Skip to main content

scx_cosmos/
main.rs

1// SPDX-License-Identifier: GPL-2.0
2//
3// Copyright (c) 2025 Andrea Righi <arighi@nvidia.com>
4
5// This software may be used and distributed according to the terms of the
6// GNU General Public License version 2.
7
8mod bpf_skel;
9pub use bpf_skel::*;
10pub mod bpf_intf;
11pub use bpf_intf::*;
12
13mod stats;
14use std::collections::{HashMap, HashSet};
15use std::ffi::{c_int, c_ulong};
16use std::fs;
17use std::fs::File;
18use std::io::{BufRead, BufReader};
19use std::mem::MaybeUninit;
20use std::path::Path;
21use std::sync::atomic::AtomicBool;
22use std::sync::atomic::Ordering;
23use std::sync::Arc;
24use std::time::{Duration, Instant};
25
26use anyhow::bail;
27use anyhow::Context;
28use anyhow::Result;
29use clap::Parser;
30use crossbeam::channel::RecvTimeoutError;
31use libbpf_rs::MapCore;
32use libbpf_rs::MapFlags;
33use libbpf_rs::OpenObject;
34use libbpf_rs::ProgramInput;
35use log::{debug, info, warn};
36use nvml_wrapper::bitmasks::InitFlags;
37use nvml_wrapper::Nvml;
38use scx_stats::prelude::*;
39use scx_utils::build_id;
40use scx_utils::compat;
41use scx_utils::get_primary_cpus;
42use scx_utils::libbpf_clap_opts::LibbpfOpts;
43use scx_utils::scx_ops_attach;
44use scx_utils::scx_ops_load;
45use scx_utils::scx_ops_open;
46use scx_utils::try_set_rlimit_infinity;
47use scx_utils::uei_exited;
48use scx_utils::uei_report;
49use scx_utils::GpuIndex;
50use scx_utils::Powermode;
51use scx_utils::Topology;
52use scx_utils::UserExitInfo;
53use scx_utils::NR_CPU_IDS;
54use stats::Metrics;
55
56const SCHEDULER_NAME: &str = "scx_cosmos";
57
58/// Perf event specification: either hex (0xN) or symbolic name (e.g. cache-misses).
59/// event_id is written to BPF rodata; type_ and config are used for perf_event_open.
60#[derive(Clone, Debug)]
61struct PerfEventSpec {
62    /// Opaque id for BPF (must match between install and read).
63    event_id: u64,
64    /// perf_event_attr.type (PERF_TYPE_RAW, PERF_TYPE_HARDWARE, etc.).
65    type_: u32,
66    /// perf_event_attr.config.
67    config: u64,
68    /// Original string for error messages.
69    display_name: String,
70}
71
72fn parse_hardware_event(s: &str) -> Option<u64> {
73    match s {
74        "cpu-cycles" | "cycles" => Some(0),
75        "instructions" => Some(1),
76        "cache-references" => Some(2),
77        "cache-misses" => Some(3),
78        "branch-instructions" | "branches" => Some(4),
79        "branch-misses" => Some(5),
80        "bus-cycles" => Some(6),
81        "stalled-cycles-frontend" | "idle-cycles-frontend" => Some(7),
82        "stalled-cycles-backend" | "idle-cycles-backend" => Some(8),
83        "ref-cycles" => Some(9),
84        _ => None,
85    }
86}
87
88fn parse_software_event(s: &str) -> Option<u64> {
89    match s {
90        "cpu-clock" => Some(0),
91        "task-clock" => Some(1),
92        "page-faults" | "faults" => Some(2),
93        "context-switches" | "cs" => Some(3),
94        "cpu-migrations" | "migrations" => Some(4),
95        "minor-faults" => Some(5),
96        "major-faults" => Some(6),
97        "alignment-faults" => Some(7),
98        "emulation-faults" => Some(8),
99        "dummy" => Some(9),
100        "bpf-output" => Some(10),
101        _ => None,
102    }
103}
104
105fn parse_hw_cache_event(s: &str) -> Option<u64> {
106    let (cache_id, prefix_len) = if s.starts_with("L1-dcache-") {
107        (0, 10)
108    } else if s.starts_with("L1-icache-") {
109        (1, 10)
110    } else if s.starts_with("LLC-") {
111        (2, 4)
112    } else if s.starts_with("dTLB-") {
113        (3, 5)
114    } else if s.starts_with("iTLB-") {
115        (4, 5)
116    } else if s.starts_with("branch-") {
117        (5, 7)
118    } else if s.starts_with("node-") {
119        (6, 5)
120    } else {
121        return None;
122    };
123
124    let suffix = &s[prefix_len..];
125    let (op_id, result_id) = match suffix {
126        "loads" => (0, 0),
127        "load-misses" => (0, 1),
128        "stores" => (1, 0),
129        "store-misses" => (1, 1),
130        "prefetches" => (2, 0),
131        "prefetch-misses" => (2, 1),
132        _ => return None,
133    };
134
135    Some((result_id << 16) | (op_id << 8) | cache_id)
136}
137
138/// Parse -e / -y value: hex (0xN) or symbolic name (e.g. cache-misses, LLC-load-misses).
139fn parse_perf_event(s: &str) -> Result<PerfEventSpec, String> {
140    use perf_event_open_sys as sys;
141
142    let s = s.trim();
143    if s.is_empty() || s == "0" || s.eq_ignore_ascii_case("0x0") {
144        return Ok(PerfEventSpec {
145            event_id: 0,
146            type_: sys::bindings::PERF_TYPE_RAW,
147            config: 0,
148            display_name: s.to_string(),
149        });
150    }
151
152    if let Some(hex_str) = s.strip_prefix("0x").or_else(|| s.strip_prefix("0X")) {
153        if let Ok(config) = u64::from_str_radix(hex_str, 16) {
154            return Ok(PerfEventSpec {
155                event_id: config,
156                type_: sys::bindings::PERF_TYPE_RAW,
157                config,
158                display_name: s.to_string(),
159            });
160        }
161    }
162
163    if let Some(config) = parse_hardware_event(s) {
164        let event_id = (sys::bindings::PERF_TYPE_HARDWARE as u64) << 32 | config;
165        return Ok(PerfEventSpec {
166            event_id,
167            type_: sys::bindings::PERF_TYPE_HARDWARE,
168            config,
169            display_name: s.to_string(),
170        });
171    }
172
173    if let Some(config) = parse_software_event(s) {
174        let event_id = (sys::bindings::PERF_TYPE_SOFTWARE as u64) << 32 | config;
175        return Ok(PerfEventSpec {
176            event_id,
177            type_: sys::bindings::PERF_TYPE_SOFTWARE,
178            config,
179            display_name: s.to_string(),
180        });
181    }
182
183    if let Some(config) = parse_hw_cache_event(s) {
184        let event_id = (sys::bindings::PERF_TYPE_HW_CACHE as u64) << 32 | config;
185        return Ok(PerfEventSpec {
186            event_id,
187            type_: sys::bindings::PERF_TYPE_HW_CACHE,
188            config,
189            display_name: s.to_string(),
190        });
191    }
192
193    Err(format!(
194        "Invalid perf event '{}': use hex (0xN) or symbolic name (e.g. cache-misses, LLC-load-misses, page-faults)",
195        s
196    ))
197}
198
199/// Must match lib/pmu.bpf.c SCX_PMU_STRIDE for perf_events map key layout.
200const PERF_MAP_STRIDE: u32 = 4096;
201
202/// Setup performance counter events for a specific CPU and counter index.
203/// counter_idx 0 = migration event (-e), 1 = sticky event (-y).
204fn setup_perf_events(
205    skel: &mut BpfSkel,
206    cpu: i32,
207    spec: &PerfEventSpec,
208    counter_idx: u32,
209) -> Result<()> {
210    use perf_event_open_sys as sys;
211
212    if spec.event_id == 0 {
213        return Ok(());
214    }
215
216    let map = &skel.maps.scx_pmu_map;
217
218    let mut attrs = sys::bindings::perf_event_attr::default();
219    attrs.type_ = spec.type_;
220    attrs.config = spec.config;
221    attrs.size = std::mem::size_of::<sys::bindings::perf_event_attr>() as u32;
222    attrs.set_disabled(0);
223    attrs.set_inherit(0);
224
225    let fd = unsafe { sys::perf_event_open(&mut attrs, -1, cpu, -1, 0) };
226
227    if fd < 0 {
228        let err = std::io::Error::last_os_error();
229        return Err(anyhow::anyhow!(
230            "Failed to open perf event '{}' on CPU {}: {}",
231            spec.display_name,
232            cpu,
233            err
234        ));
235    }
236
237    let key = cpu as u32 + counter_idx * PERF_MAP_STRIDE;
238
239    map.update(
240        &key.to_ne_bytes(),
241        &fd.to_ne_bytes(),
242        libbpf_rs::MapFlags::ANY,
243    )
244    .with_context(|| "Failed to update perf_events map")?;
245
246    Ok(())
247}
248
249#[derive(Debug, clap::Parser)]
250#[command(
251    name = "scx_cosmos",
252    version,
253    disable_version_flag = true,
254    about = "Lightweight scheduler optimized for preserving task-to-CPU locality."
255)]
256struct Opts {
257    /// Exit debug dump buffer length. 0 indicates default.
258    #[clap(long, default_value = "0")]
259    exit_dump_len: u32,
260
261    /// Maximum scheduling slice duration in microseconds.
262    #[clap(short = 's', long, default_value = "1000")]
263    slice_us: u64,
264
265    /// Maximum runtime (since last sleep) that can be charged to a task in microseconds.
266    #[clap(short = 'l', long, default_value = "20000")]
267    slice_lag_us: u64,
268
269    /// CPU busy threshold.
270    ///
271    /// Specifies the CPU utilization percentage (0-100%) at which the scheduler considers the
272    /// system to be busy.
273    ///
274    /// When the average CPU utilization reaches this threshold, the scheduler switches from using
275    /// multiple per-CPU round-robin dispatch queues (which favor locality and reduced locking
276    /// contention) to a global deadline-based dispatch queue (which improves load balancing).
277    ///
278    /// The global dispatch queue can increase task migrations and improve responsiveness for
279    /// interactive tasks under heavy load. Lower values make the scheduler switch to deadline
280    /// mode sooner, improving overall responsiveness at the cost of reducing single-task
281    /// performance due to the additional migrations. Higher values makes task more "sticky" to
282    /// their CPU, improving workloads that benefit from cache locality.
283    ///
284    /// A higher value is recommended for server-type workloads, while a lower value is recommended
285    /// for interactive-type workloads.
286    #[clap(short = 'c', long, default_value = "0")]
287    cpu_busy_thresh: u64,
288
289    /// Polling time (ms) to refresh the CPU utilization.
290    ///
291    /// This interval determines how often the scheduler refreshes the CPU utilization that is
292    /// compared with the CPU busy threshold (option -c) to decide if the system is busy or not
293    /// and trigger the switch between using multiple per-CPU dispatch queues or a single global
294    /// deadline-based dispatch queue.
295    ///
296    /// Value is clamped to the range [10 .. 1000].
297    ///
298    /// 0 = disabled.
299    #[clap(short = 'p', long, default_value = "0")]
300    polling_ms: u64,
301
302    /// Specifies a list of CPUs to prioritize.
303    ///
304    /// Accepts a comma-separated list of CPUs or ranges (i.e., 0-3,12-15) or the following special
305    /// keywords:
306    ///
307    /// "turbo" = automatically detect and prioritize the CPUs with the highest max frequency,
308    /// "performance" = automatically detect and prioritize the fastest CPUs,
309    /// "powersave" = automatically detect and prioritize the slowest CPUs,
310    /// "all" = all CPUs assigned to the primary domain.
311    ///
312    /// By default "all" CPUs are used.
313    #[clap(short = 'm', long)]
314    primary_domain: Option<String>,
315
316    /// Hardware perf event to monitor (0x0 = disabled). Accepts hex (0xN) or symbolic names
317    /// (e.g. cache-misses, LLC-load-misses, page-faults, branch-misses).
318    #[clap(short = 'e', long, default_value = "0x0", value_parser = parse_perf_event)]
319    perf_config: PerfEventSpec,
320
321    /// Threshold (perf events/msec) to classify a task as event heavy; exceeding it triggers migration.
322    #[clap(short = 'E', default_value = "0", long)]
323    perf_threshold: u64,
324
325    /// Sticky perf event (0x0 = disabled). When a task exceeds -Y for this event, keep it on the same CPU.
326    /// Accepts hex (0xN) or symbolic names (e.g. cache-misses, LLC-load-misses).
327    #[clap(short = 'y', long, default_value = "0x0", value_parser = parse_perf_event)]
328    perf_sticky: PerfEventSpec,
329
330    /// Sticky perf threshold; task is kept on same CPU when its count for -y event exceeds this.
331    #[clap(short = 'Y', default_value = "0", long)]
332    perf_sticky_threshold: u64,
333
334    /// Enable GPU-aware scheduling.
335    #[clap(short = 'g', long, action = clap::ArgAction::SetTrue)]
336    gpu: bool,
337
338    /// Only treat a process as GPU-bound if its GPU utilization is at least this percentage (0–100).
339    ///
340    /// Uses NVML process utilization (SM + memory). 0 = no filter (all processes on the GPU are
341    /// considered GPU-bound). Requires driver support (Maxwell or newer).
342    #[clap(long, default_value = "0", value_parser = clap::value_parser!(u32).range(0..=100))]
343    gpu_util_threshold: u32,
344
345    /// Disable NUMA optimizations.
346    #[clap(short = 'n', long, action = clap::ArgAction::SetTrue)]
347    disable_numa: bool,
348
349    /// Disable CPU frequency control.
350    #[clap(short = 'f', long, action = clap::ArgAction::SetTrue)]
351    disable_cpufreq: bool,
352
353    /// Enable flat idle CPU scanning.
354    ///
355    /// This option can help reducing some overhead when trying to allocate idle CPUs and it can be
356    /// quite effective with simple CPU topologies.
357    #[arg(short = 'i', long, action = clap::ArgAction::SetTrue)]
358    flat_idle_scan: bool,
359
360    /// Enable preferred idle CPU scanning.
361    ///
362    /// With this option enabled, the scheduler will prioritize assigning tasks to higher-ranked
363    /// cores before considering lower-ranked ones.
364    #[clap(short = 'P', long, action = clap::ArgAction::SetTrue)]
365    preferred_idle_scan: bool,
366
367    /// Disable SMT.
368    ///
369    /// This option can only be used together with --flat-idle-scan or --preferred-idle-scan,
370    /// otherwise it is ignored.
371    #[clap(long, action = clap::ArgAction::SetTrue)]
372    disable_smt: bool,
373
374    /// ***DEPRECATED*** SMT contention avoidance.
375    #[clap(short = 'S', long, action = clap::ArgAction::SetTrue)]
376    avoid_smt: bool,
377
378    /// Disable early clearing of idle CPU state.
379    ///
380    /// When enabled, multiple concurrent wakeups can select the same idle CPU
381    /// before it fully wakes up. This can improve performance in highly communicative
382    /// workloads by aggressively stacking tasks on the same cache.
383    #[clap(short = 'N', long, action = clap::ArgAction::SetTrue)]
384    no_early_clear: bool,
385
386    /// Disable direct dispatch during synchronous wakeups.
387    ///
388    /// Enabling this option can lead to a more uniform load distribution across available cores,
389    /// potentially improving performance in certain scenarios. However, it may come at the cost of
390    /// reduced efficiency for pipe-intensive workloads that benefit from tighter producer-consumer
391    /// coupling.
392    #[clap(short = 'w', long, action = clap::ArgAction::SetTrue)]
393    no_wake_sync: bool,
394
395    /// ***DEPRECATED*** Disable deferred wakeups.
396    #[clap(short = 'd', long, action = clap::ArgAction::SetTrue)]
397    no_deferred_wakeup: bool,
398
399    /// Enable high-resolution timer preemption.
400    ///
401    /// By default, the scheduler preempts tasks that exceed their time slice, measuring the time
402    /// slice via the tick handler. Add an option to enforce preemption based on the high-precision
403    /// timer and CPU occupancy. Enable this option to improve latency-sensitive workloads.
404    #[clap(long, action = clap::ArgAction::SetTrue)]
405    time_preemption: bool,
406
407    /// Enable address space affinity.
408    ///
409    /// This option allows to keep tasks that share the same address space (e.g., threads of the
410    /// same process) on the same CPU across wakeups.
411    ///
412    /// This can improve locality and performance in certain cache-sensitive workloads.
413    #[clap(short = 'a', long, action = clap::ArgAction::SetTrue)]
414    mm_affinity: bool,
415
416    /// Enable stats monitoring with the specified interval.
417    #[clap(long)]
418    stats: Option<f64>,
419
420    /// Run in stats monitoring mode with the specified interval. Scheduler
421    /// is not launched.
422    #[clap(long)]
423    monitor: Option<f64>,
424
425    /// Enable verbose output, including libbpf details.
426    #[clap(short = 'v', long, action = clap::ArgAction::SetTrue)]
427    verbose: bool,
428
429    /// Print scheduler version and exit.
430    #[clap(short = 'V', long, action = clap::ArgAction::SetTrue)]
431    version: bool,
432
433    /// Show descriptions for statistics.
434    #[clap(long)]
435    help_stats: bool,
436
437    #[clap(flatten, next_help_heading = "Libbpf Options")]
438    pub libbpf: LibbpfOpts,
439}
440
441pub fn parse_cpu_list(optarg: &str) -> Result<Vec<usize>, String> {
442    let mut cpus = Vec::new();
443    let mut seen = HashSet::new();
444
445    // Handle special keywords
446    if let Some(mode) = match optarg {
447        "powersave" => Some(Powermode::Powersave),
448        "performance" => Some(Powermode::Performance),
449        "turbo" => Some(Powermode::Turbo),
450        "all" => Some(Powermode::Any),
451        _ => None,
452    } {
453        return get_primary_cpus(mode).map_err(|e| e.to_string());
454    }
455
456    // Validate input characters
457    if optarg
458        .chars()
459        .any(|c| !c.is_ascii_digit() && c != '-' && c != ',' && !c.is_whitespace())
460    {
461        return Err("Invalid character in CPU list".to_string());
462    }
463
464    // Replace all whitespace with tab (or just trim later)
465    let cleaned = optarg.replace(' ', "\t");
466
467    for token in cleaned.split(',') {
468        let token = token.trim_matches(|c: char| c.is_whitespace());
469
470        if token.is_empty() {
471            continue;
472        }
473
474        if let Some((start_str, end_str)) = token.split_once('-') {
475            let start = start_str
476                .trim()
477                .parse::<usize>()
478                .map_err(|_| "Invalid range start")?;
479            let end = end_str
480                .trim()
481                .parse::<usize>()
482                .map_err(|_| "Invalid range end")?;
483
484            if start > end {
485                return Err(format!("Invalid CPU range: {}-{}", start, end));
486            }
487
488            for i in start..=end {
489                if cpus.len() >= *NR_CPU_IDS {
490                    return Err(format!("Too many CPUs specified (max {})", *NR_CPU_IDS));
491                }
492                if seen.insert(i) {
493                    cpus.push(i);
494                }
495            }
496        } else {
497            let cpu = token
498                .parse::<usize>()
499                .map_err(|_| format!("Invalid CPU: {}", token))?;
500            if cpus.len() >= *NR_CPU_IDS {
501                return Err(format!("Too many CPUs specified (max {})", *NR_CPU_IDS));
502            }
503            if seen.insert(cpu) {
504                cpus.push(cpu);
505            }
506        }
507    }
508
509    Ok(cpus)
510}
511
512/// Initial value for the dynamic threshold (in BPF units).
513const DYNAMIC_THRESHOLD_INIT_VALUE: u64 = 1000;
514
515/// Minimum value for the dynamic threshold (in BPF units).
516const DYNAMIC_THRESHOLD_MIN_VALUE: u64 = 10;
517
518/// Target event rate (per second) above which we consider migrations/sticky dispatches too high.
519const DYNAMIC_THRESHOLD_RATE_HIGH: f64 = 4000.0;
520
521/// Target event rate (per second) below which we consider migrations/sticky dispatches too low.
522const DYNAMIC_THRESHOLD_RATE_LOW: f64 = 2000.0;
523
524/// Hysteresis band: rate must move by this fraction beyond the target bounds before we act.
525/// This prevents oscillation when the rate hovers near the threshold boundaries.
526const DYNAMIC_THRESHOLD_HYSTERESIS: f64 = 0.1;
527
528/// EMA smoothing factor (alpha). Higher values give more weight to recent samples.
529/// 0.3 provides good balance between responsiveness and stability.
530const DYNAMIC_THRESHOLD_EMA_ALPHA: f64 = 0.3;
531
532/// Minimum scale factor when just outside the target band (slow convergence near optimal).
533const DYNAMIC_THRESHOLD_SCALE_MIN: f64 = 0.0001;
534
535/// Maximum scale factor when far from target (fast convergence when initial threshold is way off).
536const DYNAMIC_THRESHOLD_SCALE_MAX: f64 = 1000.0;
537
538/// Slope for "too high" case: scale grows with (rate/HIGH - 1) so we step much harder when rate is
539/// many times over target.
540const DYNAMIC_THRESHOLD_SLOPE_HIGH: f64 = 0.35;
541
542/// Slope for "too low" case: scale grows with deficit so we step harder when rate is near zero.
543const DYNAMIC_THRESHOLD_SLOPE_LOW: f64 = 0.58;
544
545/// Minimum interval between NVML GPU PID syncs. Kept separate from CPU polling so that fast
546/// polling (e.g. 100 ms) does not trigger expensive NVML calls every tick.
547const GPU_SYNC_INTERVAL: Duration = Duration::from_secs(1);
548
549/// State for EMA-based dynamic threshold adjustment with hysteresis.
550///
551/// This struct maintains the smoothed rate estimate and tracks whether we're
552/// currently in an adjustment state (raising or lowering threshold) to implement
553/// hysteresis and prevent oscillation.
554#[derive(Debug, Clone)]
555struct DynamicThresholdState {
556    /// Current threshold value.
557    threshold: u64,
558    /// EMA-smoothed rate estimate.
559    smoothed_rate: f64,
560    /// Previous raw counter value for delta calculation.
561    prev_counter: u64,
562    /// Whether the EMA has been initialized with a valid sample.
563    initialized: bool,
564    /// Current adjustment direction: None (stable), Some(true) = raising, Some(false) = lowering.
565    /// Used for hysteresis: once we start adjusting in a direction, we continue until
566    /// the rate crosses back into the stable band with hysteresis margin.
567    adjustment_direction: Option<bool>,
568}
569
570impl DynamicThresholdState {
571    /// Create a new dynamic threshold state with the given initial threshold.
572    fn new(initial_threshold: u64) -> Self {
573        Self {
574            threshold: initial_threshold,
575            smoothed_rate: 0.0,
576            prev_counter: 0,
577            initialized: false,
578            adjustment_direction: None,
579        }
580    }
581
582    /// Update the state with a new counter sample and elapsed time.
583    /// Returns the new threshold if it changed, or None if unchanged.
584    fn update(
585        &mut self,
586        counter: u64,
587        elapsed_secs: f64,
588        verbose: bool,
589        name: &str,
590    ) -> Option<u64> {
591        if elapsed_secs <= 0.0 {
592            return None;
593        }
594
595        // Calculate instantaneous rate.
596        let delta = counter.saturating_sub(self.prev_counter);
597        self.prev_counter = counter;
598        let raw_rate = delta as f64 / elapsed_secs;
599
600        // Update EMA.
601        if self.initialized {
602            self.smoothed_rate = DYNAMIC_THRESHOLD_EMA_ALPHA * raw_rate
603                + (1.0 - DYNAMIC_THRESHOLD_EMA_ALPHA) * self.smoothed_rate;
604        } else {
605            // First sample: initialize EMA directly.
606            self.smoothed_rate = raw_rate;
607            self.initialized = true;
608        }
609
610        // Determine if we should adjust the threshold using hysteresis.
611        let rate = self.smoothed_rate;
612        let old_threshold = self.threshold;
613
614        // Calculate hysteresis-adjusted bounds based on current state.
615        let (effective_high, effective_low) = match self.adjustment_direction {
616            Some(true) => {
617                // Currently raising threshold: need rate to drop below LOW - hysteresis to stop.
618                (
619                    DYNAMIC_THRESHOLD_RATE_HIGH,
620                    DYNAMIC_THRESHOLD_RATE_LOW * (1.0 - DYNAMIC_THRESHOLD_HYSTERESIS),
621                )
622            }
623            Some(false) => {
624                // Currently lowering threshold: need rate to rise above HIGH + hysteresis to stop.
625                (
626                    DYNAMIC_THRESHOLD_RATE_HIGH * (1.0 + DYNAMIC_THRESHOLD_HYSTERESIS),
627                    DYNAMIC_THRESHOLD_RATE_LOW,
628                )
629            }
630            None => {
631                // Stable state: need rate to exceed bounds + hysteresis to start adjusting.
632                (
633                    DYNAMIC_THRESHOLD_RATE_HIGH * (1.0 + DYNAMIC_THRESHOLD_HYSTERESIS),
634                    DYNAMIC_THRESHOLD_RATE_LOW * (1.0 - DYNAMIC_THRESHOLD_HYSTERESIS),
635                )
636            }
637        };
638
639        // Determine new adjustment direction.
640        let new_direction = if rate > effective_high {
641            Some(true) // Rate too high, raise threshold.
642        } else if rate < effective_low && rate >= 0.0 {
643            Some(false) // Rate too low, lower threshold.
644        } else {
645            // Rate in stable band (considering hysteresis).
646            if self.adjustment_direction.is_some() {
647                // We were adjusting; check if we should stop.
648                if rate >= DYNAMIC_THRESHOLD_RATE_LOW && rate <= DYNAMIC_THRESHOLD_RATE_HIGH {
649                    None // Back in target band, stop adjusting.
650                } else {
651                    self.adjustment_direction // Continue current direction.
652                }
653            } else {
654                None // Already stable.
655            }
656        };
657
658        // Apply adjustment if we have a direction.
659        if let Some(raising) = new_direction {
660            let scale = Self::compute_scale(rate, raising);
661            let factor = if raising { 1.0 + scale } else { 1.0 - scale };
662            let new_threshold = ((self.threshold as f64) * factor).round() as u64;
663            self.threshold = new_threshold.clamp(DYNAMIC_THRESHOLD_MIN_VALUE, u64::MAX);
664        }
665
666        self.adjustment_direction = new_direction;
667
668        // Return new threshold only if it changed.
669        if self.threshold != old_threshold {
670            if verbose {
671                info!(
672                    "{}: {} -> {} (smoothed rate {:.1}/s, raw {:.1}/s, dir {:?})",
673                    name,
674                    old_threshold,
675                    self.threshold,
676                    self.smoothed_rate,
677                    raw_rate,
678                    self.adjustment_direction
679                );
680            }
681            Some(self.threshold)
682        } else {
683            None
684        }
685    }
686
687    /// Compute the scale factor for threshold adjustment based on how far the rate
688    /// is from the target band.
689    fn compute_scale(rate: f64, too_high: bool) -> f64 {
690        if too_high {
691            let excess = ((rate / DYNAMIC_THRESHOLD_RATE_HIGH) - 1.0).max(0.0);
692            let scale =
693                DYNAMIC_THRESHOLD_SCALE_MIN + DYNAMIC_THRESHOLD_SLOPE_HIGH * excess.min(4.0);
694            scale.min(DYNAMIC_THRESHOLD_SCALE_MAX)
695        } else {
696            if rate <= 0.0 {
697                return DYNAMIC_THRESHOLD_SCALE_MAX;
698            }
699            let deficit = (DYNAMIC_THRESHOLD_RATE_LOW - rate) / DYNAMIC_THRESHOLD_RATE_LOW;
700            let t = deficit.clamp(0.0, 1.0);
701            DYNAMIC_THRESHOLD_SCALE_MIN + DYNAMIC_THRESHOLD_SLOPE_LOW * t
702        }
703    }
704}
705
706#[derive(Debug, Clone, Copy)]
707struct CpuTimes {
708    user: u64,
709    nice: u64,
710    total: u64,
711}
712
713struct Scheduler<'a> {
714    skel: BpfSkel<'a>,
715    opts: &'a Opts,
716    struct_ops: Option<libbpf_rs::Link>,
717    stats_server: StatsServer<(), Metrics>,
718    /// GPU device index -> NUMA node (for NVML PID sync). Only set when --gpu and NUMA enabled.
719    gpu_index_to_node: Option<HashMap<u32, u32>>,
720    /// Previous (pid, node) set so we can remove PIDs that stopped using the GPU.
721    previous_gpu_pids: Option<HashMap<u32, u32>>,
722    /// Reused NVML handle to avoid re-initializing on every sync (expensive).
723    nvml: Option<Nvml>,
724    /// Dynamic threshold state for perf event migrations (when --perf-threshold is 0/dynamic).
725    perf_threshold_state: Option<DynamicThresholdState>,
726    /// Dynamic threshold state for sticky perf events (when --perf-sticky-threshold is 0/dynamic).
727    perf_sticky_threshold_state: Option<DynamicThresholdState>,
728}
729
730impl<'a> Scheduler<'a> {
731    fn init(opts: &'a Opts, open_object: &'a mut MaybeUninit<OpenObject>) -> Result<Self> {
732        try_set_rlimit_infinity();
733
734        // Initialize CPU topology.
735        let topo = Topology::new().unwrap();
736
737        // Check host topology to determine if we need to enable SMT capabilities.
738        let smt_enabled = !opts.disable_smt && topo.smt_enabled;
739
740        // Determine the amount of non-empty NUMA nodes in the system.
741        let nr_nodes = topo
742            .nodes
743            .values()
744            .filter(|node| !node.all_cpus.is_empty())
745            .count();
746        info!("NUMA nodes: {}", nr_nodes);
747
748        // Automatically disable NUMA optimizations when running on non-NUMA systems.
749        let numa_enabled = !opts.disable_numa && nr_nodes > 1;
750        if !numa_enabled {
751            info!("Disabling NUMA optimizations");
752        }
753
754        info!(
755            "{} {} {}",
756            SCHEDULER_NAME,
757            build_id::full_version(env!("CARGO_PKG_VERSION")),
758            if smt_enabled { "SMT on" } else { "SMT off" }
759        );
760
761        // Print command line.
762        info!(
763            "scheduler options: {}",
764            std::env::args().collect::<Vec<_>>().join(" ")
765        );
766
767        // Initialize BPF connector.
768        let mut skel_builder = BpfSkelBuilder::default();
769        skel_builder.obj_builder.debug(opts.verbose);
770        let open_opts = opts.libbpf.clone().into_bpf_open_opts();
771        let mut skel = scx_ops_open!(skel_builder, open_object, cosmos_ops, open_opts)?;
772
773        skel.struct_ops.cosmos_ops_mut().exit_dump_len = opts.exit_dump_len;
774
775        // Override default BPF scheduling parameters.
776        let rodata = skel.maps.rodata_data.as_mut().unwrap();
777        rodata.slice_ns = opts.slice_us * 1000;
778        rodata.slice_lag = opts.slice_lag_us * 1000;
779        rodata.cpufreq_enabled = !opts.disable_cpufreq;
780        rodata.flat_idle_scan = opts.flat_idle_scan;
781        rodata.smt_enabled = smt_enabled;
782        rodata.numa_enabled = numa_enabled;
783        rodata.nr_node_ids = topo.nodes.len() as u32;
784        rodata.no_wake_sync = opts.no_wake_sync;
785        rodata.no_early_clear = opts.no_early_clear;
786        rodata.time_preemption = opts.time_preemption;
787        rodata.mm_affinity = opts.mm_affinity;
788
789        // Enable perf event scheduling settings.
790        rodata.perf_config = opts.perf_config.event_id;
791        rodata.perf_sticky = opts.perf_sticky.event_id;
792
793        // Normalize CPU busy threshold in the range [0 .. 1024].
794        rodata.busy_threshold = opts.cpu_busy_thresh * 1024 / 100;
795
796        // Generate the list of available CPUs sorted by capacity in descending order.
797        let mut cpus: Vec<_> = topo.all_cpus.values().collect();
798        cpus.sort_by_key(|cpu| std::cmp::Reverse(cpu.cpu_capacity));
799        // Normalize CPU capacities to 1..1024 so the highest capacity is always 1024.
800        let max_cap = cpus.first().map(|c| c.cpu_capacity).unwrap_or(1).max(1);
801        for (i, cpu) in cpus.iter().enumerate() {
802            let normalized = (cpu.cpu_capacity * 1024 / max_cap).clamp(1, 1024);
803            rodata.cpu_capacity[cpu.id] = normalized as c_ulong;
804            rodata.preferred_cpus[i] = cpu.id as u64;
805        }
806        rodata.all_cpus_same_capacity = cpus.iter().all(|cpu| cpu.cpu_capacity == max_cap);
807        if opts.preferred_idle_scan {
808            info!(
809                "Preferred CPUs: {:?}",
810                &rodata.preferred_cpus[0..cpus.len()]
811            );
812        }
813        rodata.preferred_idle_scan = opts.preferred_idle_scan;
814
815        // Define the primary scheduling domain.
816        let primary_cpus = if let Some(ref domain) = opts.primary_domain {
817            match parse_cpu_list(domain) {
818                Ok(cpus) => cpus,
819                Err(e) => bail!("Error parsing primary domain: {}", e),
820            }
821        } else {
822            (0..*NR_CPU_IDS).collect()
823        };
824        if primary_cpus.len() < *NR_CPU_IDS {
825            info!("Primary CPUs: {:?}", primary_cpus);
826            rodata.primary_all = false;
827        } else {
828            rodata.primary_all = true;
829        }
830
831        // Enable GPU support and build GPU index -> node for NVML PID sync. Init NVML once here
832        // so we reuse the handle in the run loop (re-initing every sync is very expensive).
833        let (gpu_index_to_node, previous_gpu_pids, nvml) = if opts.gpu && numa_enabled {
834            match Nvml::init_with_flags(InitFlags::NO_GPUS) {
835                Ok(nvml) => {
836                    info!("NVIDIA GPU-aware scheduling enabled (NVML PID sync)");
837                    rodata.gpu_enabled = true;
838                    let mut idx_to_node = HashMap::new();
839                    for (id, gpu) in topo.gpus() {
840                        let GpuIndex::Nvidia { nvml_id } = id;
841                        idx_to_node.insert(nvml_id, gpu.node_id as u32);
842                    }
843                    (Some(idx_to_node), Some(HashMap::new()), Some(nvml))
844                }
845                Err(e) => {
846                    warn!("NVML init failed, disabling GPU-aware scheduling: {}", e);
847                    rodata.gpu_enabled = false;
848                    (None, None, None)
849                }
850            }
851        } else {
852            rodata.gpu_enabled = false;
853            (None, None, None)
854        };
855
856        // Set scheduler flags.
857        skel.struct_ops.cosmos_ops_mut().flags = *compat::SCX_OPS_ENQ_EXITING
858            | *compat::SCX_OPS_ENQ_LAST
859            | *compat::SCX_OPS_ENQ_MIGRATION_DISABLED
860            | *compat::SCX_OPS_ALLOW_QUEUED_WAKEUP
861            | if numa_enabled {
862                *compat::SCX_OPS_BUILTIN_IDLE_PER_NODE
863            } else {
864                0
865            };
866
867        info!(
868            "scheduler flags: {:#x}",
869            skel.struct_ops.cosmos_ops_mut().flags
870        );
871
872        // Load the BPF program for validation.
873        let mut skel = scx_ops_load!(skel, cosmos_ops, uei)?;
874
875        // Initial perf thresholds in bss. When threshold is 0 we use dynamic logic; when user
876        // specifies a value > 0 we use it as a static threshold.
877        let bss = skel.maps.bss_data.as_mut().unwrap();
878        if opts.perf_config.event_id > 0 {
879            bss.perf_threshold = if opts.perf_threshold == 0 {
880                DYNAMIC_THRESHOLD_INIT_VALUE
881            } else {
882                opts.perf_threshold
883            };
884        }
885        if opts.perf_sticky.event_id > 0 {
886            bss.perf_sticky_threshold = if opts.perf_sticky_threshold == 0 {
887                DYNAMIC_THRESHOLD_INIT_VALUE
888            } else {
889                opts.perf_sticky_threshold
890            };
891        }
892
893        // Configure CPU->node mapping (must be done after skeleton is loaded).
894        for node in topo.nodes.values() {
895            for cpu in node.all_cpus.values() {
896                if opts.verbose {
897                    info!("CPU{} -> node{}", cpu.id, node.id);
898                }
899                skel.maps.cpu_node_map.update(
900                    &(cpu.id as u32).to_ne_bytes(),
901                    &(node.id as u32).to_ne_bytes(),
902                    MapFlags::ANY,
903                )?;
904            }
905        }
906
907        // Setup performance events for all CPUs.
908        // Counter indices must match PMU library install order: migration first (0), then sticky (1).
909        // When only sticky is used, it gets index 0; when both are used, sticky gets index 1.
910        let nr_cpus = *NR_CPU_IDS;
911        info!("Setting up performance counters for {} CPUs...", nr_cpus);
912        let mut perf_available = true;
913        let sticky_counter_idx = if opts.perf_config.event_id > 0 { 1 } else { 0 };
914        for cpu in 0..nr_cpus {
915            if opts.perf_config.event_id > 0 {
916                if let Err(e) = setup_perf_events(&mut skel, cpu as i32, &opts.perf_config, 0) {
917                    if cpu == 0 {
918                        let err_str = e.to_string();
919                        if err_str.contains("errno 2") || err_str.contains("os error 2") {
920                            warn!("Performance counters not available on this CPU architecture");
921                            warn!("PMU event '{}' not supported - scheduler will run without perf monitoring", opts.perf_config.display_name);
922                        } else {
923                            warn!("Failed to setup perf events: {}", e);
924                        }
925                        perf_available = false;
926                        break;
927                    }
928                }
929            }
930            if opts.perf_sticky.event_id > 0 {
931                if let Err(e) =
932                    setup_perf_events(&mut skel, cpu as i32, &opts.perf_sticky, sticky_counter_idx)
933                {
934                    if cpu == 0 {
935                        let err_str = e.to_string();
936                        if err_str.contains("errno 2") || err_str.contains("os error 2") {
937                            warn!("Performance counters not available on this CPU architecture");
938                            warn!("PMU event '{}' not supported - scheduler will run without perf monitoring", opts.perf_sticky.display_name);
939                        } else {
940                            warn!("Failed to setup perf events: {}", e);
941                        }
942                        perf_available = false;
943                        break;
944                    }
945                }
946            }
947        }
948        if perf_available {
949            info!("Performance counters configured successfully for all CPUs");
950        }
951
952        // Configure GPU->node mapping.
953        if opts.gpu && numa_enabled {
954            for (id, gpu) in topo.gpus() {
955                let GpuIndex::Nvidia { nvml_id } = id;
956                if opts.verbose {
957                    info!("GPU{} -> node{}", nvml_id, gpu.node_id);
958                }
959                skel.maps.gpu_node_map.update(
960                    &(nvml_id as u32).to_ne_bytes(),
961                    &(gpu.node_id as u32).to_ne_bytes(),
962                    MapFlags::ANY,
963                )?;
964            }
965        }
966
967        // Enable primary scheduling domain, if defined.
968        if primary_cpus.len() < *NR_CPU_IDS {
969            for cpu in primary_cpus {
970                if let Err(err) = Self::enable_primary_cpu(&mut skel, cpu as i32) {
971                    bail!("failed to add CPU {} to primary domain: error {}", cpu, err);
972                }
973            }
974        }
975
976        // Initialize SMT domains.
977        if smt_enabled {
978            Self::init_smt_domains(&mut skel, &topo)?;
979        }
980
981        // Attach the scheduler.
982        let struct_ops = Some(scx_ops_attach!(skel, cosmos_ops)?);
983        let stats_server = StatsServer::new(stats::server_data()).launch()?;
984
985        // Initialize dynamic threshold states for perf events (only when using dynamic mode).
986        let perf_threshold_state = if opts.perf_config.event_id > 0 && opts.perf_threshold == 0 {
987            Some(DynamicThresholdState::new(DYNAMIC_THRESHOLD_INIT_VALUE))
988        } else {
989            None
990        };
991        let perf_sticky_threshold_state =
992            if opts.perf_sticky.event_id > 0 && opts.perf_sticky_threshold == 0 {
993                Some(DynamicThresholdState::new(DYNAMIC_THRESHOLD_INIT_VALUE))
994            } else {
995                None
996            };
997
998        Ok(Self {
999            skel,
1000            opts,
1001            struct_ops,
1002            stats_server,
1003            gpu_index_to_node,
1004            previous_gpu_pids,
1005            nvml,
1006            perf_threshold_state,
1007            perf_sticky_threshold_state,
1008        })
1009    }
1010
1011    /// Sync PID -> GPU (node) map from NVML. When gpu_util_threshold > 0, only PIDs with
1012    /// GPU utilization (SM or memory) >= threshold are added. Map is keyed by task pid.
1013    /// Only processes using a single GPU are added; multi-GPU processes are excluded.
1014    fn sync_gpu_pids(&mut self) -> Result<()> {
1015        let gpu_index_to_node = match &self.gpu_index_to_node {
1016            Some(m) => m,
1017            None => return Ok(()),
1018        };
1019        let nvml = match &self.nvml {
1020            Some(n) => n,
1021            None => return Ok(()),
1022        };
1023        let threshold = self.opts.gpu_util_threshold;
1024        let previous = self.previous_gpu_pids.as_ref().unwrap();
1025        // First collect pid -> set of nodes (GPUs) per process.
1026        let mut pid_to_nodes: HashMap<u32, HashSet<u32>> = HashMap::new();
1027
1028        let count = nvml.device_count().context("NVML device count")?;
1029        for i in 0..count {
1030            let node = match gpu_index_to_node.get(&i) {
1031                Some(&n) => n,
1032                None => continue,
1033            };
1034            let device = nvml.device_by_index(i).context("NVML device_by_index")?;
1035
1036            if threshold > 0 {
1037                // Use process utilization; only add PIDs above threshold.
1038                match device.process_utilization_stats(None::<u64>) {
1039                    Ok(samples) => {
1040                        for sample in samples {
1041                            let util = sample.sm_util.max(sample.mem_util);
1042                            if util >= threshold {
1043                                pid_to_nodes.entry(sample.pid).or_default().insert(node);
1044                            }
1045                        }
1046                    }
1047                    Err(_) => {
1048                        // NotSupported or other: fall back to all running processes.
1049                        Self::add_running_gpu_processes_to_set(&device, node, &mut pid_to_nodes);
1050                    }
1051                }
1052            } else {
1053                Self::add_running_gpu_processes_to_set(&device, node, &mut pid_to_nodes);
1054            }
1055        }
1056
1057        // Only add PIDs that use exactly one GPU to the map.
1058        let mut current: HashMap<u32, u32> = HashMap::new();
1059        for (tgid, nodes) in pid_to_nodes {
1060            if nodes.len() == 1 {
1061                let node = nodes.into_iter().next().unwrap();
1062                current.insert(tgid, node);
1063                for tid in Self::task_tids(tgid) {
1064                    current.insert(tid, node);
1065                }
1066            }
1067        }
1068
1069        let map = &self.skel.maps.gpu_pid_map;
1070        for (pid, node) in &current {
1071            map.update(&pid.to_ne_bytes(), &node.to_ne_bytes(), MapFlags::ANY)
1072                .context("gpu_pid_map update")?;
1073        }
1074        for pid in previous.keys() {
1075            if !current.contains_key(pid) {
1076                let _ = map.delete(&pid.to_ne_bytes());
1077            }
1078        }
1079        *self.previous_gpu_pids.as_mut().unwrap() = current;
1080        Ok(())
1081    }
1082
1083    /// Record running compute/graphics process PIDs and the GPU node in pid_to_nodes.
1084    fn add_running_gpu_processes_to_set(
1085        device: &nvml_wrapper::Device<'_>,
1086        node: u32,
1087        pid_to_nodes: &mut HashMap<u32, HashSet<u32>>,
1088    ) {
1089        for proc in device
1090            .running_compute_processes()
1091            .unwrap_or_default()
1092            .into_iter()
1093            .chain(device.running_graphics_processes().unwrap_or_default())
1094        {
1095            pid_to_nodes.entry(proc.pid).or_default().insert(node);
1096        }
1097    }
1098
1099    /// Return all thread IDs (tids) of the process with the given pid (tgid).
1100    fn task_tids(pid: u32) -> Vec<u32> {
1101        let task_dir = format!("/proc/{}/task", pid);
1102        let Ok(entries) = fs::read_dir(Path::new(&task_dir)) else {
1103            return Vec::new();
1104        };
1105        entries
1106            .filter_map(|e| e.ok())
1107            .filter_map(|e| e.file_name().to_str().and_then(|s| s.parse::<u32>().ok()))
1108            .collect()
1109    }
1110
1111    fn enable_primary_cpu(skel: &mut BpfSkel<'_>, cpu: i32) -> Result<(), u32> {
1112        let prog = &mut skel.progs.enable_primary_cpu;
1113        let mut args = cpu_arg {
1114            cpu_id: cpu as c_int,
1115        };
1116        let input = ProgramInput {
1117            context_in: Some(unsafe {
1118                std::slice::from_raw_parts_mut(
1119                    &mut args as *mut _ as *mut u8,
1120                    std::mem::size_of_val(&args),
1121                )
1122            }),
1123            ..Default::default()
1124        };
1125        let out = prog.test_run(input).unwrap();
1126        if out.return_value != 0 {
1127            return Err(out.return_value);
1128        }
1129
1130        Ok(())
1131    }
1132
1133    fn enable_sibling_cpu(
1134        skel: &mut BpfSkel<'_>,
1135        cpu: usize,
1136        sibling_cpu: usize,
1137    ) -> Result<(), u32> {
1138        let prog = &mut skel.progs.enable_sibling_cpu;
1139        let mut args = domain_arg {
1140            cpu_id: cpu as c_int,
1141            sibling_cpu_id: sibling_cpu as c_int,
1142        };
1143        let input = ProgramInput {
1144            context_in: Some(unsafe {
1145                std::slice::from_raw_parts_mut(
1146                    &mut args as *mut _ as *mut u8,
1147                    std::mem::size_of_val(&args),
1148                )
1149            }),
1150            ..Default::default()
1151        };
1152        let out = prog.test_run(input).unwrap();
1153        if out.return_value != 0 {
1154            return Err(out.return_value);
1155        }
1156
1157        Ok(())
1158    }
1159
1160    fn init_smt_domains(skel: &mut BpfSkel<'_>, topo: &Topology) -> Result<(), std::io::Error> {
1161        let smt_siblings = topo.sibling_cpus();
1162
1163        info!("SMT sibling CPUs: {:?}", smt_siblings);
1164        for (cpu, sibling_cpu) in smt_siblings.iter().enumerate() {
1165            Self::enable_sibling_cpu(skel, cpu, *sibling_cpu as usize).unwrap();
1166        }
1167
1168        Ok(())
1169    }
1170
1171    fn get_metrics(&self) -> Metrics {
1172        let bss_data = self.skel.maps.bss_data.as_ref().unwrap();
1173        Metrics {
1174            nr_event_dispatches: bss_data.nr_event_dispatches,
1175            nr_ev_sticky_dispatches: bss_data.nr_ev_sticky_dispatches,
1176            nr_gpu_dispatches: bss_data.nr_gpu_dispatches,
1177        }
1178    }
1179
1180    pub fn exited(&mut self) -> bool {
1181        uei_exited!(&self.skel, uei)
1182    }
1183
1184    fn compute_user_cpu_pct(prev: &CpuTimes, curr: &CpuTimes) -> Option<u64> {
1185        // Evaluate total user CPU time as user + nice.
1186        let user_diff = (curr.user + curr.nice).saturating_sub(prev.user + prev.nice);
1187        let total_diff = curr.total.saturating_sub(prev.total);
1188
1189        if total_diff > 0 {
1190            let user_ratio = user_diff as f64 / total_diff as f64;
1191            Some((user_ratio * 1024.0).round() as u64)
1192        } else {
1193            None
1194        }
1195    }
1196
1197    /// Read per-CPU times from /proc/stat (lines "cpu0", "cpu1", ...).
1198    /// Returns one CpuTimes per CPU, in order.
1199    fn read_per_cpu_cpu_times(nr_cpus: usize) -> Option<Vec<CpuTimes>> {
1200        let file = File::open("/proc/stat").ok()?;
1201        let reader = BufReader::new(file);
1202        let mut result = Vec::with_capacity(nr_cpus);
1203
1204        for line in reader.lines() {
1205            let line = line.ok()?;
1206            let line = line.trim();
1207            if !line.starts_with("cpu") {
1208                continue;
1209            }
1210            let rest = line.strip_prefix("cpu")?;
1211            if rest.starts_with(' ') {
1212                // Aggregate line "cpu " - skip.
1213                continue;
1214            }
1215            let cpu_id: usize = rest.split_whitespace().next()?.parse().ok()?;
1216            if cpu_id != result.len() {
1217                break;
1218            }
1219            let fields: Vec<&str> = line.split_whitespace().collect();
1220            if fields.len() < 5 {
1221                return None;
1222            }
1223            let user: u64 = fields[1].parse().ok()?;
1224            let nice: u64 = fields[2].parse().ok()?;
1225            let total: u64 = fields
1226                .iter()
1227                .skip(1)
1228                .take(8)
1229                .filter_map(|v| v.parse::<u64>().ok())
1230                .sum();
1231            result.push(CpuTimes { user, nice, total });
1232            if result.len() >= nr_cpus {
1233                break;
1234            }
1235        }
1236
1237        if result.len() == nr_cpus {
1238            Some(result)
1239        } else {
1240            None
1241        }
1242    }
1243
1244    fn run(&mut self, shutdown: Arc<AtomicBool>) -> Result<UserExitInfo> {
1245        let (res_ch, req_ch) = self.stats_server.channels();
1246
1247        // Periodically evaluate per-CPU user utilization from userspace and update the
1248        // cpu_util_map in BPF. The scheduler uses is_cpu_busy(cpu) with prev_cpu or
1249        // scx_bpf_task_cpu(p) to decide per-CPU whether to use local DSQs (round-robin)
1250        // or deadline-based shared DSQ.
1251        let polling_time = Duration::from_millis(self.opts.polling_ms).min(Duration::from_secs(1));
1252        let nr_cpus = *NR_CPU_IDS as usize;
1253        let mut prev_cputime =
1254            Self::read_per_cpu_cpu_times(nr_cpus).expect("Failed to read initial per-CPU stats");
1255        let mut last_update = Instant::now();
1256        let mut last_gpu_sync = Instant::now();
1257
1258        while !shutdown.load(Ordering::Relaxed) && !self.exited() {
1259            // Update per-CPU utilization and GPU PID -> node map (NVML).
1260            if !polling_time.is_zero() && last_update.elapsed() >= polling_time {
1261                if let Some(curr_cputime) = Self::read_per_cpu_cpu_times(nr_cpus) {
1262                    let map = &self.skel.maps.cpu_util_map;
1263                    for cpu in 0..nr_cpus {
1264                        if let Some(util) =
1265                            Self::compute_user_cpu_pct(&prev_cputime[cpu], &curr_cputime[cpu])
1266                        {
1267                            let _ = map.update(
1268                                &(cpu as u32).to_ne_bytes(),
1269                                &util.to_ne_bytes(),
1270                                MapFlags::ANY,
1271                            );
1272                        }
1273                    }
1274                    prev_cputime = curr_cputime;
1275                }
1276
1277                // Update dynamic perf thresholds using EMA + hysteresis.
1278                let elapsed_secs = last_update.elapsed().as_secs_f64();
1279
1280                // Update migration threshold state if dynamic mode is enabled.
1281                if let Some(ref mut state) = self.perf_threshold_state {
1282                    let nr_event = self
1283                        .skel
1284                        .maps
1285                        .bss_data
1286                        .as_ref()
1287                        .unwrap()
1288                        .nr_event_dispatches;
1289                    if let Some(new_thresh) =
1290                        state.update(nr_event, elapsed_secs, self.opts.verbose, "perf_threshold")
1291                    {
1292                        self.skel.maps.bss_data.as_mut().unwrap().perf_threshold = new_thresh;
1293                    }
1294                }
1295
1296                // Update sticky threshold state if dynamic mode is enabled.
1297                if let Some(ref mut state) = self.perf_sticky_threshold_state {
1298                    let nr_sticky = self
1299                        .skel
1300                        .maps
1301                        .bss_data
1302                        .as_ref()
1303                        .unwrap()
1304                        .nr_ev_sticky_dispatches;
1305                    if let Some(new_thresh) = state.update(
1306                        nr_sticky,
1307                        elapsed_secs,
1308                        self.opts.verbose,
1309                        "perf_sticky_threshold",
1310                    ) {
1311                        self.skel
1312                            .maps
1313                            .bss_data
1314                            .as_mut()
1315                            .unwrap()
1316                            .perf_sticky_threshold = new_thresh;
1317                    }
1318                }
1319
1320                // GPU PID sync is throttled to GPU_SYNC_INTERVAL (NVML is expensive).
1321                if self.gpu_index_to_node.is_some() && last_gpu_sync.elapsed() >= GPU_SYNC_INTERVAL
1322                {
1323                    if let Err(e) = self.sync_gpu_pids() {
1324                        debug!("GPU PID sync: {}", e);
1325                    }
1326                    last_gpu_sync = Instant::now();
1327                }
1328
1329                last_update = Instant::now();
1330            }
1331
1332            // Update statistics and check for exit condition.
1333            let timeout = if polling_time.is_zero() {
1334                Duration::from_secs(1)
1335            } else {
1336                polling_time
1337            };
1338            match req_ch.recv_timeout(timeout) {
1339                Ok(()) => res_ch.send(self.get_metrics())?,
1340                Err(RecvTimeoutError::Timeout) => {}
1341                Err(e) => Err(e)?,
1342            }
1343        }
1344
1345        let _ = self.struct_ops.take();
1346        uei_report!(&self.skel, uei)
1347    }
1348}
1349
1350impl Drop for Scheduler<'_> {
1351    fn drop(&mut self) {
1352        info!("Unregister {SCHEDULER_NAME} scheduler");
1353    }
1354}
1355
1356fn main() -> Result<()> {
1357    let opts = Opts::parse();
1358
1359    if opts.version {
1360        println!(
1361            "{} {}",
1362            SCHEDULER_NAME,
1363            build_id::full_version(env!("CARGO_PKG_VERSION"))
1364        );
1365        return Ok(());
1366    }
1367
1368    if opts.help_stats {
1369        stats::server_data().describe_meta(&mut std::io::stdout(), None)?;
1370        return Ok(());
1371    }
1372
1373    let loglevel = simplelog::LevelFilter::Info;
1374
1375    let mut lcfg = simplelog::ConfigBuilder::new();
1376    lcfg.set_time_offset_to_local()
1377        .expect("Failed to set local time offset")
1378        .set_time_level(simplelog::LevelFilter::Error)
1379        .set_location_level(simplelog::LevelFilter::Off)
1380        .set_target_level(simplelog::LevelFilter::Off)
1381        .set_thread_level(simplelog::LevelFilter::Off);
1382    simplelog::TermLogger::init(
1383        loglevel,
1384        lcfg.build(),
1385        simplelog::TerminalMode::Stderr,
1386        simplelog::ColorChoice::Auto,
1387    )?;
1388
1389    let shutdown = Arc::new(AtomicBool::new(false));
1390    let shutdown_clone = shutdown.clone();
1391    ctrlc::set_handler(move || {
1392        shutdown_clone.store(true, Ordering::Relaxed);
1393    })
1394    .context("Error setting Ctrl-C handler")?;
1395
1396    if let Some(intv) = opts.monitor.or(opts.stats) {
1397        let shutdown_copy = shutdown.clone();
1398        let jh = std::thread::spawn(move || {
1399            match stats::monitor(Duration::from_secs_f64(intv), shutdown_copy) {
1400                Ok(_) => {
1401                    debug!("stats monitor thread finished successfully")
1402                }
1403                Err(error_object) => {
1404                    warn!(
1405                        "stats monitor thread finished because of an error {}",
1406                        error_object
1407                    )
1408                }
1409            }
1410        });
1411        if opts.monitor.is_some() {
1412            let _ = jh.join();
1413            return Ok(());
1414        }
1415    }
1416
1417    let mut open_object = MaybeUninit::uninit();
1418    loop {
1419        let mut sched = Scheduler::init(&opts, &mut open_object)?;
1420        if !sched.run(shutdown.clone())?.should_restart() {
1421            break;
1422        }
1423    }
1424
1425    Ok(())
1426}