scx_cosmos/
main.rs

1// SPDX-License-Identifier: GPL-2.0
2//
3// Copyright (c) 2025 Andrea Righi <arighi@nvidia.com>
4
5// This software may be used and distributed according to the terms of the
6// GNU General Public License version 2.
7
8mod bpf_skel;
9pub use bpf_skel::*;
10pub mod bpf_intf;
11pub use bpf_intf::*;
12
13mod stats;
14use std::collections::HashSet;
15use std::ffi::c_int;
16use std::fs::File;
17use std::io::{BufRead, BufReader};
18use std::mem::MaybeUninit;
19use std::sync::atomic::AtomicBool;
20use std::sync::atomic::Ordering;
21use std::sync::Arc;
22use std::time::{Duration, Instant};
23
24use anyhow::bail;
25use anyhow::Context;
26use anyhow::Result;
27use clap::Parser;
28use crossbeam::channel::RecvTimeoutError;
29use libbpf_rs::OpenObject;
30use libbpf_rs::ProgramInput;
31use log::{debug, info, warn};
32use scx_stats::prelude::*;
33use scx_utils::build_id;
34use scx_utils::compat;
35use scx_utils::libbpf_clap_opts::LibbpfOpts;
36use scx_utils::scx_ops_attach;
37use scx_utils::scx_ops_load;
38use scx_utils::scx_ops_open;
39use scx_utils::try_set_rlimit_infinity;
40use scx_utils::uei_exited;
41use scx_utils::uei_report;
42use scx_utils::CoreType;
43use scx_utils::Topology;
44use scx_utils::UserExitInfo;
45use scx_utils::NR_CPU_IDS;
46use stats::Metrics;
47
48const SCHEDULER_NAME: &str = "scx_cosmos";
49
50#[derive(Debug, clap::Parser)]
51#[command(
52    name = "scx_cosmos",
53    version,
54    disable_version_flag = true,
55    about = "Lightweight scheduler optimized for preserving task-to-CPU locality."
56)]
57struct Opts {
58    /// Exit debug dump buffer length. 0 indicates default.
59    #[clap(long, default_value = "0")]
60    exit_dump_len: u32,
61
62    /// Maximum scheduling slice duration in microseconds.
63    #[clap(short = 's', long, default_value = "10")]
64    slice_us: u64,
65
66    /// Maximum runtime (since last sleep) that can be charged to a task in microseconds.
67    #[clap(short = 'l', long, default_value = "20000")]
68    slice_lag_us: u64,
69
70    /// CPU busy threshold.
71    ///
72    /// Specifies the CPU utilization percentage (0-100%) at which the scheduler considers the
73    /// system to be busy.
74    ///
75    /// When the average CPU utilization reaches this threshold, the scheduler switches from using
76    /// multiple per-CPU round-robin dispatch queues (which favor locality and reduced locking
77    /// contention) to a global deadline-based dispatch queue (which improves load balancing).
78    ///
79    /// The global dispatch queue can increase task migrations and improve responsiveness for
80    /// interactive tasks under heavy load. Lower values make the scheduler switch to deadline
81    /// mode sooner, improving overall responsiveness at the cost of reducing single-task
82    /// performance due to the additional migrations. Higher values makes task more "sticky" to
83    /// their CPU, improving workloads that benefit from cache locality.
84    ///
85    /// A higher value is recommended for server-type workloads, while a lower value is recommended
86    /// for interactive-type workloads.
87    #[clap(short = 'c', long, default_value = "75")]
88    cpu_busy_thresh: u64,
89
90    /// Polling time (ms) to refresh the CPU utilization.
91    ///
92    /// This interval determines how often the scheduler refreshes the CPU utilization that is
93    /// compared with the CPU busy threshold (option -c) to decide if the system is busy or not
94    /// and trigger the switch between using multiple per-CPU dispatch queues or a single global
95    /// deadline-based dispatch queue.
96    ///
97    /// Value is clamped to the range [10 .. 1000].
98    ///
99    /// 0 = disabled.
100    #[clap(short = 'p', long, default_value = "250")]
101    polling_ms: u64,
102
103    /// Specifies a list of CPUs to prioritize.
104    ///
105    /// Accepts a comma-separated list of CPUs or ranges (i.e., 0-3,12-15) or the following special
106    /// keywords:
107    ///
108    /// "turbo" = automatically detect and prioritize the CPUs with the highest max frequency,
109    /// "performance" = automatically detect and prioritize the fastest CPUs,
110    /// "powersave" = automatically detect and prioritize the slowest CPUs,
111    /// "all" = all CPUs assigned to the primary domain.
112    ///
113    /// By default "all" CPUs are used.
114    #[clap(short = 'm', long)]
115    primary_domain: Option<String>,
116
117    /// Enable NUMA optimizations.
118    #[clap(short = 'n', long, action = clap::ArgAction::SetTrue)]
119    enable_numa: bool,
120
121    /// Disable CPU frequency control.
122    #[clap(short = 'f', long, action = clap::ArgAction::SetTrue)]
123    disable_cpufreq: bool,
124
125    /// Enable flat idle CPU scanning.
126    ///
127    /// This option can help reducing some overhead when trying to allocate idle CPUs and it can be
128    /// quite effective with simple CPU topologies.
129    #[arg(short = 'i', long, action = clap::ArgAction::SetTrue)]
130    flat_idle_scan: bool,
131
132    /// Enable preferred idle CPU scanning.
133    ///
134    /// With this option enabled, the scheduler will prioritize assigning tasks to higher-ranked
135    /// cores before considering lower-ranked ones.
136    #[clap(short = 'P', long, action = clap::ArgAction::SetTrue)]
137    preferred_idle_scan: bool,
138
139    /// Disable SMT.
140    ///
141    /// This option can only be used together with --flat-idle-scan or --preferred-idle-scan,
142    /// otherwise it is ignored.
143    #[clap(long, action = clap::ArgAction::SetTrue)]
144    disable_smt: bool,
145
146    /// SMT contention avoidance.
147    ///
148    /// When enabled, the scheduler aggressively avoids placing tasks on sibling SMT threads.
149    /// This may increase task migrations and lower overall throughput, but can lead to more
150    /// consistent performance by reducing contention on shared SMT cores.
151    #[clap(short = 'S', long, action = clap::ArgAction::SetTrue)]
152    avoid_smt: bool,
153
154    /// Disable direct dispatch during synchronous wakeups.
155    ///
156    /// Enabling this option can lead to a more uniform load distribution across available cores,
157    /// potentially improving performance in certain scenarios. However, it may come at the cost of
158    /// reduced efficiency for pipe-intensive workloads that benefit from tighter producer-consumer
159    /// coupling.
160    #[clap(short = 'w', long, action = clap::ArgAction::SetTrue)]
161    no_wake_sync: bool,
162
163    /// Disable deferred wakeups.
164    ///
165    /// Enabling this option can reduce throughput and performance for certain workloads, but it
166    /// can also reduce power consumption (useful on battery-powered systems).
167    #[clap(short = 'd', long, action = clap::ArgAction::SetTrue)]
168    no_deferred_wakeup: bool,
169
170    /// Enable address space affinity.
171    ///
172    /// This option allows to keep tasks that share the same address space (e.g., threads of the
173    /// same process) on the same CPU across wakeups.
174    ///
175    /// This can improve locality and performance in certain cache-sensitive workloads.
176    #[clap(short = 'a', long, action = clap::ArgAction::SetTrue)]
177    mm_affinity: bool,
178
179    /// Enable stats monitoring with the specified interval.
180    #[clap(long)]
181    stats: Option<f64>,
182
183    /// Run in stats monitoring mode with the specified interval. Scheduler
184    /// is not launched.
185    #[clap(long)]
186    monitor: Option<f64>,
187
188    /// Enable verbose output, including libbpf details.
189    #[clap(short = 'v', long, action = clap::ArgAction::SetTrue)]
190    verbose: bool,
191
192    /// Print scheduler version and exit.
193    #[clap(short = 'V', long, action = clap::ArgAction::SetTrue)]
194    version: bool,
195
196    /// Show descriptions for statistics.
197    #[clap(long)]
198    help_stats: bool,
199
200    #[clap(flatten, next_help_heading = "Libbpf Options")]
201    pub libbpf: LibbpfOpts,
202}
203
204#[derive(PartialEq)]
205enum Powermode {
206    Turbo,
207    Performance,
208    Powersave,
209    Any,
210}
211
212/*
213 * TODO: this code is shared between scx_bpfland, scx_flash and scx_cosmos; consder to move it to
214 * scx_utils.
215 */
216fn get_primary_cpus(mode: Powermode) -> std::io::Result<Vec<usize>> {
217    let cpus: Vec<usize> = Topology::new()
218        .unwrap()
219        .all_cores
220        .values()
221        .flat_map(|core| &core.cpus)
222        .filter_map(|(cpu_id, cpu)| match (&mode, &cpu.core_type) {
223            // Turbo mode: prioritize CPUs with the highest max frequency
224            (Powermode::Turbo, CoreType::Big { turbo: true }) |
225            // Performance mode: add all the Big CPUs (either Turbo or non-Turbo)
226            (Powermode::Performance, CoreType::Big { .. }) |
227            // Powersave mode: add all the Little CPUs
228            (Powermode::Powersave, CoreType::Little) => Some(*cpu_id),
229            (Powermode::Any, ..) => Some(*cpu_id),
230            _ => None,
231        })
232        .collect();
233
234    Ok(cpus)
235}
236
237pub fn parse_cpu_list(optarg: &str) -> Result<Vec<usize>, String> {
238    let mut cpus = Vec::new();
239    let mut seen = HashSet::new();
240
241    // Handle special keywords
242    if let Some(mode) = match optarg {
243        "powersave" => Some(Powermode::Powersave),
244        "performance" => Some(Powermode::Performance),
245        "turbo" => Some(Powermode::Turbo),
246        "all" => Some(Powermode::Any),
247        _ => None,
248    } {
249        return get_primary_cpus(mode).map_err(|e| e.to_string());
250    }
251
252    // Validate input characters
253    if optarg
254        .chars()
255        .any(|c| !c.is_ascii_digit() && c != '-' && c != ',' && !c.is_whitespace())
256    {
257        return Err("Invalid character in CPU list".to_string());
258    }
259
260    // Replace all whitespace with tab (or just trim later)
261    let cleaned = optarg.replace(' ', "\t");
262
263    for token in cleaned.split(',') {
264        let token = token.trim_matches(|c: char| c.is_whitespace());
265
266        if token.is_empty() {
267            continue;
268        }
269
270        if let Some((start_str, end_str)) = token.split_once('-') {
271            let start = start_str
272                .trim()
273                .parse::<usize>()
274                .map_err(|_| "Invalid range start")?;
275            let end = end_str
276                .trim()
277                .parse::<usize>()
278                .map_err(|_| "Invalid range end")?;
279
280            if start > end {
281                return Err(format!("Invalid CPU range: {}-{}", start, end));
282            }
283
284            for i in start..=end {
285                if cpus.len() >= *NR_CPU_IDS {
286                    return Err(format!("Too many CPUs specified (max {})", *NR_CPU_IDS));
287                }
288                if seen.insert(i) {
289                    cpus.push(i);
290                }
291            }
292        } else {
293            let cpu = token
294                .parse::<usize>()
295                .map_err(|_| format!("Invalid CPU: {}", token))?;
296            if cpus.len() >= *NR_CPU_IDS {
297                return Err(format!("Too many CPUs specified (max {})", *NR_CPU_IDS));
298            }
299            if seen.insert(cpu) {
300                cpus.push(cpu);
301            }
302        }
303    }
304
305    Ok(cpus)
306}
307
308#[derive(Debug, Clone, Copy)]
309struct CpuTimes {
310    user: u64,
311    nice: u64,
312    total: u64,
313}
314
315struct Scheduler<'a> {
316    skel: BpfSkel<'a>,
317    opts: &'a Opts,
318    struct_ops: Option<libbpf_rs::Link>,
319    stats_server: StatsServer<(), Metrics>,
320}
321
322impl<'a> Scheduler<'a> {
323    fn init(opts: &'a Opts, open_object: &'a mut MaybeUninit<OpenObject>) -> Result<Self> {
324        try_set_rlimit_infinity();
325
326        // Initialize CPU topology.
327        let topo = Topology::new().unwrap();
328
329        // Check host topology to determine if we need to enable SMT capabilities.
330        let smt_enabled = !opts.disable_smt && topo.smt_enabled;
331
332        info!(
333            "{} {} {}",
334            SCHEDULER_NAME,
335            build_id::full_version(env!("CARGO_PKG_VERSION")),
336            if smt_enabled { "SMT on" } else { "SMT off" }
337        );
338
339        // Print command line.
340        info!(
341            "scheduler options: {}",
342            std::env::args().collect::<Vec<_>>().join(" ")
343        );
344
345        // Initialize BPF connector.
346        let mut skel_builder = BpfSkelBuilder::default();
347        skel_builder.obj_builder.debug(opts.verbose);
348        let open_opts = opts.libbpf.clone().into_bpf_open_opts();
349        let mut skel = scx_ops_open!(skel_builder, open_object, cosmos_ops, open_opts)?;
350
351        skel.struct_ops.cosmos_ops_mut().exit_dump_len = opts.exit_dump_len;
352
353        // Override default BPF scheduling parameters.
354        let rodata = skel.maps.rodata_data.as_mut().unwrap();
355        rodata.slice_ns = opts.slice_us * 1000;
356        rodata.slice_lag = opts.slice_lag_us * 1000;
357        rodata.cpufreq_enabled = !opts.disable_cpufreq;
358        rodata.deferred_wakeups = !opts.no_deferred_wakeup;
359        rodata.flat_idle_scan = opts.flat_idle_scan;
360        rodata.smt_enabled = smt_enabled;
361        rodata.numa_enabled = opts.enable_numa;
362        rodata.no_wake_sync = opts.no_wake_sync;
363        rodata.avoid_smt = opts.avoid_smt;
364        rodata.mm_affinity = opts.mm_affinity;
365
366        // Normalize CPU busy threshold in the range [0 .. 1024].
367        rodata.busy_threshold = opts.cpu_busy_thresh * 1024 / 100;
368
369        // Generate the list of available CPUs sorted by capacity in descending order.
370        if opts.preferred_idle_scan {
371            let mut cpus: Vec<_> = topo.all_cpus.values().collect();
372            cpus.sort_by_key(|cpu| std::cmp::Reverse(cpu.cpu_capacity));
373            for (i, cpu) in cpus.iter().enumerate() {
374                rodata.preferred_cpus[i] = cpu.id as u64;
375            }
376            info!(
377                "Preferred CPUs: {:?}",
378                &rodata.preferred_cpus[0..cpus.len()]
379            );
380        }
381        rodata.preferred_idle_scan = opts.preferred_idle_scan;
382
383        // Define the primary scheduling domain.
384        let primary_cpus = if let Some(ref domain) = opts.primary_domain {
385            match parse_cpu_list(domain) {
386                Ok(cpus) => cpus,
387                Err(e) => bail!("Error parsing primary domain: {}", e),
388            }
389        } else {
390            (0..*NR_CPU_IDS).collect()
391        };
392        if primary_cpus.len() < *NR_CPU_IDS {
393            info!("Primary CPUs: {:?}", primary_cpus);
394            rodata.primary_all = false;
395        } else {
396            rodata.primary_all = true;
397        }
398
399        // Set scheduler flags.
400        skel.struct_ops.cosmos_ops_mut().flags = *compat::SCX_OPS_ENQ_EXITING
401            | *compat::SCX_OPS_ENQ_LAST
402            | *compat::SCX_OPS_ENQ_MIGRATION_DISABLED
403            | *compat::SCX_OPS_ALLOW_QUEUED_WAKEUP
404            | if opts.enable_numa {
405                *compat::SCX_OPS_BUILTIN_IDLE_PER_NODE
406            } else {
407                0
408            };
409        info!(
410            "scheduler flags: {:#x}",
411            skel.struct_ops.cosmos_ops_mut().flags
412        );
413
414        // Load the BPF program for validation.
415        let mut skel = scx_ops_load!(skel, cosmos_ops, uei)?;
416
417        // Enable primary scheduling domain, if defined.
418        if primary_cpus.len() < *NR_CPU_IDS {
419            for cpu in primary_cpus {
420                if let Err(err) = Self::enable_primary_cpu(&mut skel, cpu as i32) {
421                    bail!("failed to add CPU {} to primary domain: error {}", cpu, err);
422                }
423            }
424        }
425
426        // Attach the scheduler.
427        let struct_ops = Some(scx_ops_attach!(skel, cosmos_ops)?);
428        let stats_server = StatsServer::new(stats::server_data()).launch()?;
429
430        Ok(Self {
431            skel,
432            opts,
433            struct_ops,
434            stats_server,
435        })
436    }
437
438    fn enable_primary_cpu(skel: &mut BpfSkel<'_>, cpu: i32) -> Result<(), u32> {
439        let prog = &mut skel.progs.enable_primary_cpu;
440        let mut args = cpu_arg {
441            cpu_id: cpu as c_int,
442        };
443        let input = ProgramInput {
444            context_in: Some(unsafe {
445                std::slice::from_raw_parts_mut(
446                    &mut args as *mut _ as *mut u8,
447                    std::mem::size_of_val(&args),
448                )
449            }),
450            ..Default::default()
451        };
452        let out = prog.test_run(input).unwrap();
453        if out.return_value != 0 {
454            return Err(out.return_value);
455        }
456
457        Ok(())
458    }
459
460    fn get_metrics(&self) -> Metrics {
461        Metrics {
462            cpu_thresh: self.skel.maps.rodata_data.as_ref().unwrap().busy_threshold,
463            cpu_util: self.skel.maps.bss_data.as_ref().unwrap().cpu_util,
464        }
465    }
466
467    pub fn exited(&mut self) -> bool {
468        uei_exited!(&self.skel, uei)
469    }
470
471    fn compute_user_cpu_pct(prev: &CpuTimes, curr: &CpuTimes) -> Option<u64> {
472        // Evaluate total user CPU time as user + nice.
473        let user_diff = (curr.user + curr.nice).saturating_sub(prev.user + prev.nice);
474        let total_diff = curr.total.saturating_sub(prev.total);
475
476        if total_diff > 0 {
477            let user_ratio = user_diff as f64 / total_diff as f64;
478            Some((user_ratio * 1024.0).round() as u64)
479        } else {
480            None
481        }
482    }
483
484    fn read_cpu_times() -> Option<CpuTimes> {
485        let file = File::open("/proc/stat").ok()?;
486        let reader = BufReader::new(file);
487
488        for line in reader.lines() {
489            let line = line.ok()?;
490            if line.starts_with("cpu ") {
491                let fields: Vec<&str> = line.split_whitespace().collect();
492                if fields.len() < 5 {
493                    return None;
494                }
495
496                let user: u64 = fields[1].parse().ok()?;
497                let nice: u64 = fields[2].parse().ok()?;
498
499                // Sum the first 8 fields as total time, including idle, system, etc.
500                let total: u64 = fields
501                    .iter()
502                    .skip(1)
503                    .take(8)
504                    .filter_map(|v| v.parse::<u64>().ok())
505                    .sum();
506
507                return Some(CpuTimes { user, nice, total });
508            }
509        }
510
511        None
512    }
513
514    fn run(&mut self, shutdown: Arc<AtomicBool>) -> Result<UserExitInfo> {
515        let (res_ch, req_ch) = self.stats_server.channels();
516
517        // Periodically evaluate user CPU utilization from user-space and update a global variable
518        // in BPF.
519        //
520        // The BPF scheduler will use this value to determine when the system is idle (using local
521        // DSQs and simple round-robin scheduler) or busy (switching to a deadline-based policy).
522        let polling_time = Duration::from_millis(self.opts.polling_ms).min(Duration::from_secs(1));
523        let mut prev_cputime = Self::read_cpu_times().expect("Failed to read initial CPU stats");
524        let mut last_update = Instant::now();
525
526        while !shutdown.load(Ordering::Relaxed) && !self.exited() {
527            // Update CPU utilization.
528            if !polling_time.is_zero() && last_update.elapsed() >= polling_time {
529                if let Some(curr_cputime) = Self::read_cpu_times() {
530                    Self::compute_user_cpu_pct(&prev_cputime, &curr_cputime)
531                        .map(|util| self.skel.maps.bss_data.as_mut().unwrap().cpu_util = util);
532                    prev_cputime = curr_cputime;
533                }
534                last_update = Instant::now();
535            }
536
537            // Update statistics and check for exit condition.
538            let timeout = if polling_time.is_zero() {
539                Duration::from_secs(1)
540            } else {
541                polling_time
542            };
543            match req_ch.recv_timeout(timeout) {
544                Ok(()) => res_ch.send(self.get_metrics())?,
545                Err(RecvTimeoutError::Timeout) => {}
546                Err(e) => Err(e)?,
547            }
548        }
549
550        let _ = self.struct_ops.take();
551        uei_report!(&self.skel, uei)
552    }
553}
554
555impl Drop for Scheduler<'_> {
556    fn drop(&mut self) {
557        info!("Unregister {SCHEDULER_NAME} scheduler");
558    }
559}
560
561fn main() -> Result<()> {
562    let opts = Opts::parse();
563
564    if opts.version {
565        println!(
566            "{} {}",
567            SCHEDULER_NAME,
568            build_id::full_version(env!("CARGO_PKG_VERSION"))
569        );
570        return Ok(());
571    }
572
573    if opts.help_stats {
574        stats::server_data().describe_meta(&mut std::io::stdout(), None)?;
575        return Ok(());
576    }
577
578    let loglevel = simplelog::LevelFilter::Info;
579
580    let mut lcfg = simplelog::ConfigBuilder::new();
581    lcfg.set_time_offset_to_local()
582        .expect("Failed to set local time offset")
583        .set_time_level(simplelog::LevelFilter::Error)
584        .set_location_level(simplelog::LevelFilter::Off)
585        .set_target_level(simplelog::LevelFilter::Off)
586        .set_thread_level(simplelog::LevelFilter::Off);
587    simplelog::TermLogger::init(
588        loglevel,
589        lcfg.build(),
590        simplelog::TerminalMode::Stderr,
591        simplelog::ColorChoice::Auto,
592    )?;
593
594    let shutdown = Arc::new(AtomicBool::new(false));
595    let shutdown_clone = shutdown.clone();
596    ctrlc::set_handler(move || {
597        shutdown_clone.store(true, Ordering::Relaxed);
598    })
599    .context("Error setting Ctrl-C handler")?;
600
601    if let Some(intv) = opts.monitor.or(opts.stats) {
602        let shutdown_copy = shutdown.clone();
603        let jh = std::thread::spawn(move || {
604            match stats::monitor(Duration::from_secs_f64(intv), shutdown_copy) {
605                Ok(_) => {
606                    debug!("stats monitor thread finished successfully")
607                }
608                Err(error_object) => {
609                    warn!(
610                        "stats monitor thread finished because of an error {}",
611                        error_object
612                    )
613                }
614            }
615        });
616        if opts.monitor.is_some() {
617            let _ = jh.join();
618            return Ok(());
619        }
620    }
621
622    let mut open_object = MaybeUninit::uninit();
623    loop {
624        let mut sched = Scheduler::init(&opts, &mut open_object)?;
625        if !sched.run(shutdown.clone())?.should_restart() {
626            break;
627        }
628    }
629
630    Ok(())
631}