scx_loader/
main.rs

1// SPDX-License-Identifier: GPL-2.0
2//
3// Copyright (c) 2024 Vladislav Nepogodin <vnepogodin@cachyos.org>
4
5// This software may be used and distributed according to the terms of the
6// GNU General Public License version 2.
7
8mod logger;
9
10use scx_loader::dbus::LoaderClientProxy;
11use scx_loader::*;
12
13use std::process::ExitStatus;
14use std::process::Stdio;
15use std::sync::Arc;
16
17use anyhow::Context;
18use anyhow::Result;
19use clap::Parser;
20use sysinfo::System;
21use tokio::process::Child;
22use tokio::process::Command;
23use tokio::sync::mpsc::UnboundedReceiver;
24use tokio::sync::mpsc::UnboundedSender;
25use tokio::time::Duration;
26use tokio::time::Instant;
27use zbus::interface;
28use zbus::Connection;
29
30#[derive(Debug, PartialEq)]
31enum ScxMessage {
32    /// Quit the scx_loader
33    Quit,
34    /// Stop the scheduler, if any
35    StopSched,
36    /// Start the scheduler with the given mode
37    StartSched((SupportedSched, SchedMode)),
38    /// Start the scheduler with the given scx arguments
39    StartSchedArgs((SupportedSched, Vec<String>)),
40    /// Switch to another scheduler with the given mode
41    SwitchSched((SupportedSched, SchedMode)),
42    /// Switch to another scheduler with the given scx arguments
43    SwitchSchedArgs((SupportedSched, Vec<String>)),
44    /// Restart the currently running scheduler with original configuration
45    RestartSched((SupportedSched, Option<Vec<String>>, SchedMode)),
46}
47
48#[derive(Debug, PartialEq)]
49enum RunnerMessage {
50    /// Switch to another scheduler with the given scx arguments
51    Switch((SupportedSched, Vec<String>)),
52    /// Start the scheduler with the given scx arguments
53    Start((SupportedSched, Vec<String>)),
54    /// Stop the scheduler, if any
55    Stop,
56    /// Restart the currently running scheduler with same arguments
57    Restart((SupportedSched, Vec<String>)),
58}
59
60struct ScxLoader {
61    current_scx: Option<SupportedSched>,
62    current_mode: SchedMode,
63    current_args: Option<Vec<String>>,
64    channel: UnboundedSender<ScxMessage>,
65}
66
67#[derive(Parser, Debug)]
68#[clap(author, version, about, long_about = None)]
69struct Args {
70    #[clap(long, short, action)]
71    auto: bool,
72}
73
74#[interface(name = "org.scx.Loader")]
75impl ScxLoader {
76    /// Get currently running scheduler, in case non is running return "unknown"
77    #[zbus(property)]
78    async fn current_scheduler(&self) -> String {
79        if let Some(current_scx) = &self.current_scx {
80            let current_scx: &str = current_scx.clone().into();
81            log::info!("called {current_scx:?}");
82            return current_scx.to_owned();
83        }
84        "unknown".to_owned()
85    }
86
87    /// Get scheduler mode
88    #[zbus(property)]
89    async fn scheduler_mode(&self) -> SchedMode {
90        self.current_mode.clone()
91    }
92
93    /// Get arguments used for currently running scheduler
94    #[zbus(property)]
95    async fn current_scheduler_args(&self) -> Vec<String> {
96        self.current_args.clone().unwrap_or_default()
97    }
98
99    /// Get list of supported schedulers
100    #[zbus(property)]
101    async fn supported_schedulers(&self) -> Vec<&str> {
102        vec![
103            "scx_bpfland",
104            "scx_cosmos",
105            "scx_flash",
106            "scx_lavd",
107            "scx_p2dq",
108            "scx_tickless",
109            "scx_rustland",
110            "scx_rusty",
111        ]
112    }
113    async fn start_scheduler(
114        &mut self,
115        scx_name: SupportedSched,
116        sched_mode: SchedMode,
117    ) -> zbus::fdo::Result<()> {
118        log::info!("starting {scx_name:?} with mode {sched_mode:?}..");
119
120        let _ = self.channel.send(ScxMessage::StartSched((
121            scx_name.clone(),
122            sched_mode.clone(),
123        )));
124        self.current_scx = Some(scx_name);
125        self.current_mode = sched_mode;
126        self.current_args = None;
127
128        Ok(())
129    }
130
131    async fn start_scheduler_with_args(
132        &mut self,
133        scx_name: SupportedSched,
134        scx_args: Vec<String>,
135    ) -> zbus::fdo::Result<()> {
136        log::info!("starting {scx_name:?} with args {scx_args:?}..");
137
138        let _ = self.channel.send(ScxMessage::StartSchedArgs((
139            scx_name.clone(),
140            scx_args.clone(),
141        )));
142        self.current_scx = Some(scx_name);
143        // reset mode to auto
144        self.current_mode = SchedMode::Auto;
145        self.current_args = Some(scx_args);
146
147        Ok(())
148    }
149
150    async fn switch_scheduler(
151        &mut self,
152        scx_name: SupportedSched,
153        sched_mode: SchedMode,
154    ) -> zbus::fdo::Result<()> {
155        log::info!("switching {scx_name:?} with mode {sched_mode:?}..");
156
157        let _ = self.channel.send(ScxMessage::SwitchSched((
158            scx_name.clone(),
159            sched_mode.clone(),
160        )));
161        self.current_scx = Some(scx_name);
162        self.current_mode = sched_mode;
163        self.current_args = None;
164
165        Ok(())
166    }
167
168    async fn switch_scheduler_with_args(
169        &mut self,
170        scx_name: SupportedSched,
171        scx_args: Vec<String>,
172    ) -> zbus::fdo::Result<()> {
173        log::info!("switching {scx_name:?} with args {scx_args:?}..");
174
175        let _ = self.channel.send(ScxMessage::SwitchSchedArgs((
176            scx_name.clone(),
177            scx_args.clone(),
178        )));
179        self.current_scx = Some(scx_name);
180        // reset mode to auto
181        self.current_mode = SchedMode::Auto;
182        self.current_args = Some(scx_args);
183
184        Ok(())
185    }
186
187    async fn stop_scheduler(&mut self) -> zbus::fdo::Result<()> {
188        if let Some(current_scx) = &self.current_scx {
189            let scx_name: &str = current_scx.clone().into();
190
191            log::info!("stopping {scx_name:?}..");
192            let _ = self.channel.send(ScxMessage::StopSched);
193            self.current_scx = None;
194            self.current_args = None;
195        }
196
197        Ok(())
198    }
199
200    async fn restart_scheduler(&mut self) -> zbus::fdo::Result<()> {
201        if let Some(current_scx) = &self.current_scx {
202            let scx_name: &str = current_scx.clone().into();
203
204            log::info!("restarting {scx_name:?}..");
205            let _ = self.channel.send(ScxMessage::RestartSched((
206                current_scx.clone(),
207                self.current_args.clone(),
208                self.current_mode.clone(),
209            )));
210
211            Ok(())
212        } else {
213            Err(zbus::fdo::Error::Failed(
214                "No scheduler is currently running to restart".to_string(),
215            ))
216        }
217    }
218}
219
220// Monitors CPU utilization and enables scx_lavd when utilization of any CPUs is > 90%
221async fn monitor_cpu_util() -> Result<()> {
222    let mut system = System::new_all();
223    let mut running_sched: Option<Child> = None;
224    let mut cpu_above_threshold_since: Option<Instant> = None;
225    let mut cpu_below_threshold_since: Option<Instant> = None;
226
227    let high_utilization_threshold = 90.0;
228    let low_utilization_threshold_duration = Duration::from_secs(30);
229    let high_utilization_trigger_duration = Duration::from_secs(5);
230
231    loop {
232        system.refresh_cpu_all();
233
234        let any_cpu_above_threshold = system
235            .cpus()
236            .iter()
237            .any(|cpu| cpu.cpu_usage() > high_utilization_threshold);
238
239        if any_cpu_above_threshold {
240            if cpu_above_threshold_since.is_none() {
241                cpu_above_threshold_since = Some(Instant::now());
242            }
243
244            if cpu_above_threshold_since.unwrap().elapsed() > high_utilization_trigger_duration {
245                if running_sched.is_none() {
246                    log::info!("CPU Utilization exceeded 90% for 5 seconds, starting scx_lavd");
247
248                    let scx_name: &str = SupportedSched::Lavd.into();
249                    running_sched = Some(
250                        Command::new(scx_name)
251                            .spawn()
252                            .expect("Failed to start scx_lavd"),
253                    );
254                }
255
256                cpu_below_threshold_since = None;
257            }
258        } else {
259            cpu_above_threshold_since = None;
260
261            if cpu_below_threshold_since.is_none() {
262                cpu_below_threshold_since = Some(Instant::now());
263            }
264
265            if cpu_below_threshold_since.unwrap().elapsed() > low_utilization_threshold_duration {
266                if let Some(mut running_sched_loc) = running_sched.take() {
267                    log::info!(
268                        "CPU utilization dropped below 90% for more than 30 seconds, exiting latency-aware scheduler"
269                    );
270                    running_sched_loc
271                        .kill()
272                        .await
273                        .expect("Failed to kill scx_lavd");
274                    let lavd_exit_status = running_sched_loc
275                        .wait()
276                        .await
277                        .expect("Failed to wait on scx_lavd");
278                    log::info!("scx_lavd exited with status: {}", lavd_exit_status);
279                }
280            }
281        }
282
283        tokio::time::sleep(Duration::from_secs(1)).await;
284    }
285}
286
287#[tokio::main]
288async fn main() -> Result<()> {
289    // initialize the logger
290    logger::init_logger().expect("Failed to initialize logger");
291
292    let args = Args::parse();
293
294    // initialize the config
295    let config = config::init_config().context("Failed to initialize config")?;
296
297    // If --auto is passed, start scx_loader as a standard background process
298    // that swaps schedulers out automatically
299    // based on CPU utilization without registering a dbus interface.
300    if args.auto {
301        log::info!("Starting scx_loader monitor as standard process without dbus interface");
302        monitor_cpu_util().await?;
303        return Ok(());
304    }
305
306    log::info!("Starting as dbus interface");
307    // setup channel
308    let (channel, rx) = tokio::sync::mpsc::unbounded_channel::<ScxMessage>();
309
310    let channel_clone = channel.clone();
311    ctrlc::set_handler(move || {
312        log::info!("shutting down..");
313        let _ = channel_clone.send(ScxMessage::Quit);
314    })
315    .context("Error setting Ctrl-C handler")?;
316
317    // register dbus interface
318    let connection = Connection::system().await?;
319    connection
320        .object_server()
321        .at(
322            "/org/scx/Loader",
323            ScxLoader {
324                current_scx: None,
325                current_mode: SchedMode::Auto,
326                current_args: None,
327                channel: channel.clone(),
328            },
329        )
330        .await?;
331
332    connection.request_name("org.scx.Loader").await?;
333
334    // if user set default scheduler, then start it
335    if let Some(default_sched) = &config.default_sched {
336        log::info!("Starting default scheduler: {default_sched:?}");
337
338        let default_mode = config.default_mode.clone().unwrap_or(SchedMode::Auto);
339
340        let loader_client = LoaderClientProxy::new(&connection).await?;
341        loader_client
342            .switch_scheduler(default_sched.clone(), default_mode)
343            .await?;
344    }
345
346    // run worker/receiver loop
347    worker_loop(config, rx).await?;
348
349    Ok(())
350}
351
352async fn worker_loop(
353    config: config::Config,
354    mut receiver: UnboundedReceiver<ScxMessage>,
355) -> Result<()> {
356    // setup channel for scheduler runner
357    let (runner_tx, runner_rx) = tokio::sync::mpsc::channel::<RunnerMessage>(1);
358
359    let run_sched_future = tokio::spawn(async move { handle_child_process(runner_rx).await });
360
361    // prepare future for tokio
362    tokio::pin!(run_sched_future);
363
364    loop {
365        // handle each future here
366        let msg = tokio::select! {
367            msg = receiver.recv() => {
368                match msg {
369                    None => return Ok(()),
370                    Some(m) => m,
371                }
372            }
373            res = &mut run_sched_future => {
374                log::info!("Sched future finished");
375                let _ = res?;
376                continue;
377            }
378        };
379        log::debug!("Got msg : {msg:?}");
380
381        match msg {
382            ScxMessage::Quit => return Ok(()),
383            ScxMessage::StopSched => {
384                log::info!("Got event to stop scheduler!");
385
386                // send stop message to the runner
387                runner_tx.send(RunnerMessage::Stop).await?;
388            }
389            ScxMessage::StartSched((scx_sched, sched_mode)) => {
390                log::info!("Got event to start scheduler!");
391
392                // get scheduler args for the mode
393                let args = config::get_scx_flags_for_mode(&config, &scx_sched, sched_mode);
394
395                // send message with scheduler and asociated args to the runner
396                runner_tx
397                    .send(RunnerMessage::Start((scx_sched, args)))
398                    .await?;
399            }
400            ScxMessage::StartSchedArgs((scx_sched, sched_args)) => {
401                log::info!("Got event to start scheduler with args!");
402
403                // send message with scheduler and asociated args to the runner
404                runner_tx
405                    .send(RunnerMessage::Start((scx_sched, sched_args)))
406                    .await?;
407            }
408            ScxMessage::SwitchSched((scx_sched, sched_mode)) => {
409                log::info!("Got event to switch scheduler!");
410
411                // get scheduler args for the mode
412                let args = config::get_scx_flags_for_mode(&config, &scx_sched, sched_mode);
413
414                // send message with scheduler and asociated args to the runner
415                runner_tx
416                    .send(RunnerMessage::Switch((scx_sched, args)))
417                    .await?;
418            }
419            ScxMessage::SwitchSchedArgs((scx_sched, sched_args)) => {
420                log::info!("Got event to switch scheduler with args!");
421
422                // send message with scheduler and asociated args to the runner
423                runner_tx
424                    .send(RunnerMessage::Switch((scx_sched, sched_args)))
425                    .await?;
426            }
427            ScxMessage::RestartSched((scx_sched, current_args, current_mode)) => {
428                log::info!("Got event to restart scheduler!");
429
430                // Determine the arguments to use for restart
431                let args = if let Some(args) = current_args {
432                    // Use custom arguments if they were set
433                    args
434                } else {
435                    // Use mode-based arguments
436                    config::get_scx_flags_for_mode(&config, &scx_sched, current_mode)
437                };
438
439                // send restart message to the runner
440                runner_tx
441                    .send(RunnerMessage::Restart((scx_sched, args)))
442                    .await?;
443            }
444        }
445    }
446}
447
448async fn handle_child_process(mut rx: tokio::sync::mpsc::Receiver<RunnerMessage>) -> Result<()> {
449    let mut task: Option<tokio::task::JoinHandle<Result<Option<ExitStatus>>>> = None;
450    let mut cancel_token = Arc::new(tokio_util::sync::CancellationToken::new());
451
452    while let Some(message) = rx.recv().await {
453        match message {
454            RunnerMessage::Switch((scx_sched, sched_args)) => {
455                // stop the sched if its running
456                stop_scheduler(&mut task, &mut cancel_token).await;
457
458                // overwise start scheduler
459                match start_scheduler(scx_sched, sched_args, cancel_token.clone()).await {
460                    Ok(handle) => {
461                        task = Some(handle);
462                        log::debug!("Scheduler started");
463                    }
464                    Err(err) => {
465                        log::error!("Failed to start scheduler: {err}");
466                    }
467                }
468            }
469            RunnerMessage::Start((scx_sched, sched_args)) => {
470                // check if sched is running or not
471                if task.is_some() {
472                    log::error!("Scheduler wasn't finished yet. Stop already running scheduler!");
473                    continue;
474                }
475                // overwise start scheduler
476                match start_scheduler(scx_sched, sched_args, cancel_token.clone()).await {
477                    Ok(handle) => {
478                        task = Some(handle);
479                        log::debug!("Scheduler started");
480                    }
481                    Err(err) => {
482                        log::error!("Failed to start scheduler: {err}");
483                    }
484                }
485            }
486            RunnerMessage::Stop => {
487                stop_scheduler(&mut task, &mut cancel_token).await;
488            }
489            RunnerMessage::Restart((scx_sched, sched_args)) => {
490                log::info!("Got event to restart scheduler!");
491
492                // stop the sched if its running
493                stop_scheduler(&mut task, &mut cancel_token).await;
494
495                // restart scheduler with the same configuration
496                match start_scheduler(scx_sched, sched_args, cancel_token.clone()).await {
497                    Ok(handle) => {
498                        task = Some(handle);
499                        log::debug!("Scheduler restarted");
500                    }
501                    Err(err) => {
502                        log::error!("Failed to restart scheduler: {err}");
503                    }
504                }
505            }
506        }
507    }
508
509    Ok(())
510}
511
512/// Start the scheduler with the given arguments
513async fn start_scheduler(
514    scx_crate: SupportedSched,
515    args: Vec<String>,
516    cancel_token: Arc<tokio_util::sync::CancellationToken>,
517) -> Result<tokio::task::JoinHandle<Result<Option<ExitStatus>>>> {
518    // Ensure the child process exit is handled correctly in the runtime
519    let handle = tokio::spawn(async move {
520        let mut retries = 0u32;
521        let max_retries = 5u32;
522
523        let mut last_status: Option<ExitStatus> = None;
524
525        while retries < max_retries {
526            let child = spawn_scheduler(scx_crate.clone(), args.clone()).await;
527
528            let mut failed = false;
529            if let Ok(mut child) = child {
530                tokio::select! {
531                    status = child.wait() => {
532                        let status = status.expect("child process encountered an error");
533                        last_status = Some(status);
534                        if !status.success() {
535                            failed = true;
536                        }
537                        log::debug!("Child process exited with status: {status:?}");
538                    }
539
540                    _ = cancel_token.cancelled() => {
541                        log::debug!("Received cancellation signal");
542                        // Send SIGINT
543                        if let Some(child_id) = child.id() {
544                            nix::sys::signal::kill(
545                                nix::unistd::Pid::from_raw(child_id as i32),
546                                nix::sys::signal::SIGINT,
547                            ).context("Failed to send termination signal to the child")?;
548                        }
549                        let status = child.wait().await.expect("child process encountered an error");
550                        last_status = Some(status);
551                        break;
552                    }
553                };
554            } else {
555                log::debug!("Failed to spawn child process");
556                failed = true;
557            }
558
559            // retrying if failed, otherwise exit
560            if !failed {
561                break;
562            }
563
564            retries += 1;
565            log::error!(
566                "Failed to start scheduler (attempt {}/{})",
567                retries,
568                max_retries,
569            );
570        }
571
572        Ok(last_status)
573    });
574
575    Ok(handle)
576}
577
578/// Starts the scheduler as a child process and returns child object to manage lifecycle by the
579/// caller.
580async fn spawn_scheduler(scx_crate: SupportedSched, args: Vec<String>) -> Result<Child> {
581    let sched_bin_name: &str = scx_crate.into();
582    log::info!("starting {sched_bin_name} command");
583
584    let mut cmd = Command::new(sched_bin_name);
585    // set arguments
586    cmd.args(args);
587
588    // by default child IO handles are inherited from parent process
589
590    // pipe stdin of child proc to /dev/null
591    cmd.stdin(Stdio::null());
592
593    // spawn process
594    let child = cmd.spawn().expect("failed to spawn command");
595
596    Ok(child)
597}
598
599async fn stop_scheduler(
600    task: &mut Option<tokio::task::JoinHandle<Result<Option<ExitStatus>>>>,
601    cancel_token: &mut Arc<tokio_util::sync::CancellationToken>,
602) {
603    if let Some(task) = task.take() {
604        log::debug!("Stopping already running scheduler..");
605        cancel_token.cancel();
606        let status = task.await;
607        log::debug!("Scheduler was stopped with status: {:?}", status);
608        // Create a new cancellation token
609        *cancel_token = Arc::new(tokio_util::sync::CancellationToken::new());
610    }
611}