1mod bpf_skel;
9pub use bpf_skel::*;
10pub mod bpf_intf;
11pub use bpf_intf::*;
12
13mod stats;
14use std::collections::HashSet;
15use std::ffi::c_int;
16use std::fs::File;
17use std::io::{BufRead, BufReader};
18use std::mem::MaybeUninit;
19use std::sync::atomic::AtomicBool;
20use std::sync::atomic::Ordering;
21use std::sync::Arc;
22use std::time::{Duration, Instant};
23
24use anyhow::bail;
25use anyhow::Context;
26use anyhow::Result;
27use clap::Parser;
28use crossbeam::channel::RecvTimeoutError;
29use libbpf_rs::OpenObject;
30use libbpf_rs::ProgramInput;
31use log::{debug, info, warn};
32use scx_stats::prelude::*;
33use scx_utils::build_id;
34use scx_utils::compat;
35use scx_utils::scx_ops_attach;
36use scx_utils::scx_ops_load;
37use scx_utils::scx_ops_open;
38use scx_utils::set_rlimit_infinity;
39use scx_utils::uei_exited;
40use scx_utils::uei_report;
41use scx_utils::CoreType;
42use scx_utils::Topology;
43use scx_utils::UserExitInfo;
44use scx_utils::NR_CPU_IDS;
45use stats::Metrics;
46
47const SCHEDULER_NAME: &str = "scx_cosmos";
48
49#[derive(Debug, clap::Parser)]
50#[command(
51 name = "scx_cosmos",
52 version,
53 disable_version_flag = true,
54 about = "Lightweight scheduler optimized for preserving task-to-CPU locality."
55)]
56struct Opts {
57 #[clap(long, default_value = "0")]
59 exit_dump_len: u32,
60
61 #[clap(short = 's', long, default_value = "10")]
63 slice_us: u64,
64
65 #[clap(short = 'l', long, default_value = "20000")]
67 slice_lag_us: u64,
68
69 #[clap(short = 'c', long, default_value = "75")]
71 cpu_busy_thresh: u64,
72
73 #[clap(short = 'p', long, default_value = "250")]
77 polling_ms: u64,
78
79 #[clap(short = 'm', long)]
91 primary_domain: Option<String>,
92
93 #[clap(short = 'n', long, action = clap::ArgAction::SetTrue)]
95 enable_numa: bool,
96
97 #[clap(short = 'f', long, action = clap::ArgAction::SetTrue)]
99 disable_cpufreq: bool,
100
101 #[arg(short = 'i', long, action = clap::ArgAction::SetTrue)]
106 flat_idle_scan: bool,
107
108 #[clap(short = 'P', long, action = clap::ArgAction::SetTrue)]
113 preferred_idle_scan: bool,
114
115 #[clap(long, action = clap::ArgAction::SetTrue)]
120 disable_smt: bool,
121
122 #[clap(short = 'w', long, action = clap::ArgAction::SetTrue)]
129 no_wake_sync: bool,
130
131 #[clap(short = 'd', long, action = clap::ArgAction::SetTrue)]
136 no_deferred_wakeup: bool,
137
138 #[clap(short = 'a', long, action = clap::ArgAction::SetTrue)]
145 mm_affinity: bool,
146
147 #[clap(long)]
149 stats: Option<f64>,
150
151 #[clap(long)]
154 monitor: Option<f64>,
155
156 #[clap(short = 'v', long, action = clap::ArgAction::SetTrue)]
158 verbose: bool,
159
160 #[clap(short = 'V', long, action = clap::ArgAction::SetTrue)]
162 version: bool,
163
164 #[clap(long)]
166 help_stats: bool,
167}
168
169#[derive(PartialEq)]
170enum Powermode {
171 Turbo,
172 Performance,
173 Powersave,
174 Any,
175}
176
177fn get_primary_cpus(mode: Powermode) -> std::io::Result<Vec<usize>> {
182 let cpus: Vec<usize> = Topology::new()
183 .unwrap()
184 .all_cores
185 .values()
186 .flat_map(|core| &core.cpus)
187 .filter_map(|(cpu_id, cpu)| match (&mode, &cpu.core_type) {
188 (Powermode::Turbo, CoreType::Big { turbo: true }) |
190 (Powermode::Performance, CoreType::Big { .. }) |
192 (Powermode::Powersave, CoreType::Little) => Some(*cpu_id),
194 (Powermode::Any, ..) => Some(*cpu_id),
195 _ => None,
196 })
197 .collect();
198
199 Ok(cpus)
200}
201
202pub fn parse_cpu_list(optarg: &str) -> Result<Vec<usize>, String> {
203 let mut cpus = Vec::new();
204 let mut seen = HashSet::new();
205
206 if let Some(mode) = match optarg {
208 "powersave" => Some(Powermode::Powersave),
209 "performance" => Some(Powermode::Performance),
210 "turbo" => Some(Powermode::Turbo),
211 "all" => Some(Powermode::Any),
212 _ => None,
213 } {
214 return get_primary_cpus(mode).map_err(|e| e.to_string());
215 }
216
217 if optarg
219 .chars()
220 .any(|c| !c.is_ascii_digit() && c != '-' && c != ',' && !c.is_whitespace())
221 {
222 return Err("Invalid character in CPU list".to_string());
223 }
224
225 let cleaned = optarg.replace(' ', "\t");
227
228 for token in cleaned.split(',') {
229 let token = token.trim_matches(|c: char| c.is_whitespace());
230
231 if token.is_empty() {
232 continue;
233 }
234
235 if let Some((start_str, end_str)) = token.split_once('-') {
236 let start = start_str
237 .trim()
238 .parse::<usize>()
239 .map_err(|_| "Invalid range start")?;
240 let end = end_str
241 .trim()
242 .parse::<usize>()
243 .map_err(|_| "Invalid range end")?;
244
245 if start > end {
246 return Err(format!("Invalid CPU range: {}-{}", start, end));
247 }
248
249 for i in start..=end {
250 if cpus.len() >= *NR_CPU_IDS {
251 return Err(format!("Too many CPUs specified (max {})", *NR_CPU_IDS));
252 }
253 if seen.insert(i) {
254 cpus.push(i);
255 }
256 }
257 } else {
258 let cpu = token
259 .parse::<usize>()
260 .map_err(|_| format!("Invalid CPU: {}", token))?;
261 if cpus.len() >= *NR_CPU_IDS {
262 return Err(format!("Too many CPUs specified (max {})", *NR_CPU_IDS));
263 }
264 if seen.insert(cpu) {
265 cpus.push(cpu);
266 }
267 }
268 }
269
270 Ok(cpus)
271}
272
273#[derive(Debug, Clone, Copy)]
274struct CpuTimes {
275 user: u64,
276 nice: u64,
277 total: u64,
278}
279
280struct Scheduler<'a> {
281 skel: BpfSkel<'a>,
282 opts: &'a Opts,
283 struct_ops: Option<libbpf_rs::Link>,
284 stats_server: StatsServer<(), Metrics>,
285}
286
287impl<'a> Scheduler<'a> {
288 fn init(opts: &'a Opts, open_object: &'a mut MaybeUninit<OpenObject>) -> Result<Self> {
289 set_rlimit_infinity();
290
291 let topo = Topology::new().unwrap();
293
294 let smt_enabled = !opts.disable_smt && topo.smt_enabled;
296
297 info!(
298 "{} {} {}",
299 SCHEDULER_NAME,
300 build_id::full_version(env!("CARGO_PKG_VERSION")),
301 if smt_enabled { "SMT on" } else { "SMT off" }
302 );
303
304 info!(
306 "scheduler options: {}",
307 std::env::args().collect::<Vec<_>>().join(" ")
308 );
309
310 let mut skel_builder = BpfSkelBuilder::default();
312 skel_builder.obj_builder.debug(opts.verbose);
313 let mut skel = scx_ops_open!(skel_builder, open_object, cosmos_ops)?;
314
315 skel.struct_ops.cosmos_ops_mut().exit_dump_len = opts.exit_dump_len;
316
317 let rodata = skel.maps.rodata_data.as_mut().unwrap();
319 rodata.slice_ns = opts.slice_us * 1000;
320 rodata.slice_lag = opts.slice_lag_us * 1000;
321 rodata.cpufreq_enabled = !opts.disable_cpufreq;
322 rodata.deferred_wakeups = !opts.no_deferred_wakeup;
323 rodata.flat_idle_scan = opts.flat_idle_scan;
324 rodata.smt_enabled = smt_enabled;
325 rodata.numa_enabled = opts.enable_numa;
326 rodata.no_wake_sync = opts.no_wake_sync;
327 rodata.mm_affinity = opts.mm_affinity;
328
329 rodata.busy_threshold = opts.cpu_busy_thresh * 1024 / 100;
331
332 if opts.preferred_idle_scan {
334 let mut cpus: Vec<_> = topo.all_cpus.values().collect();
335 cpus.sort_by_key(|cpu| std::cmp::Reverse(cpu.cpu_capacity));
336 for (i, cpu) in cpus.iter().enumerate() {
337 rodata.preferred_cpus[i] = cpu.id as u64;
338 }
339 info!(
340 "Preferred CPUs: {:?}",
341 &rodata.preferred_cpus[0..cpus.len()]
342 );
343 }
344 rodata.preferred_idle_scan = opts.preferred_idle_scan;
345
346 let primary_cpus = if let Some(ref domain) = opts.primary_domain {
348 match parse_cpu_list(domain) {
349 Ok(cpus) => cpus,
350 Err(e) => bail!("Error parsing primary domain: {}", e),
351 }
352 } else {
353 (0..*NR_CPU_IDS).collect()
354 };
355 if primary_cpus.len() < *NR_CPU_IDS {
356 info!("Primary CPUs: {:?}", primary_cpus);
357 rodata.primary_all = false;
358 } else {
359 rodata.primary_all = true;
360 }
361
362 skel.struct_ops.cosmos_ops_mut().flags = *compat::SCX_OPS_ENQ_EXITING
364 | *compat::SCX_OPS_ENQ_LAST
365 | *compat::SCX_OPS_ENQ_MIGRATION_DISABLED
366 | *compat::SCX_OPS_ALLOW_QUEUED_WAKEUP
367 | if opts.enable_numa {
368 *compat::SCX_OPS_BUILTIN_IDLE_PER_NODE
369 } else {
370 0
371 };
372 info!(
373 "scheduler flags: {:#x}",
374 skel.struct_ops.cosmos_ops_mut().flags
375 );
376
377 let mut skel = scx_ops_load!(skel, cosmos_ops, uei)?;
379
380 if primary_cpus.len() < *NR_CPU_IDS {
382 for cpu in primary_cpus {
383 if let Err(err) = Self::enable_primary_cpu(&mut skel, cpu as i32) {
384 bail!("failed to add CPU {} to primary domain: error {}", cpu, err);
385 }
386 }
387 }
388
389 let struct_ops = Some(scx_ops_attach!(skel, cosmos_ops)?);
391 let stats_server = StatsServer::new(stats::server_data()).launch()?;
392
393 Ok(Self {
394 skel,
395 opts,
396 struct_ops,
397 stats_server,
398 })
399 }
400
401 fn enable_primary_cpu(skel: &mut BpfSkel<'_>, cpu: i32) -> Result<(), u32> {
402 let prog = &mut skel.progs.enable_primary_cpu;
403 let mut args = cpu_arg {
404 cpu_id: cpu as c_int,
405 };
406 let input = ProgramInput {
407 context_in: Some(unsafe {
408 std::slice::from_raw_parts_mut(
409 &mut args as *mut _ as *mut u8,
410 std::mem::size_of_val(&args),
411 )
412 }),
413 ..Default::default()
414 };
415 let out = prog.test_run(input).unwrap();
416 if out.return_value != 0 {
417 return Err(out.return_value);
418 }
419
420 Ok(())
421 }
422
423 fn get_metrics(&self) -> Metrics {
424 Metrics {
425 cpu_thresh: self.skel.maps.rodata_data.as_ref().unwrap().busy_threshold,
426 cpu_util: self.skel.maps.bss_data.as_ref().unwrap().cpu_util,
427 }
428 }
429
430 pub fn exited(&mut self) -> bool {
431 uei_exited!(&self.skel, uei)
432 }
433
434 fn compute_user_cpu_pct(prev: &CpuTimes, curr: &CpuTimes) -> Option<u64> {
435 let user_diff = (curr.user + curr.nice).saturating_sub(prev.user + prev.nice);
437 let total_diff = curr.total.saturating_sub(prev.total);
438
439 if total_diff > 0 {
440 let user_ratio = user_diff as f64 / total_diff as f64;
441 Some((user_ratio * 1024.0).round() as u64)
442 } else {
443 None
444 }
445 }
446
447 fn read_cpu_times() -> Option<CpuTimes> {
448 let file = File::open("/proc/stat").ok()?;
449 let reader = BufReader::new(file);
450
451 for line in reader.lines() {
452 let line = line.ok()?;
453 if line.starts_with("cpu ") {
454 let fields: Vec<&str> = line.split_whitespace().collect();
455 if fields.len() < 5 {
456 return None;
457 }
458
459 let user: u64 = fields[1].parse().ok()?;
460 let nice: u64 = fields[2].parse().ok()?;
461
462 let total: u64 = fields
464 .iter()
465 .skip(1)
466 .take(8)
467 .filter_map(|v| v.parse::<u64>().ok())
468 .sum();
469
470 return Some(CpuTimes { user, nice, total });
471 }
472 }
473
474 None
475 }
476
477 fn run(&mut self, shutdown: Arc<AtomicBool>) -> Result<UserExitInfo> {
478 let (res_ch, req_ch) = self.stats_server.channels();
479
480 let polling_time = Duration::from_millis(self.opts.polling_ms).min(Duration::from_secs(1));
486 let mut prev_cputime = Self::read_cpu_times().expect("Failed to read initial CPU stats");
487 let mut last_update = Instant::now();
488
489 while !shutdown.load(Ordering::Relaxed) && !self.exited() {
490 if !polling_time.is_zero() && last_update.elapsed() >= polling_time {
492 if let Some(curr_cputime) = Self::read_cpu_times() {
493 Self::compute_user_cpu_pct(&prev_cputime, &curr_cputime)
494 .map(|util| self.skel.maps.bss_data.as_mut().unwrap().cpu_util = util);
495 prev_cputime = curr_cputime;
496 }
497 last_update = Instant::now();
498 }
499
500 let timeout = if polling_time.is_zero() {
502 Duration::from_secs(1)
503 } else {
504 polling_time
505 };
506 match req_ch.recv_timeout(timeout) {
507 Ok(()) => res_ch.send(self.get_metrics())?,
508 Err(RecvTimeoutError::Timeout) => {}
509 Err(e) => Err(e)?,
510 }
511 }
512
513 let _ = self.struct_ops.take();
514 uei_report!(&self.skel, uei)
515 }
516}
517
518impl Drop for Scheduler<'_> {
519 fn drop(&mut self) {
520 info!("Unregister {} scheduler", SCHEDULER_NAME);
521 }
522}
523
524fn main() -> Result<()> {
525 let opts = Opts::parse();
526
527 if opts.version {
528 println!(
529 "{} {}",
530 SCHEDULER_NAME,
531 build_id::full_version(env!("CARGO_PKG_VERSION"))
532 );
533 return Ok(());
534 }
535
536 if opts.help_stats {
537 stats::server_data().describe_meta(&mut std::io::stdout(), None)?;
538 return Ok(());
539 }
540
541 let loglevel = simplelog::LevelFilter::Info;
542
543 let mut lcfg = simplelog::ConfigBuilder::new();
544 lcfg.set_time_offset_to_local()
545 .expect("Failed to set local time offset")
546 .set_time_level(simplelog::LevelFilter::Error)
547 .set_location_level(simplelog::LevelFilter::Off)
548 .set_target_level(simplelog::LevelFilter::Off)
549 .set_thread_level(simplelog::LevelFilter::Off);
550 simplelog::TermLogger::init(
551 loglevel,
552 lcfg.build(),
553 simplelog::TerminalMode::Stderr,
554 simplelog::ColorChoice::Auto,
555 )?;
556
557 let shutdown = Arc::new(AtomicBool::new(false));
558 let shutdown_clone = shutdown.clone();
559 ctrlc::set_handler(move || {
560 shutdown_clone.store(true, Ordering::Relaxed);
561 })
562 .context("Error setting Ctrl-C handler")?;
563
564 if let Some(intv) = opts.monitor.or(opts.stats) {
565 let shutdown_copy = shutdown.clone();
566 let jh = std::thread::spawn(move || {
567 match stats::monitor(Duration::from_secs_f64(intv), shutdown_copy) {
568 Ok(_) => {
569 debug!("stats monitor thread finished successfully")
570 }
571 Err(error_object) => {
572 warn!(
573 "stats monitor thread finished because of an error {}",
574 error_object
575 )
576 }
577 }
578 });
579 if opts.monitor.is_some() {
580 let _ = jh.join();
581 return Ok(());
582 }
583 }
584
585 let mut open_object = MaybeUninit::uninit();
586 loop {
587 let mut sched = Scheduler::init(&opts, &mut open_object)?;
588 if !sched.run(shutdown.clone())?.should_restart() {
589 break;
590 }
591 }
592
593 Ok(())
594}