Skip to main content

scx_cosmos/
main.rs

1// SPDX-License-Identifier: GPL-2.0
2//
3// Copyright (c) 2025 Andrea Righi <arighi@nvidia.com>
4
5// This software may be used and distributed according to the terms of the
6// GNU General Public License version 2.
7
8mod bpf_skel;
9pub use bpf_skel::*;
10pub mod bpf_intf;
11pub use bpf_intf::*;
12
13mod stats;
14use std::collections::{HashMap, HashSet};
15use std::ffi::{c_int, c_ulong};
16use std::fs;
17use std::fs::File;
18use std::io::{BufRead, BufReader};
19use std::mem::MaybeUninit;
20use std::path::Path;
21use std::sync::atomic::AtomicBool;
22use std::sync::atomic::Ordering;
23use std::sync::Arc;
24use std::time::{Duration, Instant};
25
26use anyhow::bail;
27use anyhow::Context;
28use anyhow::Result;
29use clap::Parser;
30use crossbeam::channel::RecvTimeoutError;
31use libbpf_rs::MapCore;
32use libbpf_rs::MapFlags;
33use libbpf_rs::OpenObject;
34use libbpf_rs::ProgramInput;
35use log::{debug, info, warn};
36use nvml_wrapper::bitmasks::InitFlags;
37use nvml_wrapper::Nvml;
38use scx_stats::prelude::*;
39use scx_utils::build_id;
40use scx_utils::compat;
41use scx_utils::libbpf_clap_opts::LibbpfOpts;
42use scx_utils::scx_ops_attach;
43use scx_utils::scx_ops_load;
44use scx_utils::scx_ops_open;
45use scx_utils::try_set_rlimit_infinity;
46use scx_utils::uei_exited;
47use scx_utils::uei_report;
48use scx_utils::CoreType;
49use scx_utils::GpuIndex;
50use scx_utils::Topology;
51use scx_utils::UserExitInfo;
52use scx_utils::NR_CPU_IDS;
53use stats::Metrics;
54
55const SCHEDULER_NAME: &str = "scx_cosmos";
56
57/// Perf event specification: either hex (0xN) or symbolic name (e.g. cache-misses).
58/// event_id is written to BPF rodata; type_ and config are used for perf_event_open.
59#[derive(Clone, Debug)]
60struct PerfEventSpec {
61    /// Opaque id for BPF (must match between install and read).
62    event_id: u64,
63    /// perf_event_attr.type (PERF_TYPE_RAW, PERF_TYPE_HARDWARE, etc.).
64    type_: u32,
65    /// perf_event_attr.config.
66    config: u64,
67    /// Original string for error messages.
68    display_name: String,
69}
70
71fn parse_hardware_event(s: &str) -> Option<u64> {
72    match s {
73        "cpu-cycles" | "cycles" => Some(0),
74        "instructions" => Some(1),
75        "cache-references" => Some(2),
76        "cache-misses" => Some(3),
77        "branch-instructions" | "branches" => Some(4),
78        "branch-misses" => Some(5),
79        "bus-cycles" => Some(6),
80        "stalled-cycles-frontend" | "idle-cycles-frontend" => Some(7),
81        "stalled-cycles-backend" | "idle-cycles-backend" => Some(8),
82        "ref-cycles" => Some(9),
83        _ => None,
84    }
85}
86
87fn parse_software_event(s: &str) -> Option<u64> {
88    match s {
89        "cpu-clock" => Some(0),
90        "task-clock" => Some(1),
91        "page-faults" | "faults" => Some(2),
92        "context-switches" | "cs" => Some(3),
93        "cpu-migrations" | "migrations" => Some(4),
94        "minor-faults" => Some(5),
95        "major-faults" => Some(6),
96        "alignment-faults" => Some(7),
97        "emulation-faults" => Some(8),
98        "dummy" => Some(9),
99        "bpf-output" => Some(10),
100        _ => None,
101    }
102}
103
104fn parse_hw_cache_event(s: &str) -> Option<u64> {
105    let (cache_id, prefix_len) = if s.starts_with("L1-dcache-") {
106        (0, 10)
107    } else if s.starts_with("L1-icache-") {
108        (1, 10)
109    } else if s.starts_with("LLC-") {
110        (2, 4)
111    } else if s.starts_with("dTLB-") {
112        (3, 5)
113    } else if s.starts_with("iTLB-") {
114        (4, 5)
115    } else if s.starts_with("branch-") {
116        (5, 7)
117    } else if s.starts_with("node-") {
118        (6, 5)
119    } else {
120        return None;
121    };
122
123    let suffix = &s[prefix_len..];
124    let (op_id, result_id) = match suffix {
125        "loads" => (0, 0),
126        "load-misses" => (0, 1),
127        "stores" => (1, 0),
128        "store-misses" => (1, 1),
129        "prefetches" => (2, 0),
130        "prefetch-misses" => (2, 1),
131        _ => return None,
132    };
133
134    Some((result_id << 16) | (op_id << 8) | cache_id)
135}
136
137/// Parse -e / -y value: hex (0xN) or symbolic name (e.g. cache-misses, LLC-load-misses).
138fn parse_perf_event(s: &str) -> Result<PerfEventSpec, String> {
139    use perf_event_open_sys as sys;
140
141    let s = s.trim();
142    if s.is_empty() || s == "0" || s.eq_ignore_ascii_case("0x0") {
143        return Ok(PerfEventSpec {
144            event_id: 0,
145            type_: sys::bindings::PERF_TYPE_RAW,
146            config: 0,
147            display_name: s.to_string(),
148        });
149    }
150
151    if let Some(hex_str) = s.strip_prefix("0x").or_else(|| s.strip_prefix("0X")) {
152        if let Ok(config) = u64::from_str_radix(hex_str, 16) {
153            return Ok(PerfEventSpec {
154                event_id: config,
155                type_: sys::bindings::PERF_TYPE_RAW,
156                config,
157                display_name: s.to_string(),
158            });
159        }
160    }
161
162    if let Some(config) = parse_hardware_event(s) {
163        let event_id = (sys::bindings::PERF_TYPE_HARDWARE as u64) << 32 | config;
164        return Ok(PerfEventSpec {
165            event_id,
166            type_: sys::bindings::PERF_TYPE_HARDWARE,
167            config,
168            display_name: s.to_string(),
169        });
170    }
171
172    if let Some(config) = parse_software_event(s) {
173        let event_id = (sys::bindings::PERF_TYPE_SOFTWARE as u64) << 32 | config;
174        return Ok(PerfEventSpec {
175            event_id,
176            type_: sys::bindings::PERF_TYPE_SOFTWARE,
177            config,
178            display_name: s.to_string(),
179        });
180    }
181
182    if let Some(config) = parse_hw_cache_event(s) {
183        let event_id = (sys::bindings::PERF_TYPE_HW_CACHE as u64) << 32 | config;
184        return Ok(PerfEventSpec {
185            event_id,
186            type_: sys::bindings::PERF_TYPE_HW_CACHE,
187            config,
188            display_name: s.to_string(),
189        });
190    }
191
192    Err(format!(
193        "Invalid perf event '{}': use hex (0xN) or symbolic name (e.g. cache-misses, LLC-load-misses, page-faults)",
194        s
195    ))
196}
197
198/// Must match lib/pmu.bpf.c SCX_PMU_STRIDE for perf_events map key layout.
199const PERF_MAP_STRIDE: u32 = 4096;
200
201/// Setup performance counter events for a specific CPU and counter index.
202/// counter_idx 0 = migration event (-e), 1 = sticky event (-y).
203fn setup_perf_events(
204    skel: &mut BpfSkel,
205    cpu: i32,
206    spec: &PerfEventSpec,
207    counter_idx: u32,
208) -> Result<()> {
209    use perf_event_open_sys as sys;
210
211    if spec.event_id == 0 {
212        return Ok(());
213    }
214
215    let map = &skel.maps.scx_pmu_map;
216
217    let mut attrs = sys::bindings::perf_event_attr::default();
218    attrs.type_ = spec.type_;
219    attrs.config = spec.config;
220    attrs.size = std::mem::size_of::<sys::bindings::perf_event_attr>() as u32;
221    attrs.set_disabled(0);
222    attrs.set_inherit(0);
223
224    let fd = unsafe { sys::perf_event_open(&mut attrs, -1, cpu, -1, 0) };
225
226    if fd < 0 {
227        let err = std::io::Error::last_os_error();
228        return Err(anyhow::anyhow!(
229            "Failed to open perf event '{}' on CPU {}: {}",
230            spec.display_name,
231            cpu,
232            err
233        ));
234    }
235
236    let key = cpu as u32 + counter_idx * PERF_MAP_STRIDE;
237
238    map.update(
239        &key.to_ne_bytes(),
240        &fd.to_ne_bytes(),
241        libbpf_rs::MapFlags::ANY,
242    )
243    .with_context(|| "Failed to update perf_events map")?;
244
245    Ok(())
246}
247
248#[derive(Debug, clap::Parser)]
249#[command(
250    name = "scx_cosmos",
251    version,
252    disable_version_flag = true,
253    about = "Lightweight scheduler optimized for preserving task-to-CPU locality."
254)]
255struct Opts {
256    /// Exit debug dump buffer length. 0 indicates default.
257    #[clap(long, default_value = "0")]
258    exit_dump_len: u32,
259
260    /// Maximum scheduling slice duration in microseconds.
261    #[clap(short = 's', long, default_value = "1000")]
262    slice_us: u64,
263
264    /// Maximum runtime (since last sleep) that can be charged to a task in microseconds.
265    #[clap(short = 'l', long, default_value = "20000")]
266    slice_lag_us: u64,
267
268    /// CPU busy threshold.
269    ///
270    /// Specifies the CPU utilization percentage (0-100%) at which the scheduler considers the
271    /// system to be busy.
272    ///
273    /// When the average CPU utilization reaches this threshold, the scheduler switches from using
274    /// multiple per-CPU round-robin dispatch queues (which favor locality and reduced locking
275    /// contention) to a global deadline-based dispatch queue (which improves load balancing).
276    ///
277    /// The global dispatch queue can increase task migrations and improve responsiveness for
278    /// interactive tasks under heavy load. Lower values make the scheduler switch to deadline
279    /// mode sooner, improving overall responsiveness at the cost of reducing single-task
280    /// performance due to the additional migrations. Higher values makes task more "sticky" to
281    /// their CPU, improving workloads that benefit from cache locality.
282    ///
283    /// A higher value is recommended for server-type workloads, while a lower value is recommended
284    /// for interactive-type workloads.
285    #[clap(short = 'c', long, default_value = "0")]
286    cpu_busy_thresh: u64,
287
288    /// Polling time (ms) to refresh the CPU utilization.
289    ///
290    /// This interval determines how often the scheduler refreshes the CPU utilization that is
291    /// compared with the CPU busy threshold (option -c) to decide if the system is busy or not
292    /// and trigger the switch between using multiple per-CPU dispatch queues or a single global
293    /// deadline-based dispatch queue.
294    ///
295    /// Value is clamped to the range [10 .. 1000].
296    ///
297    /// 0 = disabled.
298    #[clap(short = 'p', long, default_value = "0")]
299    polling_ms: u64,
300
301    /// Specifies a list of CPUs to prioritize.
302    ///
303    /// Accepts a comma-separated list of CPUs or ranges (i.e., 0-3,12-15) or the following special
304    /// keywords:
305    ///
306    /// "turbo" = automatically detect and prioritize the CPUs with the highest max frequency,
307    /// "performance" = automatically detect and prioritize the fastest CPUs,
308    /// "powersave" = automatically detect and prioritize the slowest CPUs,
309    /// "all" = all CPUs assigned to the primary domain.
310    ///
311    /// By default "all" CPUs are used.
312    #[clap(short = 'm', long)]
313    primary_domain: Option<String>,
314
315    /// Hardware perf event to monitor (0x0 = disabled). Accepts hex (0xN) or symbolic names
316    /// (e.g. cache-misses, LLC-load-misses, page-faults, branch-misses).
317    #[clap(short = 'e', long, default_value = "0x0", value_parser = parse_perf_event)]
318    perf_config: PerfEventSpec,
319
320    /// Threshold (perf events/msec) to classify a task as event heavy; exceeding it triggers migration.
321    #[clap(short = 'E', default_value = "0", long)]
322    perf_threshold: u64,
323
324    /// Sticky perf event (0x0 = disabled). When a task exceeds -Y for this event, keep it on the same CPU.
325    /// Accepts hex (0xN) or symbolic names (e.g. cache-misses, LLC-load-misses).
326    #[clap(short = 'y', long, default_value = "0x0", value_parser = parse_perf_event)]
327    perf_sticky: PerfEventSpec,
328
329    /// Sticky perf threshold; task is kept on same CPU when its count for -y event exceeds this.
330    #[clap(short = 'Y', default_value = "0", long)]
331    perf_sticky_threshold: u64,
332
333    /// Enable GPU-aware scheduling.
334    #[clap(short = 'g', long, action = clap::ArgAction::SetTrue)]
335    gpu: bool,
336
337    /// Only treat a process as GPU-bound if its GPU utilization is at least this percentage (0–100).
338    ///
339    /// Uses NVML process utilization (SM + memory). 0 = no filter (all processes on the GPU are
340    /// considered GPU-bound). Requires driver support (Maxwell or newer).
341    #[clap(long, default_value = "0", value_parser = clap::value_parser!(u32).range(0..=100))]
342    gpu_util_threshold: u32,
343
344    /// Disable NUMA optimizations.
345    #[clap(short = 'n', long, action = clap::ArgAction::SetTrue)]
346    disable_numa: bool,
347
348    /// Disable CPU frequency control.
349    #[clap(short = 'f', long, action = clap::ArgAction::SetTrue)]
350    disable_cpufreq: bool,
351
352    /// Enable flat idle CPU scanning.
353    ///
354    /// This option can help reducing some overhead when trying to allocate idle CPUs and it can be
355    /// quite effective with simple CPU topologies.
356    #[arg(short = 'i', long, action = clap::ArgAction::SetTrue)]
357    flat_idle_scan: bool,
358
359    /// Enable preferred idle CPU scanning.
360    ///
361    /// With this option enabled, the scheduler will prioritize assigning tasks to higher-ranked
362    /// cores before considering lower-ranked ones.
363    #[clap(short = 'P', long, action = clap::ArgAction::SetTrue)]
364    preferred_idle_scan: bool,
365
366    /// Disable SMT.
367    ///
368    /// This option can only be used together with --flat-idle-scan or --preferred-idle-scan,
369    /// otherwise it is ignored.
370    #[clap(long, action = clap::ArgAction::SetTrue)]
371    disable_smt: bool,
372
373    /// SMT contention avoidance.
374    ///
375    /// When enabled, the scheduler aggressively avoids placing tasks on sibling SMT threads.
376    /// This may increase task migrations and lower overall throughput, but can lead to more
377    /// consistent performance by reducing contention on shared SMT cores.
378    #[clap(short = 'S', long, action = clap::ArgAction::SetTrue)]
379    avoid_smt: bool,
380
381    /// Disable early clearing of idle CPU state.
382    ///
383    /// When enabled, multiple concurrent wakeups can select the same idle CPU
384    /// before it fully wakes up. This can improve performance in highly communicative
385    /// workloads by aggressively stacking tasks on the same cache.
386    #[clap(short = 'N', long, action = clap::ArgAction::SetTrue)]
387    no_early_clear: bool,
388
389    /// Disable direct dispatch during synchronous wakeups.
390    ///
391    /// Enabling this option can lead to a more uniform load distribution across available cores,
392    /// potentially improving performance in certain scenarios. However, it may come at the cost of
393    /// reduced efficiency for pipe-intensive workloads that benefit from tighter producer-consumer
394    /// coupling.
395    #[clap(short = 'w', long, action = clap::ArgAction::SetTrue)]
396    no_wake_sync: bool,
397
398    /// ***DEPRECATED*** Disable deferred wakeups.
399    #[clap(short = 'd', long, action = clap::ArgAction::SetTrue)]
400    no_deferred_wakeup: bool,
401
402    /// Disable tick-based preemption enforcement.
403    ///
404    /// By default, the scheduler preempts tasks that exceed their time slice when the system is
405    /// busy or SMT contention is detected. Use this flag to disable this behavior.
406    #[clap(long, action = clap::ArgAction::SetTrue)]
407    no_tick_preempt: bool,
408
409    /// Enable address space affinity.
410    ///
411    /// This option allows to keep tasks that share the same address space (e.g., threads of the
412    /// same process) on the same CPU across wakeups.
413    ///
414    /// This can improve locality and performance in certain cache-sensitive workloads.
415    #[clap(short = 'a', long, action = clap::ArgAction::SetTrue)]
416    mm_affinity: bool,
417
418    /// Enable stats monitoring with the specified interval.
419    #[clap(long)]
420    stats: Option<f64>,
421
422    /// Run in stats monitoring mode with the specified interval. Scheduler
423    /// is not launched.
424    #[clap(long)]
425    monitor: Option<f64>,
426
427    /// Enable verbose output, including libbpf details.
428    #[clap(short = 'v', long, action = clap::ArgAction::SetTrue)]
429    verbose: bool,
430
431    /// Print scheduler version and exit.
432    #[clap(short = 'V', long, action = clap::ArgAction::SetTrue)]
433    version: bool,
434
435    /// Show descriptions for statistics.
436    #[clap(long)]
437    help_stats: bool,
438
439    #[clap(flatten, next_help_heading = "Libbpf Options")]
440    pub libbpf: LibbpfOpts,
441}
442
443#[derive(PartialEq)]
444enum Powermode {
445    Turbo,
446    Performance,
447    Powersave,
448    Any,
449}
450
451/*
452 * TODO: this code is shared between scx_bpfland, scx_flash and scx_cosmos; consder to move it to
453 * scx_utils.
454 */
455fn get_primary_cpus(mode: Powermode) -> std::io::Result<Vec<usize>> {
456    let cpus: Vec<usize> = Topology::new()
457        .unwrap()
458        .all_cores
459        .values()
460        .flat_map(|core| &core.cpus)
461        .filter_map(|(cpu_id, cpu)| match (&mode, &cpu.core_type) {
462            // Turbo mode: prioritize CPUs with the highest max frequency
463            (Powermode::Turbo, CoreType::Big { turbo: true }) |
464            // Performance mode: add all the Big CPUs (either Turbo or non-Turbo)
465            (Powermode::Performance, CoreType::Big { .. }) |
466            // Powersave mode: add all the Little CPUs
467            (Powermode::Powersave, CoreType::Little) => Some(*cpu_id),
468            (Powermode::Any, ..) => Some(*cpu_id),
469            _ => None,
470        })
471        .collect();
472
473    Ok(cpus)
474}
475
476pub fn parse_cpu_list(optarg: &str) -> Result<Vec<usize>, String> {
477    let mut cpus = Vec::new();
478    let mut seen = HashSet::new();
479
480    // Handle special keywords
481    if let Some(mode) = match optarg {
482        "powersave" => Some(Powermode::Powersave),
483        "performance" => Some(Powermode::Performance),
484        "turbo" => Some(Powermode::Turbo),
485        "all" => Some(Powermode::Any),
486        _ => None,
487    } {
488        return get_primary_cpus(mode).map_err(|e| e.to_string());
489    }
490
491    // Validate input characters
492    if optarg
493        .chars()
494        .any(|c| !c.is_ascii_digit() && c != '-' && c != ',' && !c.is_whitespace())
495    {
496        return Err("Invalid character in CPU list".to_string());
497    }
498
499    // Replace all whitespace with tab (or just trim later)
500    let cleaned = optarg.replace(' ', "\t");
501
502    for token in cleaned.split(',') {
503        let token = token.trim_matches(|c: char| c.is_whitespace());
504
505        if token.is_empty() {
506            continue;
507        }
508
509        if let Some((start_str, end_str)) = token.split_once('-') {
510            let start = start_str
511                .trim()
512                .parse::<usize>()
513                .map_err(|_| "Invalid range start")?;
514            let end = end_str
515                .trim()
516                .parse::<usize>()
517                .map_err(|_| "Invalid range end")?;
518
519            if start > end {
520                return Err(format!("Invalid CPU range: {}-{}", start, end));
521            }
522
523            for i in start..=end {
524                if cpus.len() >= *NR_CPU_IDS {
525                    return Err(format!("Too many CPUs specified (max {})", *NR_CPU_IDS));
526                }
527                if seen.insert(i) {
528                    cpus.push(i);
529                }
530            }
531        } else {
532            let cpu = token
533                .parse::<usize>()
534                .map_err(|_| format!("Invalid CPU: {}", token))?;
535            if cpus.len() >= *NR_CPU_IDS {
536                return Err(format!("Too many CPUs specified (max {})", *NR_CPU_IDS));
537            }
538            if seen.insert(cpu) {
539                cpus.push(cpu);
540            }
541        }
542    }
543
544    Ok(cpus)
545}
546
547/// Initial value for the dynamic threshold (in BPF units).
548const DYNAMIC_THRESHOLD_INIT_VALUE: u64 = 1000;
549
550/// Minimum value for the dynamic threshold (in BPF units).
551const DYNAMIC_THRESHOLD_MIN_VALUE: u64 = 10;
552
553/// Target event rate (per second) above which we consider migrations/sticky dispatches too high.
554const DYNAMIC_THRESHOLD_RATE_HIGH: f64 = 4000.0;
555
556/// Target event rate (per second) below which we consider migrations/sticky dispatches too low.
557const DYNAMIC_THRESHOLD_RATE_LOW: f64 = 2000.0;
558
559/// Hysteresis band: rate must move by this fraction beyond the target bounds before we act.
560/// This prevents oscillation when the rate hovers near the threshold boundaries.
561const DYNAMIC_THRESHOLD_HYSTERESIS: f64 = 0.1;
562
563/// EMA smoothing factor (alpha). Higher values give more weight to recent samples.
564/// 0.3 provides good balance between responsiveness and stability.
565const DYNAMIC_THRESHOLD_EMA_ALPHA: f64 = 0.3;
566
567/// Minimum scale factor when just outside the target band (slow convergence near optimal).
568const DYNAMIC_THRESHOLD_SCALE_MIN: f64 = 0.0001;
569
570/// Maximum scale factor when far from target (fast convergence when initial threshold is way off).
571const DYNAMIC_THRESHOLD_SCALE_MAX: f64 = 1000.0;
572
573/// Slope for "too high" case: scale grows with (rate/HIGH - 1) so we step much harder when rate is
574/// many times over target.
575const DYNAMIC_THRESHOLD_SLOPE_HIGH: f64 = 0.35;
576
577/// Slope for "too low" case: scale grows with deficit so we step harder when rate is near zero.
578const DYNAMIC_THRESHOLD_SLOPE_LOW: f64 = 0.58;
579
580/// Minimum interval between NVML GPU PID syncs. Kept separate from CPU polling so that fast
581/// polling (e.g. 100 ms) does not trigger expensive NVML calls every tick.
582const GPU_SYNC_INTERVAL: Duration = Duration::from_secs(1);
583
584/// State for EMA-based dynamic threshold adjustment with hysteresis.
585///
586/// This struct maintains the smoothed rate estimate and tracks whether we're
587/// currently in an adjustment state (raising or lowering threshold) to implement
588/// hysteresis and prevent oscillation.
589#[derive(Debug, Clone)]
590struct DynamicThresholdState {
591    /// Current threshold value.
592    threshold: u64,
593    /// EMA-smoothed rate estimate.
594    smoothed_rate: f64,
595    /// Previous raw counter value for delta calculation.
596    prev_counter: u64,
597    /// Whether the EMA has been initialized with a valid sample.
598    initialized: bool,
599    /// Current adjustment direction: None (stable), Some(true) = raising, Some(false) = lowering.
600    /// Used for hysteresis: once we start adjusting in a direction, we continue until
601    /// the rate crosses back into the stable band with hysteresis margin.
602    adjustment_direction: Option<bool>,
603}
604
605impl DynamicThresholdState {
606    /// Create a new dynamic threshold state with the given initial threshold.
607    fn new(initial_threshold: u64) -> Self {
608        Self {
609            threshold: initial_threshold,
610            smoothed_rate: 0.0,
611            prev_counter: 0,
612            initialized: false,
613            adjustment_direction: None,
614        }
615    }
616
617    /// Update the state with a new counter sample and elapsed time.
618    /// Returns the new threshold if it changed, or None if unchanged.
619    fn update(
620        &mut self,
621        counter: u64,
622        elapsed_secs: f64,
623        verbose: bool,
624        name: &str,
625    ) -> Option<u64> {
626        if elapsed_secs <= 0.0 {
627            return None;
628        }
629
630        // Calculate instantaneous rate.
631        let delta = counter.saturating_sub(self.prev_counter);
632        self.prev_counter = counter;
633        let raw_rate = delta as f64 / elapsed_secs;
634
635        // Update EMA.
636        if self.initialized {
637            self.smoothed_rate = DYNAMIC_THRESHOLD_EMA_ALPHA * raw_rate
638                + (1.0 - DYNAMIC_THRESHOLD_EMA_ALPHA) * self.smoothed_rate;
639        } else {
640            // First sample: initialize EMA directly.
641            self.smoothed_rate = raw_rate;
642            self.initialized = true;
643        }
644
645        // Determine if we should adjust the threshold using hysteresis.
646        let rate = self.smoothed_rate;
647        let old_threshold = self.threshold;
648
649        // Calculate hysteresis-adjusted bounds based on current state.
650        let (effective_high, effective_low) = match self.adjustment_direction {
651            Some(true) => {
652                // Currently raising threshold: need rate to drop below LOW - hysteresis to stop.
653                (
654                    DYNAMIC_THRESHOLD_RATE_HIGH,
655                    DYNAMIC_THRESHOLD_RATE_LOW * (1.0 - DYNAMIC_THRESHOLD_HYSTERESIS),
656                )
657            }
658            Some(false) => {
659                // Currently lowering threshold: need rate to rise above HIGH + hysteresis to stop.
660                (
661                    DYNAMIC_THRESHOLD_RATE_HIGH * (1.0 + DYNAMIC_THRESHOLD_HYSTERESIS),
662                    DYNAMIC_THRESHOLD_RATE_LOW,
663                )
664            }
665            None => {
666                // Stable state: need rate to exceed bounds + hysteresis to start adjusting.
667                (
668                    DYNAMIC_THRESHOLD_RATE_HIGH * (1.0 + DYNAMIC_THRESHOLD_HYSTERESIS),
669                    DYNAMIC_THRESHOLD_RATE_LOW * (1.0 - DYNAMIC_THRESHOLD_HYSTERESIS),
670                )
671            }
672        };
673
674        // Determine new adjustment direction.
675        let new_direction = if rate > effective_high {
676            Some(true) // Rate too high, raise threshold.
677        } else if rate < effective_low && rate >= 0.0 {
678            Some(false) // Rate too low, lower threshold.
679        } else {
680            // Rate in stable band (considering hysteresis).
681            if self.adjustment_direction.is_some() {
682                // We were adjusting; check if we should stop.
683                if rate >= DYNAMIC_THRESHOLD_RATE_LOW && rate <= DYNAMIC_THRESHOLD_RATE_HIGH {
684                    None // Back in target band, stop adjusting.
685                } else {
686                    self.adjustment_direction // Continue current direction.
687                }
688            } else {
689                None // Already stable.
690            }
691        };
692
693        // Apply adjustment if we have a direction.
694        if let Some(raising) = new_direction {
695            let scale = Self::compute_scale(rate, raising);
696            let factor = if raising { 1.0 + scale } else { 1.0 - scale };
697            let new_threshold = ((self.threshold as f64) * factor).round() as u64;
698            self.threshold = new_threshold.clamp(DYNAMIC_THRESHOLD_MIN_VALUE, u64::MAX);
699        }
700
701        self.adjustment_direction = new_direction;
702
703        // Return new threshold only if it changed.
704        if self.threshold != old_threshold {
705            if verbose {
706                info!(
707                    "{}: {} -> {} (smoothed rate {:.1}/s, raw {:.1}/s, dir {:?})",
708                    name,
709                    old_threshold,
710                    self.threshold,
711                    self.smoothed_rate,
712                    raw_rate,
713                    self.adjustment_direction
714                );
715            }
716            Some(self.threshold)
717        } else {
718            None
719        }
720    }
721
722    /// Compute the scale factor for threshold adjustment based on how far the rate
723    /// is from the target band.
724    fn compute_scale(rate: f64, too_high: bool) -> f64 {
725        if too_high {
726            let excess = ((rate / DYNAMIC_THRESHOLD_RATE_HIGH) - 1.0).max(0.0);
727            let scale =
728                DYNAMIC_THRESHOLD_SCALE_MIN + DYNAMIC_THRESHOLD_SLOPE_HIGH * excess.min(4.0);
729            scale.min(DYNAMIC_THRESHOLD_SCALE_MAX)
730        } else {
731            if rate <= 0.0 {
732                return DYNAMIC_THRESHOLD_SCALE_MAX;
733            }
734            let deficit = (DYNAMIC_THRESHOLD_RATE_LOW - rate) / DYNAMIC_THRESHOLD_RATE_LOW;
735            let t = deficit.clamp(0.0, 1.0);
736            DYNAMIC_THRESHOLD_SCALE_MIN + DYNAMIC_THRESHOLD_SLOPE_LOW * t
737        }
738    }
739}
740
741#[derive(Debug, Clone, Copy)]
742struct CpuTimes {
743    user: u64,
744    nice: u64,
745    total: u64,
746}
747
748struct Scheduler<'a> {
749    skel: BpfSkel<'a>,
750    opts: &'a Opts,
751    struct_ops: Option<libbpf_rs::Link>,
752    stats_server: StatsServer<(), Metrics>,
753    /// GPU device index -> NUMA node (for NVML PID sync). Only set when --gpu and NUMA enabled.
754    gpu_index_to_node: Option<HashMap<u32, u32>>,
755    /// Previous (pid, node) set so we can remove PIDs that stopped using the GPU.
756    previous_gpu_pids: Option<HashMap<u32, u32>>,
757    /// Reused NVML handle to avoid re-initializing on every sync (expensive).
758    nvml: Option<Nvml>,
759    /// Dynamic threshold state for perf event migrations (when --perf-threshold is 0/dynamic).
760    perf_threshold_state: Option<DynamicThresholdState>,
761    /// Dynamic threshold state for sticky perf events (when --perf-sticky-threshold is 0/dynamic).
762    perf_sticky_threshold_state: Option<DynamicThresholdState>,
763}
764
765impl<'a> Scheduler<'a> {
766    fn init(opts: &'a Opts, open_object: &'a mut MaybeUninit<OpenObject>) -> Result<Self> {
767        try_set_rlimit_infinity();
768
769        // Initialize CPU topology.
770        let topo = Topology::new().unwrap();
771
772        // Check host topology to determine if we need to enable SMT capabilities.
773        let smt_enabled = !opts.disable_smt && topo.smt_enabled;
774
775        // Determine the amount of non-empty NUMA nodes in the system.
776        let nr_nodes = topo
777            .nodes
778            .values()
779            .filter(|node| !node.all_cpus.is_empty())
780            .count();
781        info!("NUMA nodes: {}", nr_nodes);
782
783        // Automatically disable NUMA optimizations when running on non-NUMA systems.
784        let numa_enabled = !opts.disable_numa && nr_nodes > 1;
785        if !numa_enabled {
786            info!("Disabling NUMA optimizations");
787        }
788
789        info!(
790            "{} {} {}",
791            SCHEDULER_NAME,
792            build_id::full_version(env!("CARGO_PKG_VERSION")),
793            if smt_enabled { "SMT on" } else { "SMT off" }
794        );
795
796        // Print command line.
797        info!(
798            "scheduler options: {}",
799            std::env::args().collect::<Vec<_>>().join(" ")
800        );
801
802        // Initialize BPF connector.
803        let mut skel_builder = BpfSkelBuilder::default();
804        skel_builder.obj_builder.debug(opts.verbose);
805        let open_opts = opts.libbpf.clone().into_bpf_open_opts();
806        let mut skel = scx_ops_open!(skel_builder, open_object, cosmos_ops, open_opts)?;
807
808        skel.struct_ops.cosmos_ops_mut().exit_dump_len = opts.exit_dump_len;
809
810        // Override default BPF scheduling parameters.
811        let rodata = skel.maps.rodata_data.as_mut().unwrap();
812        rodata.slice_ns = opts.slice_us * 1000;
813        rodata.slice_lag = opts.slice_lag_us * 1000;
814        rodata.cpufreq_enabled = !opts.disable_cpufreq;
815        rodata.flat_idle_scan = opts.flat_idle_scan;
816        rodata.smt_enabled = smt_enabled;
817        rodata.numa_enabled = numa_enabled;
818        rodata.nr_node_ids = topo.nodes.len() as u32;
819        rodata.no_wake_sync = opts.no_wake_sync;
820        rodata.avoid_smt = opts.avoid_smt;
821        rodata.no_early_clear = opts.no_early_clear;
822        rodata.tick_preempt = !opts.no_tick_preempt;
823        rodata.mm_affinity = opts.mm_affinity;
824
825        // Enable perf event scheduling settings.
826        rodata.perf_config = opts.perf_config.event_id;
827        rodata.perf_sticky = opts.perf_sticky.event_id;
828
829        // Normalize CPU busy threshold in the range [0 .. 1024].
830        rodata.busy_threshold = opts.cpu_busy_thresh * 1024 / 100;
831
832        // Generate the list of available CPUs sorted by capacity in descending order.
833        let mut cpus: Vec<_> = topo.all_cpus.values().collect();
834        cpus.sort_by_key(|cpu| std::cmp::Reverse(cpu.cpu_capacity));
835        // Normalize CPU capacities to 1..1024 so the highest capacity is always 1024.
836        let max_cap = cpus.first().map(|c| c.cpu_capacity).unwrap_or(1).max(1);
837        for (i, cpu) in cpus.iter().enumerate() {
838            let normalized = (cpu.cpu_capacity * 1024 / max_cap).clamp(1, 1024);
839            rodata.cpu_capacity[cpu.id] = normalized as c_ulong;
840            rodata.preferred_cpus[i] = cpu.id as u64;
841        }
842        rodata.all_cpus_same_capacity = cpus.iter().all(|cpu| cpu.cpu_capacity == max_cap);
843        if opts.preferred_idle_scan {
844            info!(
845                "Preferred CPUs: {:?}",
846                &rodata.preferred_cpus[0..cpus.len()]
847            );
848        }
849        rodata.preferred_idle_scan = opts.preferred_idle_scan;
850
851        // Define the primary scheduling domain.
852        let primary_cpus = if let Some(ref domain) = opts.primary_domain {
853            match parse_cpu_list(domain) {
854                Ok(cpus) => cpus,
855                Err(e) => bail!("Error parsing primary domain: {}", e),
856            }
857        } else {
858            (0..*NR_CPU_IDS).collect()
859        };
860        if primary_cpus.len() < *NR_CPU_IDS {
861            info!("Primary CPUs: {:?}", primary_cpus);
862            rodata.primary_all = false;
863        } else {
864            rodata.primary_all = true;
865        }
866
867        // Enable GPU support and build GPU index -> node for NVML PID sync. Init NVML once here
868        // so we reuse the handle in the run loop (re-initing every sync is very expensive).
869        let (gpu_index_to_node, previous_gpu_pids, nvml) = if opts.gpu && numa_enabled {
870            match Nvml::init_with_flags(InitFlags::NO_GPUS) {
871                Ok(nvml) => {
872                    info!("NVIDIA GPU-aware scheduling enabled (NVML PID sync)");
873                    rodata.gpu_enabled = true;
874                    let mut idx_to_node = HashMap::new();
875                    for (id, gpu) in topo.gpus() {
876                        let GpuIndex::Nvidia { nvml_id } = id;
877                        idx_to_node.insert(nvml_id, gpu.node_id as u32);
878                    }
879                    (Some(idx_to_node), Some(HashMap::new()), Some(nvml))
880                }
881                Err(e) => {
882                    warn!("NVML init failed, disabling GPU-aware scheduling: {}", e);
883                    rodata.gpu_enabled = false;
884                    (None, None, None)
885                }
886            }
887        } else {
888            rodata.gpu_enabled = false;
889            (None, None, None)
890        };
891
892        // Set scheduler flags.
893        skel.struct_ops.cosmos_ops_mut().flags = *compat::SCX_OPS_ENQ_EXITING
894            | *compat::SCX_OPS_ENQ_LAST
895            | *compat::SCX_OPS_ENQ_MIGRATION_DISABLED
896            | *compat::SCX_OPS_ALLOW_QUEUED_WAKEUP;
897
898        info!(
899            "scheduler flags: {:#x}",
900            skel.struct_ops.cosmos_ops_mut().flags
901        );
902
903        // Load the BPF program for validation.
904        let mut skel = scx_ops_load!(skel, cosmos_ops, uei)?;
905
906        // Initial perf thresholds in bss. When threshold is 0 we use dynamic logic; when user
907        // specifies a value > 0 we use it as a static threshold.
908        let bss = skel.maps.bss_data.as_mut().unwrap();
909        if opts.perf_config.event_id > 0 {
910            bss.perf_threshold = if opts.perf_threshold == 0 {
911                DYNAMIC_THRESHOLD_INIT_VALUE
912            } else {
913                opts.perf_threshold
914            };
915        }
916        if opts.perf_sticky.event_id > 0 {
917            bss.perf_sticky_threshold = if opts.perf_sticky_threshold == 0 {
918                DYNAMIC_THRESHOLD_INIT_VALUE
919            } else {
920                opts.perf_sticky_threshold
921            };
922        }
923
924        // Configure CPU->node mapping (must be done after skeleton is loaded).
925        for node in topo.nodes.values() {
926            for cpu in node.all_cpus.values() {
927                if opts.verbose {
928                    info!("CPU{} -> node{}", cpu.id, node.id);
929                }
930                skel.maps.cpu_node_map.update(
931                    &(cpu.id as u32).to_ne_bytes(),
932                    &(node.id as u32).to_ne_bytes(),
933                    MapFlags::ANY,
934                )?;
935            }
936        }
937
938        // Setup performance events for all CPUs.
939        // Counter indices must match PMU library install order: migration first (0), then sticky (1).
940        // When only sticky is used, it gets index 0; when both are used, sticky gets index 1.
941        let nr_cpus = *NR_CPU_IDS;
942        info!("Setting up performance counters for {} CPUs...", nr_cpus);
943        let mut perf_available = true;
944        let sticky_counter_idx = if opts.perf_config.event_id > 0 { 1 } else { 0 };
945        for cpu in 0..nr_cpus {
946            if opts.perf_config.event_id > 0 {
947                if let Err(e) = setup_perf_events(&mut skel, cpu as i32, &opts.perf_config, 0) {
948                    if cpu == 0 {
949                        let err_str = e.to_string();
950                        if err_str.contains("errno 2") || err_str.contains("os error 2") {
951                            warn!("Performance counters not available on this CPU architecture");
952                            warn!("PMU event '{}' not supported - scheduler will run without perf monitoring", opts.perf_config.display_name);
953                        } else {
954                            warn!("Failed to setup perf events: {}", e);
955                        }
956                        perf_available = false;
957                        break;
958                    }
959                }
960            }
961            if opts.perf_sticky.event_id > 0 {
962                if let Err(e) =
963                    setup_perf_events(&mut skel, cpu as i32, &opts.perf_sticky, sticky_counter_idx)
964                {
965                    if cpu == 0 {
966                        let err_str = e.to_string();
967                        if err_str.contains("errno 2") || err_str.contains("os error 2") {
968                            warn!("Performance counters not available on this CPU architecture");
969                            warn!("PMU event '{}' not supported - scheduler will run without perf monitoring", opts.perf_sticky.display_name);
970                        } else {
971                            warn!("Failed to setup perf events: {}", e);
972                        }
973                        perf_available = false;
974                        break;
975                    }
976                }
977            }
978        }
979        if perf_available {
980            info!("Performance counters configured successfully for all CPUs");
981        }
982
983        // Configure GPU->node mapping.
984        if opts.gpu && numa_enabled {
985            for (id, gpu) in topo.gpus() {
986                let GpuIndex::Nvidia { nvml_id } = id;
987                if opts.verbose {
988                    info!("GPU{} -> node{}", nvml_id, gpu.node_id);
989                }
990                skel.maps.gpu_node_map.update(
991                    &(nvml_id as u32).to_ne_bytes(),
992                    &(gpu.node_id as u32).to_ne_bytes(),
993                    MapFlags::ANY,
994                )?;
995            }
996        }
997
998        // Enable primary scheduling domain, if defined.
999        if primary_cpus.len() < *NR_CPU_IDS {
1000            for cpu in primary_cpus {
1001                if let Err(err) = Self::enable_primary_cpu(&mut skel, cpu as i32) {
1002                    bail!("failed to add CPU {} to primary domain: error {}", cpu, err);
1003                }
1004            }
1005        }
1006
1007        // Initialize SMT domains.
1008        if smt_enabled {
1009            Self::init_smt_domains(&mut skel, &topo)?;
1010        }
1011
1012        // Attach the scheduler.
1013        let struct_ops = Some(scx_ops_attach!(skel, cosmos_ops)?);
1014        let stats_server = StatsServer::new(stats::server_data()).launch()?;
1015
1016        // Initialize dynamic threshold states for perf events (only when using dynamic mode).
1017        let perf_threshold_state = if opts.perf_config.event_id > 0 && opts.perf_threshold == 0 {
1018            Some(DynamicThresholdState::new(DYNAMIC_THRESHOLD_INIT_VALUE))
1019        } else {
1020            None
1021        };
1022        let perf_sticky_threshold_state =
1023            if opts.perf_sticky.event_id > 0 && opts.perf_sticky_threshold == 0 {
1024                Some(DynamicThresholdState::new(DYNAMIC_THRESHOLD_INIT_VALUE))
1025            } else {
1026                None
1027            };
1028
1029        Ok(Self {
1030            skel,
1031            opts,
1032            struct_ops,
1033            stats_server,
1034            gpu_index_to_node,
1035            previous_gpu_pids,
1036            nvml,
1037            perf_threshold_state,
1038            perf_sticky_threshold_state,
1039        })
1040    }
1041
1042    /// Sync PID -> GPU (node) map from NVML. When gpu_util_threshold > 0, only PIDs with
1043    /// GPU utilization (SM or memory) >= threshold are added. Map is keyed by task pid.
1044    /// Only processes using a single GPU are added; multi-GPU processes are excluded.
1045    fn sync_gpu_pids(&mut self) -> Result<()> {
1046        let gpu_index_to_node = match &self.gpu_index_to_node {
1047            Some(m) => m,
1048            None => return Ok(()),
1049        };
1050        let nvml = match &self.nvml {
1051            Some(n) => n,
1052            None => return Ok(()),
1053        };
1054        let threshold = self.opts.gpu_util_threshold;
1055        let previous = self.previous_gpu_pids.as_ref().unwrap();
1056        // First collect pid -> set of nodes (GPUs) per process.
1057        let mut pid_to_nodes: HashMap<u32, HashSet<u32>> = HashMap::new();
1058
1059        let count = nvml.device_count().context("NVML device count")?;
1060        for i in 0..count {
1061            let node = match gpu_index_to_node.get(&i) {
1062                Some(&n) => n,
1063                None => continue,
1064            };
1065            let device = nvml.device_by_index(i).context("NVML device_by_index")?;
1066
1067            if threshold > 0 {
1068                // Use process utilization; only add PIDs above threshold.
1069                match device.process_utilization_stats(None::<u64>) {
1070                    Ok(samples) => {
1071                        for sample in samples {
1072                            let util = sample.sm_util.max(sample.mem_util);
1073                            if util >= threshold {
1074                                pid_to_nodes.entry(sample.pid).or_default().insert(node);
1075                            }
1076                        }
1077                    }
1078                    Err(_) => {
1079                        // NotSupported or other: fall back to all running processes.
1080                        Self::add_running_gpu_processes_to_set(&device, node, &mut pid_to_nodes);
1081                    }
1082                }
1083            } else {
1084                Self::add_running_gpu_processes_to_set(&device, node, &mut pid_to_nodes);
1085            }
1086        }
1087
1088        // Only add PIDs that use exactly one GPU to the map.
1089        let mut current: HashMap<u32, u32> = HashMap::new();
1090        for (tgid, nodes) in pid_to_nodes {
1091            if nodes.len() == 1 {
1092                let node = nodes.into_iter().next().unwrap();
1093                current.insert(tgid, node);
1094                for tid in Self::task_tids(tgid) {
1095                    current.insert(tid, node);
1096                }
1097            }
1098        }
1099
1100        let map = &self.skel.maps.gpu_pid_map;
1101        for (pid, node) in &current {
1102            map.update(&pid.to_ne_bytes(), &node.to_ne_bytes(), MapFlags::ANY)
1103                .context("gpu_pid_map update")?;
1104        }
1105        for pid in previous.keys() {
1106            if !current.contains_key(pid) {
1107                let _ = map.delete(&pid.to_ne_bytes());
1108            }
1109        }
1110        *self.previous_gpu_pids.as_mut().unwrap() = current;
1111        Ok(())
1112    }
1113
1114    /// Record running compute/graphics process PIDs and the GPU node in pid_to_nodes.
1115    fn add_running_gpu_processes_to_set(
1116        device: &nvml_wrapper::Device<'_>,
1117        node: u32,
1118        pid_to_nodes: &mut HashMap<u32, HashSet<u32>>,
1119    ) {
1120        for proc in device
1121            .running_compute_processes()
1122            .unwrap_or_default()
1123            .into_iter()
1124            .chain(device.running_graphics_processes().unwrap_or_default())
1125        {
1126            pid_to_nodes.entry(proc.pid).or_default().insert(node);
1127        }
1128    }
1129
1130    /// Return all thread IDs (tids) of the process with the given pid (tgid).
1131    fn task_tids(pid: u32) -> Vec<u32> {
1132        let task_dir = format!("/proc/{}/task", pid);
1133        let Ok(entries) = fs::read_dir(Path::new(&task_dir)) else {
1134            return Vec::new();
1135        };
1136        entries
1137            .filter_map(|e| e.ok())
1138            .filter_map(|e| e.file_name().to_str().and_then(|s| s.parse::<u32>().ok()))
1139            .collect()
1140    }
1141
1142    fn enable_primary_cpu(skel: &mut BpfSkel<'_>, cpu: i32) -> Result<(), u32> {
1143        let prog = &mut skel.progs.enable_primary_cpu;
1144        let mut args = cpu_arg {
1145            cpu_id: cpu as c_int,
1146        };
1147        let input = ProgramInput {
1148            context_in: Some(unsafe {
1149                std::slice::from_raw_parts_mut(
1150                    &mut args as *mut _ as *mut u8,
1151                    std::mem::size_of_val(&args),
1152                )
1153            }),
1154            ..Default::default()
1155        };
1156        let out = prog.test_run(input).unwrap();
1157        if out.return_value != 0 {
1158            return Err(out.return_value);
1159        }
1160
1161        Ok(())
1162    }
1163
1164    fn enable_sibling_cpu(
1165        skel: &mut BpfSkel<'_>,
1166        cpu: usize,
1167        sibling_cpu: usize,
1168    ) -> Result<(), u32> {
1169        let prog = &mut skel.progs.enable_sibling_cpu;
1170        let mut args = domain_arg {
1171            cpu_id: cpu as c_int,
1172            sibling_cpu_id: sibling_cpu as c_int,
1173        };
1174        let input = ProgramInput {
1175            context_in: Some(unsafe {
1176                std::slice::from_raw_parts_mut(
1177                    &mut args as *mut _ as *mut u8,
1178                    std::mem::size_of_val(&args),
1179                )
1180            }),
1181            ..Default::default()
1182        };
1183        let out = prog.test_run(input).unwrap();
1184        if out.return_value != 0 {
1185            return Err(out.return_value);
1186        }
1187
1188        Ok(())
1189    }
1190
1191    fn init_smt_domains(skel: &mut BpfSkel<'_>, topo: &Topology) -> Result<(), std::io::Error> {
1192        let smt_siblings = topo.sibling_cpus();
1193
1194        info!("SMT sibling CPUs: {:?}", smt_siblings);
1195        for (cpu, sibling_cpu) in smt_siblings.iter().enumerate() {
1196            Self::enable_sibling_cpu(skel, cpu, *sibling_cpu as usize).unwrap();
1197        }
1198
1199        Ok(())
1200    }
1201
1202    fn get_metrics(&self) -> Metrics {
1203        let bss_data = self.skel.maps.bss_data.as_ref().unwrap();
1204        Metrics {
1205            nr_event_dispatches: bss_data.nr_event_dispatches,
1206            nr_ev_sticky_dispatches: bss_data.nr_ev_sticky_dispatches,
1207            nr_gpu_dispatches: bss_data.nr_gpu_dispatches,
1208        }
1209    }
1210
1211    pub fn exited(&mut self) -> bool {
1212        uei_exited!(&self.skel, uei)
1213    }
1214
1215    fn compute_user_cpu_pct(prev: &CpuTimes, curr: &CpuTimes) -> Option<u64> {
1216        // Evaluate total user CPU time as user + nice.
1217        let user_diff = (curr.user + curr.nice).saturating_sub(prev.user + prev.nice);
1218        let total_diff = curr.total.saturating_sub(prev.total);
1219
1220        if total_diff > 0 {
1221            let user_ratio = user_diff as f64 / total_diff as f64;
1222            Some((user_ratio * 1024.0).round() as u64)
1223        } else {
1224            None
1225        }
1226    }
1227
1228    /// Read per-CPU times from /proc/stat (lines "cpu0", "cpu1", ...).
1229    /// Returns one CpuTimes per CPU, in order.
1230    fn read_per_cpu_cpu_times(nr_cpus: usize) -> Option<Vec<CpuTimes>> {
1231        let file = File::open("/proc/stat").ok()?;
1232        let reader = BufReader::new(file);
1233        let mut result = Vec::with_capacity(nr_cpus);
1234
1235        for line in reader.lines() {
1236            let line = line.ok()?;
1237            let line = line.trim();
1238            if !line.starts_with("cpu") {
1239                continue;
1240            }
1241            let rest = line.strip_prefix("cpu")?;
1242            if rest.starts_with(' ') {
1243                // Aggregate line "cpu " - skip.
1244                continue;
1245            }
1246            let cpu_id: usize = rest.split_whitespace().next()?.parse().ok()?;
1247            if cpu_id != result.len() {
1248                break;
1249            }
1250            let fields: Vec<&str> = line.split_whitespace().collect();
1251            if fields.len() < 5 {
1252                return None;
1253            }
1254            let user: u64 = fields[1].parse().ok()?;
1255            let nice: u64 = fields[2].parse().ok()?;
1256            let total: u64 = fields
1257                .iter()
1258                .skip(1)
1259                .take(8)
1260                .filter_map(|v| v.parse::<u64>().ok())
1261                .sum();
1262            result.push(CpuTimes { user, nice, total });
1263            if result.len() >= nr_cpus {
1264                break;
1265            }
1266        }
1267
1268        if result.len() == nr_cpus {
1269            Some(result)
1270        } else {
1271            None
1272        }
1273    }
1274
1275    fn run(&mut self, shutdown: Arc<AtomicBool>) -> Result<UserExitInfo> {
1276        let (res_ch, req_ch) = self.stats_server.channels();
1277
1278        // Periodically evaluate per-CPU user utilization from userspace and update the
1279        // cpu_util_map in BPF. The scheduler uses is_cpu_busy(cpu) with prev_cpu or
1280        // scx_bpf_task_cpu(p) to decide per-CPU whether to use local DSQs (round-robin)
1281        // or deadline-based shared DSQ.
1282        let polling_time = Duration::from_millis(self.opts.polling_ms).min(Duration::from_secs(1));
1283        let nr_cpus = *NR_CPU_IDS as usize;
1284        let mut prev_cputime =
1285            Self::read_per_cpu_cpu_times(nr_cpus).expect("Failed to read initial per-CPU stats");
1286        let mut last_update = Instant::now();
1287        let mut last_gpu_sync = Instant::now();
1288
1289        while !shutdown.load(Ordering::Relaxed) && !self.exited() {
1290            // Update per-CPU utilization and GPU PID -> node map (NVML).
1291            if !polling_time.is_zero() && last_update.elapsed() >= polling_time {
1292                if let Some(curr_cputime) = Self::read_per_cpu_cpu_times(nr_cpus) {
1293                    let map = &self.skel.maps.cpu_util_map;
1294                    for cpu in 0..nr_cpus {
1295                        if let Some(util) =
1296                            Self::compute_user_cpu_pct(&prev_cputime[cpu], &curr_cputime[cpu])
1297                        {
1298                            let _ = map.update(
1299                                &(cpu as u32).to_ne_bytes(),
1300                                &util.to_ne_bytes(),
1301                                MapFlags::ANY,
1302                            );
1303                        }
1304                    }
1305                    prev_cputime = curr_cputime;
1306                }
1307
1308                // Update dynamic perf thresholds using EMA + hysteresis.
1309                let elapsed_secs = last_update.elapsed().as_secs_f64();
1310
1311                // Update migration threshold state if dynamic mode is enabled.
1312                if let Some(ref mut state) = self.perf_threshold_state {
1313                    let nr_event = self
1314                        .skel
1315                        .maps
1316                        .bss_data
1317                        .as_ref()
1318                        .unwrap()
1319                        .nr_event_dispatches;
1320                    if let Some(new_thresh) =
1321                        state.update(nr_event, elapsed_secs, self.opts.verbose, "perf_threshold")
1322                    {
1323                        self.skel.maps.bss_data.as_mut().unwrap().perf_threshold = new_thresh;
1324                    }
1325                }
1326
1327                // Update sticky threshold state if dynamic mode is enabled.
1328                if let Some(ref mut state) = self.perf_sticky_threshold_state {
1329                    let nr_sticky = self
1330                        .skel
1331                        .maps
1332                        .bss_data
1333                        .as_ref()
1334                        .unwrap()
1335                        .nr_ev_sticky_dispatches;
1336                    if let Some(new_thresh) = state.update(
1337                        nr_sticky,
1338                        elapsed_secs,
1339                        self.opts.verbose,
1340                        "perf_sticky_threshold",
1341                    ) {
1342                        self.skel
1343                            .maps
1344                            .bss_data
1345                            .as_mut()
1346                            .unwrap()
1347                            .perf_sticky_threshold = new_thresh;
1348                    }
1349                }
1350
1351                // GPU PID sync is throttled to GPU_SYNC_INTERVAL (NVML is expensive).
1352                if self.gpu_index_to_node.is_some() && last_gpu_sync.elapsed() >= GPU_SYNC_INTERVAL
1353                {
1354                    if let Err(e) = self.sync_gpu_pids() {
1355                        debug!("GPU PID sync: {}", e);
1356                    }
1357                    last_gpu_sync = Instant::now();
1358                }
1359
1360                last_update = Instant::now();
1361            }
1362
1363            // Update statistics and check for exit condition.
1364            let timeout = if polling_time.is_zero() {
1365                Duration::from_secs(1)
1366            } else {
1367                polling_time
1368            };
1369            match req_ch.recv_timeout(timeout) {
1370                Ok(()) => res_ch.send(self.get_metrics())?,
1371                Err(RecvTimeoutError::Timeout) => {}
1372                Err(e) => Err(e)?,
1373            }
1374        }
1375
1376        let _ = self.struct_ops.take();
1377        uei_report!(&self.skel, uei)
1378    }
1379}
1380
1381impl Drop for Scheduler<'_> {
1382    fn drop(&mut self) {
1383        info!("Unregister {SCHEDULER_NAME} scheduler");
1384    }
1385}
1386
1387fn main() -> Result<()> {
1388    let opts = Opts::parse();
1389
1390    if opts.version {
1391        println!(
1392            "{} {}",
1393            SCHEDULER_NAME,
1394            build_id::full_version(env!("CARGO_PKG_VERSION"))
1395        );
1396        return Ok(());
1397    }
1398
1399    if opts.help_stats {
1400        stats::server_data().describe_meta(&mut std::io::stdout(), None)?;
1401        return Ok(());
1402    }
1403
1404    let loglevel = simplelog::LevelFilter::Info;
1405
1406    let mut lcfg = simplelog::ConfigBuilder::new();
1407    lcfg.set_time_offset_to_local()
1408        .expect("Failed to set local time offset")
1409        .set_time_level(simplelog::LevelFilter::Error)
1410        .set_location_level(simplelog::LevelFilter::Off)
1411        .set_target_level(simplelog::LevelFilter::Off)
1412        .set_thread_level(simplelog::LevelFilter::Off);
1413    simplelog::TermLogger::init(
1414        loglevel,
1415        lcfg.build(),
1416        simplelog::TerminalMode::Stderr,
1417        simplelog::ColorChoice::Auto,
1418    )?;
1419
1420    let shutdown = Arc::new(AtomicBool::new(false));
1421    let shutdown_clone = shutdown.clone();
1422    ctrlc::set_handler(move || {
1423        shutdown_clone.store(true, Ordering::Relaxed);
1424    })
1425    .context("Error setting Ctrl-C handler")?;
1426
1427    if let Some(intv) = opts.monitor.or(opts.stats) {
1428        let shutdown_copy = shutdown.clone();
1429        let jh = std::thread::spawn(move || {
1430            match stats::monitor(Duration::from_secs_f64(intv), shutdown_copy) {
1431                Ok(_) => {
1432                    debug!("stats monitor thread finished successfully")
1433                }
1434                Err(error_object) => {
1435                    warn!(
1436                        "stats monitor thread finished because of an error {}",
1437                        error_object
1438                    )
1439                }
1440            }
1441        });
1442        if opts.monitor.is_some() {
1443            let _ = jh.join();
1444            return Ok(());
1445        }
1446    }
1447
1448    let mut open_object = MaybeUninit::uninit();
1449    loop {
1450        let mut sched = Scheduler::init(&opts, &mut open_object)?;
1451        if !sched.run(shutdown.clone())?.should_restart() {
1452            break;
1453        }
1454    }
1455
1456    Ok(())
1457}