Skip to main content

scx_cosmos/
main.rs

1// SPDX-License-Identifier: GPL-2.0
2//
3// Copyright (c) 2025 Andrea Righi <arighi@nvidia.com>
4
5// This software may be used and distributed according to the terms of the
6// GNU General Public License version 2.
7
8mod bpf_skel;
9pub use bpf_skel::*;
10pub mod bpf_intf;
11pub use bpf_intf::*;
12
13mod stats;
14use std::collections::HashSet;
15use std::ffi::{c_int, c_ulong};
16use std::fs::File;
17use std::io::{BufRead, BufReader};
18use std::mem::MaybeUninit;
19use std::sync::atomic::AtomicBool;
20use std::sync::atomic::Ordering;
21use std::sync::Arc;
22use std::time::{Duration, Instant};
23
24use anyhow::bail;
25use anyhow::Context;
26use anyhow::Result;
27use clap::Parser;
28use crossbeam::channel::RecvTimeoutError;
29use libbpf_rs::MapCore;
30use libbpf_rs::MapFlags;
31use libbpf_rs::OpenObject;
32use libbpf_rs::ProgramInput;
33use log::{debug, info, warn};
34use scx_stats::prelude::*;
35use scx_utils::build_id;
36use scx_utils::compat;
37use scx_utils::libbpf_clap_opts::LibbpfOpts;
38use scx_utils::scx_ops_attach;
39use scx_utils::scx_ops_load;
40use scx_utils::scx_ops_open;
41use scx_utils::try_set_rlimit_infinity;
42use scx_utils::uei_exited;
43use scx_utils::uei_report;
44use scx_utils::CoreType;
45use scx_utils::Topology;
46use scx_utils::UserExitInfo;
47use scx_utils::NR_CPU_IDS;
48use stats::Metrics;
49
50const SCHEDULER_NAME: &str = "scx_cosmos";
51
52/// Parse hexadecimal value from command line (requires "0x" prefix, e.g., "0x2")
53fn parse_hex(s: &str) -> Result<u64, String> {
54    if let Some(hex_str) = s.strip_prefix("0x").or_else(|| s.strip_prefix("0X")) {
55        u64::from_str_radix(hex_str, 16).map_err(|e| format!("Invalid hexadecimal value: {}", e))
56    } else {
57        Err("Hexadecimal value must start with '0x' prefix (e.g., 0x2)".to_string())
58    }
59}
60
61/// Must match lib/pmu.bpf.c SCX_PMU_STRIDE for perf_events map key layout.
62const PERF_MAP_STRIDE: u32 = 4096;
63
64/// Setup performance counter events for a specific CPU and counter index.
65/// counter_idx 0 = migration event (-e), 1 = sticky event (-y).
66fn setup_perf_events(
67    skel: &mut BpfSkel,
68    cpu: i32,
69    perf_config: u64,
70    counter_idx: u32,
71) -> Result<()> {
72    use perf_event_open_sys as sys;
73
74    let map = &skel.maps.scx_pmu_map;
75
76    let mut attrs = sys::bindings::perf_event_attr::default();
77    attrs.type_ = sys::bindings::PERF_TYPE_RAW;
78    attrs.config = perf_config;
79    attrs.size = std::mem::size_of::<sys::bindings::perf_event_attr>() as u32;
80    attrs.set_disabled(0);
81    attrs.set_inherit(0);
82
83    let fd = unsafe { sys::perf_event_open(&mut attrs, -1, cpu, -1, 0) };
84
85    if fd < 0 {
86        let err = std::io::Error::last_os_error();
87        return Err(anyhow::anyhow!(
88            "Failed to open perf event 0x{:x} on CPU {}: {}",
89            perf_config,
90            cpu,
91            err
92        ));
93    }
94
95    let key = cpu as u32 + counter_idx * PERF_MAP_STRIDE;
96
97    map.update(
98        &key.to_ne_bytes(),
99        &fd.to_ne_bytes(),
100        libbpf_rs::MapFlags::ANY,
101    )
102    .with_context(|| "Failed to update perf_events map")?;
103
104    Ok(())
105}
106
107#[derive(Debug, clap::Parser)]
108#[command(
109    name = "scx_cosmos",
110    version,
111    disable_version_flag = true,
112    about = "Lightweight scheduler optimized for preserving task-to-CPU locality."
113)]
114struct Opts {
115    /// Exit debug dump buffer length. 0 indicates default.
116    #[clap(long, default_value = "0")]
117    exit_dump_len: u32,
118
119    /// Maximum scheduling slice duration in microseconds.
120    #[clap(short = 's', long, default_value = "10")]
121    slice_us: u64,
122
123    /// Maximum runtime (since last sleep) that can be charged to a task in microseconds.
124    #[clap(short = 'l', long, default_value = "20000")]
125    slice_lag_us: u64,
126
127    /// CPU busy threshold.
128    ///
129    /// Specifies the CPU utilization percentage (0-100%) at which the scheduler considers the
130    /// system to be busy.
131    ///
132    /// When the average CPU utilization reaches this threshold, the scheduler switches from using
133    /// multiple per-CPU round-robin dispatch queues (which favor locality and reduced locking
134    /// contention) to a global deadline-based dispatch queue (which improves load balancing).
135    ///
136    /// The global dispatch queue can increase task migrations and improve responsiveness for
137    /// interactive tasks under heavy load. Lower values make the scheduler switch to deadline
138    /// mode sooner, improving overall responsiveness at the cost of reducing single-task
139    /// performance due to the additional migrations. Higher values makes task more "sticky" to
140    /// their CPU, improving workloads that benefit from cache locality.
141    ///
142    /// A higher value is recommended for server-type workloads, while a lower value is recommended
143    /// for interactive-type workloads.
144    #[clap(short = 'c', long, default_value = "75")]
145    cpu_busy_thresh: u64,
146
147    /// Polling time (ms) to refresh the CPU utilization.
148    ///
149    /// This interval determines how often the scheduler refreshes the CPU utilization that is
150    /// compared with the CPU busy threshold (option -c) to decide if the system is busy or not
151    /// and trigger the switch between using multiple per-CPU dispatch queues or a single global
152    /// deadline-based dispatch queue.
153    ///
154    /// Value is clamped to the range [10 .. 1000].
155    ///
156    /// 0 = disabled.
157    #[clap(short = 'p', long, default_value = "250")]
158    polling_ms: u64,
159
160    /// Specifies a list of CPUs to prioritize.
161    ///
162    /// Accepts a comma-separated list of CPUs or ranges (i.e., 0-3,12-15) or the following special
163    /// keywords:
164    ///
165    /// "turbo" = automatically detect and prioritize the CPUs with the highest max frequency,
166    /// "performance" = automatically detect and prioritize the fastest CPUs,
167    /// "powersave" = automatically detect and prioritize the slowest CPUs,
168    /// "all" = all CPUs assigned to the primary domain.
169    ///
170    /// By default "all" CPUs are used.
171    #[clap(short = 'm', long)]
172    primary_domain: Option<String>,
173
174    /// Hardware perf event to monitor (0x0 = disabled).
175    #[clap(short = 'e', long, default_value = "0x0", value_parser = parse_hex)]
176    perf_config: u64,
177
178    /// Threshold (perf events/msec) to classify a task as event heavy; exceeding it triggers migration.
179    #[clap(short = 'E', default_value = "0", long)]
180    perf_threshold: u64,
181
182    /// Sticky perf event (0x0 = disabled). When a task exceeds -Y for this event, keep it on the same CPU.
183    #[clap(short = 'y', long, default_value = "0x0", value_parser = parse_hex)]
184    perf_sticky: u64,
185
186    /// Sticky perf threshold; task is kept on same CPU when its count for -y event exceeds this.
187    #[clap(short = 'Y', default_value = "0", long)]
188    perf_sticky_threshold: u64,
189
190    /// Disable NUMA optimizations.
191    #[clap(short = 'n', long, action = clap::ArgAction::SetTrue)]
192    disable_numa: bool,
193
194    /// Disable CPU frequency control.
195    #[clap(short = 'f', long, action = clap::ArgAction::SetTrue)]
196    disable_cpufreq: bool,
197
198    /// Enable flat idle CPU scanning.
199    ///
200    /// This option can help reducing some overhead when trying to allocate idle CPUs and it can be
201    /// quite effective with simple CPU topologies.
202    #[arg(short = 'i', long, action = clap::ArgAction::SetTrue)]
203    flat_idle_scan: bool,
204
205    /// Enable preferred idle CPU scanning.
206    ///
207    /// With this option enabled, the scheduler will prioritize assigning tasks to higher-ranked
208    /// cores before considering lower-ranked ones.
209    #[clap(short = 'P', long, action = clap::ArgAction::SetTrue)]
210    preferred_idle_scan: bool,
211
212    /// Disable SMT.
213    ///
214    /// This option can only be used together with --flat-idle-scan or --preferred-idle-scan,
215    /// otherwise it is ignored.
216    #[clap(long, action = clap::ArgAction::SetTrue)]
217    disable_smt: bool,
218
219    /// SMT contention avoidance.
220    ///
221    /// When enabled, the scheduler aggressively avoids placing tasks on sibling SMT threads.
222    /// This may increase task migrations and lower overall throughput, but can lead to more
223    /// consistent performance by reducing contention on shared SMT cores.
224    #[clap(short = 'S', long, action = clap::ArgAction::SetTrue)]
225    avoid_smt: bool,
226
227    /// Disable direct dispatch during synchronous wakeups.
228    ///
229    /// Enabling this option can lead to a more uniform load distribution across available cores,
230    /// potentially improving performance in certain scenarios. However, it may come at the cost of
231    /// reduced efficiency for pipe-intensive workloads that benefit from tighter producer-consumer
232    /// coupling.
233    #[clap(short = 'w', long, action = clap::ArgAction::SetTrue)]
234    no_wake_sync: bool,
235
236    /// Disable deferred wakeups.
237    ///
238    /// Enabling this option can reduce throughput and performance for certain workloads, but it
239    /// can also reduce power consumption (useful on battery-powered systems).
240    #[clap(short = 'd', long, action = clap::ArgAction::SetTrue)]
241    no_deferred_wakeup: bool,
242
243    /// Disable tick-based preemption enforcement.
244    ///
245    /// By default, the scheduler preempts tasks that exceed their time slice when the system is
246    /// busy or SMT contention is detected. Use this flag to disable this behavior.
247    #[clap(long, action = clap::ArgAction::SetTrue)]
248    no_tick_preempt: bool,
249
250    /// Enable address space affinity.
251    ///
252    /// This option allows to keep tasks that share the same address space (e.g., threads of the
253    /// same process) on the same CPU across wakeups.
254    ///
255    /// This can improve locality and performance in certain cache-sensitive workloads.
256    #[clap(short = 'a', long, action = clap::ArgAction::SetTrue)]
257    mm_affinity: bool,
258
259    /// Enable stats monitoring with the specified interval.
260    #[clap(long)]
261    stats: Option<f64>,
262
263    /// Run in stats monitoring mode with the specified interval. Scheduler
264    /// is not launched.
265    #[clap(long)]
266    monitor: Option<f64>,
267
268    /// Enable verbose output, including libbpf details.
269    #[clap(short = 'v', long, action = clap::ArgAction::SetTrue)]
270    verbose: bool,
271
272    /// Print scheduler version and exit.
273    #[clap(short = 'V', long, action = clap::ArgAction::SetTrue)]
274    version: bool,
275
276    /// Show descriptions for statistics.
277    #[clap(long)]
278    help_stats: bool,
279
280    #[clap(flatten, next_help_heading = "Libbpf Options")]
281    pub libbpf: LibbpfOpts,
282}
283
284#[derive(PartialEq)]
285enum Powermode {
286    Turbo,
287    Performance,
288    Powersave,
289    Any,
290}
291
292/*
293 * TODO: this code is shared between scx_bpfland, scx_flash and scx_cosmos; consder to move it to
294 * scx_utils.
295 */
296fn get_primary_cpus(mode: Powermode) -> std::io::Result<Vec<usize>> {
297    let cpus: Vec<usize> = Topology::new()
298        .unwrap()
299        .all_cores
300        .values()
301        .flat_map(|core| &core.cpus)
302        .filter_map(|(cpu_id, cpu)| match (&mode, &cpu.core_type) {
303            // Turbo mode: prioritize CPUs with the highest max frequency
304            (Powermode::Turbo, CoreType::Big { turbo: true }) |
305            // Performance mode: add all the Big CPUs (either Turbo or non-Turbo)
306            (Powermode::Performance, CoreType::Big { .. }) |
307            // Powersave mode: add all the Little CPUs
308            (Powermode::Powersave, CoreType::Little) => Some(*cpu_id),
309            (Powermode::Any, ..) => Some(*cpu_id),
310            _ => None,
311        })
312        .collect();
313
314    Ok(cpus)
315}
316
317pub fn parse_cpu_list(optarg: &str) -> Result<Vec<usize>, String> {
318    let mut cpus = Vec::new();
319    let mut seen = HashSet::new();
320
321    // Handle special keywords
322    if let Some(mode) = match optarg {
323        "powersave" => Some(Powermode::Powersave),
324        "performance" => Some(Powermode::Performance),
325        "turbo" => Some(Powermode::Turbo),
326        "all" => Some(Powermode::Any),
327        _ => None,
328    } {
329        return get_primary_cpus(mode).map_err(|e| e.to_string());
330    }
331
332    // Validate input characters
333    if optarg
334        .chars()
335        .any(|c| !c.is_ascii_digit() && c != '-' && c != ',' && !c.is_whitespace())
336    {
337        return Err("Invalid character in CPU list".to_string());
338    }
339
340    // Replace all whitespace with tab (or just trim later)
341    let cleaned = optarg.replace(' ', "\t");
342
343    for token in cleaned.split(',') {
344        let token = token.trim_matches(|c: char| c.is_whitespace());
345
346        if token.is_empty() {
347            continue;
348        }
349
350        if let Some((start_str, end_str)) = token.split_once('-') {
351            let start = start_str
352                .trim()
353                .parse::<usize>()
354                .map_err(|_| "Invalid range start")?;
355            let end = end_str
356                .trim()
357                .parse::<usize>()
358                .map_err(|_| "Invalid range end")?;
359
360            if start > end {
361                return Err(format!("Invalid CPU range: {}-{}", start, end));
362            }
363
364            for i in start..=end {
365                if cpus.len() >= *NR_CPU_IDS {
366                    return Err(format!("Too many CPUs specified (max {})", *NR_CPU_IDS));
367                }
368                if seen.insert(i) {
369                    cpus.push(i);
370                }
371            }
372        } else {
373            let cpu = token
374                .parse::<usize>()
375                .map_err(|_| format!("Invalid CPU: {}", token))?;
376            if cpus.len() >= *NR_CPU_IDS {
377                return Err(format!("Too many CPUs specified (max {})", *NR_CPU_IDS));
378            }
379            if seen.insert(cpu) {
380                cpus.push(cpu);
381            }
382        }
383    }
384
385    Ok(cpus)
386}
387
388/// Initial value for the dynamic threshold (in BPF units).
389const DYNAMIC_THRESHOLD_INIT_VALUE: u64 = 1000;
390
391/// Target event rate (per second) above which we consider migrations/sticky dispatches too high.
392const DYNAMIC_THRESHOLD_RATE_HIGH: f64 = 4000.0;
393
394/// Target event rate (per second) below which we consider migrations/sticky dispatches too low.
395const DYNAMIC_THRESHOLD_RATE_LOW: f64 = 2000.0;
396
397/// Minimum scale factor when just outside the target band (slow convergence near optimal).
398const DYNAMIC_THRESHOLD_SCALE_MIN: f64 = 0.0001;
399
400/// Maximum scale factor when far from target (fast convergence when initial threshold is way off).
401const DYNAMIC_THRESHOLD_SCALE_MAX: f64 = 1000.0;
402
403/// Slope for "too high" case: scale grows with (rate/HIGH - 1) so we step much harder when rate is
404/// many times over target.
405const DYNAMIC_THRESHOLD_SLOPE_HIGH: f64 = 0.35;
406
407/// Slope for "too low" case: scale grows with deficit so we step harder when rate is near zero.
408const DYNAMIC_THRESHOLD_SLOPE_LOW: f64 = 0.58;
409
410fn dynamic_threshold_scale(rate_per_sec: f64, too_high: bool) -> f64 {
411    if too_high {
412        let excess = ((rate_per_sec / DYNAMIC_THRESHOLD_RATE_HIGH) - 1.0).max(0.0);
413        let scale = DYNAMIC_THRESHOLD_SCALE_MIN + DYNAMIC_THRESHOLD_SLOPE_HIGH * excess.min(4.0);
414        scale.min(DYNAMIC_THRESHOLD_SCALE_MAX)
415    } else {
416        if rate_per_sec <= 0.0 {
417            return DYNAMIC_THRESHOLD_SCALE_MAX;
418        }
419        let deficit = (DYNAMIC_THRESHOLD_RATE_LOW - rate_per_sec) / DYNAMIC_THRESHOLD_RATE_LOW;
420        let t = deficit.min(1.0).max(0.0);
421        DYNAMIC_THRESHOLD_SCALE_MIN + DYNAMIC_THRESHOLD_SLOPE_LOW * t
422    }
423}
424
425fn adjust_dynamic_threshold(current: u64, rate_per_sec: f64, base_threshold: u64) -> u64 {
426    let (scale_pct, raise_threshold) = if rate_per_sec > DYNAMIC_THRESHOLD_RATE_HIGH {
427        (dynamic_threshold_scale(rate_per_sec, true), true)
428    } else if rate_per_sec < DYNAMIC_THRESHOLD_RATE_LOW && rate_per_sec >= 0.0 {
429        (dynamic_threshold_scale(rate_per_sec, false), false)
430    } else {
431        return current;
432    };
433
434    let factor = if raise_threshold {
435        1.0 + scale_pct
436    } else {
437        1.0 - scale_pct
438    };
439    let new = ((current as f64) * factor).round() as u64;
440
441    let min_val = if base_threshold == 0 {
442        1
443    } else {
444        base_threshold / 100
445    };
446    let max_val = if base_threshold == 0 {
447        u64::MAX
448    } else {
449        base_threshold.saturating_mul(10000)
450    };
451
452    new.clamp(min_val.max(1), max_val)
453}
454
455#[derive(Debug, Clone, Copy)]
456struct CpuTimes {
457    user: u64,
458    nice: u64,
459    total: u64,
460}
461
462struct Scheduler<'a> {
463    skel: BpfSkel<'a>,
464    opts: &'a Opts,
465    struct_ops: Option<libbpf_rs::Link>,
466    stats_server: StatsServer<(), Metrics>,
467}
468
469impl<'a> Scheduler<'a> {
470    fn init(opts: &'a Opts, open_object: &'a mut MaybeUninit<OpenObject>) -> Result<Self> {
471        try_set_rlimit_infinity();
472
473        // Initialize CPU topology.
474        let topo = Topology::new().unwrap();
475
476        // Check host topology to determine if we need to enable SMT capabilities.
477        let smt_enabled = !opts.disable_smt && topo.smt_enabled;
478
479        // Determine the amount of non-empty NUMA nodes in the system.
480        let nr_nodes = topo
481            .nodes
482            .values()
483            .filter(|node| !node.all_cpus.is_empty())
484            .count();
485        info!("NUMA nodes: {}", nr_nodes);
486
487        // Automatically disable NUMA optimizations when running on non-NUMA systems.
488        let numa_enabled = !opts.disable_numa && nr_nodes > 1;
489        if !numa_enabled {
490            info!("Disabling NUMA optimizations");
491        }
492
493        info!(
494            "{} {} {}",
495            SCHEDULER_NAME,
496            build_id::full_version(env!("CARGO_PKG_VERSION")),
497            if smt_enabled { "SMT on" } else { "SMT off" }
498        );
499
500        // Print command line.
501        info!(
502            "scheduler options: {}",
503            std::env::args().collect::<Vec<_>>().join(" ")
504        );
505
506        // Initialize BPF connector.
507        let mut skel_builder = BpfSkelBuilder::default();
508        skel_builder.obj_builder.debug(opts.verbose);
509        let open_opts = opts.libbpf.clone().into_bpf_open_opts();
510        let mut skel = scx_ops_open!(skel_builder, open_object, cosmos_ops, open_opts)?;
511
512        skel.struct_ops.cosmos_ops_mut().exit_dump_len = opts.exit_dump_len;
513
514        // Override default BPF scheduling parameters.
515        let rodata = skel.maps.rodata_data.as_mut().unwrap();
516        rodata.slice_ns = opts.slice_us * 1000;
517        rodata.slice_lag = opts.slice_lag_us * 1000;
518        rodata.cpufreq_enabled = !opts.disable_cpufreq;
519        rodata.deferred_wakeups = !opts.no_deferred_wakeup;
520        rodata.flat_idle_scan = opts.flat_idle_scan;
521        rodata.smt_enabled = smt_enabled;
522        rodata.numa_enabled = numa_enabled;
523        rodata.nr_node_ids = topo.nodes.len() as u32;
524        rodata.no_wake_sync = opts.no_wake_sync;
525        rodata.avoid_smt = opts.avoid_smt;
526        rodata.tick_preempt = !opts.no_tick_preempt;
527        rodata.mm_affinity = opts.mm_affinity;
528
529        // Enable perf event scheduling settings.
530        rodata.perf_config = opts.perf_config;
531        rodata.perf_sticky = opts.perf_sticky;
532
533        // Normalize CPU busy threshold in the range [0 .. 1024].
534        rodata.busy_threshold = opts.cpu_busy_thresh * 1024 / 100;
535
536        // Generate the list of available CPUs sorted by capacity in descending order.
537        let mut cpus: Vec<_> = topo.all_cpus.values().collect();
538        cpus.sort_by_key(|cpu| std::cmp::Reverse(cpu.cpu_capacity));
539        for (i, cpu) in cpus.iter().enumerate() {
540            rodata.cpu_capacity[cpu.id] = cpu.cpu_capacity as c_ulong;
541            rodata.preferred_cpus[i] = cpu.id as u64;
542        }
543        if opts.preferred_idle_scan {
544            info!(
545                "Preferred CPUs: {:?}",
546                &rodata.preferred_cpus[0..cpus.len()]
547            );
548        }
549        rodata.preferred_idle_scan = opts.preferred_idle_scan;
550
551        // Define the primary scheduling domain.
552        let primary_cpus = if let Some(ref domain) = opts.primary_domain {
553            match parse_cpu_list(domain) {
554                Ok(cpus) => cpus,
555                Err(e) => bail!("Error parsing primary domain: {}", e),
556            }
557        } else {
558            (0..*NR_CPU_IDS).collect()
559        };
560        if primary_cpus.len() < *NR_CPU_IDS {
561            info!("Primary CPUs: {:?}", primary_cpus);
562            rodata.primary_all = false;
563        } else {
564            rodata.primary_all = true;
565        }
566
567        // Set scheduler flags.
568        skel.struct_ops.cosmos_ops_mut().flags = *compat::SCX_OPS_ENQ_EXITING
569            | *compat::SCX_OPS_ENQ_LAST
570            | *compat::SCX_OPS_ENQ_MIGRATION_DISABLED
571            | *compat::SCX_OPS_ALLOW_QUEUED_WAKEUP;
572
573        info!(
574            "scheduler flags: {:#x}",
575            skel.struct_ops.cosmos_ops_mut().flags
576        );
577
578        // Load the BPF program for validation.
579        let mut skel = scx_ops_load!(skel, cosmos_ops, uei)?;
580
581        // Initial perf thresholds in bss. When threshold is 0 we use dynamic logic; when user
582        // specifies a value > 0 we use it as a static threshold.
583        let bss = skel.maps.bss_data.as_mut().unwrap();
584        if opts.perf_config > 0 {
585            bss.perf_threshold = if opts.perf_threshold == 0 {
586                DYNAMIC_THRESHOLD_INIT_VALUE
587            } else {
588                opts.perf_threshold
589            };
590        }
591        if opts.perf_sticky > 0 {
592            bss.perf_sticky_threshold = if opts.perf_sticky_threshold == 0 {
593                DYNAMIC_THRESHOLD_INIT_VALUE
594            } else {
595                opts.perf_sticky_threshold
596            };
597        }
598
599        // Configure CPU->node mapping (must be done after skeleton is loaded).
600        for node in topo.nodes.values() {
601            for cpu in node.all_cpus.values() {
602                if opts.verbose {
603                    info!("CPU{} -> node{}", cpu.id, node.id);
604                }
605                skel.maps.cpu_node_map.update(
606                    &(cpu.id as u32).to_ne_bytes(),
607                    &(node.id as u32).to_ne_bytes(),
608                    MapFlags::ANY,
609                )?;
610            }
611        }
612
613        // Setup performance events for all CPUs.
614        // Counter indices must match PMU library install order: migration first (0), then sticky (1).
615        // When only sticky is used, it gets index 0; when both are used, sticky gets index 1.
616        let nr_cpus = *NR_CPU_IDS;
617        info!("Setting up performance counters for {} CPUs...", nr_cpus);
618        let mut perf_available = true;
619        let sticky_counter_idx = if opts.perf_config > 0 { 1 } else { 0 };
620        for cpu in 0..nr_cpus {
621            if opts.perf_config > 0 {
622                if let Err(e) = setup_perf_events(&mut skel, cpu as i32, opts.perf_config, 0) {
623                    if cpu == 0 {
624                        let err_str = e.to_string();
625                        if err_str.contains("errno 2") || err_str.contains("os error 2") {
626                            warn!("Performance counters not available on this CPU architecture");
627                            warn!("PMU event 0x{:x} not supported - scheduler will run without perf monitoring", opts.perf_config);
628                        } else {
629                            warn!("Failed to setup perf events: {}", e);
630                        }
631                        perf_available = false;
632                        break;
633                    }
634                }
635            }
636            if opts.perf_sticky > 0 {
637                if let Err(e) =
638                    setup_perf_events(&mut skel, cpu as i32, opts.perf_sticky, sticky_counter_idx)
639                {
640                    if cpu == 0 {
641                        let err_str = e.to_string();
642                        if err_str.contains("errno 2") || err_str.contains("os error 2") {
643                            warn!("Performance counters not available on this CPU architecture");
644                            warn!("PMU event 0x{:x} not supported - scheduler will run without perf monitoring", opts.perf_sticky);
645                        } else {
646                            warn!("Failed to setup perf events: {}", e);
647                        }
648                        perf_available = false;
649                        break;
650                    }
651                }
652            }
653        }
654        if perf_available {
655            info!("Performance counters configured successfully for all CPUs");
656        }
657
658        // Enable primary scheduling domain, if defined.
659        if primary_cpus.len() < *NR_CPU_IDS {
660            for cpu in primary_cpus {
661                if let Err(err) = Self::enable_primary_cpu(&mut skel, cpu as i32) {
662                    bail!("failed to add CPU {} to primary domain: error {}", cpu, err);
663                }
664            }
665        }
666
667        // Initialize SMT domains.
668        if smt_enabled {
669            Self::init_smt_domains(&mut skel, &topo)?;
670        }
671
672        // Attach the scheduler.
673        let struct_ops = Some(scx_ops_attach!(skel, cosmos_ops)?);
674        let stats_server = StatsServer::new(stats::server_data()).launch()?;
675
676        Ok(Self {
677            skel,
678            opts,
679            struct_ops,
680            stats_server,
681        })
682    }
683
684    fn enable_primary_cpu(skel: &mut BpfSkel<'_>, cpu: i32) -> Result<(), u32> {
685        let prog = &mut skel.progs.enable_primary_cpu;
686        let mut args = cpu_arg {
687            cpu_id: cpu as c_int,
688        };
689        let input = ProgramInput {
690            context_in: Some(unsafe {
691                std::slice::from_raw_parts_mut(
692                    &mut args as *mut _ as *mut u8,
693                    std::mem::size_of_val(&args),
694                )
695            }),
696            ..Default::default()
697        };
698        let out = prog.test_run(input).unwrap();
699        if out.return_value != 0 {
700            return Err(out.return_value);
701        }
702
703        Ok(())
704    }
705
706    fn enable_sibling_cpu(
707        skel: &mut BpfSkel<'_>,
708        cpu: usize,
709        sibling_cpu: usize,
710    ) -> Result<(), u32> {
711        let prog = &mut skel.progs.enable_sibling_cpu;
712        let mut args = domain_arg {
713            cpu_id: cpu as c_int,
714            sibling_cpu_id: sibling_cpu as c_int,
715        };
716        let input = ProgramInput {
717            context_in: Some(unsafe {
718                std::slice::from_raw_parts_mut(
719                    &mut args as *mut _ as *mut u8,
720                    std::mem::size_of_val(&args),
721                )
722            }),
723            ..Default::default()
724        };
725        let out = prog.test_run(input).unwrap();
726        if out.return_value != 0 {
727            return Err(out.return_value);
728        }
729
730        Ok(())
731    }
732
733    fn init_smt_domains(skel: &mut BpfSkel<'_>, topo: &Topology) -> Result<(), std::io::Error> {
734        let smt_siblings = topo.sibling_cpus();
735
736        info!("SMT sibling CPUs: {:?}", smt_siblings);
737        for (cpu, sibling_cpu) in smt_siblings.iter().enumerate() {
738            Self::enable_sibling_cpu(skel, cpu, *sibling_cpu as usize).unwrap();
739        }
740
741        Ok(())
742    }
743
744    fn get_metrics(&self) -> Metrics {
745        let bss_data = self.skel.maps.bss_data.as_ref().unwrap();
746        Metrics {
747            cpu_thresh: self.skel.maps.rodata_data.as_ref().unwrap().busy_threshold,
748            cpu_util: self.skel.maps.bss_data.as_ref().unwrap().cpu_util,
749            nr_event_dispatches: bss_data.nr_event_dispatches,
750            nr_ev_sticky_dispatches: bss_data.nr_ev_sticky_dispatches,
751        }
752    }
753
754    pub fn exited(&mut self) -> bool {
755        uei_exited!(&self.skel, uei)
756    }
757
758    fn compute_user_cpu_pct(prev: &CpuTimes, curr: &CpuTimes) -> Option<u64> {
759        // Evaluate total user CPU time as user + nice.
760        let user_diff = (curr.user + curr.nice).saturating_sub(prev.user + prev.nice);
761        let total_diff = curr.total.saturating_sub(prev.total);
762
763        if total_diff > 0 {
764            let user_ratio = user_diff as f64 / total_diff as f64;
765            Some((user_ratio * 1024.0).round() as u64)
766        } else {
767            None
768        }
769    }
770
771    fn read_cpu_times() -> Option<CpuTimes> {
772        let file = File::open("/proc/stat").ok()?;
773        let reader = BufReader::new(file);
774
775        for line in reader.lines() {
776            let line = line.ok()?;
777            if line.starts_with("cpu ") {
778                let fields: Vec<&str> = line.split_whitespace().collect();
779                if fields.len() < 5 {
780                    return None;
781                }
782
783                let user: u64 = fields[1].parse().ok()?;
784                let nice: u64 = fields[2].parse().ok()?;
785
786                // Sum the first 8 fields as total time, including idle, system, etc.
787                let total: u64 = fields
788                    .iter()
789                    .skip(1)
790                    .take(8)
791                    .filter_map(|v| v.parse::<u64>().ok())
792                    .sum();
793
794                return Some(CpuTimes { user, nice, total });
795            }
796        }
797
798        None
799    }
800
801    fn run(&mut self, shutdown: Arc<AtomicBool>) -> Result<UserExitInfo> {
802        let (res_ch, req_ch) = self.stats_server.channels();
803
804        // Periodically evaluate user CPU utilization from user-space and update a global variable
805        // in BPF.
806        //
807        // The BPF scheduler will use this value to determine when the system is idle (using local
808        // DSQs and simple round-robin scheduler) or busy (switching to a deadline-based policy).
809        let polling_time = Duration::from_millis(self.opts.polling_ms).min(Duration::from_secs(1));
810        let mut prev_cputime = Self::read_cpu_times().expect("Failed to read initial CPU stats");
811        let mut last_update = Instant::now();
812
813        // Dynamic perf thresholds: scale based on migration and sticky dispatch rates.
814        let mut prev_nr_event_dispatches: u64 = 0;
815        let mut prev_nr_ev_sticky_dispatches: u64 = 0;
816
817        while !shutdown.load(Ordering::Relaxed) && !self.exited() {
818            // Update CPU utilization.
819            if !polling_time.is_zero() && last_update.elapsed() >= polling_time {
820                if let Some(curr_cputime) = Self::read_cpu_times() {
821                    Self::compute_user_cpu_pct(&prev_cputime, &curr_cputime)
822                        .map(|util| self.skel.maps.bss_data.as_mut().unwrap().cpu_util = util);
823                    prev_cputime = curr_cputime;
824                }
825
826                // Update dynamic perf thresholds based on event rates.
827                let nr_event = self
828                    .skel
829                    .maps
830                    .bss_data
831                    .as_ref()
832                    .unwrap()
833                    .nr_event_dispatches;
834                let nr_sticky = self
835                    .skel
836                    .maps
837                    .bss_data
838                    .as_ref()
839                    .unwrap()
840                    .nr_ev_sticky_dispatches;
841                let elapsed_secs = last_update.elapsed().as_secs_f64();
842                if elapsed_secs > 0.0 {
843                    let migration_rate =
844                        (nr_event.saturating_sub(prev_nr_event_dispatches) as f64) / elapsed_secs;
845                    let sticky_rate = (nr_sticky.saturating_sub(prev_nr_ev_sticky_dispatches)
846                        as f64)
847                        / elapsed_secs;
848
849                    let bss = self.skel.maps.bss_data.as_mut().unwrap();
850                    // Dynamic threshold only when user did not specify a value (threshold == 0).
851                    if self.opts.perf_config > 0 && self.opts.perf_threshold == 0 {
852                        let base = 0u64; // dynamic mode: use 0 so clamp is [1, u64::MAX]
853                        let current = bss.perf_threshold;
854                        let new_thresh = adjust_dynamic_threshold(current, migration_rate, base);
855                        if new_thresh != current {
856                            bss.perf_threshold = new_thresh;
857                            if self.opts.verbose {
858                                info!(
859                                    "perf_threshold: {} (migration rate {:.1}/s)",
860                                    new_thresh, migration_rate
861                                );
862                            }
863                        }
864                    }
865                    if self.opts.perf_sticky > 0 && self.opts.perf_sticky_threshold == 0 {
866                        let base = 0u64;
867                        let current = bss.perf_sticky_threshold;
868                        let new_thresh = adjust_dynamic_threshold(current, sticky_rate, base);
869                        if new_thresh != current {
870                            bss.perf_sticky_threshold = new_thresh;
871                            if self.opts.verbose {
872                                info!(
873                                    "perf_sticky_threshold: {} (sticky rate {:.1}/s)",
874                                    new_thresh, sticky_rate
875                                );
876                            }
877                        }
878                    }
879
880                    prev_nr_event_dispatches = nr_event;
881                    prev_nr_ev_sticky_dispatches = nr_sticky;
882                }
883
884                last_update = Instant::now();
885            }
886
887            // Update statistics and check for exit condition.
888            let timeout = if polling_time.is_zero() {
889                Duration::from_secs(1)
890            } else {
891                polling_time
892            };
893            match req_ch.recv_timeout(timeout) {
894                Ok(()) => res_ch.send(self.get_metrics())?,
895                Err(RecvTimeoutError::Timeout) => {}
896                Err(e) => Err(e)?,
897            }
898        }
899
900        let _ = self.struct_ops.take();
901        uei_report!(&self.skel, uei)
902    }
903}
904
905impl Drop for Scheduler<'_> {
906    fn drop(&mut self) {
907        info!("Unregister {SCHEDULER_NAME} scheduler");
908    }
909}
910
911fn main() -> Result<()> {
912    let opts = Opts::parse();
913
914    if opts.version {
915        println!(
916            "{} {}",
917            SCHEDULER_NAME,
918            build_id::full_version(env!("CARGO_PKG_VERSION"))
919        );
920        return Ok(());
921    }
922
923    if opts.help_stats {
924        stats::server_data().describe_meta(&mut std::io::stdout(), None)?;
925        return Ok(());
926    }
927
928    let loglevel = simplelog::LevelFilter::Info;
929
930    let mut lcfg = simplelog::ConfigBuilder::new();
931    lcfg.set_time_offset_to_local()
932        .expect("Failed to set local time offset")
933        .set_time_level(simplelog::LevelFilter::Error)
934        .set_location_level(simplelog::LevelFilter::Off)
935        .set_target_level(simplelog::LevelFilter::Off)
936        .set_thread_level(simplelog::LevelFilter::Off);
937    simplelog::TermLogger::init(
938        loglevel,
939        lcfg.build(),
940        simplelog::TerminalMode::Stderr,
941        simplelog::ColorChoice::Auto,
942    )?;
943
944    let shutdown = Arc::new(AtomicBool::new(false));
945    let shutdown_clone = shutdown.clone();
946    ctrlc::set_handler(move || {
947        shutdown_clone.store(true, Ordering::Relaxed);
948    })
949    .context("Error setting Ctrl-C handler")?;
950
951    if let Some(intv) = opts.monitor.or(opts.stats) {
952        let shutdown_copy = shutdown.clone();
953        let jh = std::thread::spawn(move || {
954            match stats::monitor(Duration::from_secs_f64(intv), shutdown_copy) {
955                Ok(_) => {
956                    debug!("stats monitor thread finished successfully")
957                }
958                Err(error_object) => {
959                    warn!(
960                        "stats monitor thread finished because of an error {}",
961                        error_object
962                    )
963                }
964            }
965        });
966        if opts.monitor.is_some() {
967            let _ = jh.join();
968            return Ok(());
969        }
970    }
971
972    let mut open_object = MaybeUninit::uninit();
973    loop {
974        let mut sched = Scheduler::init(&opts, &mut open_object)?;
975        if !sched.run(shutdown.clone())?.should_restart() {
976            break;
977        }
978    }
979
980    Ok(())
981}