scx_cosmos/
main.rs

1// SPDX-License-Identifier: GPL-2.0
2//
3// Copyright (c) 2025 Andrea Righi <arighi@nvidia.com>
4
5// This software may be used and distributed according to the terms of the
6// GNU General Public License version 2.
7
8mod bpf_skel;
9pub use bpf_skel::*;
10pub mod bpf_intf;
11pub use bpf_intf::*;
12
13mod stats;
14use std::collections::HashSet;
15use std::ffi::c_int;
16use std::fs::File;
17use std::io::{BufRead, BufReader};
18use std::mem::MaybeUninit;
19use std::sync::atomic::AtomicBool;
20use std::sync::atomic::Ordering;
21use std::sync::Arc;
22use std::time::{Duration, Instant};
23
24use anyhow::bail;
25use anyhow::Context;
26use anyhow::Result;
27use clap::Parser;
28use crossbeam::channel::RecvTimeoutError;
29use libbpf_rs::OpenObject;
30use libbpf_rs::ProgramInput;
31use log::{debug, info, warn};
32use scx_stats::prelude::*;
33use scx_utils::build_id;
34use scx_utils::compat;
35use scx_utils::scx_ops_attach;
36use scx_utils::scx_ops_load;
37use scx_utils::scx_ops_open;
38use scx_utils::set_rlimit_infinity;
39use scx_utils::uei_exited;
40use scx_utils::uei_report;
41use scx_utils::CoreType;
42use scx_utils::Topology;
43use scx_utils::UserExitInfo;
44use scx_utils::NR_CPU_IDS;
45use stats::Metrics;
46
47const SCHEDULER_NAME: &str = "scx_cosmos";
48
49#[derive(Debug, clap::Parser)]
50#[command(
51    name = "scx_cosmos",
52    version,
53    disable_version_flag = true,
54    about = "Lightweight scheduler optimized for preserving task-to-CPU locality."
55)]
56struct Opts {
57    /// Exit debug dump buffer length. 0 indicates default.
58    #[clap(long, default_value = "0")]
59    exit_dump_len: u32,
60
61    /// Maximum scheduling slice duration in microseconds.
62    #[clap(short = 's', long, default_value = "10")]
63    slice_us: u64,
64
65    /// Maximum runtime (since last sleep) that can be charged to a task in microseconds.
66    #[clap(short = 'l', long, default_value = "20000")]
67    slice_lag_us: u64,
68
69    /// CPU busy threshold.
70    #[clap(short = 'c', long, default_value = "75")]
71    cpu_busy_thresh: u64,
72
73    /// Polling time (ms). Value is clamped to the range [10 .. 1000].
74    ///
75    /// 0 = disabled.
76    #[clap(short = 'p', long, default_value = "250")]
77    polling_ms: u64,
78
79    /// Specifies a list of CPUs to prioritize.
80    ///
81    /// Accepts a comma-separated list of CPUs or ranges (i.e., 0-3,12-15) or the following special
82    /// keywords:
83    ///
84    /// "turbo" = automatically detect and prioritize the CPUs with the highest max frequency,
85    /// "performance" = automatically detect and prioritize the fastest CPUs,
86    /// "powersave" = automatically detect and prioritize the slowest CPUs,
87    /// "all" = all CPUs assigned to the primary domain.
88    ///
89    /// By default "all" CPUs are used.
90    #[clap(short = 'm', long)]
91    primary_domain: Option<String>,
92
93    /// Enable NUMA optimizations.
94    #[clap(short = 'n', long, action = clap::ArgAction::SetTrue)]
95    enable_numa: bool,
96
97    /// Disable CPU frequency control.
98    #[clap(short = 'f', long, action = clap::ArgAction::SetTrue)]
99    disable_cpufreq: bool,
100
101    /// Enable flat idle CPU scanning.
102    ///
103    /// This option can help reducing some overhead when trying to allocate idle CPUs and it can be
104    /// quite effective with simple CPU topologies.
105    #[arg(short = 'i', long, action = clap::ArgAction::SetTrue)]
106    flat_idle_scan: bool,
107
108    /// Enable preferred idle CPU scanning.
109    ///
110    /// With this opition enabled, the scheduler will prioritize assigning tasks to higher-ranked
111    /// cores before considering lower-ranked ones.
112    #[clap(short = 'P', long, action = clap::ArgAction::SetTrue)]
113    preferred_idle_scan: bool,
114
115    /// Disable SMT.
116    ///
117    /// This option can only be used together with --flat-idle-scan or --preferred-idle-scan,
118    /// otherwise it is ignored.
119    #[clap(long, action = clap::ArgAction::SetTrue)]
120    disable_smt: bool,
121
122    /// Disable direct dispatch during synchronous wakeups.
123    ///
124    /// Enabling this option can lead to a more uniform load distribution across available cores,
125    /// potentially improving performance in certain scenarios. However, it may come at the cost of
126    /// reduced efficiency for pipe-intensive workloads that benefit from tighter producer-consumer
127    /// coupling.
128    #[clap(short = 'w', long, action = clap::ArgAction::SetTrue)]
129    no_wake_sync: bool,
130
131    /// Disable deferred wakeups.
132    ///
133    /// Enabling this option can reduce throughput and performance for certain workloads, but it
134    /// can also reduce power consumption (useful on battery-powered systems).
135    #[clap(short = 'd', long, action = clap::ArgAction::SetTrue)]
136    no_deferred_wakeup: bool,
137
138    /// Enable address space affinity.
139    ///
140    /// This option allows to keep tasks that share the same address space (e.g., threads of the
141    /// same process) on the same CPU across wakeups.
142    ///
143    /// This can improve locality and performance in certain cache-sensitive workloads.
144    #[clap(short = 'a', long, action = clap::ArgAction::SetTrue)]
145    mm_affinity: bool,
146
147    /// Enable stats monitoring with the specified interval.
148    #[clap(long)]
149    stats: Option<f64>,
150
151    /// Run in stats monitoring mode with the specified interval. Scheduler
152    /// is not launched.
153    #[clap(long)]
154    monitor: Option<f64>,
155
156    /// Enable verbose output, including libbpf details.
157    #[clap(short = 'v', long, action = clap::ArgAction::SetTrue)]
158    verbose: bool,
159
160    /// Print scheduler version and exit.
161    #[clap(short = 'V', long, action = clap::ArgAction::SetTrue)]
162    version: bool,
163
164    /// Show descriptions for statistics.
165    #[clap(long)]
166    help_stats: bool,
167}
168
169#[derive(PartialEq)]
170enum Powermode {
171    Turbo,
172    Performance,
173    Powersave,
174    Any,
175}
176
177/*
178 * TODO: this code is shared between scx_bpfland, scx_flash and scx_cosmos; consder to move it to
179 * scx_utils.
180 */
181fn get_primary_cpus(mode: Powermode) -> std::io::Result<Vec<usize>> {
182    let cpus: Vec<usize> = Topology::new()
183        .unwrap()
184        .all_cores
185        .values()
186        .flat_map(|core| &core.cpus)
187        .filter_map(|(cpu_id, cpu)| match (&mode, &cpu.core_type) {
188            // Turbo mode: prioritize CPUs with the highest max frequency
189            (Powermode::Turbo, CoreType::Big { turbo: true }) |
190            // Performance mode: add all the Big CPUs (either Turbo or non-Turbo)
191            (Powermode::Performance, CoreType::Big { .. }) |
192            // Powersave mode: add all the Little CPUs
193            (Powermode::Powersave, CoreType::Little) => Some(*cpu_id),
194            (Powermode::Any, ..) => Some(*cpu_id),
195            _ => None,
196        })
197        .collect();
198
199    Ok(cpus)
200}
201
202pub fn parse_cpu_list(optarg: &str) -> Result<Vec<usize>, String> {
203    let mut cpus = Vec::new();
204    let mut seen = HashSet::new();
205
206    // Handle special keywords
207    if let Some(mode) = match optarg {
208        "powersave" => Some(Powermode::Powersave),
209        "performance" => Some(Powermode::Performance),
210        "turbo" => Some(Powermode::Turbo),
211        "all" => Some(Powermode::Any),
212        _ => None,
213    } {
214        return get_primary_cpus(mode).map_err(|e| e.to_string());
215    }
216
217    // Validate input characters
218    if optarg
219        .chars()
220        .any(|c| !c.is_ascii_digit() && c != '-' && c != ',' && !c.is_whitespace())
221    {
222        return Err("Invalid character in CPU list".to_string());
223    }
224
225    // Replace all whitespace with tab (or just trim later)
226    let cleaned = optarg.replace(' ', "\t");
227
228    for token in cleaned.split(',') {
229        let token = token.trim_matches(|c: char| c.is_whitespace());
230
231        if token.is_empty() {
232            continue;
233        }
234
235        if let Some((start_str, end_str)) = token.split_once('-') {
236            let start = start_str
237                .trim()
238                .parse::<usize>()
239                .map_err(|_| "Invalid range start")?;
240            let end = end_str
241                .trim()
242                .parse::<usize>()
243                .map_err(|_| "Invalid range end")?;
244
245            if start > end {
246                return Err(format!("Invalid CPU range: {}-{}", start, end));
247            }
248
249            for i in start..=end {
250                if cpus.len() >= *NR_CPU_IDS {
251                    return Err(format!("Too many CPUs specified (max {})", *NR_CPU_IDS));
252                }
253                if seen.insert(i) {
254                    cpus.push(i);
255                }
256            }
257        } else {
258            let cpu = token
259                .parse::<usize>()
260                .map_err(|_| format!("Invalid CPU: {}", token))?;
261            if cpus.len() >= *NR_CPU_IDS {
262                return Err(format!("Too many CPUs specified (max {})", *NR_CPU_IDS));
263            }
264            if seen.insert(cpu) {
265                cpus.push(cpu);
266            }
267        }
268    }
269
270    Ok(cpus)
271}
272
273#[derive(Debug, Clone, Copy)]
274struct CpuTimes {
275    user: u64,
276    nice: u64,
277    total: u64,
278}
279
280struct Scheduler<'a> {
281    skel: BpfSkel<'a>,
282    opts: &'a Opts,
283    struct_ops: Option<libbpf_rs::Link>,
284    stats_server: StatsServer<(), Metrics>,
285}
286
287impl<'a> Scheduler<'a> {
288    fn init(opts: &'a Opts, open_object: &'a mut MaybeUninit<OpenObject>) -> Result<Self> {
289        set_rlimit_infinity();
290
291        // Initialize CPU topology.
292        let topo = Topology::new().unwrap();
293
294        // Check host topology to determine if we need to enable SMT capabilities.
295        let smt_enabled = !opts.disable_smt && topo.smt_enabled;
296
297        info!(
298            "{} {} {}",
299            SCHEDULER_NAME,
300            build_id::full_version(env!("CARGO_PKG_VERSION")),
301            if smt_enabled { "SMT on" } else { "SMT off" }
302        );
303
304        // Print command line.
305        info!(
306            "scheduler options: {}",
307            std::env::args().collect::<Vec<_>>().join(" ")
308        );
309
310        // Initialize BPF connector.
311        let mut skel_builder = BpfSkelBuilder::default();
312        skel_builder.obj_builder.debug(opts.verbose);
313        let mut skel = scx_ops_open!(skel_builder, open_object, cosmos_ops)?;
314
315        skel.struct_ops.cosmos_ops_mut().exit_dump_len = opts.exit_dump_len;
316
317        // Override default BPF scheduling parameters.
318        let rodata = skel.maps.rodata_data.as_mut().unwrap();
319        rodata.slice_ns = opts.slice_us * 1000;
320        rodata.slice_lag = opts.slice_lag_us * 1000;
321        rodata.cpufreq_enabled = !opts.disable_cpufreq;
322        rodata.deferred_wakeups = !opts.no_deferred_wakeup;
323        rodata.flat_idle_scan = opts.flat_idle_scan;
324        rodata.smt_enabled = smt_enabled;
325        rodata.numa_enabled = opts.enable_numa;
326        rodata.no_wake_sync = opts.no_wake_sync;
327        rodata.mm_affinity = opts.mm_affinity;
328
329        // Normalize CPU busy threshold in the range [0 .. 1024].
330        rodata.busy_threshold = opts.cpu_busy_thresh * 1024 / 100;
331
332        // Generate the list of available CPUs sorted by capacity in descending order.
333        if opts.preferred_idle_scan {
334            let mut cpus: Vec<_> = topo.all_cpus.values().collect();
335            cpus.sort_by_key(|cpu| std::cmp::Reverse(cpu.cpu_capacity));
336            for (i, cpu) in cpus.iter().enumerate() {
337                rodata.preferred_cpus[i] = cpu.id as u64;
338            }
339            info!(
340                "Preferred CPUs: {:?}",
341                &rodata.preferred_cpus[0..cpus.len()]
342            );
343        }
344        rodata.preferred_idle_scan = opts.preferred_idle_scan;
345
346        // Define the primary scheduling domain.
347        let primary_cpus = if let Some(ref domain) = opts.primary_domain {
348            match parse_cpu_list(domain) {
349                Ok(cpus) => cpus,
350                Err(e) => bail!("Error parsing primary domain: {}", e),
351            }
352        } else {
353            (0..*NR_CPU_IDS).collect()
354        };
355        if primary_cpus.len() < *NR_CPU_IDS {
356            info!("Primary CPUs: {:?}", primary_cpus);
357            rodata.primary_all = false;
358        } else {
359            rodata.primary_all = true;
360        }
361
362        // Set scheduler flags.
363        skel.struct_ops.cosmos_ops_mut().flags = *compat::SCX_OPS_ENQ_EXITING
364            | *compat::SCX_OPS_ENQ_LAST
365            | *compat::SCX_OPS_ENQ_MIGRATION_DISABLED
366            | *compat::SCX_OPS_ALLOW_QUEUED_WAKEUP
367            | if opts.enable_numa {
368                *compat::SCX_OPS_BUILTIN_IDLE_PER_NODE
369            } else {
370                0
371            };
372        info!(
373            "scheduler flags: {:#x}",
374            skel.struct_ops.cosmos_ops_mut().flags
375        );
376
377        // Load the BPF program for validation.
378        let mut skel = scx_ops_load!(skel, cosmos_ops, uei)?;
379
380        // Enable primary scheduling domain, if defined.
381        if primary_cpus.len() < *NR_CPU_IDS {
382            for cpu in primary_cpus {
383                if let Err(err) = Self::enable_primary_cpu(&mut skel, cpu as i32) {
384                    bail!("failed to add CPU {} to primary domain: error {}", cpu, err);
385                }
386            }
387        }
388
389        // Attach the scheduler.
390        let struct_ops = Some(scx_ops_attach!(skel, cosmos_ops)?);
391        let stats_server = StatsServer::new(stats::server_data()).launch()?;
392
393        Ok(Self {
394            skel,
395            opts,
396            struct_ops,
397            stats_server,
398        })
399    }
400
401    fn enable_primary_cpu(skel: &mut BpfSkel<'_>, cpu: i32) -> Result<(), u32> {
402        let prog = &mut skel.progs.enable_primary_cpu;
403        let mut args = cpu_arg {
404            cpu_id: cpu as c_int,
405        };
406        let input = ProgramInput {
407            context_in: Some(unsafe {
408                std::slice::from_raw_parts_mut(
409                    &mut args as *mut _ as *mut u8,
410                    std::mem::size_of_val(&args),
411                )
412            }),
413            ..Default::default()
414        };
415        let out = prog.test_run(input).unwrap();
416        if out.return_value != 0 {
417            return Err(out.return_value);
418        }
419
420        Ok(())
421    }
422
423    fn get_metrics(&self) -> Metrics {
424        Metrics {
425            cpu_thresh: self.skel.maps.rodata_data.as_ref().unwrap().busy_threshold,
426            cpu_util: self.skel.maps.bss_data.as_ref().unwrap().cpu_util,
427        }
428    }
429
430    pub fn exited(&mut self) -> bool {
431        uei_exited!(&self.skel, uei)
432    }
433
434    fn compute_user_cpu_pct(prev: &CpuTimes, curr: &CpuTimes) -> Option<u64> {
435        // Evaluate total user CPU time as user + nice.
436        let user_diff = (curr.user + curr.nice).saturating_sub(prev.user + prev.nice);
437        let total_diff = curr.total.saturating_sub(prev.total);
438
439        if total_diff > 0 {
440            let user_ratio = user_diff as f64 / total_diff as f64;
441            Some((user_ratio * 1024.0).round() as u64)
442        } else {
443            None
444        }
445    }
446
447    fn read_cpu_times() -> Option<CpuTimes> {
448        let file = File::open("/proc/stat").ok()?;
449        let reader = BufReader::new(file);
450
451        for line in reader.lines() {
452            let line = line.ok()?;
453            if line.starts_with("cpu ") {
454                let fields: Vec<&str> = line.split_whitespace().collect();
455                if fields.len() < 5 {
456                    return None;
457                }
458
459                let user: u64 = fields[1].parse().ok()?;
460                let nice: u64 = fields[2].parse().ok()?;
461
462                // Sum the first 8 fields as total time, including idle, system, etc.
463                let total: u64 = fields
464                    .iter()
465                    .skip(1)
466                    .take(8)
467                    .filter_map(|v| v.parse::<u64>().ok())
468                    .sum();
469
470                return Some(CpuTimes { user, nice, total });
471            }
472        }
473
474        None
475    }
476
477    fn run(&mut self, shutdown: Arc<AtomicBool>) -> Result<UserExitInfo> {
478        let (res_ch, req_ch) = self.stats_server.channels();
479
480        // Periodically evaluate user CPU utilization from user-space and update a global variable
481        // in BPF.
482        //
483        // The BPF scheduler will use this value to determine when the system is idle (using local
484        // DSQs and simple round-robin scheduler) or busy (switching to a deadline-based policy).
485        let polling_time = Duration::from_millis(self.opts.polling_ms).min(Duration::from_secs(1));
486        let mut prev_cputime = Self::read_cpu_times().expect("Failed to read initial CPU stats");
487        let mut last_update = Instant::now();
488
489        while !shutdown.load(Ordering::Relaxed) && !self.exited() {
490            // Update CPU utilization.
491            if !polling_time.is_zero() && last_update.elapsed() >= polling_time {
492                if let Some(curr_cputime) = Self::read_cpu_times() {
493                    Self::compute_user_cpu_pct(&prev_cputime, &curr_cputime)
494                        .map(|util| self.skel.maps.bss_data.as_mut().unwrap().cpu_util = util);
495                    prev_cputime = curr_cputime;
496                }
497                last_update = Instant::now();
498            }
499
500            // Update statistics and check for exit condition.
501            let timeout = if polling_time.is_zero() {
502                Duration::from_secs(1)
503            } else {
504                polling_time
505            };
506            match req_ch.recv_timeout(timeout) {
507                Ok(()) => res_ch.send(self.get_metrics())?,
508                Err(RecvTimeoutError::Timeout) => {}
509                Err(e) => Err(e)?,
510            }
511        }
512
513        let _ = self.struct_ops.take();
514        uei_report!(&self.skel, uei)
515    }
516}
517
518impl Drop for Scheduler<'_> {
519    fn drop(&mut self) {
520        info!("Unregister {} scheduler", SCHEDULER_NAME);
521    }
522}
523
524fn main() -> Result<()> {
525    let opts = Opts::parse();
526
527    if opts.version {
528        println!(
529            "{} {}",
530            SCHEDULER_NAME,
531            build_id::full_version(env!("CARGO_PKG_VERSION"))
532        );
533        return Ok(());
534    }
535
536    if opts.help_stats {
537        stats::server_data().describe_meta(&mut std::io::stdout(), None)?;
538        return Ok(());
539    }
540
541    let loglevel = simplelog::LevelFilter::Info;
542
543    let mut lcfg = simplelog::ConfigBuilder::new();
544    lcfg.set_time_offset_to_local()
545        .expect("Failed to set local time offset")
546        .set_time_level(simplelog::LevelFilter::Error)
547        .set_location_level(simplelog::LevelFilter::Off)
548        .set_target_level(simplelog::LevelFilter::Off)
549        .set_thread_level(simplelog::LevelFilter::Off);
550    simplelog::TermLogger::init(
551        loglevel,
552        lcfg.build(),
553        simplelog::TerminalMode::Stderr,
554        simplelog::ColorChoice::Auto,
555    )?;
556
557    let shutdown = Arc::new(AtomicBool::new(false));
558    let shutdown_clone = shutdown.clone();
559    ctrlc::set_handler(move || {
560        shutdown_clone.store(true, Ordering::Relaxed);
561    })
562    .context("Error setting Ctrl-C handler")?;
563
564    if let Some(intv) = opts.monitor.or(opts.stats) {
565        let shutdown_copy = shutdown.clone();
566        let jh = std::thread::spawn(move || {
567            match stats::monitor(Duration::from_secs_f64(intv), shutdown_copy) {
568                Ok(_) => {
569                    debug!("stats monitor thread finished successfully")
570                }
571                Err(error_object) => {
572                    warn!(
573                        "stats monitor thread finished because of an error {}",
574                        error_object
575                    )
576                }
577            }
578        });
579        if opts.monitor.is_some() {
580            let _ = jh.join();
581            return Ok(());
582        }
583    }
584
585    let mut open_object = MaybeUninit::uninit();
586    loop {
587        let mut sched = Scheduler::init(&opts, &mut open_object)?;
588        if !sched.run(shutdown.clone())?.should_restart() {
589            break;
590        }
591    }
592
593    Ok(())
594}