1mod bpf_skel;
9pub use bpf_skel::*;
10pub mod bpf_intf;
11pub use bpf_intf::*;
12
13mod stats;
14use std::collections::HashSet;
15use std::ffi::c_int;
16use std::fs::File;
17use std::io::{BufRead, BufReader};
18use std::mem::MaybeUninit;
19use std::sync::atomic::AtomicBool;
20use std::sync::atomic::Ordering;
21use std::sync::Arc;
22use std::time::{Duration, Instant};
23
24use anyhow::bail;
25use anyhow::Context;
26use anyhow::Result;
27use clap::Parser;
28use crossbeam::channel::RecvTimeoutError;
29use libbpf_rs::OpenObject;
30use libbpf_rs::ProgramInput;
31use log::{debug, info, warn};
32use scx_stats::prelude::*;
33use scx_utils::build_id;
34use scx_utils::compat;
35use scx_utils::libbpf_clap_opts::LibbpfOpts;
36use scx_utils::scx_ops_attach;
37use scx_utils::scx_ops_load;
38use scx_utils::scx_ops_open;
39use scx_utils::try_set_rlimit_infinity;
40use scx_utils::uei_exited;
41use scx_utils::uei_report;
42use scx_utils::CoreType;
43use scx_utils::Topology;
44use scx_utils::UserExitInfo;
45use scx_utils::NR_CPU_IDS;
46use stats::Metrics;
47
48const SCHEDULER_NAME: &str = "scx_cosmos";
49
50#[derive(Debug, clap::Parser)]
51#[command(
52 name = "scx_cosmos",
53 version,
54 disable_version_flag = true,
55 about = "Lightweight scheduler optimized for preserving task-to-CPU locality."
56)]
57struct Opts {
58 #[clap(long, default_value = "0")]
60 exit_dump_len: u32,
61
62 #[clap(short = 's', long, default_value = "10")]
64 slice_us: u64,
65
66 #[clap(short = 'l', long, default_value = "20000")]
68 slice_lag_us: u64,
69
70 #[clap(short = 'c', long, default_value = "75")]
88 cpu_busy_thresh: u64,
89
90 #[clap(short = 'p', long, default_value = "250")]
101 polling_ms: u64,
102
103 #[clap(short = 'm', long)]
115 primary_domain: Option<String>,
116
117 #[clap(short = 'n', long, action = clap::ArgAction::SetTrue)]
119 enable_numa: bool,
120
121 #[clap(short = 'f', long, action = clap::ArgAction::SetTrue)]
123 disable_cpufreq: bool,
124
125 #[arg(short = 'i', long, action = clap::ArgAction::SetTrue)]
130 flat_idle_scan: bool,
131
132 #[clap(short = 'P', long, action = clap::ArgAction::SetTrue)]
137 preferred_idle_scan: bool,
138
139 #[clap(long, action = clap::ArgAction::SetTrue)]
144 disable_smt: bool,
145
146 #[clap(short = 'S', long, action = clap::ArgAction::SetTrue)]
152 avoid_smt: bool,
153
154 #[clap(short = 'w', long, action = clap::ArgAction::SetTrue)]
161 no_wake_sync: bool,
162
163 #[clap(short = 'd', long, action = clap::ArgAction::SetTrue)]
168 no_deferred_wakeup: bool,
169
170 #[clap(short = 'a', long, action = clap::ArgAction::SetTrue)]
177 mm_affinity: bool,
178
179 #[clap(long)]
181 stats: Option<f64>,
182
183 #[clap(long)]
186 monitor: Option<f64>,
187
188 #[clap(short = 'v', long, action = clap::ArgAction::SetTrue)]
190 verbose: bool,
191
192 #[clap(short = 'V', long, action = clap::ArgAction::SetTrue)]
194 version: bool,
195
196 #[clap(long)]
198 help_stats: bool,
199
200 #[clap(flatten, next_help_heading = "Libbpf Options")]
201 pub libbpf: LibbpfOpts,
202}
203
204#[derive(PartialEq)]
205enum Powermode {
206 Turbo,
207 Performance,
208 Powersave,
209 Any,
210}
211
212fn get_primary_cpus(mode: Powermode) -> std::io::Result<Vec<usize>> {
217 let cpus: Vec<usize> = Topology::new()
218 .unwrap()
219 .all_cores
220 .values()
221 .flat_map(|core| &core.cpus)
222 .filter_map(|(cpu_id, cpu)| match (&mode, &cpu.core_type) {
223 (Powermode::Turbo, CoreType::Big { turbo: true }) |
225 (Powermode::Performance, CoreType::Big { .. }) |
227 (Powermode::Powersave, CoreType::Little) => Some(*cpu_id),
229 (Powermode::Any, ..) => Some(*cpu_id),
230 _ => None,
231 })
232 .collect();
233
234 Ok(cpus)
235}
236
237pub fn parse_cpu_list(optarg: &str) -> Result<Vec<usize>, String> {
238 let mut cpus = Vec::new();
239 let mut seen = HashSet::new();
240
241 if let Some(mode) = match optarg {
243 "powersave" => Some(Powermode::Powersave),
244 "performance" => Some(Powermode::Performance),
245 "turbo" => Some(Powermode::Turbo),
246 "all" => Some(Powermode::Any),
247 _ => None,
248 } {
249 return get_primary_cpus(mode).map_err(|e| e.to_string());
250 }
251
252 if optarg
254 .chars()
255 .any(|c| !c.is_ascii_digit() && c != '-' && c != ',' && !c.is_whitespace())
256 {
257 return Err("Invalid character in CPU list".to_string());
258 }
259
260 let cleaned = optarg.replace(' ', "\t");
262
263 for token in cleaned.split(',') {
264 let token = token.trim_matches(|c: char| c.is_whitespace());
265
266 if token.is_empty() {
267 continue;
268 }
269
270 if let Some((start_str, end_str)) = token.split_once('-') {
271 let start = start_str
272 .trim()
273 .parse::<usize>()
274 .map_err(|_| "Invalid range start")?;
275 let end = end_str
276 .trim()
277 .parse::<usize>()
278 .map_err(|_| "Invalid range end")?;
279
280 if start > end {
281 return Err(format!("Invalid CPU range: {}-{}", start, end));
282 }
283
284 for i in start..=end {
285 if cpus.len() >= *NR_CPU_IDS {
286 return Err(format!("Too many CPUs specified (max {})", *NR_CPU_IDS));
287 }
288 if seen.insert(i) {
289 cpus.push(i);
290 }
291 }
292 } else {
293 let cpu = token
294 .parse::<usize>()
295 .map_err(|_| format!("Invalid CPU: {}", token))?;
296 if cpus.len() >= *NR_CPU_IDS {
297 return Err(format!("Too many CPUs specified (max {})", *NR_CPU_IDS));
298 }
299 if seen.insert(cpu) {
300 cpus.push(cpu);
301 }
302 }
303 }
304
305 Ok(cpus)
306}
307
308#[derive(Debug, Clone, Copy)]
309struct CpuTimes {
310 user: u64,
311 nice: u64,
312 total: u64,
313}
314
315struct Scheduler<'a> {
316 skel: BpfSkel<'a>,
317 opts: &'a Opts,
318 struct_ops: Option<libbpf_rs::Link>,
319 stats_server: StatsServer<(), Metrics>,
320}
321
322impl<'a> Scheduler<'a> {
323 fn init(opts: &'a Opts, open_object: &'a mut MaybeUninit<OpenObject>) -> Result<Self> {
324 try_set_rlimit_infinity();
325
326 let topo = Topology::new().unwrap();
328
329 let smt_enabled = !opts.disable_smt && topo.smt_enabled;
331
332 info!(
333 "{} {} {}",
334 SCHEDULER_NAME,
335 build_id::full_version(env!("CARGO_PKG_VERSION")),
336 if smt_enabled { "SMT on" } else { "SMT off" }
337 );
338
339 info!(
341 "scheduler options: {}",
342 std::env::args().collect::<Vec<_>>().join(" ")
343 );
344
345 let mut skel_builder = BpfSkelBuilder::default();
347 skel_builder.obj_builder.debug(opts.verbose);
348 let open_opts = opts.libbpf.clone().into_bpf_open_opts();
349 let mut skel = scx_ops_open!(skel_builder, open_object, cosmos_ops, open_opts)?;
350
351 skel.struct_ops.cosmos_ops_mut().exit_dump_len = opts.exit_dump_len;
352
353 let rodata = skel.maps.rodata_data.as_mut().unwrap();
355 rodata.slice_ns = opts.slice_us * 1000;
356 rodata.slice_lag = opts.slice_lag_us * 1000;
357 rodata.cpufreq_enabled = !opts.disable_cpufreq;
358 rodata.deferred_wakeups = !opts.no_deferred_wakeup;
359 rodata.flat_idle_scan = opts.flat_idle_scan;
360 rodata.smt_enabled = smt_enabled;
361 rodata.numa_enabled = opts.enable_numa;
362 rodata.no_wake_sync = opts.no_wake_sync;
363 rodata.avoid_smt = opts.avoid_smt;
364 rodata.mm_affinity = opts.mm_affinity;
365
366 rodata.busy_threshold = opts.cpu_busy_thresh * 1024 / 100;
368
369 if opts.preferred_idle_scan {
371 let mut cpus: Vec<_> = topo.all_cpus.values().collect();
372 cpus.sort_by_key(|cpu| std::cmp::Reverse(cpu.cpu_capacity));
373 for (i, cpu) in cpus.iter().enumerate() {
374 rodata.preferred_cpus[i] = cpu.id as u64;
375 }
376 info!(
377 "Preferred CPUs: {:?}",
378 &rodata.preferred_cpus[0..cpus.len()]
379 );
380 }
381 rodata.preferred_idle_scan = opts.preferred_idle_scan;
382
383 let primary_cpus = if let Some(ref domain) = opts.primary_domain {
385 match parse_cpu_list(domain) {
386 Ok(cpus) => cpus,
387 Err(e) => bail!("Error parsing primary domain: {}", e),
388 }
389 } else {
390 (0..*NR_CPU_IDS).collect()
391 };
392 if primary_cpus.len() < *NR_CPU_IDS {
393 info!("Primary CPUs: {:?}", primary_cpus);
394 rodata.primary_all = false;
395 } else {
396 rodata.primary_all = true;
397 }
398
399 skel.struct_ops.cosmos_ops_mut().flags = *compat::SCX_OPS_ENQ_EXITING
401 | *compat::SCX_OPS_ENQ_LAST
402 | *compat::SCX_OPS_ENQ_MIGRATION_DISABLED
403 | *compat::SCX_OPS_ALLOW_QUEUED_WAKEUP
404 | if opts.enable_numa {
405 *compat::SCX_OPS_BUILTIN_IDLE_PER_NODE
406 } else {
407 0
408 };
409 info!(
410 "scheduler flags: {:#x}",
411 skel.struct_ops.cosmos_ops_mut().flags
412 );
413
414 let mut skel = scx_ops_load!(skel, cosmos_ops, uei)?;
416
417 if primary_cpus.len() < *NR_CPU_IDS {
419 for cpu in primary_cpus {
420 if let Err(err) = Self::enable_primary_cpu(&mut skel, cpu as i32) {
421 bail!("failed to add CPU {} to primary domain: error {}", cpu, err);
422 }
423 }
424 }
425
426 let struct_ops = Some(scx_ops_attach!(skel, cosmos_ops)?);
428 let stats_server = StatsServer::new(stats::server_data()).launch()?;
429
430 Ok(Self {
431 skel,
432 opts,
433 struct_ops,
434 stats_server,
435 })
436 }
437
438 fn enable_primary_cpu(skel: &mut BpfSkel<'_>, cpu: i32) -> Result<(), u32> {
439 let prog = &mut skel.progs.enable_primary_cpu;
440 let mut args = cpu_arg {
441 cpu_id: cpu as c_int,
442 };
443 let input = ProgramInput {
444 context_in: Some(unsafe {
445 std::slice::from_raw_parts_mut(
446 &mut args as *mut _ as *mut u8,
447 std::mem::size_of_val(&args),
448 )
449 }),
450 ..Default::default()
451 };
452 let out = prog.test_run(input).unwrap();
453 if out.return_value != 0 {
454 return Err(out.return_value);
455 }
456
457 Ok(())
458 }
459
460 fn get_metrics(&self) -> Metrics {
461 Metrics {
462 cpu_thresh: self.skel.maps.rodata_data.as_ref().unwrap().busy_threshold,
463 cpu_util: self.skel.maps.bss_data.as_ref().unwrap().cpu_util,
464 }
465 }
466
467 pub fn exited(&mut self) -> bool {
468 uei_exited!(&self.skel, uei)
469 }
470
471 fn compute_user_cpu_pct(prev: &CpuTimes, curr: &CpuTimes) -> Option<u64> {
472 let user_diff = (curr.user + curr.nice).saturating_sub(prev.user + prev.nice);
474 let total_diff = curr.total.saturating_sub(prev.total);
475
476 if total_diff > 0 {
477 let user_ratio = user_diff as f64 / total_diff as f64;
478 Some((user_ratio * 1024.0).round() as u64)
479 } else {
480 None
481 }
482 }
483
484 fn read_cpu_times() -> Option<CpuTimes> {
485 let file = File::open("/proc/stat").ok()?;
486 let reader = BufReader::new(file);
487
488 for line in reader.lines() {
489 let line = line.ok()?;
490 if line.starts_with("cpu ") {
491 let fields: Vec<&str> = line.split_whitespace().collect();
492 if fields.len() < 5 {
493 return None;
494 }
495
496 let user: u64 = fields[1].parse().ok()?;
497 let nice: u64 = fields[2].parse().ok()?;
498
499 let total: u64 = fields
501 .iter()
502 .skip(1)
503 .take(8)
504 .filter_map(|v| v.parse::<u64>().ok())
505 .sum();
506
507 return Some(CpuTimes { user, nice, total });
508 }
509 }
510
511 None
512 }
513
514 fn run(&mut self, shutdown: Arc<AtomicBool>) -> Result<UserExitInfo> {
515 let (res_ch, req_ch) = self.stats_server.channels();
516
517 let polling_time = Duration::from_millis(self.opts.polling_ms).min(Duration::from_secs(1));
523 let mut prev_cputime = Self::read_cpu_times().expect("Failed to read initial CPU stats");
524 let mut last_update = Instant::now();
525
526 while !shutdown.load(Ordering::Relaxed) && !self.exited() {
527 if !polling_time.is_zero() && last_update.elapsed() >= polling_time {
529 if let Some(curr_cputime) = Self::read_cpu_times() {
530 Self::compute_user_cpu_pct(&prev_cputime, &curr_cputime)
531 .map(|util| self.skel.maps.bss_data.as_mut().unwrap().cpu_util = util);
532 prev_cputime = curr_cputime;
533 }
534 last_update = Instant::now();
535 }
536
537 let timeout = if polling_time.is_zero() {
539 Duration::from_secs(1)
540 } else {
541 polling_time
542 };
543 match req_ch.recv_timeout(timeout) {
544 Ok(()) => res_ch.send(self.get_metrics())?,
545 Err(RecvTimeoutError::Timeout) => {}
546 Err(e) => Err(e)?,
547 }
548 }
549
550 let _ = self.struct_ops.take();
551 uei_report!(&self.skel, uei)
552 }
553}
554
555impl Drop for Scheduler<'_> {
556 fn drop(&mut self) {
557 info!("Unregister {SCHEDULER_NAME} scheduler");
558 }
559}
560
561fn main() -> Result<()> {
562 let opts = Opts::parse();
563
564 if opts.version {
565 println!(
566 "{} {}",
567 SCHEDULER_NAME,
568 build_id::full_version(env!("CARGO_PKG_VERSION"))
569 );
570 return Ok(());
571 }
572
573 if opts.help_stats {
574 stats::server_data().describe_meta(&mut std::io::stdout(), None)?;
575 return Ok(());
576 }
577
578 let loglevel = simplelog::LevelFilter::Info;
579
580 let mut lcfg = simplelog::ConfigBuilder::new();
581 lcfg.set_time_offset_to_local()
582 .expect("Failed to set local time offset")
583 .set_time_level(simplelog::LevelFilter::Error)
584 .set_location_level(simplelog::LevelFilter::Off)
585 .set_target_level(simplelog::LevelFilter::Off)
586 .set_thread_level(simplelog::LevelFilter::Off);
587 simplelog::TermLogger::init(
588 loglevel,
589 lcfg.build(),
590 simplelog::TerminalMode::Stderr,
591 simplelog::ColorChoice::Auto,
592 )?;
593
594 let shutdown = Arc::new(AtomicBool::new(false));
595 let shutdown_clone = shutdown.clone();
596 ctrlc::set_handler(move || {
597 shutdown_clone.store(true, Ordering::Relaxed);
598 })
599 .context("Error setting Ctrl-C handler")?;
600
601 if let Some(intv) = opts.monitor.or(opts.stats) {
602 let shutdown_copy = shutdown.clone();
603 let jh = std::thread::spawn(move || {
604 match stats::monitor(Duration::from_secs_f64(intv), shutdown_copy) {
605 Ok(_) => {
606 debug!("stats monitor thread finished successfully")
607 }
608 Err(error_object) => {
609 warn!(
610 "stats monitor thread finished because of an error {}",
611 error_object
612 )
613 }
614 }
615 });
616 if opts.monitor.is_some() {
617 let _ = jh.join();
618 return Ok(());
619 }
620 }
621
622 let mut open_object = MaybeUninit::uninit();
623 loop {
624 let mut sched = Scheduler::init(&opts, &mut open_object)?;
625 if !sched.run(shutdown.clone())?.should_restart() {
626 break;
627 }
628 }
629
630 Ok(())
631}