1mod logger;
9
10use scx_loader::dbus::LoaderClientProxy;
11use scx_loader::*;
12
13use std::process::ExitStatus;
14use std::process::Stdio;
15use std::sync::Arc;
16
17use anyhow::Context;
18use anyhow::Result;
19use clap::Parser;
20use sysinfo::System;
21use tokio::process::Child;
22use tokio::process::Command;
23use tokio::sync::mpsc::UnboundedReceiver;
24use tokio::sync::mpsc::UnboundedSender;
25use tokio::time::Duration;
26use tokio::time::Instant;
27use zbus::interface;
28use zbus::Connection;
29
30#[derive(Debug, PartialEq)]
31enum ScxMessage {
32 Quit,
34 StopSched,
36 StartSched((SupportedSched, SchedMode)),
38 StartSchedArgs((SupportedSched, Vec<String>)),
40 SwitchSched((SupportedSched, SchedMode)),
42 SwitchSchedArgs((SupportedSched, Vec<String>)),
44 RestartSched((SupportedSched, Option<Vec<String>>, SchedMode)),
46}
47
48#[derive(Debug, PartialEq)]
49enum RunnerMessage {
50 Switch((SupportedSched, Vec<String>)),
52 Start((SupportedSched, Vec<String>)),
54 Stop,
56 Restart((SupportedSched, Vec<String>)),
58}
59
60struct ScxLoader {
61 current_scx: Option<SupportedSched>,
62 current_mode: SchedMode,
63 current_args: Option<Vec<String>>,
64 channel: UnboundedSender<ScxMessage>,
65}
66
67#[derive(Parser, Debug)]
68#[clap(author, version, about, long_about = None)]
69struct Args {
70 #[clap(long, short, action)]
71 auto: bool,
72}
73
74#[interface(name = "org.scx.Loader")]
75impl ScxLoader {
76 #[zbus(property)]
78 async fn current_scheduler(&self) -> String {
79 if let Some(current_scx) = &self.current_scx {
80 let current_scx: &str = current_scx.clone().into();
81 log::info!("called {current_scx:?}");
82 return current_scx.to_owned();
83 }
84 "unknown".to_owned()
85 }
86
87 #[zbus(property)]
89 async fn scheduler_mode(&self) -> SchedMode {
90 self.current_mode.clone()
91 }
92
93 #[zbus(property)]
95 async fn current_scheduler_args(&self) -> Vec<String> {
96 self.current_args.clone().unwrap_or_default()
97 }
98
99 #[zbus(property)]
101 async fn supported_schedulers(&self) -> Vec<&str> {
102 vec![
103 "scx_bpfland",
104 "scx_cosmos",
105 "scx_flash",
106 "scx_lavd",
107 "scx_p2dq",
108 "scx_tickless",
109 "scx_rustland",
110 "scx_rusty",
111 ]
112 }
113 async fn start_scheduler(
114 &mut self,
115 scx_name: SupportedSched,
116 sched_mode: SchedMode,
117 ) -> zbus::fdo::Result<()> {
118 log::info!("starting {scx_name:?} with mode {sched_mode:?}..");
119
120 let _ = self.channel.send(ScxMessage::StartSched((
121 scx_name.clone(),
122 sched_mode.clone(),
123 )));
124 self.current_scx = Some(scx_name);
125 self.current_mode = sched_mode;
126 self.current_args = None;
127
128 Ok(())
129 }
130
131 async fn start_scheduler_with_args(
132 &mut self,
133 scx_name: SupportedSched,
134 scx_args: Vec<String>,
135 ) -> zbus::fdo::Result<()> {
136 log::info!("starting {scx_name:?} with args {scx_args:?}..");
137
138 let _ = self.channel.send(ScxMessage::StartSchedArgs((
139 scx_name.clone(),
140 scx_args.clone(),
141 )));
142 self.current_scx = Some(scx_name);
143 self.current_mode = SchedMode::Auto;
145 self.current_args = Some(scx_args);
146
147 Ok(())
148 }
149
150 async fn switch_scheduler(
151 &mut self,
152 scx_name: SupportedSched,
153 sched_mode: SchedMode,
154 ) -> zbus::fdo::Result<()> {
155 log::info!("switching {scx_name:?} with mode {sched_mode:?}..");
156
157 let _ = self.channel.send(ScxMessage::SwitchSched((
158 scx_name.clone(),
159 sched_mode.clone(),
160 )));
161 self.current_scx = Some(scx_name);
162 self.current_mode = sched_mode;
163 self.current_args = None;
164
165 Ok(())
166 }
167
168 async fn switch_scheduler_with_args(
169 &mut self,
170 scx_name: SupportedSched,
171 scx_args: Vec<String>,
172 ) -> zbus::fdo::Result<()> {
173 log::info!("switching {scx_name:?} with args {scx_args:?}..");
174
175 let _ = self.channel.send(ScxMessage::SwitchSchedArgs((
176 scx_name.clone(),
177 scx_args.clone(),
178 )));
179 self.current_scx = Some(scx_name);
180 self.current_mode = SchedMode::Auto;
182 self.current_args = Some(scx_args);
183
184 Ok(())
185 }
186
187 async fn stop_scheduler(&mut self) -> zbus::fdo::Result<()> {
188 if let Some(current_scx) = &self.current_scx {
189 let scx_name: &str = current_scx.clone().into();
190
191 log::info!("stopping {scx_name:?}..");
192 let _ = self.channel.send(ScxMessage::StopSched);
193 self.current_scx = None;
194 self.current_args = None;
195 }
196
197 Ok(())
198 }
199
200 async fn restart_scheduler(&mut self) -> zbus::fdo::Result<()> {
201 if let Some(current_scx) = &self.current_scx {
202 let scx_name: &str = current_scx.clone().into();
203
204 log::info!("restarting {scx_name:?}..");
205 let _ = self.channel.send(ScxMessage::RestartSched((
206 current_scx.clone(),
207 self.current_args.clone(),
208 self.current_mode.clone(),
209 )));
210
211 Ok(())
212 } else {
213 Err(zbus::fdo::Error::Failed(
214 "No scheduler is currently running to restart".to_string(),
215 ))
216 }
217 }
218}
219
220async fn monitor_cpu_util() -> Result<()> {
222 let mut system = System::new_all();
223 let mut running_sched: Option<Child> = None;
224 let mut cpu_above_threshold_since: Option<Instant> = None;
225 let mut cpu_below_threshold_since: Option<Instant> = None;
226
227 let high_utilization_threshold = 90.0;
228 let low_utilization_threshold_duration = Duration::from_secs(30);
229 let high_utilization_trigger_duration = Duration::from_secs(5);
230
231 loop {
232 system.refresh_cpu_all();
233
234 let any_cpu_above_threshold = system
235 .cpus()
236 .iter()
237 .any(|cpu| cpu.cpu_usage() > high_utilization_threshold);
238
239 if any_cpu_above_threshold {
240 if cpu_above_threshold_since.is_none() {
241 cpu_above_threshold_since = Some(Instant::now());
242 }
243
244 if cpu_above_threshold_since.unwrap().elapsed() > high_utilization_trigger_duration {
245 if running_sched.is_none() {
246 log::info!("CPU Utilization exceeded 90% for 5 seconds, starting scx_lavd");
247
248 let scx_name: &str = SupportedSched::Lavd.into();
249 running_sched = Some(
250 Command::new(scx_name)
251 .spawn()
252 .expect("Failed to start scx_lavd"),
253 );
254 }
255
256 cpu_below_threshold_since = None;
257 }
258 } else {
259 cpu_above_threshold_since = None;
260
261 if cpu_below_threshold_since.is_none() {
262 cpu_below_threshold_since = Some(Instant::now());
263 }
264
265 if cpu_below_threshold_since.unwrap().elapsed() > low_utilization_threshold_duration {
266 if let Some(mut running_sched_loc) = running_sched.take() {
267 log::info!(
268 "CPU utilization dropped below 90% for more than 30 seconds, exiting latency-aware scheduler"
269 );
270 running_sched_loc
271 .kill()
272 .await
273 .expect("Failed to kill scx_lavd");
274 let lavd_exit_status = running_sched_loc
275 .wait()
276 .await
277 .expect("Failed to wait on scx_lavd");
278 log::info!("scx_lavd exited with status: {}", lavd_exit_status);
279 }
280 }
281 }
282
283 tokio::time::sleep(Duration::from_secs(1)).await;
284 }
285}
286
287#[tokio::main]
288async fn main() -> Result<()> {
289 logger::init_logger().expect("Failed to initialize logger");
291
292 let args = Args::parse();
293
294 let config = config::init_config().context("Failed to initialize config")?;
296
297 if args.auto {
301 log::info!("Starting scx_loader monitor as standard process without dbus interface");
302 monitor_cpu_util().await?;
303 return Ok(());
304 }
305
306 log::info!("Starting as dbus interface");
307 let (channel, rx) = tokio::sync::mpsc::unbounded_channel::<ScxMessage>();
309
310 let channel_clone = channel.clone();
311 ctrlc::set_handler(move || {
312 log::info!("shutting down..");
313 let _ = channel_clone.send(ScxMessage::Quit);
314 })
315 .context("Error setting Ctrl-C handler")?;
316
317 let connection = Connection::system().await?;
319 connection
320 .object_server()
321 .at(
322 "/org/scx/Loader",
323 ScxLoader {
324 current_scx: None,
325 current_mode: SchedMode::Auto,
326 current_args: None,
327 channel: channel.clone(),
328 },
329 )
330 .await?;
331
332 connection.request_name("org.scx.Loader").await?;
333
334 if let Some(default_sched) = &config.default_sched {
336 log::info!("Starting default scheduler: {default_sched:?}");
337
338 let default_mode = config.default_mode.clone().unwrap_or(SchedMode::Auto);
339
340 let loader_client = LoaderClientProxy::new(&connection).await?;
341 loader_client
342 .switch_scheduler(default_sched.clone(), default_mode)
343 .await?;
344 }
345
346 worker_loop(config, rx).await?;
348
349 Ok(())
350}
351
352async fn worker_loop(
353 config: config::Config,
354 mut receiver: UnboundedReceiver<ScxMessage>,
355) -> Result<()> {
356 let (runner_tx, runner_rx) = tokio::sync::mpsc::channel::<RunnerMessage>(1);
358
359 let run_sched_future = tokio::spawn(async move { handle_child_process(runner_rx).await });
360
361 tokio::pin!(run_sched_future);
363
364 loop {
365 let msg = tokio::select! {
367 msg = receiver.recv() => {
368 match msg {
369 None => return Ok(()),
370 Some(m) => m,
371 }
372 }
373 res = &mut run_sched_future => {
374 log::info!("Sched future finished");
375 let _ = res?;
376 continue;
377 }
378 };
379 log::debug!("Got msg : {msg:?}");
380
381 match msg {
382 ScxMessage::Quit => return Ok(()),
383 ScxMessage::StopSched => {
384 log::info!("Got event to stop scheduler!");
385
386 runner_tx.send(RunnerMessage::Stop).await?;
388 }
389 ScxMessage::StartSched((scx_sched, sched_mode)) => {
390 log::info!("Got event to start scheduler!");
391
392 let args = config::get_scx_flags_for_mode(&config, &scx_sched, sched_mode);
394
395 runner_tx
397 .send(RunnerMessage::Start((scx_sched, args)))
398 .await?;
399 }
400 ScxMessage::StartSchedArgs((scx_sched, sched_args)) => {
401 log::info!("Got event to start scheduler with args!");
402
403 runner_tx
405 .send(RunnerMessage::Start((scx_sched, sched_args)))
406 .await?;
407 }
408 ScxMessage::SwitchSched((scx_sched, sched_mode)) => {
409 log::info!("Got event to switch scheduler!");
410
411 let args = config::get_scx_flags_for_mode(&config, &scx_sched, sched_mode);
413
414 runner_tx
416 .send(RunnerMessage::Switch((scx_sched, args)))
417 .await?;
418 }
419 ScxMessage::SwitchSchedArgs((scx_sched, sched_args)) => {
420 log::info!("Got event to switch scheduler with args!");
421
422 runner_tx
424 .send(RunnerMessage::Switch((scx_sched, sched_args)))
425 .await?;
426 }
427 ScxMessage::RestartSched((scx_sched, current_args, current_mode)) => {
428 log::info!("Got event to restart scheduler!");
429
430 let args = if let Some(args) = current_args {
432 args
434 } else {
435 config::get_scx_flags_for_mode(&config, &scx_sched, current_mode)
437 };
438
439 runner_tx
441 .send(RunnerMessage::Restart((scx_sched, args)))
442 .await?;
443 }
444 }
445 }
446}
447
448async fn handle_child_process(mut rx: tokio::sync::mpsc::Receiver<RunnerMessage>) -> Result<()> {
449 let mut task: Option<tokio::task::JoinHandle<Result<Option<ExitStatus>>>> = None;
450 let mut cancel_token = Arc::new(tokio_util::sync::CancellationToken::new());
451
452 while let Some(message) = rx.recv().await {
453 match message {
454 RunnerMessage::Switch((scx_sched, sched_args)) => {
455 stop_scheduler(&mut task, &mut cancel_token).await;
457
458 match start_scheduler(scx_sched, sched_args, cancel_token.clone()).await {
460 Ok(handle) => {
461 task = Some(handle);
462 log::debug!("Scheduler started");
463 }
464 Err(err) => {
465 log::error!("Failed to start scheduler: {err}");
466 }
467 }
468 }
469 RunnerMessage::Start((scx_sched, sched_args)) => {
470 if task.is_some() {
472 log::error!("Scheduler wasn't finished yet. Stop already running scheduler!");
473 continue;
474 }
475 match start_scheduler(scx_sched, sched_args, cancel_token.clone()).await {
477 Ok(handle) => {
478 task = Some(handle);
479 log::debug!("Scheduler started");
480 }
481 Err(err) => {
482 log::error!("Failed to start scheduler: {err}");
483 }
484 }
485 }
486 RunnerMessage::Stop => {
487 stop_scheduler(&mut task, &mut cancel_token).await;
488 }
489 RunnerMessage::Restart((scx_sched, sched_args)) => {
490 log::info!("Got event to restart scheduler!");
491
492 stop_scheduler(&mut task, &mut cancel_token).await;
494
495 match start_scheduler(scx_sched, sched_args, cancel_token.clone()).await {
497 Ok(handle) => {
498 task = Some(handle);
499 log::debug!("Scheduler restarted");
500 }
501 Err(err) => {
502 log::error!("Failed to restart scheduler: {err}");
503 }
504 }
505 }
506 }
507 }
508
509 Ok(())
510}
511
512async fn start_scheduler(
514 scx_crate: SupportedSched,
515 args: Vec<String>,
516 cancel_token: Arc<tokio_util::sync::CancellationToken>,
517) -> Result<tokio::task::JoinHandle<Result<Option<ExitStatus>>>> {
518 let handle = tokio::spawn(async move {
520 let mut retries = 0u32;
521 let max_retries = 5u32;
522
523 let mut last_status: Option<ExitStatus> = None;
524
525 while retries < max_retries {
526 let child = spawn_scheduler(scx_crate.clone(), args.clone()).await;
527
528 let mut failed = false;
529 if let Ok(mut child) = child {
530 tokio::select! {
531 status = child.wait() => {
532 let status = status.expect("child process encountered an error");
533 last_status = Some(status);
534 if !status.success() {
535 failed = true;
536 }
537 log::debug!("Child process exited with status: {status:?}");
538 }
539
540 _ = cancel_token.cancelled() => {
541 log::debug!("Received cancellation signal");
542 if let Some(child_id) = child.id() {
544 nix::sys::signal::kill(
545 nix::unistd::Pid::from_raw(child_id as i32),
546 nix::sys::signal::SIGINT,
547 ).context("Failed to send termination signal to the child")?;
548 }
549 let status = child.wait().await.expect("child process encountered an error");
550 last_status = Some(status);
551 break;
552 }
553 };
554 } else {
555 log::debug!("Failed to spawn child process");
556 failed = true;
557 }
558
559 if !failed {
561 break;
562 }
563
564 retries += 1;
565 log::error!(
566 "Failed to start scheduler (attempt {}/{})",
567 retries,
568 max_retries,
569 );
570 }
571
572 Ok(last_status)
573 });
574
575 Ok(handle)
576}
577
578async fn spawn_scheduler(scx_crate: SupportedSched, args: Vec<String>) -> Result<Child> {
581 let sched_bin_name: &str = scx_crate.into();
582 log::info!("starting {sched_bin_name} command");
583
584 let mut cmd = Command::new(sched_bin_name);
585 cmd.args(args);
587
588 cmd.stdin(Stdio::null());
592
593 let child = cmd.spawn().expect("failed to spawn command");
595
596 Ok(child)
597}
598
599async fn stop_scheduler(
600 task: &mut Option<tokio::task::JoinHandle<Result<Option<ExitStatus>>>>,
601 cancel_token: &mut Arc<tokio_util::sync::CancellationToken>,
602) {
603 if let Some(task) = task.take() {
604 log::debug!("Stopping already running scheduler..");
605 cancel_token.cancel();
606 let status = task.await;
607 log::debug!("Scheduler was stopped with status: {:?}", status);
608 *cancel_token = Arc::new(tokio_util::sync::CancellationToken::new());
610 }
611}