1mod logger;
9
10use scx_loader::dbus::LoaderClientProxy;
11use scx_loader::*;
12
13use std::process::ExitStatus;
14use std::process::Stdio;
15use std::sync::Arc;
16
17use anyhow::Context;
18use anyhow::Result;
19use clap::Parser;
20use sysinfo::System;
21use tokio::process::Child;
22use tokio::process::Command;
23use tokio::sync::mpsc::UnboundedReceiver;
24use tokio::sync::mpsc::UnboundedSender;
25use tokio::time::Duration;
26use tokio::time::Instant;
27use zbus::interface;
28use zbus::Connection;
29
30#[derive(Debug, PartialEq)]
31enum ScxMessage {
32 Quit,
34 StopSched,
36 StartSched((SupportedSched, SchedMode)),
38 StartSchedArgs((SupportedSched, Vec<String>)),
40 SwitchSched((SupportedSched, SchedMode)),
42 SwitchSchedArgs((SupportedSched, Vec<String>)),
44}
45
46#[derive(Debug, PartialEq)]
47enum RunnerMessage {
48 Switch((SupportedSched, Vec<String>)),
50 Start((SupportedSched, Vec<String>)),
52 Stop,
54}
55
56struct ScxLoader {
57 current_scx: Option<SupportedSched>,
58 current_mode: SchedMode,
59 channel: UnboundedSender<ScxMessage>,
60}
61
62#[derive(Parser, Debug)]
63#[clap(author, version, about, long_about = None)]
64struct Args {
65 #[clap(long, short, action)]
66 auto: bool,
67}
68
69#[interface(name = "org.scx.Loader")]
70impl ScxLoader {
71 #[zbus(property)]
73 async fn current_scheduler(&self) -> String {
74 if let Some(current_scx) = &self.current_scx {
75 let current_scx: &str = current_scx.clone().into();
76 log::info!("called {current_scx:?}");
77 return current_scx.to_owned();
78 }
79 "unknown".to_owned()
80 }
81
82 #[zbus(property)]
84 async fn scheduler_mode(&self) -> SchedMode {
85 self.current_mode.clone()
86 }
87
88 #[zbus(property)]
90 async fn supported_schedulers(&self) -> Vec<&str> {
91 vec![
92 "scx_bpfland",
93 "scx_flash",
94 "scx_lavd",
95 "scx_p2dq",
96 "scx_tickless",
97 "scx_rustland",
98 "scx_rusty",
99 ]
100 }
101
102 async fn start_scheduler(
103 &mut self,
104 scx_name: SupportedSched,
105 sched_mode: SchedMode,
106 ) -> zbus::fdo::Result<()> {
107 log::info!("starting {scx_name:?} with mode {sched_mode:?}..");
108
109 let _ = self.channel.send(ScxMessage::StartSched((
110 scx_name.clone(),
111 sched_mode.clone(),
112 )));
113 self.current_scx = Some(scx_name);
114 self.current_mode = sched_mode;
115
116 Ok(())
117 }
118
119 async fn start_scheduler_with_args(
120 &mut self,
121 scx_name: SupportedSched,
122 scx_args: Vec<String>,
123 ) -> zbus::fdo::Result<()> {
124 log::info!("starting {scx_name:?} with args {scx_args:?}..");
125
126 let _ = self
127 .channel
128 .send(ScxMessage::StartSchedArgs((scx_name.clone(), scx_args)));
129 self.current_scx = Some(scx_name);
130 self.current_mode = SchedMode::Auto;
132
133 Ok(())
134 }
135
136 async fn switch_scheduler(
137 &mut self,
138 scx_name: SupportedSched,
139 sched_mode: SchedMode,
140 ) -> zbus::fdo::Result<()> {
141 log::info!("switching {scx_name:?} with mode {sched_mode:?}..");
142
143 let _ = self.channel.send(ScxMessage::SwitchSched((
144 scx_name.clone(),
145 sched_mode.clone(),
146 )));
147 self.current_scx = Some(scx_name);
148 self.current_mode = sched_mode;
149
150 Ok(())
151 }
152
153 async fn switch_scheduler_with_args(
154 &mut self,
155 scx_name: SupportedSched,
156 scx_args: Vec<String>,
157 ) -> zbus::fdo::Result<()> {
158 log::info!("switching {scx_name:?} with args {scx_args:?}..");
159
160 let _ = self
161 .channel
162 .send(ScxMessage::SwitchSchedArgs((scx_name.clone(), scx_args)));
163 self.current_scx = Some(scx_name);
164 self.current_mode = SchedMode::Auto;
166
167 Ok(())
168 }
169
170 async fn stop_scheduler(&mut self) -> zbus::fdo::Result<()> {
171 if let Some(current_scx) = &self.current_scx {
172 let scx_name: &str = current_scx.clone().into();
173
174 log::info!("stopping {scx_name:?}..");
175 let _ = self.channel.send(ScxMessage::StopSched);
176 self.current_scx = None;
177 }
178
179 Ok(())
180 }
181}
182
183async fn monitor_cpu_util() -> Result<()> {
185 let mut system = System::new_all();
186 let mut running_sched: Option<Child> = None;
187 let mut cpu_above_threshold_since: Option<Instant> = None;
188 let mut cpu_below_threshold_since: Option<Instant> = None;
189
190 let high_utilization_threshold = 90.0;
191 let low_utilization_threshold_duration = Duration::from_secs(30);
192 let high_utilization_trigger_duration = Duration::from_secs(5);
193
194 loop {
195 system.refresh_cpu_all();
196
197 let any_cpu_above_threshold = system
198 .cpus()
199 .iter()
200 .any(|cpu| cpu.cpu_usage() > high_utilization_threshold);
201
202 if any_cpu_above_threshold {
203 if cpu_above_threshold_since.is_none() {
204 cpu_above_threshold_since = Some(Instant::now());
205 }
206
207 if cpu_above_threshold_since.unwrap().elapsed() > high_utilization_trigger_duration {
208 if running_sched.is_none() {
209 log::info!("CPU Utilization exceeded 90% for 5 seconds, starting scx_lavd");
210
211 let scx_name: &str = SupportedSched::Lavd.into();
212 running_sched = Some(
213 Command::new(scx_name)
214 .spawn()
215 .expect("Failed to start scx_lavd"),
216 );
217 }
218
219 cpu_below_threshold_since = None;
220 }
221 } else {
222 cpu_above_threshold_since = None;
223
224 if cpu_below_threshold_since.is_none() {
225 cpu_below_threshold_since = Some(Instant::now());
226 }
227
228 if cpu_below_threshold_since.unwrap().elapsed() > low_utilization_threshold_duration {
229 if let Some(mut running_sched_loc) = running_sched.take() {
230 log::info!(
231 "CPU utilization dropped below 90% for more than 30 seconds, exiting latency-aware scheduler"
232 );
233 running_sched_loc
234 .kill()
235 .await
236 .expect("Failed to kill scx_lavd");
237 let lavd_exit_status = running_sched_loc
238 .wait()
239 .await
240 .expect("Failed to wait on scx_lavd");
241 log::info!("scx_lavd exited with status: {}", lavd_exit_status);
242 }
243 }
244 }
245
246 tokio::time::sleep(Duration::from_secs(1)).await;
247 }
248}
249
250#[tokio::main]
251async fn main() -> Result<()> {
252 logger::init_logger().expect("Failed to initialize logger");
254
255 let args = Args::parse();
256
257 let config = config::init_config().context("Failed to initialize config")?;
259
260 if args.auto {
264 log::info!("Starting scx_loader monitor as standard process without dbus interface");
265 monitor_cpu_util().await?;
266 return Ok(());
267 }
268
269 log::info!("Starting as dbus interface");
270 let (channel, rx) = tokio::sync::mpsc::unbounded_channel::<ScxMessage>();
272
273 let channel_clone = channel.clone();
274 ctrlc::set_handler(move || {
275 log::info!("shutting down..");
276 let _ = channel_clone.send(ScxMessage::Quit);
277 })
278 .context("Error setting Ctrl-C handler")?;
279
280 let connection = Connection::system().await?;
282 connection
283 .object_server()
284 .at(
285 "/org/scx/Loader",
286 ScxLoader {
287 current_scx: None,
288 current_mode: SchedMode::Auto,
289 channel: channel.clone(),
290 },
291 )
292 .await?;
293
294 connection.request_name("org.scx.Loader").await?;
295
296 if let Some(default_sched) = &config.default_sched {
298 log::info!("Starting default scheduler: {default_sched:?}");
299
300 let default_mode = config.default_mode.clone().unwrap_or(SchedMode::Auto);
301
302 let loader_client = LoaderClientProxy::new(&connection).await?;
303 loader_client
304 .switch_scheduler(default_sched.clone(), default_mode)
305 .await?;
306 }
307
308 worker_loop(config, rx).await?;
310
311 Ok(())
312}
313
314async fn worker_loop(
315 config: config::Config,
316 mut receiver: UnboundedReceiver<ScxMessage>,
317) -> Result<()> {
318 let (runner_tx, runner_rx) = tokio::sync::mpsc::channel::<RunnerMessage>(1);
320
321 let run_sched_future = tokio::spawn(async move { handle_child_process(runner_rx).await });
322
323 tokio::pin!(run_sched_future);
325
326 loop {
327 let msg = tokio::select! {
329 msg = receiver.recv() => {
330 match msg {
331 None => return Ok(()),
332 Some(m) => m,
333 }
334 }
335 res = &mut run_sched_future => {
336 log::info!("Sched future finished");
337 let _ = res?;
338 continue;
339 }
340 };
341 log::debug!("Got msg : {msg:?}");
342
343 match msg {
344 ScxMessage::Quit => return Ok(()),
345 ScxMessage::StopSched => {
346 log::info!("Got event to stop scheduler!");
347
348 runner_tx.send(RunnerMessage::Stop).await?;
350 }
351 ScxMessage::StartSched((scx_sched, sched_mode)) => {
352 log::info!("Got event to start scheduler!");
353
354 let args = config::get_scx_flags_for_mode(&config, &scx_sched, sched_mode);
356
357 runner_tx
359 .send(RunnerMessage::Start((scx_sched, args)))
360 .await?;
361 }
362 ScxMessage::StartSchedArgs((scx_sched, sched_args)) => {
363 log::info!("Got event to start scheduler with args!");
364
365 runner_tx
367 .send(RunnerMessage::Start((scx_sched, sched_args)))
368 .await?;
369 }
370 ScxMessage::SwitchSched((scx_sched, sched_mode)) => {
371 log::info!("Got event to switch scheduler!");
372
373 let args = config::get_scx_flags_for_mode(&config, &scx_sched, sched_mode);
375
376 runner_tx
378 .send(RunnerMessage::Switch((scx_sched, args)))
379 .await?;
380 }
381 ScxMessage::SwitchSchedArgs((scx_sched, sched_args)) => {
382 log::info!("Got event to switch scheduler with args!");
383
384 runner_tx
386 .send(RunnerMessage::Switch((scx_sched, sched_args)))
387 .await?;
388 }
389 }
390 }
391}
392
393async fn handle_child_process(mut rx: tokio::sync::mpsc::Receiver<RunnerMessage>) -> Result<()> {
394 let mut task: Option<tokio::task::JoinHandle<Result<Option<ExitStatus>>>> = None;
395 let mut cancel_token = Arc::new(tokio_util::sync::CancellationToken::new());
396
397 while let Some(message) = rx.recv().await {
398 match message {
399 RunnerMessage::Switch((scx_sched, sched_args)) => {
400 stop_scheduler(&mut task, &mut cancel_token).await;
402
403 match start_scheduler(scx_sched, sched_args, cancel_token.clone()).await {
405 Ok(handle) => {
406 task = Some(handle);
407 log::debug!("Scheduler started");
408 }
409 Err(err) => {
410 log::error!("Failed to start scheduler: {err}");
411 }
412 }
413 }
414 RunnerMessage::Start((scx_sched, sched_args)) => {
415 if task.is_some() {
417 log::error!("Scheduler wasn't finished yet. Stop already running scheduler!");
418 continue;
419 }
420 match start_scheduler(scx_sched, sched_args, cancel_token.clone()).await {
422 Ok(handle) => {
423 task = Some(handle);
424 log::debug!("Scheduler started");
425 }
426 Err(err) => {
427 log::error!("Failed to start scheduler: {err}");
428 }
429 }
430 }
431 RunnerMessage::Stop => {
432 stop_scheduler(&mut task, &mut cancel_token).await;
433 }
434 }
435 }
436
437 Ok(())
438}
439
440async fn start_scheduler(
442 scx_crate: SupportedSched,
443 args: Vec<String>,
444 cancel_token: Arc<tokio_util::sync::CancellationToken>,
445) -> Result<tokio::task::JoinHandle<Result<Option<ExitStatus>>>> {
446 let handle = tokio::spawn(async move {
448 let mut retries = 0u32;
449 let max_retries = 5u32;
450
451 let mut last_status: Option<ExitStatus> = None;
452
453 while retries < max_retries {
454 let child = spawn_scheduler(scx_crate.clone(), args.clone()).await;
455
456 let mut failed = false;
457 if let Ok(mut child) = child {
458 tokio::select! {
459 status = child.wait() => {
460 let status = status.expect("child process encountered an error");
461 last_status = Some(status);
462 if !status.success() {
463 failed = true;
464 }
465 log::debug!("Child process exited with status: {status:?}");
466 }
467
468 _ = cancel_token.cancelled() => {
469 log::debug!("Received cancellation signal");
470 if let Some(child_id) = child.id() {
472 nix::sys::signal::kill(
473 nix::unistd::Pid::from_raw(child_id as i32),
474 nix::sys::signal::SIGINT,
475 ).context("Failed to send termination signal to the child")?;
476 }
477 let status = child.wait().await.expect("child process encountered an error");
478 last_status = Some(status);
479 break;
480 }
481 };
482 } else {
483 log::debug!("Failed to spawn child process");
484 failed = true;
485 }
486
487 if !failed {
489 break;
490 }
491
492 retries += 1;
493 log::error!(
494 "Failed to start scheduler (attempt {}/{})",
495 retries,
496 max_retries,
497 );
498 }
499
500 Ok(last_status)
501 });
502
503 Ok(handle)
504}
505
506async fn spawn_scheduler(scx_crate: SupportedSched, args: Vec<String>) -> Result<Child> {
509 let sched_bin_name: &str = scx_crate.into();
510 log::info!("starting {sched_bin_name} command");
511
512 let mut cmd = Command::new(sched_bin_name);
513 cmd.args(args);
515
516 cmd.stdin(Stdio::null());
520
521 let child = cmd.spawn().expect("failed to spawn command");
523
524 Ok(child)
525}
526
527async fn stop_scheduler(
528 task: &mut Option<tokio::task::JoinHandle<Result<Option<ExitStatus>>>>,
529 cancel_token: &mut Arc<tokio_util::sync::CancellationToken>,
530) {
531 if let Some(task) = task.take() {
532 log::debug!("Stopping already running scheduler..");
533 cancel_token.cancel();
534 let status = task.await;
535 log::debug!("Scheduler was stopped with status: {:?}", status);
536 *cancel_token = Arc::new(tokio_util::sync::CancellationToken::new());
538 }
539}