scx_loader/
main.rs

1// SPDX-License-Identifier: GPL-2.0
2//
3// Copyright (c) 2024 Vladislav Nepogodin <vnepogodin@cachyos.org>
4
5// This software may be used and distributed according to the terms of the
6// GNU General Public License version 2.
7
8mod logger;
9
10use scx_loader::dbus::LoaderClientProxy;
11use scx_loader::*;
12
13use std::process::ExitStatus;
14use std::process::Stdio;
15use std::sync::Arc;
16
17use anyhow::Context;
18use anyhow::Result;
19use clap::Parser;
20use sysinfo::System;
21use tokio::process::Child;
22use tokio::process::Command;
23use tokio::sync::mpsc::UnboundedReceiver;
24use tokio::sync::mpsc::UnboundedSender;
25use tokio::time::Duration;
26use tokio::time::Instant;
27use zbus::interface;
28use zbus::Connection;
29
30#[derive(Debug, PartialEq)]
31enum ScxMessage {
32    /// Quit the scx_loader
33    Quit,
34    /// Stop the scheduler, if any
35    StopSched,
36    /// Start the scheduler with the given mode
37    StartSched((SupportedSched, SchedMode)),
38    /// Start the scheduler with the given scx arguments
39    StartSchedArgs((SupportedSched, Vec<String>)),
40    /// Switch to another scheduler with the given mode
41    SwitchSched((SupportedSched, SchedMode)),
42    /// Switch to another scheduler with the given scx arguments
43    SwitchSchedArgs((SupportedSched, Vec<String>)),
44}
45
46#[derive(Debug, PartialEq)]
47enum RunnerMessage {
48    /// Switch to another scheduler with the given scx arguments
49    Switch((SupportedSched, Vec<String>)),
50    /// Start the scheduler with the given scx arguments
51    Start((SupportedSched, Vec<String>)),
52    /// Stop the scheduler, if any
53    Stop,
54}
55
56struct ScxLoader {
57    current_scx: Option<SupportedSched>,
58    current_mode: SchedMode,
59    channel: UnboundedSender<ScxMessage>,
60}
61
62#[derive(Parser, Debug)]
63#[clap(author, version, about, long_about = None)]
64struct Args {
65    #[clap(long, short, action)]
66    auto: bool,
67}
68
69#[interface(name = "org.scx.Loader")]
70impl ScxLoader {
71    /// Get currently running scheduler, in case non is running return "unknown"
72    #[zbus(property)]
73    async fn current_scheduler(&self) -> String {
74        if let Some(current_scx) = &self.current_scx {
75            let current_scx: &str = current_scx.clone().into();
76            log::info!("called {current_scx:?}");
77            return current_scx.to_owned();
78        }
79        "unknown".to_owned()
80    }
81
82    /// Get scheduler mode
83    #[zbus(property)]
84    async fn scheduler_mode(&self) -> SchedMode {
85        self.current_mode.clone()
86    }
87
88    /// Get list of supported schedulers
89    #[zbus(property)]
90    async fn supported_schedulers(&self) -> Vec<&str> {
91        vec![
92            "scx_bpfland",
93            "scx_flash",
94            "scx_lavd",
95            "scx_p2dq",
96            "scx_tickless",
97            "scx_rustland",
98            "scx_rusty",
99        ]
100    }
101
102    async fn start_scheduler(
103        &mut self,
104        scx_name: SupportedSched,
105        sched_mode: SchedMode,
106    ) -> zbus::fdo::Result<()> {
107        log::info!("starting {scx_name:?} with mode {sched_mode:?}..");
108
109        let _ = self.channel.send(ScxMessage::StartSched((
110            scx_name.clone(),
111            sched_mode.clone(),
112        )));
113        self.current_scx = Some(scx_name);
114        self.current_mode = sched_mode;
115
116        Ok(())
117    }
118
119    async fn start_scheduler_with_args(
120        &mut self,
121        scx_name: SupportedSched,
122        scx_args: Vec<String>,
123    ) -> zbus::fdo::Result<()> {
124        log::info!("starting {scx_name:?} with args {scx_args:?}..");
125
126        let _ = self
127            .channel
128            .send(ScxMessage::StartSchedArgs((scx_name.clone(), scx_args)));
129        self.current_scx = Some(scx_name);
130        // reset mode to auto
131        self.current_mode = SchedMode::Auto;
132
133        Ok(())
134    }
135
136    async fn switch_scheduler(
137        &mut self,
138        scx_name: SupportedSched,
139        sched_mode: SchedMode,
140    ) -> zbus::fdo::Result<()> {
141        log::info!("switching {scx_name:?} with mode {sched_mode:?}..");
142
143        let _ = self.channel.send(ScxMessage::SwitchSched((
144            scx_name.clone(),
145            sched_mode.clone(),
146        )));
147        self.current_scx = Some(scx_name);
148        self.current_mode = sched_mode;
149
150        Ok(())
151    }
152
153    async fn switch_scheduler_with_args(
154        &mut self,
155        scx_name: SupportedSched,
156        scx_args: Vec<String>,
157    ) -> zbus::fdo::Result<()> {
158        log::info!("switching {scx_name:?} with args {scx_args:?}..");
159
160        let _ = self
161            .channel
162            .send(ScxMessage::SwitchSchedArgs((scx_name.clone(), scx_args)));
163        self.current_scx = Some(scx_name);
164        // reset mode to auto
165        self.current_mode = SchedMode::Auto;
166
167        Ok(())
168    }
169
170    async fn stop_scheduler(&mut self) -> zbus::fdo::Result<()> {
171        if let Some(current_scx) = &self.current_scx {
172            let scx_name: &str = current_scx.clone().into();
173
174            log::info!("stopping {scx_name:?}..");
175            let _ = self.channel.send(ScxMessage::StopSched);
176            self.current_scx = None;
177        }
178
179        Ok(())
180    }
181}
182
183// Monitors CPU utilization and enables scx_lavd when utilization of any CPUs is > 90%
184async fn monitor_cpu_util() -> Result<()> {
185    let mut system = System::new_all();
186    let mut running_sched: Option<Child> = None;
187    let mut cpu_above_threshold_since: Option<Instant> = None;
188    let mut cpu_below_threshold_since: Option<Instant> = None;
189
190    let high_utilization_threshold = 90.0;
191    let low_utilization_threshold_duration = Duration::from_secs(30);
192    let high_utilization_trigger_duration = Duration::from_secs(5);
193
194    loop {
195        system.refresh_cpu_all();
196
197        let any_cpu_above_threshold = system
198            .cpus()
199            .iter()
200            .any(|cpu| cpu.cpu_usage() > high_utilization_threshold);
201
202        if any_cpu_above_threshold {
203            if cpu_above_threshold_since.is_none() {
204                cpu_above_threshold_since = Some(Instant::now());
205            }
206
207            if cpu_above_threshold_since.unwrap().elapsed() > high_utilization_trigger_duration {
208                if running_sched.is_none() {
209                    log::info!("CPU Utilization exceeded 90% for 5 seconds, starting scx_lavd");
210
211                    let scx_name: &str = SupportedSched::Lavd.into();
212                    running_sched = Some(
213                        Command::new(scx_name)
214                            .spawn()
215                            .expect("Failed to start scx_lavd"),
216                    );
217                }
218
219                cpu_below_threshold_since = None;
220            }
221        } else {
222            cpu_above_threshold_since = None;
223
224            if cpu_below_threshold_since.is_none() {
225                cpu_below_threshold_since = Some(Instant::now());
226            }
227
228            if cpu_below_threshold_since.unwrap().elapsed() > low_utilization_threshold_duration {
229                if let Some(mut running_sched_loc) = running_sched.take() {
230                    log::info!(
231                        "CPU utilization dropped below 90% for more than 30 seconds, exiting latency-aware scheduler"
232                    );
233                    running_sched_loc
234                        .kill()
235                        .await
236                        .expect("Failed to kill scx_lavd");
237                    let lavd_exit_status = running_sched_loc
238                        .wait()
239                        .await
240                        .expect("Failed to wait on scx_lavd");
241                    log::info!("scx_lavd exited with status: {}", lavd_exit_status);
242                }
243            }
244        }
245
246        tokio::time::sleep(Duration::from_secs(1)).await;
247    }
248}
249
250#[tokio::main]
251async fn main() -> Result<()> {
252    // initialize the logger
253    logger::init_logger().expect("Failed to initialize logger");
254
255    let args = Args::parse();
256
257    // initialize the config
258    let config = config::init_config().context("Failed to initialize config")?;
259
260    // If --auto is passed, start scx_loader as a standard background process
261    // that swaps schedulers out automatically
262    // based on CPU utilization without registering a dbus interface.
263    if args.auto {
264        log::info!("Starting scx_loader monitor as standard process without dbus interface");
265        monitor_cpu_util().await?;
266        return Ok(());
267    }
268
269    log::info!("Starting as dbus interface");
270    // setup channel
271    let (channel, rx) = tokio::sync::mpsc::unbounded_channel::<ScxMessage>();
272
273    let channel_clone = channel.clone();
274    ctrlc::set_handler(move || {
275        log::info!("shutting down..");
276        let _ = channel_clone.send(ScxMessage::Quit);
277    })
278    .context("Error setting Ctrl-C handler")?;
279
280    // register dbus interface
281    let connection = Connection::system().await?;
282    connection
283        .object_server()
284        .at(
285            "/org/scx/Loader",
286            ScxLoader {
287                current_scx: None,
288                current_mode: SchedMode::Auto,
289                channel: channel.clone(),
290            },
291        )
292        .await?;
293
294    connection.request_name("org.scx.Loader").await?;
295
296    // if user set default scheduler, then start it
297    if let Some(default_sched) = &config.default_sched {
298        log::info!("Starting default scheduler: {default_sched:?}");
299
300        let default_mode = config.default_mode.clone().unwrap_or(SchedMode::Auto);
301
302        let loader_client = LoaderClientProxy::new(&connection).await?;
303        loader_client
304            .switch_scheduler(default_sched.clone(), default_mode)
305            .await?;
306    }
307
308    // run worker/receiver loop
309    worker_loop(config, rx).await?;
310
311    Ok(())
312}
313
314async fn worker_loop(
315    config: config::Config,
316    mut receiver: UnboundedReceiver<ScxMessage>,
317) -> Result<()> {
318    // setup channel for scheduler runner
319    let (runner_tx, runner_rx) = tokio::sync::mpsc::channel::<RunnerMessage>(1);
320
321    let run_sched_future = tokio::spawn(async move { handle_child_process(runner_rx).await });
322
323    // prepare future for tokio
324    tokio::pin!(run_sched_future);
325
326    loop {
327        // handle each future here
328        let msg = tokio::select! {
329            msg = receiver.recv() => {
330                match msg {
331                    None => return Ok(()),
332                    Some(m) => m,
333                }
334            }
335            res = &mut run_sched_future => {
336                log::info!("Sched future finished");
337                let _ = res?;
338                continue;
339            }
340        };
341        log::debug!("Got msg : {msg:?}");
342
343        match msg {
344            ScxMessage::Quit => return Ok(()),
345            ScxMessage::StopSched => {
346                log::info!("Got event to stop scheduler!");
347
348                // send stop message to the runner
349                runner_tx.send(RunnerMessage::Stop).await?;
350            }
351            ScxMessage::StartSched((scx_sched, sched_mode)) => {
352                log::info!("Got event to start scheduler!");
353
354                // get scheduler args for the mode
355                let args = config::get_scx_flags_for_mode(&config, &scx_sched, sched_mode);
356
357                // send message with scheduler and asociated args to the runner
358                runner_tx
359                    .send(RunnerMessage::Start((scx_sched, args)))
360                    .await?;
361            }
362            ScxMessage::StartSchedArgs((scx_sched, sched_args)) => {
363                log::info!("Got event to start scheduler with args!");
364
365                // send message with scheduler and asociated args to the runner
366                runner_tx
367                    .send(RunnerMessage::Start((scx_sched, sched_args)))
368                    .await?;
369            }
370            ScxMessage::SwitchSched((scx_sched, sched_mode)) => {
371                log::info!("Got event to switch scheduler!");
372
373                // get scheduler args for the mode
374                let args = config::get_scx_flags_for_mode(&config, &scx_sched, sched_mode);
375
376                // send message with scheduler and asociated args to the runner
377                runner_tx
378                    .send(RunnerMessage::Switch((scx_sched, args)))
379                    .await?;
380            }
381            ScxMessage::SwitchSchedArgs((scx_sched, sched_args)) => {
382                log::info!("Got event to switch scheduler with args!");
383
384                // send message with scheduler and asociated args to the runner
385                runner_tx
386                    .send(RunnerMessage::Switch((scx_sched, sched_args)))
387                    .await?;
388            }
389        }
390    }
391}
392
393async fn handle_child_process(mut rx: tokio::sync::mpsc::Receiver<RunnerMessage>) -> Result<()> {
394    let mut task: Option<tokio::task::JoinHandle<Result<Option<ExitStatus>>>> = None;
395    let mut cancel_token = Arc::new(tokio_util::sync::CancellationToken::new());
396
397    while let Some(message) = rx.recv().await {
398        match message {
399            RunnerMessage::Switch((scx_sched, sched_args)) => {
400                // stop the sched if its running
401                stop_scheduler(&mut task, &mut cancel_token).await;
402
403                // overwise start scheduler
404                match start_scheduler(scx_sched, sched_args, cancel_token.clone()).await {
405                    Ok(handle) => {
406                        task = Some(handle);
407                        log::debug!("Scheduler started");
408                    }
409                    Err(err) => {
410                        log::error!("Failed to start scheduler: {err}");
411                    }
412                }
413            }
414            RunnerMessage::Start((scx_sched, sched_args)) => {
415                // check if sched is running or not
416                if task.is_some() {
417                    log::error!("Scheduler wasn't finished yet. Stop already running scheduler!");
418                    continue;
419                }
420                // overwise start scheduler
421                match start_scheduler(scx_sched, sched_args, cancel_token.clone()).await {
422                    Ok(handle) => {
423                        task = Some(handle);
424                        log::debug!("Scheduler started");
425                    }
426                    Err(err) => {
427                        log::error!("Failed to start scheduler: {err}");
428                    }
429                }
430            }
431            RunnerMessage::Stop => {
432                stop_scheduler(&mut task, &mut cancel_token).await;
433            }
434        }
435    }
436
437    Ok(())
438}
439
440/// Start the scheduler with the given arguments
441async fn start_scheduler(
442    scx_crate: SupportedSched,
443    args: Vec<String>,
444    cancel_token: Arc<tokio_util::sync::CancellationToken>,
445) -> Result<tokio::task::JoinHandle<Result<Option<ExitStatus>>>> {
446    // Ensure the child process exit is handled correctly in the runtime
447    let handle = tokio::spawn(async move {
448        let mut retries = 0u32;
449        let max_retries = 5u32;
450
451        let mut last_status: Option<ExitStatus> = None;
452
453        while retries < max_retries {
454            let child = spawn_scheduler(scx_crate.clone(), args.clone()).await;
455
456            let mut failed = false;
457            if let Ok(mut child) = child {
458                tokio::select! {
459                    status = child.wait() => {
460                        let status = status.expect("child process encountered an error");
461                        last_status = Some(status);
462                        if !status.success() {
463                            failed = true;
464                        }
465                        log::debug!("Child process exited with status: {status:?}");
466                    }
467
468                    _ = cancel_token.cancelled() => {
469                        log::debug!("Received cancellation signal");
470                        // Send SIGINT
471                        if let Some(child_id) = child.id() {
472                            nix::sys::signal::kill(
473                                nix::unistd::Pid::from_raw(child_id as i32),
474                                nix::sys::signal::SIGINT,
475                            ).context("Failed to send termination signal to the child")?;
476                        }
477                        let status = child.wait().await.expect("child process encountered an error");
478                        last_status = Some(status);
479                        break;
480                    }
481                };
482            } else {
483                log::debug!("Failed to spawn child process");
484                failed = true;
485            }
486
487            // retrying if failed, otherwise exit
488            if !failed {
489                break;
490            }
491
492            retries += 1;
493            log::error!(
494                "Failed to start scheduler (attempt {}/{})",
495                retries,
496                max_retries,
497            );
498        }
499
500        Ok(last_status)
501    });
502
503    Ok(handle)
504}
505
506/// Starts the scheduler as a child process and returns child object to manage lifecycle by the
507/// caller.
508async fn spawn_scheduler(scx_crate: SupportedSched, args: Vec<String>) -> Result<Child> {
509    let sched_bin_name: &str = scx_crate.into();
510    log::info!("starting {sched_bin_name} command");
511
512    let mut cmd = Command::new(sched_bin_name);
513    // set arguments
514    cmd.args(args);
515
516    // by default child IO handles are inherited from parent process
517
518    // pipe stdin of child proc to /dev/null
519    cmd.stdin(Stdio::null());
520
521    // spawn process
522    let child = cmd.spawn().expect("failed to spawn command");
523
524    Ok(child)
525}
526
527async fn stop_scheduler(
528    task: &mut Option<tokio::task::JoinHandle<Result<Option<ExitStatus>>>>,
529    cancel_token: &mut Arc<tokio_util::sync::CancellationToken>,
530) {
531    if let Some(task) = task.take() {
532        log::debug!("Stopping already running scheduler..");
533        cancel_token.cancel();
534        let status = task.await;
535        log::debug!("Scheduler was stopped with status: {:?}", status);
536        // Create a new cancellation token
537        *cancel_token = Arc::new(tokio_util::sync::CancellationToken::new());
538    }
539}