1mod bpf_skel;
9pub use bpf_skel::*;
10pub mod bpf_intf;
11pub use bpf_intf::*;
12
13mod stats;
14use std::collections::{HashMap, HashSet};
15use std::ffi::{c_int, c_ulong};
16use std::fs;
17use std::fs::File;
18use std::io::{BufRead, BufReader};
19use std::mem::MaybeUninit;
20use std::path::Path;
21use std::sync::atomic::AtomicBool;
22use std::sync::atomic::Ordering;
23use std::sync::Arc;
24use std::time::{Duration, Instant};
25
26use anyhow::bail;
27use anyhow::Context;
28use anyhow::Result;
29use clap::Parser;
30use crossbeam::channel::RecvTimeoutError;
31use libbpf_rs::MapCore;
32use libbpf_rs::MapFlags;
33use libbpf_rs::OpenObject;
34use libbpf_rs::ProgramInput;
35use log::{debug, info, warn};
36use nvml_wrapper::bitmasks::InitFlags;
37use nvml_wrapper::Nvml;
38use scx_stats::prelude::*;
39use scx_utils::build_id;
40use scx_utils::compat;
41use scx_utils::get_primary_cpus;
42use scx_utils::libbpf_clap_opts::LibbpfOpts;
43use scx_utils::scx_ops_attach;
44use scx_utils::scx_ops_load;
45use scx_utils::scx_ops_open;
46use scx_utils::try_set_rlimit_infinity;
47use scx_utils::uei_exited;
48use scx_utils::uei_report;
49use scx_utils::GpuIndex;
50use scx_utils::Powermode;
51use scx_utils::Topology;
52use scx_utils::UserExitInfo;
53use scx_utils::NR_CPU_IDS;
54use stats::Metrics;
55
56const SCHEDULER_NAME: &str = "scx_cosmos";
57
58#[derive(Clone, Debug)]
61struct PerfEventSpec {
62 event_id: u64,
64 type_: u32,
66 config: u64,
68 display_name: String,
70}
71
72fn parse_hardware_event(s: &str) -> Option<u64> {
73 match s {
74 "cpu-cycles" | "cycles" => Some(0),
75 "instructions" => Some(1),
76 "cache-references" => Some(2),
77 "cache-misses" => Some(3),
78 "branch-instructions" | "branches" => Some(4),
79 "branch-misses" => Some(5),
80 "bus-cycles" => Some(6),
81 "stalled-cycles-frontend" | "idle-cycles-frontend" => Some(7),
82 "stalled-cycles-backend" | "idle-cycles-backend" => Some(8),
83 "ref-cycles" => Some(9),
84 _ => None,
85 }
86}
87
88fn parse_software_event(s: &str) -> Option<u64> {
89 match s {
90 "cpu-clock" => Some(0),
91 "task-clock" => Some(1),
92 "page-faults" | "faults" => Some(2),
93 "context-switches" | "cs" => Some(3),
94 "cpu-migrations" | "migrations" => Some(4),
95 "minor-faults" => Some(5),
96 "major-faults" => Some(6),
97 "alignment-faults" => Some(7),
98 "emulation-faults" => Some(8),
99 "dummy" => Some(9),
100 "bpf-output" => Some(10),
101 _ => None,
102 }
103}
104
105fn parse_hw_cache_event(s: &str) -> Option<u64> {
106 let (cache_id, prefix_len) = if s.starts_with("L1-dcache-") {
107 (0, 10)
108 } else if s.starts_with("L1-icache-") {
109 (1, 10)
110 } else if s.starts_with("LLC-") {
111 (2, 4)
112 } else if s.starts_with("dTLB-") {
113 (3, 5)
114 } else if s.starts_with("iTLB-") {
115 (4, 5)
116 } else if s.starts_with("branch-") {
117 (5, 7)
118 } else if s.starts_with("node-") {
119 (6, 5)
120 } else {
121 return None;
122 };
123
124 let suffix = &s[prefix_len..];
125 let (op_id, result_id) = match suffix {
126 "loads" => (0, 0),
127 "load-misses" => (0, 1),
128 "stores" => (1, 0),
129 "store-misses" => (1, 1),
130 "prefetches" => (2, 0),
131 "prefetch-misses" => (2, 1),
132 _ => return None,
133 };
134
135 Some((result_id << 16) | (op_id << 8) | cache_id)
136}
137
138fn parse_perf_event(s: &str) -> Result<PerfEventSpec, String> {
140 use perf_event_open_sys as sys;
141
142 let s = s.trim();
143 if s.is_empty() || s == "0" || s.eq_ignore_ascii_case("0x0") {
144 return Ok(PerfEventSpec {
145 event_id: 0,
146 type_: sys::bindings::PERF_TYPE_RAW,
147 config: 0,
148 display_name: s.to_string(),
149 });
150 }
151
152 if let Some(hex_str) = s.strip_prefix("0x").or_else(|| s.strip_prefix("0X")) {
153 if let Ok(config) = u64::from_str_radix(hex_str, 16) {
154 return Ok(PerfEventSpec {
155 event_id: config,
156 type_: sys::bindings::PERF_TYPE_RAW,
157 config,
158 display_name: s.to_string(),
159 });
160 }
161 }
162
163 if let Some(config) = parse_hardware_event(s) {
164 let event_id = (sys::bindings::PERF_TYPE_HARDWARE as u64) << 32 | config;
165 return Ok(PerfEventSpec {
166 event_id,
167 type_: sys::bindings::PERF_TYPE_HARDWARE,
168 config,
169 display_name: s.to_string(),
170 });
171 }
172
173 if let Some(config) = parse_software_event(s) {
174 let event_id = (sys::bindings::PERF_TYPE_SOFTWARE as u64) << 32 | config;
175 return Ok(PerfEventSpec {
176 event_id,
177 type_: sys::bindings::PERF_TYPE_SOFTWARE,
178 config,
179 display_name: s.to_string(),
180 });
181 }
182
183 if let Some(config) = parse_hw_cache_event(s) {
184 let event_id = (sys::bindings::PERF_TYPE_HW_CACHE as u64) << 32 | config;
185 return Ok(PerfEventSpec {
186 event_id,
187 type_: sys::bindings::PERF_TYPE_HW_CACHE,
188 config,
189 display_name: s.to_string(),
190 });
191 }
192
193 Err(format!(
194 "Invalid perf event '{}': use hex (0xN) or symbolic name (e.g. cache-misses, LLC-load-misses, page-faults)",
195 s
196 ))
197}
198
199const PERF_MAP_STRIDE: u32 = 4096;
201
202fn setup_perf_events(
205 skel: &mut BpfSkel,
206 cpu: i32,
207 spec: &PerfEventSpec,
208 counter_idx: u32,
209) -> Result<()> {
210 use perf_event_open_sys as sys;
211
212 if spec.event_id == 0 {
213 return Ok(());
214 }
215
216 let map = &skel.maps.scx_pmu_map;
217
218 let mut attrs = sys::bindings::perf_event_attr::default();
219 attrs.type_ = spec.type_;
220 attrs.config = spec.config;
221 attrs.size = std::mem::size_of::<sys::bindings::perf_event_attr>() as u32;
222 attrs.set_disabled(0);
223 attrs.set_inherit(0);
224
225 let fd = unsafe { sys::perf_event_open(&mut attrs, -1, cpu, -1, 0) };
226
227 if fd < 0 {
228 let err = std::io::Error::last_os_error();
229 return Err(anyhow::anyhow!(
230 "Failed to open perf event '{}' on CPU {}: {}",
231 spec.display_name,
232 cpu,
233 err
234 ));
235 }
236
237 let key = cpu as u32 + counter_idx * PERF_MAP_STRIDE;
238
239 map.update(
240 &key.to_ne_bytes(),
241 &fd.to_ne_bytes(),
242 libbpf_rs::MapFlags::ANY,
243 )
244 .with_context(|| "Failed to update perf_events map")?;
245
246 Ok(())
247}
248
249#[derive(Debug, clap::Parser)]
250#[command(
251 name = "scx_cosmos",
252 version,
253 disable_version_flag = true,
254 about = "Lightweight scheduler optimized for preserving task-to-CPU locality."
255)]
256struct Opts {
257 #[clap(long, default_value = "0")]
259 exit_dump_len: u32,
260
261 #[clap(short = 's', long, default_value = "1000")]
263 slice_us: u64,
264
265 #[clap(short = 'l', long, default_value = "20000")]
267 slice_lag_us: u64,
268
269 #[clap(short = 'c', long, default_value = "0")]
287 cpu_busy_thresh: u64,
288
289 #[clap(short = 'p', long, default_value = "0")]
300 polling_ms: u64,
301
302 #[clap(short = 'm', long)]
314 primary_domain: Option<String>,
315
316 #[clap(short = 'e', long, default_value = "0x0", value_parser = parse_perf_event)]
319 perf_config: PerfEventSpec,
320
321 #[clap(short = 'E', default_value = "0", long)]
323 perf_threshold: u64,
324
325 #[clap(short = 'y', long, default_value = "0x0", value_parser = parse_perf_event)]
328 perf_sticky: PerfEventSpec,
329
330 #[clap(short = 'Y', default_value = "0", long)]
332 perf_sticky_threshold: u64,
333
334 #[clap(short = 'g', long, action = clap::ArgAction::SetTrue)]
336 gpu: bool,
337
338 #[clap(long, default_value = "0", value_parser = clap::value_parser!(u32).range(0..=100))]
343 gpu_util_threshold: u32,
344
345 #[clap(short = 'n', long, action = clap::ArgAction::SetTrue)]
347 disable_numa: bool,
348
349 #[clap(short = 'f', long, action = clap::ArgAction::SetTrue)]
351 disable_cpufreq: bool,
352
353 #[arg(short = 'i', long, action = clap::ArgAction::SetTrue)]
358 flat_idle_scan: bool,
359
360 #[clap(short = 'P', long, action = clap::ArgAction::SetTrue)]
365 preferred_idle_scan: bool,
366
367 #[clap(long, action = clap::ArgAction::SetTrue)]
372 disable_smt: bool,
373
374 #[clap(short = 'S', long, action = clap::ArgAction::SetTrue)]
376 avoid_smt: bool,
377
378 #[clap(short = 'N', long, action = clap::ArgAction::SetTrue)]
384 no_early_clear: bool,
385
386 #[clap(short = 'w', long, action = clap::ArgAction::SetTrue)]
393 no_wake_sync: bool,
394
395 #[clap(short = 'd', long, action = clap::ArgAction::SetTrue)]
397 no_deferred_wakeup: bool,
398
399 #[clap(long, action = clap::ArgAction::SetTrue)]
405 time_preemption: bool,
406
407 #[clap(short = 'a', long, action = clap::ArgAction::SetTrue)]
414 mm_affinity: bool,
415
416 #[clap(long)]
418 stats: Option<f64>,
419
420 #[clap(long)]
423 monitor: Option<f64>,
424
425 #[clap(short = 'v', long, action = clap::ArgAction::SetTrue)]
427 verbose: bool,
428
429 #[clap(short = 'V', long, action = clap::ArgAction::SetTrue)]
431 version: bool,
432
433 #[clap(long)]
435 help_stats: bool,
436
437 #[clap(flatten, next_help_heading = "Libbpf Options")]
438 pub libbpf: LibbpfOpts,
439}
440
441pub fn parse_cpu_list(optarg: &str) -> Result<Vec<usize>, String> {
442 let mut cpus = Vec::new();
443 let mut seen = HashSet::new();
444
445 if let Some(mode) = match optarg {
447 "powersave" => Some(Powermode::Powersave),
448 "performance" => Some(Powermode::Performance),
449 "turbo" => Some(Powermode::Turbo),
450 "all" => Some(Powermode::Any),
451 _ => None,
452 } {
453 return get_primary_cpus(mode).map_err(|e| e.to_string());
454 }
455
456 if optarg
458 .chars()
459 .any(|c| !c.is_ascii_digit() && c != '-' && c != ',' && !c.is_whitespace())
460 {
461 return Err("Invalid character in CPU list".to_string());
462 }
463
464 let cleaned = optarg.replace(' ', "\t");
466
467 for token in cleaned.split(',') {
468 let token = token.trim_matches(|c: char| c.is_whitespace());
469
470 if token.is_empty() {
471 continue;
472 }
473
474 if let Some((start_str, end_str)) = token.split_once('-') {
475 let start = start_str
476 .trim()
477 .parse::<usize>()
478 .map_err(|_| "Invalid range start")?;
479 let end = end_str
480 .trim()
481 .parse::<usize>()
482 .map_err(|_| "Invalid range end")?;
483
484 if start > end {
485 return Err(format!("Invalid CPU range: {}-{}", start, end));
486 }
487
488 for i in start..=end {
489 if cpus.len() >= *NR_CPU_IDS {
490 return Err(format!("Too many CPUs specified (max {})", *NR_CPU_IDS));
491 }
492 if seen.insert(i) {
493 cpus.push(i);
494 }
495 }
496 } else {
497 let cpu = token
498 .parse::<usize>()
499 .map_err(|_| format!("Invalid CPU: {}", token))?;
500 if cpus.len() >= *NR_CPU_IDS {
501 return Err(format!("Too many CPUs specified (max {})", *NR_CPU_IDS));
502 }
503 if seen.insert(cpu) {
504 cpus.push(cpu);
505 }
506 }
507 }
508
509 Ok(cpus)
510}
511
512const DYNAMIC_THRESHOLD_INIT_VALUE: u64 = 1000;
514
515const DYNAMIC_THRESHOLD_MIN_VALUE: u64 = 10;
517
518const DYNAMIC_THRESHOLD_RATE_HIGH: f64 = 4000.0;
520
521const DYNAMIC_THRESHOLD_RATE_LOW: f64 = 2000.0;
523
524const DYNAMIC_THRESHOLD_HYSTERESIS: f64 = 0.1;
527
528const DYNAMIC_THRESHOLD_EMA_ALPHA: f64 = 0.3;
531
532const DYNAMIC_THRESHOLD_SCALE_MIN: f64 = 0.0001;
534
535const DYNAMIC_THRESHOLD_SCALE_MAX: f64 = 1000.0;
537
538const DYNAMIC_THRESHOLD_SLOPE_HIGH: f64 = 0.35;
541
542const DYNAMIC_THRESHOLD_SLOPE_LOW: f64 = 0.58;
544
545const GPU_SYNC_INTERVAL: Duration = Duration::from_secs(1);
548
549#[derive(Debug, Clone)]
555struct DynamicThresholdState {
556 threshold: u64,
558 smoothed_rate: f64,
560 prev_counter: u64,
562 initialized: bool,
564 adjustment_direction: Option<bool>,
568}
569
570impl DynamicThresholdState {
571 fn new(initial_threshold: u64) -> Self {
573 Self {
574 threshold: initial_threshold,
575 smoothed_rate: 0.0,
576 prev_counter: 0,
577 initialized: false,
578 adjustment_direction: None,
579 }
580 }
581
582 fn update(
585 &mut self,
586 counter: u64,
587 elapsed_secs: f64,
588 verbose: bool,
589 name: &str,
590 ) -> Option<u64> {
591 if elapsed_secs <= 0.0 {
592 return None;
593 }
594
595 let delta = counter.saturating_sub(self.prev_counter);
597 self.prev_counter = counter;
598 let raw_rate = delta as f64 / elapsed_secs;
599
600 if self.initialized {
602 self.smoothed_rate = DYNAMIC_THRESHOLD_EMA_ALPHA * raw_rate
603 + (1.0 - DYNAMIC_THRESHOLD_EMA_ALPHA) * self.smoothed_rate;
604 } else {
605 self.smoothed_rate = raw_rate;
607 self.initialized = true;
608 }
609
610 let rate = self.smoothed_rate;
612 let old_threshold = self.threshold;
613
614 let (effective_high, effective_low) = match self.adjustment_direction {
616 Some(true) => {
617 (
619 DYNAMIC_THRESHOLD_RATE_HIGH,
620 DYNAMIC_THRESHOLD_RATE_LOW * (1.0 - DYNAMIC_THRESHOLD_HYSTERESIS),
621 )
622 }
623 Some(false) => {
624 (
626 DYNAMIC_THRESHOLD_RATE_HIGH * (1.0 + DYNAMIC_THRESHOLD_HYSTERESIS),
627 DYNAMIC_THRESHOLD_RATE_LOW,
628 )
629 }
630 None => {
631 (
633 DYNAMIC_THRESHOLD_RATE_HIGH * (1.0 + DYNAMIC_THRESHOLD_HYSTERESIS),
634 DYNAMIC_THRESHOLD_RATE_LOW * (1.0 - DYNAMIC_THRESHOLD_HYSTERESIS),
635 )
636 }
637 };
638
639 let new_direction = if rate > effective_high {
641 Some(true) } else if rate < effective_low && rate >= 0.0 {
643 Some(false) } else {
645 if self.adjustment_direction.is_some() {
647 if rate >= DYNAMIC_THRESHOLD_RATE_LOW && rate <= DYNAMIC_THRESHOLD_RATE_HIGH {
649 None } else {
651 self.adjustment_direction }
653 } else {
654 None }
656 };
657
658 if let Some(raising) = new_direction {
660 let scale = Self::compute_scale(rate, raising);
661 let factor = if raising { 1.0 + scale } else { 1.0 - scale };
662 let new_threshold = ((self.threshold as f64) * factor).round() as u64;
663 self.threshold = new_threshold.clamp(DYNAMIC_THRESHOLD_MIN_VALUE, u64::MAX);
664 }
665
666 self.adjustment_direction = new_direction;
667
668 if self.threshold != old_threshold {
670 if verbose {
671 info!(
672 "{}: {} -> {} (smoothed rate {:.1}/s, raw {:.1}/s, dir {:?})",
673 name,
674 old_threshold,
675 self.threshold,
676 self.smoothed_rate,
677 raw_rate,
678 self.adjustment_direction
679 );
680 }
681 Some(self.threshold)
682 } else {
683 None
684 }
685 }
686
687 fn compute_scale(rate: f64, too_high: bool) -> f64 {
690 if too_high {
691 let excess = ((rate / DYNAMIC_THRESHOLD_RATE_HIGH) - 1.0).max(0.0);
692 let scale =
693 DYNAMIC_THRESHOLD_SCALE_MIN + DYNAMIC_THRESHOLD_SLOPE_HIGH * excess.min(4.0);
694 scale.min(DYNAMIC_THRESHOLD_SCALE_MAX)
695 } else {
696 if rate <= 0.0 {
697 return DYNAMIC_THRESHOLD_SCALE_MAX;
698 }
699 let deficit = (DYNAMIC_THRESHOLD_RATE_LOW - rate) / DYNAMIC_THRESHOLD_RATE_LOW;
700 let t = deficit.clamp(0.0, 1.0);
701 DYNAMIC_THRESHOLD_SCALE_MIN + DYNAMIC_THRESHOLD_SLOPE_LOW * t
702 }
703 }
704}
705
706#[derive(Debug, Clone, Copy)]
707struct CpuTimes {
708 user: u64,
709 nice: u64,
710 total: u64,
711}
712
713struct Scheduler<'a> {
714 skel: BpfSkel<'a>,
715 opts: &'a Opts,
716 struct_ops: Option<libbpf_rs::Link>,
717 stats_server: StatsServer<(), Metrics>,
718 gpu_index_to_node: Option<HashMap<u32, u32>>,
720 previous_gpu_pids: Option<HashMap<u32, u32>>,
722 nvml: Option<Nvml>,
724 perf_threshold_state: Option<DynamicThresholdState>,
726 perf_sticky_threshold_state: Option<DynamicThresholdState>,
728}
729
730impl<'a> Scheduler<'a> {
731 fn init(opts: &'a Opts, open_object: &'a mut MaybeUninit<OpenObject>) -> Result<Self> {
732 try_set_rlimit_infinity();
733
734 let topo = Topology::new().unwrap();
736
737 let smt_enabled = !opts.disable_smt && topo.smt_enabled;
739
740 let nr_nodes = topo
742 .nodes
743 .values()
744 .filter(|node| !node.all_cpus.is_empty())
745 .count();
746 info!("NUMA nodes: {}", nr_nodes);
747
748 let numa_enabled = !opts.disable_numa && nr_nodes > 1;
750 if !numa_enabled {
751 info!("Disabling NUMA optimizations");
752 }
753
754 info!(
755 "{} {} {}",
756 SCHEDULER_NAME,
757 build_id::full_version(env!("CARGO_PKG_VERSION")),
758 if smt_enabled { "SMT on" } else { "SMT off" }
759 );
760
761 info!(
763 "scheduler options: {}",
764 std::env::args().collect::<Vec<_>>().join(" ")
765 );
766
767 let mut skel_builder = BpfSkelBuilder::default();
769 skel_builder.obj_builder.debug(opts.verbose);
770 let open_opts = opts.libbpf.clone().into_bpf_open_opts();
771 let mut skel = scx_ops_open!(skel_builder, open_object, cosmos_ops, open_opts)?;
772
773 skel.struct_ops.cosmos_ops_mut().exit_dump_len = opts.exit_dump_len;
774
775 let rodata = skel.maps.rodata_data.as_mut().unwrap();
777 rodata.slice_ns = opts.slice_us * 1000;
778 rodata.slice_lag = opts.slice_lag_us * 1000;
779 rodata.cpufreq_enabled = !opts.disable_cpufreq;
780 rodata.flat_idle_scan = opts.flat_idle_scan;
781 rodata.smt_enabled = smt_enabled;
782 rodata.numa_enabled = numa_enabled;
783 rodata.nr_node_ids = topo.nodes.len() as u32;
784 rodata.no_wake_sync = opts.no_wake_sync;
785 rodata.no_early_clear = opts.no_early_clear;
786 rodata.time_preemption = opts.time_preemption;
787 rodata.mm_affinity = opts.mm_affinity;
788
789 rodata.perf_config = opts.perf_config.event_id;
791 rodata.perf_sticky = opts.perf_sticky.event_id;
792
793 rodata.busy_threshold = opts.cpu_busy_thresh * 1024 / 100;
795
796 let mut cpus: Vec<_> = topo.all_cpus.values().collect();
798 cpus.sort_by_key(|cpu| std::cmp::Reverse(cpu.cpu_capacity));
799 let max_cap = cpus.first().map(|c| c.cpu_capacity).unwrap_or(1).max(1);
801 for (i, cpu) in cpus.iter().enumerate() {
802 let normalized = (cpu.cpu_capacity * 1024 / max_cap).clamp(1, 1024);
803 rodata.cpu_capacity[cpu.id] = normalized as c_ulong;
804 rodata.preferred_cpus[i] = cpu.id as u64;
805 }
806 rodata.all_cpus_same_capacity = cpus.iter().all(|cpu| cpu.cpu_capacity == max_cap);
807 if opts.preferred_idle_scan {
808 info!(
809 "Preferred CPUs: {:?}",
810 &rodata.preferred_cpus[0..cpus.len()]
811 );
812 }
813 rodata.preferred_idle_scan = opts.preferred_idle_scan;
814
815 let primary_cpus = if let Some(ref domain) = opts.primary_domain {
817 match parse_cpu_list(domain) {
818 Ok(cpus) => cpus,
819 Err(e) => bail!("Error parsing primary domain: {}", e),
820 }
821 } else {
822 (0..*NR_CPU_IDS).collect()
823 };
824 if primary_cpus.len() < *NR_CPU_IDS {
825 info!("Primary CPUs: {:?}", primary_cpus);
826 rodata.primary_all = false;
827 } else {
828 rodata.primary_all = true;
829 }
830
831 let (gpu_index_to_node, previous_gpu_pids, nvml) = if opts.gpu && numa_enabled {
834 match Nvml::init_with_flags(InitFlags::NO_GPUS) {
835 Ok(nvml) => {
836 info!("NVIDIA GPU-aware scheduling enabled (NVML PID sync)");
837 rodata.gpu_enabled = true;
838 let mut idx_to_node = HashMap::new();
839 for (id, gpu) in topo.gpus() {
840 let GpuIndex::Nvidia { nvml_id } = id;
841 idx_to_node.insert(nvml_id, gpu.node_id as u32);
842 }
843 (Some(idx_to_node), Some(HashMap::new()), Some(nvml))
844 }
845 Err(e) => {
846 warn!("NVML init failed, disabling GPU-aware scheduling: {}", e);
847 rodata.gpu_enabled = false;
848 (None, None, None)
849 }
850 }
851 } else {
852 rodata.gpu_enabled = false;
853 (None, None, None)
854 };
855
856 skel.struct_ops.cosmos_ops_mut().flags = *compat::SCX_OPS_ENQ_EXITING
858 | *compat::SCX_OPS_ENQ_LAST
859 | *compat::SCX_OPS_ENQ_MIGRATION_DISABLED
860 | *compat::SCX_OPS_ALLOW_QUEUED_WAKEUP
861 | if numa_enabled {
862 *compat::SCX_OPS_BUILTIN_IDLE_PER_NODE
863 } else {
864 0
865 };
866
867 info!(
868 "scheduler flags: {:#x}",
869 skel.struct_ops.cosmos_ops_mut().flags
870 );
871
872 let mut skel = scx_ops_load!(skel, cosmos_ops, uei)?;
874
875 let bss = skel.maps.bss_data.as_mut().unwrap();
878 if opts.perf_config.event_id > 0 {
879 bss.perf_threshold = if opts.perf_threshold == 0 {
880 DYNAMIC_THRESHOLD_INIT_VALUE
881 } else {
882 opts.perf_threshold
883 };
884 }
885 if opts.perf_sticky.event_id > 0 {
886 bss.perf_sticky_threshold = if opts.perf_sticky_threshold == 0 {
887 DYNAMIC_THRESHOLD_INIT_VALUE
888 } else {
889 opts.perf_sticky_threshold
890 };
891 }
892
893 for node in topo.nodes.values() {
895 for cpu in node.all_cpus.values() {
896 if opts.verbose {
897 info!("CPU{} -> node{}", cpu.id, node.id);
898 }
899 skel.maps.cpu_node_map.update(
900 &(cpu.id as u32).to_ne_bytes(),
901 &(node.id as u32).to_ne_bytes(),
902 MapFlags::ANY,
903 )?;
904 }
905 }
906
907 let nr_cpus = *NR_CPU_IDS;
911 info!("Setting up performance counters for {} CPUs...", nr_cpus);
912 let mut perf_available = true;
913 let sticky_counter_idx = if opts.perf_config.event_id > 0 { 1 } else { 0 };
914 for cpu in 0..nr_cpus {
915 if opts.perf_config.event_id > 0 {
916 if let Err(e) = setup_perf_events(&mut skel, cpu as i32, &opts.perf_config, 0) {
917 if cpu == 0 {
918 let err_str = e.to_string();
919 if err_str.contains("errno 2") || err_str.contains("os error 2") {
920 warn!("Performance counters not available on this CPU architecture");
921 warn!("PMU event '{}' not supported - scheduler will run without perf monitoring", opts.perf_config.display_name);
922 } else {
923 warn!("Failed to setup perf events: {}", e);
924 }
925 perf_available = false;
926 break;
927 }
928 }
929 }
930 if opts.perf_sticky.event_id > 0 {
931 if let Err(e) =
932 setup_perf_events(&mut skel, cpu as i32, &opts.perf_sticky, sticky_counter_idx)
933 {
934 if cpu == 0 {
935 let err_str = e.to_string();
936 if err_str.contains("errno 2") || err_str.contains("os error 2") {
937 warn!("Performance counters not available on this CPU architecture");
938 warn!("PMU event '{}' not supported - scheduler will run without perf monitoring", opts.perf_sticky.display_name);
939 } else {
940 warn!("Failed to setup perf events: {}", e);
941 }
942 perf_available = false;
943 break;
944 }
945 }
946 }
947 }
948 if perf_available {
949 info!("Performance counters configured successfully for all CPUs");
950 }
951
952 if opts.gpu && numa_enabled {
954 for (id, gpu) in topo.gpus() {
955 let GpuIndex::Nvidia { nvml_id } = id;
956 if opts.verbose {
957 info!("GPU{} -> node{}", nvml_id, gpu.node_id);
958 }
959 skel.maps.gpu_node_map.update(
960 &(nvml_id as u32).to_ne_bytes(),
961 &(gpu.node_id as u32).to_ne_bytes(),
962 MapFlags::ANY,
963 )?;
964 }
965 }
966
967 if primary_cpus.len() < *NR_CPU_IDS {
969 for cpu in primary_cpus {
970 if let Err(err) = Self::enable_primary_cpu(&mut skel, cpu as i32) {
971 bail!("failed to add CPU {} to primary domain: error {}", cpu, err);
972 }
973 }
974 }
975
976 if smt_enabled {
978 Self::init_smt_domains(&mut skel, &topo)?;
979 }
980
981 let struct_ops = Some(scx_ops_attach!(skel, cosmos_ops)?);
983 let stats_server = StatsServer::new(stats::server_data()).launch()?;
984
985 let perf_threshold_state = if opts.perf_config.event_id > 0 && opts.perf_threshold == 0 {
987 Some(DynamicThresholdState::new(DYNAMIC_THRESHOLD_INIT_VALUE))
988 } else {
989 None
990 };
991 let perf_sticky_threshold_state =
992 if opts.perf_sticky.event_id > 0 && opts.perf_sticky_threshold == 0 {
993 Some(DynamicThresholdState::new(DYNAMIC_THRESHOLD_INIT_VALUE))
994 } else {
995 None
996 };
997
998 Ok(Self {
999 skel,
1000 opts,
1001 struct_ops,
1002 stats_server,
1003 gpu_index_to_node,
1004 previous_gpu_pids,
1005 nvml,
1006 perf_threshold_state,
1007 perf_sticky_threshold_state,
1008 })
1009 }
1010
1011 fn sync_gpu_pids(&mut self) -> Result<()> {
1015 let gpu_index_to_node = match &self.gpu_index_to_node {
1016 Some(m) => m,
1017 None => return Ok(()),
1018 };
1019 let nvml = match &self.nvml {
1020 Some(n) => n,
1021 None => return Ok(()),
1022 };
1023 let threshold = self.opts.gpu_util_threshold;
1024 let previous = self.previous_gpu_pids.as_ref().unwrap();
1025 let mut pid_to_nodes: HashMap<u32, HashSet<u32>> = HashMap::new();
1027
1028 let count = nvml.device_count().context("NVML device count")?;
1029 for i in 0..count {
1030 let node = match gpu_index_to_node.get(&i) {
1031 Some(&n) => n,
1032 None => continue,
1033 };
1034 let device = nvml.device_by_index(i).context("NVML device_by_index")?;
1035
1036 if threshold > 0 {
1037 match device.process_utilization_stats(None::<u64>) {
1039 Ok(samples) => {
1040 for sample in samples {
1041 let util = sample.sm_util.max(sample.mem_util);
1042 if util >= threshold {
1043 pid_to_nodes.entry(sample.pid).or_default().insert(node);
1044 }
1045 }
1046 }
1047 Err(_) => {
1048 Self::add_running_gpu_processes_to_set(&device, node, &mut pid_to_nodes);
1050 }
1051 }
1052 } else {
1053 Self::add_running_gpu_processes_to_set(&device, node, &mut pid_to_nodes);
1054 }
1055 }
1056
1057 let mut current: HashMap<u32, u32> = HashMap::new();
1059 for (tgid, nodes) in pid_to_nodes {
1060 if nodes.len() == 1 {
1061 let node = nodes.into_iter().next().unwrap();
1062 current.insert(tgid, node);
1063 for tid in Self::task_tids(tgid) {
1064 current.insert(tid, node);
1065 }
1066 }
1067 }
1068
1069 let map = &self.skel.maps.gpu_pid_map;
1070 for (pid, node) in ¤t {
1071 map.update(&pid.to_ne_bytes(), &node.to_ne_bytes(), MapFlags::ANY)
1072 .context("gpu_pid_map update")?;
1073 }
1074 for pid in previous.keys() {
1075 if !current.contains_key(pid) {
1076 let _ = map.delete(&pid.to_ne_bytes());
1077 }
1078 }
1079 *self.previous_gpu_pids.as_mut().unwrap() = current;
1080 Ok(())
1081 }
1082
1083 fn add_running_gpu_processes_to_set(
1085 device: &nvml_wrapper::Device<'_>,
1086 node: u32,
1087 pid_to_nodes: &mut HashMap<u32, HashSet<u32>>,
1088 ) {
1089 for proc in device
1090 .running_compute_processes()
1091 .unwrap_or_default()
1092 .into_iter()
1093 .chain(device.running_graphics_processes().unwrap_or_default())
1094 {
1095 pid_to_nodes.entry(proc.pid).or_default().insert(node);
1096 }
1097 }
1098
1099 fn task_tids(pid: u32) -> Vec<u32> {
1101 let task_dir = format!("/proc/{}/task", pid);
1102 let Ok(entries) = fs::read_dir(Path::new(&task_dir)) else {
1103 return Vec::new();
1104 };
1105 entries
1106 .filter_map(|e| e.ok())
1107 .filter_map(|e| e.file_name().to_str().and_then(|s| s.parse::<u32>().ok()))
1108 .collect()
1109 }
1110
1111 fn enable_primary_cpu(skel: &mut BpfSkel<'_>, cpu: i32) -> Result<(), u32> {
1112 let prog = &mut skel.progs.enable_primary_cpu;
1113 let mut args = cpu_arg {
1114 cpu_id: cpu as c_int,
1115 };
1116 let input = ProgramInput {
1117 context_in: Some(unsafe {
1118 std::slice::from_raw_parts_mut(
1119 &mut args as *mut _ as *mut u8,
1120 std::mem::size_of_val(&args),
1121 )
1122 }),
1123 ..Default::default()
1124 };
1125 let out = prog.test_run(input).unwrap();
1126 if out.return_value != 0 {
1127 return Err(out.return_value);
1128 }
1129
1130 Ok(())
1131 }
1132
1133 fn enable_sibling_cpu(
1134 skel: &mut BpfSkel<'_>,
1135 cpu: usize,
1136 sibling_cpu: usize,
1137 ) -> Result<(), u32> {
1138 let prog = &mut skel.progs.enable_sibling_cpu;
1139 let mut args = domain_arg {
1140 cpu_id: cpu as c_int,
1141 sibling_cpu_id: sibling_cpu as c_int,
1142 };
1143 let input = ProgramInput {
1144 context_in: Some(unsafe {
1145 std::slice::from_raw_parts_mut(
1146 &mut args as *mut _ as *mut u8,
1147 std::mem::size_of_val(&args),
1148 )
1149 }),
1150 ..Default::default()
1151 };
1152 let out = prog.test_run(input).unwrap();
1153 if out.return_value != 0 {
1154 return Err(out.return_value);
1155 }
1156
1157 Ok(())
1158 }
1159
1160 fn init_smt_domains(skel: &mut BpfSkel<'_>, topo: &Topology) -> Result<(), std::io::Error> {
1161 let smt_siblings = topo.sibling_cpus();
1162
1163 info!("SMT sibling CPUs: {:?}", smt_siblings);
1164 for (cpu, sibling_cpu) in smt_siblings.iter().enumerate() {
1165 Self::enable_sibling_cpu(skel, cpu, *sibling_cpu as usize).unwrap();
1166 }
1167
1168 Ok(())
1169 }
1170
1171 fn get_metrics(&self) -> Metrics {
1172 let bss_data = self.skel.maps.bss_data.as_ref().unwrap();
1173 Metrics {
1174 nr_event_dispatches: bss_data.nr_event_dispatches,
1175 nr_ev_sticky_dispatches: bss_data.nr_ev_sticky_dispatches,
1176 nr_gpu_dispatches: bss_data.nr_gpu_dispatches,
1177 }
1178 }
1179
1180 pub fn exited(&mut self) -> bool {
1181 uei_exited!(&self.skel, uei)
1182 }
1183
1184 fn compute_user_cpu_pct(prev: &CpuTimes, curr: &CpuTimes) -> Option<u64> {
1185 let user_diff = (curr.user + curr.nice).saturating_sub(prev.user + prev.nice);
1187 let total_diff = curr.total.saturating_sub(prev.total);
1188
1189 if total_diff > 0 {
1190 let user_ratio = user_diff as f64 / total_diff as f64;
1191 Some((user_ratio * 1024.0).round() as u64)
1192 } else {
1193 None
1194 }
1195 }
1196
1197 fn read_per_cpu_cpu_times(nr_cpus: usize) -> Option<Vec<CpuTimes>> {
1200 let file = File::open("/proc/stat").ok()?;
1201 let reader = BufReader::new(file);
1202 let mut result = Vec::with_capacity(nr_cpus);
1203
1204 for line in reader.lines() {
1205 let line = line.ok()?;
1206 let line = line.trim();
1207 if !line.starts_with("cpu") {
1208 continue;
1209 }
1210 let rest = line.strip_prefix("cpu")?;
1211 if rest.starts_with(' ') {
1212 continue;
1214 }
1215 let cpu_id: usize = rest.split_whitespace().next()?.parse().ok()?;
1216 if cpu_id != result.len() {
1217 break;
1218 }
1219 let fields: Vec<&str> = line.split_whitespace().collect();
1220 if fields.len() < 5 {
1221 return None;
1222 }
1223 let user: u64 = fields[1].parse().ok()?;
1224 let nice: u64 = fields[2].parse().ok()?;
1225 let total: u64 = fields
1226 .iter()
1227 .skip(1)
1228 .take(8)
1229 .filter_map(|v| v.parse::<u64>().ok())
1230 .sum();
1231 result.push(CpuTimes { user, nice, total });
1232 if result.len() >= nr_cpus {
1233 break;
1234 }
1235 }
1236
1237 if result.len() == nr_cpus {
1238 Some(result)
1239 } else {
1240 None
1241 }
1242 }
1243
1244 fn run(&mut self, shutdown: Arc<AtomicBool>) -> Result<UserExitInfo> {
1245 let (res_ch, req_ch) = self.stats_server.channels();
1246
1247 let polling_time = Duration::from_millis(self.opts.polling_ms).min(Duration::from_secs(1));
1252 let nr_cpus = *NR_CPU_IDS as usize;
1253 let mut prev_cputime =
1254 Self::read_per_cpu_cpu_times(nr_cpus).expect("Failed to read initial per-CPU stats");
1255 let mut last_update = Instant::now();
1256 let mut last_gpu_sync = Instant::now();
1257
1258 while !shutdown.load(Ordering::Relaxed) && !self.exited() {
1259 if !polling_time.is_zero() && last_update.elapsed() >= polling_time {
1261 if let Some(curr_cputime) = Self::read_per_cpu_cpu_times(nr_cpus) {
1262 let map = &self.skel.maps.cpu_util_map;
1263 for cpu in 0..nr_cpus {
1264 if let Some(util) =
1265 Self::compute_user_cpu_pct(&prev_cputime[cpu], &curr_cputime[cpu])
1266 {
1267 let _ = map.update(
1268 &(cpu as u32).to_ne_bytes(),
1269 &util.to_ne_bytes(),
1270 MapFlags::ANY,
1271 );
1272 }
1273 }
1274 prev_cputime = curr_cputime;
1275 }
1276
1277 let elapsed_secs = last_update.elapsed().as_secs_f64();
1279
1280 if let Some(ref mut state) = self.perf_threshold_state {
1282 let nr_event = self
1283 .skel
1284 .maps
1285 .bss_data
1286 .as_ref()
1287 .unwrap()
1288 .nr_event_dispatches;
1289 if let Some(new_thresh) =
1290 state.update(nr_event, elapsed_secs, self.opts.verbose, "perf_threshold")
1291 {
1292 self.skel.maps.bss_data.as_mut().unwrap().perf_threshold = new_thresh;
1293 }
1294 }
1295
1296 if let Some(ref mut state) = self.perf_sticky_threshold_state {
1298 let nr_sticky = self
1299 .skel
1300 .maps
1301 .bss_data
1302 .as_ref()
1303 .unwrap()
1304 .nr_ev_sticky_dispatches;
1305 if let Some(new_thresh) = state.update(
1306 nr_sticky,
1307 elapsed_secs,
1308 self.opts.verbose,
1309 "perf_sticky_threshold",
1310 ) {
1311 self.skel
1312 .maps
1313 .bss_data
1314 .as_mut()
1315 .unwrap()
1316 .perf_sticky_threshold = new_thresh;
1317 }
1318 }
1319
1320 if self.gpu_index_to_node.is_some() && last_gpu_sync.elapsed() >= GPU_SYNC_INTERVAL
1322 {
1323 if let Err(e) = self.sync_gpu_pids() {
1324 debug!("GPU PID sync: {}", e);
1325 }
1326 last_gpu_sync = Instant::now();
1327 }
1328
1329 last_update = Instant::now();
1330 }
1331
1332 let timeout = if polling_time.is_zero() {
1334 Duration::from_secs(1)
1335 } else {
1336 polling_time
1337 };
1338 match req_ch.recv_timeout(timeout) {
1339 Ok(()) => res_ch.send(self.get_metrics())?,
1340 Err(RecvTimeoutError::Timeout) => {}
1341 Err(e) => Err(e)?,
1342 }
1343 }
1344
1345 let _ = self.struct_ops.take();
1346 uei_report!(&self.skel, uei)
1347 }
1348}
1349
1350impl Drop for Scheduler<'_> {
1351 fn drop(&mut self) {
1352 info!("Unregister {SCHEDULER_NAME} scheduler");
1353 }
1354}
1355
1356fn main() -> Result<()> {
1357 let opts = Opts::parse();
1358
1359 if opts.version {
1360 println!(
1361 "{} {}",
1362 SCHEDULER_NAME,
1363 build_id::full_version(env!("CARGO_PKG_VERSION"))
1364 );
1365 return Ok(());
1366 }
1367
1368 if opts.help_stats {
1369 stats::server_data().describe_meta(&mut std::io::stdout(), None)?;
1370 return Ok(());
1371 }
1372
1373 let loglevel = simplelog::LevelFilter::Info;
1374
1375 let mut lcfg = simplelog::ConfigBuilder::new();
1376 lcfg.set_time_offset_to_local()
1377 .expect("Failed to set local time offset")
1378 .set_time_level(simplelog::LevelFilter::Error)
1379 .set_location_level(simplelog::LevelFilter::Off)
1380 .set_target_level(simplelog::LevelFilter::Off)
1381 .set_thread_level(simplelog::LevelFilter::Off);
1382 simplelog::TermLogger::init(
1383 loglevel,
1384 lcfg.build(),
1385 simplelog::TerminalMode::Stderr,
1386 simplelog::ColorChoice::Auto,
1387 )?;
1388
1389 let shutdown = Arc::new(AtomicBool::new(false));
1390 let shutdown_clone = shutdown.clone();
1391 ctrlc::set_handler(move || {
1392 shutdown_clone.store(true, Ordering::Relaxed);
1393 })
1394 .context("Error setting Ctrl-C handler")?;
1395
1396 if let Some(intv) = opts.monitor.or(opts.stats) {
1397 let shutdown_copy = shutdown.clone();
1398 let jh = std::thread::spawn(move || {
1399 match stats::monitor(Duration::from_secs_f64(intv), shutdown_copy) {
1400 Ok(_) => {
1401 debug!("stats monitor thread finished successfully")
1402 }
1403 Err(error_object) => {
1404 warn!(
1405 "stats monitor thread finished because of an error {}",
1406 error_object
1407 )
1408 }
1409 }
1410 });
1411 if opts.monitor.is_some() {
1412 let _ = jh.join();
1413 return Ok(());
1414 }
1415 }
1416
1417 let mut open_object = MaybeUninit::uninit();
1418 loop {
1419 let mut sched = Scheduler::init(&opts, &mut open_object)?;
1420 if !sched.run(shutdown.clone())?.should_restart() {
1421 break;
1422 }
1423 }
1424
1425 Ok(())
1426}