1mod bpf_skel;
9pub use bpf_skel::*;
10pub mod bpf_intf;
11pub use bpf_intf::*;
12
13mod stats;
14use std::collections::{HashMap, HashSet};
15use std::ffi::{c_int, c_ulong};
16use std::fs;
17use std::fs::File;
18use std::io::{BufRead, BufReader};
19use std::mem::MaybeUninit;
20use std::path::Path;
21use std::sync::atomic::AtomicBool;
22use std::sync::atomic::Ordering;
23use std::sync::Arc;
24use std::time::{Duration, Instant};
25
26use anyhow::bail;
27use anyhow::Context;
28use anyhow::Result;
29use clap::Parser;
30use crossbeam::channel::RecvTimeoutError;
31use libbpf_rs::MapCore;
32use libbpf_rs::MapFlags;
33use libbpf_rs::OpenObject;
34use libbpf_rs::ProgramInput;
35use log::{debug, info, warn};
36use nvml_wrapper::bitmasks::InitFlags;
37use nvml_wrapper::Nvml;
38use scx_stats::prelude::*;
39use scx_utils::build_id;
40use scx_utils::compat;
41use scx_utils::libbpf_clap_opts::LibbpfOpts;
42use scx_utils::scx_ops_attach;
43use scx_utils::scx_ops_load;
44use scx_utils::scx_ops_open;
45use scx_utils::try_set_rlimit_infinity;
46use scx_utils::uei_exited;
47use scx_utils::uei_report;
48use scx_utils::CoreType;
49use scx_utils::GpuIndex;
50use scx_utils::Topology;
51use scx_utils::UserExitInfo;
52use scx_utils::NR_CPU_IDS;
53use stats::Metrics;
54
55const SCHEDULER_NAME: &str = "scx_cosmos";
56
57#[derive(Clone, Debug)]
60struct PerfEventSpec {
61 event_id: u64,
63 type_: u32,
65 config: u64,
67 display_name: String,
69}
70
71fn parse_hardware_event(s: &str) -> Option<u64> {
72 match s {
73 "cpu-cycles" | "cycles" => Some(0),
74 "instructions" => Some(1),
75 "cache-references" => Some(2),
76 "cache-misses" => Some(3),
77 "branch-instructions" | "branches" => Some(4),
78 "branch-misses" => Some(5),
79 "bus-cycles" => Some(6),
80 "stalled-cycles-frontend" | "idle-cycles-frontend" => Some(7),
81 "stalled-cycles-backend" | "idle-cycles-backend" => Some(8),
82 "ref-cycles" => Some(9),
83 _ => None,
84 }
85}
86
87fn parse_software_event(s: &str) -> Option<u64> {
88 match s {
89 "cpu-clock" => Some(0),
90 "task-clock" => Some(1),
91 "page-faults" | "faults" => Some(2),
92 "context-switches" | "cs" => Some(3),
93 "cpu-migrations" | "migrations" => Some(4),
94 "minor-faults" => Some(5),
95 "major-faults" => Some(6),
96 "alignment-faults" => Some(7),
97 "emulation-faults" => Some(8),
98 "dummy" => Some(9),
99 "bpf-output" => Some(10),
100 _ => None,
101 }
102}
103
104fn parse_hw_cache_event(s: &str) -> Option<u64> {
105 let (cache_id, prefix_len) = if s.starts_with("L1-dcache-") {
106 (0, 10)
107 } else if s.starts_with("L1-icache-") {
108 (1, 10)
109 } else if s.starts_with("LLC-") {
110 (2, 4)
111 } else if s.starts_with("dTLB-") {
112 (3, 5)
113 } else if s.starts_with("iTLB-") {
114 (4, 5)
115 } else if s.starts_with("branch-") {
116 (5, 7)
117 } else if s.starts_with("node-") {
118 (6, 5)
119 } else {
120 return None;
121 };
122
123 let suffix = &s[prefix_len..];
124 let (op_id, result_id) = match suffix {
125 "loads" => (0, 0),
126 "load-misses" => (0, 1),
127 "stores" => (1, 0),
128 "store-misses" => (1, 1),
129 "prefetches" => (2, 0),
130 "prefetch-misses" => (2, 1),
131 _ => return None,
132 };
133
134 Some((result_id << 16) | (op_id << 8) | cache_id)
135}
136
137fn parse_perf_event(s: &str) -> Result<PerfEventSpec, String> {
139 use perf_event_open_sys as sys;
140
141 let s = s.trim();
142 if s.is_empty() || s == "0" || s.eq_ignore_ascii_case("0x0") {
143 return Ok(PerfEventSpec {
144 event_id: 0,
145 type_: sys::bindings::PERF_TYPE_RAW,
146 config: 0,
147 display_name: s.to_string(),
148 });
149 }
150
151 if let Some(hex_str) = s.strip_prefix("0x").or_else(|| s.strip_prefix("0X")) {
152 if let Ok(config) = u64::from_str_radix(hex_str, 16) {
153 return Ok(PerfEventSpec {
154 event_id: config,
155 type_: sys::bindings::PERF_TYPE_RAW,
156 config,
157 display_name: s.to_string(),
158 });
159 }
160 }
161
162 if let Some(config) = parse_hardware_event(s) {
163 let event_id = (sys::bindings::PERF_TYPE_HARDWARE as u64) << 32 | config;
164 return Ok(PerfEventSpec {
165 event_id,
166 type_: sys::bindings::PERF_TYPE_HARDWARE,
167 config,
168 display_name: s.to_string(),
169 });
170 }
171
172 if let Some(config) = parse_software_event(s) {
173 let event_id = (sys::bindings::PERF_TYPE_SOFTWARE as u64) << 32 | config;
174 return Ok(PerfEventSpec {
175 event_id,
176 type_: sys::bindings::PERF_TYPE_SOFTWARE,
177 config,
178 display_name: s.to_string(),
179 });
180 }
181
182 if let Some(config) = parse_hw_cache_event(s) {
183 let event_id = (sys::bindings::PERF_TYPE_HW_CACHE as u64) << 32 | config;
184 return Ok(PerfEventSpec {
185 event_id,
186 type_: sys::bindings::PERF_TYPE_HW_CACHE,
187 config,
188 display_name: s.to_string(),
189 });
190 }
191
192 Err(format!(
193 "Invalid perf event '{}': use hex (0xN) or symbolic name (e.g. cache-misses, LLC-load-misses, page-faults)",
194 s
195 ))
196}
197
198const PERF_MAP_STRIDE: u32 = 4096;
200
201fn setup_perf_events(
204 skel: &mut BpfSkel,
205 cpu: i32,
206 spec: &PerfEventSpec,
207 counter_idx: u32,
208) -> Result<()> {
209 use perf_event_open_sys as sys;
210
211 if spec.event_id == 0 {
212 return Ok(());
213 }
214
215 let map = &skel.maps.scx_pmu_map;
216
217 let mut attrs = sys::bindings::perf_event_attr::default();
218 attrs.type_ = spec.type_;
219 attrs.config = spec.config;
220 attrs.size = std::mem::size_of::<sys::bindings::perf_event_attr>() as u32;
221 attrs.set_disabled(0);
222 attrs.set_inherit(0);
223
224 let fd = unsafe { sys::perf_event_open(&mut attrs, -1, cpu, -1, 0) };
225
226 if fd < 0 {
227 let err = std::io::Error::last_os_error();
228 return Err(anyhow::anyhow!(
229 "Failed to open perf event '{}' on CPU {}: {}",
230 spec.display_name,
231 cpu,
232 err
233 ));
234 }
235
236 let key = cpu as u32 + counter_idx * PERF_MAP_STRIDE;
237
238 map.update(
239 &key.to_ne_bytes(),
240 &fd.to_ne_bytes(),
241 libbpf_rs::MapFlags::ANY,
242 )
243 .with_context(|| "Failed to update perf_events map")?;
244
245 Ok(())
246}
247
248#[derive(Debug, clap::Parser)]
249#[command(
250 name = "scx_cosmos",
251 version,
252 disable_version_flag = true,
253 about = "Lightweight scheduler optimized for preserving task-to-CPU locality."
254)]
255struct Opts {
256 #[clap(long, default_value = "0")]
258 exit_dump_len: u32,
259
260 #[clap(short = 's', long, default_value = "1000")]
262 slice_us: u64,
263
264 #[clap(short = 'l', long, default_value = "20000")]
266 slice_lag_us: u64,
267
268 #[clap(short = 'c', long, default_value = "0")]
286 cpu_busy_thresh: u64,
287
288 #[clap(short = 'p', long, default_value = "0")]
299 polling_ms: u64,
300
301 #[clap(short = 'm', long)]
313 primary_domain: Option<String>,
314
315 #[clap(short = 'e', long, default_value = "0x0", value_parser = parse_perf_event)]
318 perf_config: PerfEventSpec,
319
320 #[clap(short = 'E', default_value = "0", long)]
322 perf_threshold: u64,
323
324 #[clap(short = 'y', long, default_value = "0x0", value_parser = parse_perf_event)]
327 perf_sticky: PerfEventSpec,
328
329 #[clap(short = 'Y', default_value = "0", long)]
331 perf_sticky_threshold: u64,
332
333 #[clap(short = 'g', long, action = clap::ArgAction::SetTrue)]
335 gpu: bool,
336
337 #[clap(long, default_value = "0", value_parser = clap::value_parser!(u32).range(0..=100))]
342 gpu_util_threshold: u32,
343
344 #[clap(short = 'n', long, action = clap::ArgAction::SetTrue)]
346 disable_numa: bool,
347
348 #[clap(short = 'f', long, action = clap::ArgAction::SetTrue)]
350 disable_cpufreq: bool,
351
352 #[arg(short = 'i', long, action = clap::ArgAction::SetTrue)]
357 flat_idle_scan: bool,
358
359 #[clap(short = 'P', long, action = clap::ArgAction::SetTrue)]
364 preferred_idle_scan: bool,
365
366 #[clap(long, action = clap::ArgAction::SetTrue)]
371 disable_smt: bool,
372
373 #[clap(short = 'S', long, action = clap::ArgAction::SetTrue)]
379 avoid_smt: bool,
380
381 #[clap(short = 'N', long, action = clap::ArgAction::SetTrue)]
387 no_early_clear: bool,
388
389 #[clap(short = 'w', long, action = clap::ArgAction::SetTrue)]
396 no_wake_sync: bool,
397
398 #[clap(short = 'd', long, action = clap::ArgAction::SetTrue)]
400 no_deferred_wakeup: bool,
401
402 #[clap(long, action = clap::ArgAction::SetTrue)]
407 no_tick_preempt: bool,
408
409 #[clap(short = 'a', long, action = clap::ArgAction::SetTrue)]
416 mm_affinity: bool,
417
418 #[clap(long)]
420 stats: Option<f64>,
421
422 #[clap(long)]
425 monitor: Option<f64>,
426
427 #[clap(short = 'v', long, action = clap::ArgAction::SetTrue)]
429 verbose: bool,
430
431 #[clap(short = 'V', long, action = clap::ArgAction::SetTrue)]
433 version: bool,
434
435 #[clap(long)]
437 help_stats: bool,
438
439 #[clap(flatten, next_help_heading = "Libbpf Options")]
440 pub libbpf: LibbpfOpts,
441}
442
443#[derive(PartialEq)]
444enum Powermode {
445 Turbo,
446 Performance,
447 Powersave,
448 Any,
449}
450
451fn get_primary_cpus(mode: Powermode) -> std::io::Result<Vec<usize>> {
456 let cpus: Vec<usize> = Topology::new()
457 .unwrap()
458 .all_cores
459 .values()
460 .flat_map(|core| &core.cpus)
461 .filter_map(|(cpu_id, cpu)| match (&mode, &cpu.core_type) {
462 (Powermode::Turbo, CoreType::Big { turbo: true }) |
464 (Powermode::Performance, CoreType::Big { .. }) |
466 (Powermode::Powersave, CoreType::Little) => Some(*cpu_id),
468 (Powermode::Any, ..) => Some(*cpu_id),
469 _ => None,
470 })
471 .collect();
472
473 Ok(cpus)
474}
475
476pub fn parse_cpu_list(optarg: &str) -> Result<Vec<usize>, String> {
477 let mut cpus = Vec::new();
478 let mut seen = HashSet::new();
479
480 if let Some(mode) = match optarg {
482 "powersave" => Some(Powermode::Powersave),
483 "performance" => Some(Powermode::Performance),
484 "turbo" => Some(Powermode::Turbo),
485 "all" => Some(Powermode::Any),
486 _ => None,
487 } {
488 return get_primary_cpus(mode).map_err(|e| e.to_string());
489 }
490
491 if optarg
493 .chars()
494 .any(|c| !c.is_ascii_digit() && c != '-' && c != ',' && !c.is_whitespace())
495 {
496 return Err("Invalid character in CPU list".to_string());
497 }
498
499 let cleaned = optarg.replace(' ', "\t");
501
502 for token in cleaned.split(',') {
503 let token = token.trim_matches(|c: char| c.is_whitespace());
504
505 if token.is_empty() {
506 continue;
507 }
508
509 if let Some((start_str, end_str)) = token.split_once('-') {
510 let start = start_str
511 .trim()
512 .parse::<usize>()
513 .map_err(|_| "Invalid range start")?;
514 let end = end_str
515 .trim()
516 .parse::<usize>()
517 .map_err(|_| "Invalid range end")?;
518
519 if start > end {
520 return Err(format!("Invalid CPU range: {}-{}", start, end));
521 }
522
523 for i in start..=end {
524 if cpus.len() >= *NR_CPU_IDS {
525 return Err(format!("Too many CPUs specified (max {})", *NR_CPU_IDS));
526 }
527 if seen.insert(i) {
528 cpus.push(i);
529 }
530 }
531 } else {
532 let cpu = token
533 .parse::<usize>()
534 .map_err(|_| format!("Invalid CPU: {}", token))?;
535 if cpus.len() >= *NR_CPU_IDS {
536 return Err(format!("Too many CPUs specified (max {})", *NR_CPU_IDS));
537 }
538 if seen.insert(cpu) {
539 cpus.push(cpu);
540 }
541 }
542 }
543
544 Ok(cpus)
545}
546
547const DYNAMIC_THRESHOLD_INIT_VALUE: u64 = 1000;
549
550const DYNAMIC_THRESHOLD_MIN_VALUE: u64 = 10;
552
553const DYNAMIC_THRESHOLD_RATE_HIGH: f64 = 4000.0;
555
556const DYNAMIC_THRESHOLD_RATE_LOW: f64 = 2000.0;
558
559const DYNAMIC_THRESHOLD_HYSTERESIS: f64 = 0.1;
562
563const DYNAMIC_THRESHOLD_EMA_ALPHA: f64 = 0.3;
566
567const DYNAMIC_THRESHOLD_SCALE_MIN: f64 = 0.0001;
569
570const DYNAMIC_THRESHOLD_SCALE_MAX: f64 = 1000.0;
572
573const DYNAMIC_THRESHOLD_SLOPE_HIGH: f64 = 0.35;
576
577const DYNAMIC_THRESHOLD_SLOPE_LOW: f64 = 0.58;
579
580const GPU_SYNC_INTERVAL: Duration = Duration::from_secs(1);
583
584#[derive(Debug, Clone)]
590struct DynamicThresholdState {
591 threshold: u64,
593 smoothed_rate: f64,
595 prev_counter: u64,
597 initialized: bool,
599 adjustment_direction: Option<bool>,
603}
604
605impl DynamicThresholdState {
606 fn new(initial_threshold: u64) -> Self {
608 Self {
609 threshold: initial_threshold,
610 smoothed_rate: 0.0,
611 prev_counter: 0,
612 initialized: false,
613 adjustment_direction: None,
614 }
615 }
616
617 fn update(
620 &mut self,
621 counter: u64,
622 elapsed_secs: f64,
623 verbose: bool,
624 name: &str,
625 ) -> Option<u64> {
626 if elapsed_secs <= 0.0 {
627 return None;
628 }
629
630 let delta = counter.saturating_sub(self.prev_counter);
632 self.prev_counter = counter;
633 let raw_rate = delta as f64 / elapsed_secs;
634
635 if self.initialized {
637 self.smoothed_rate = DYNAMIC_THRESHOLD_EMA_ALPHA * raw_rate
638 + (1.0 - DYNAMIC_THRESHOLD_EMA_ALPHA) * self.smoothed_rate;
639 } else {
640 self.smoothed_rate = raw_rate;
642 self.initialized = true;
643 }
644
645 let rate = self.smoothed_rate;
647 let old_threshold = self.threshold;
648
649 let (effective_high, effective_low) = match self.adjustment_direction {
651 Some(true) => {
652 (
654 DYNAMIC_THRESHOLD_RATE_HIGH,
655 DYNAMIC_THRESHOLD_RATE_LOW * (1.0 - DYNAMIC_THRESHOLD_HYSTERESIS),
656 )
657 }
658 Some(false) => {
659 (
661 DYNAMIC_THRESHOLD_RATE_HIGH * (1.0 + DYNAMIC_THRESHOLD_HYSTERESIS),
662 DYNAMIC_THRESHOLD_RATE_LOW,
663 )
664 }
665 None => {
666 (
668 DYNAMIC_THRESHOLD_RATE_HIGH * (1.0 + DYNAMIC_THRESHOLD_HYSTERESIS),
669 DYNAMIC_THRESHOLD_RATE_LOW * (1.0 - DYNAMIC_THRESHOLD_HYSTERESIS),
670 )
671 }
672 };
673
674 let new_direction = if rate > effective_high {
676 Some(true) } else if rate < effective_low && rate >= 0.0 {
678 Some(false) } else {
680 if self.adjustment_direction.is_some() {
682 if rate >= DYNAMIC_THRESHOLD_RATE_LOW && rate <= DYNAMIC_THRESHOLD_RATE_HIGH {
684 None } else {
686 self.adjustment_direction }
688 } else {
689 None }
691 };
692
693 if let Some(raising) = new_direction {
695 let scale = Self::compute_scale(rate, raising);
696 let factor = if raising { 1.0 + scale } else { 1.0 - scale };
697 let new_threshold = ((self.threshold as f64) * factor).round() as u64;
698 self.threshold = new_threshold.clamp(DYNAMIC_THRESHOLD_MIN_VALUE, u64::MAX);
699 }
700
701 self.adjustment_direction = new_direction;
702
703 if self.threshold != old_threshold {
705 if verbose {
706 info!(
707 "{}: {} -> {} (smoothed rate {:.1}/s, raw {:.1}/s, dir {:?})",
708 name,
709 old_threshold,
710 self.threshold,
711 self.smoothed_rate,
712 raw_rate,
713 self.adjustment_direction
714 );
715 }
716 Some(self.threshold)
717 } else {
718 None
719 }
720 }
721
722 fn compute_scale(rate: f64, too_high: bool) -> f64 {
725 if too_high {
726 let excess = ((rate / DYNAMIC_THRESHOLD_RATE_HIGH) - 1.0).max(0.0);
727 let scale =
728 DYNAMIC_THRESHOLD_SCALE_MIN + DYNAMIC_THRESHOLD_SLOPE_HIGH * excess.min(4.0);
729 scale.min(DYNAMIC_THRESHOLD_SCALE_MAX)
730 } else {
731 if rate <= 0.0 {
732 return DYNAMIC_THRESHOLD_SCALE_MAX;
733 }
734 let deficit = (DYNAMIC_THRESHOLD_RATE_LOW - rate) / DYNAMIC_THRESHOLD_RATE_LOW;
735 let t = deficit.clamp(0.0, 1.0);
736 DYNAMIC_THRESHOLD_SCALE_MIN + DYNAMIC_THRESHOLD_SLOPE_LOW * t
737 }
738 }
739}
740
741#[derive(Debug, Clone, Copy)]
742struct CpuTimes {
743 user: u64,
744 nice: u64,
745 total: u64,
746}
747
748struct Scheduler<'a> {
749 skel: BpfSkel<'a>,
750 opts: &'a Opts,
751 struct_ops: Option<libbpf_rs::Link>,
752 stats_server: StatsServer<(), Metrics>,
753 gpu_index_to_node: Option<HashMap<u32, u32>>,
755 previous_gpu_pids: Option<HashMap<u32, u32>>,
757 nvml: Option<Nvml>,
759 perf_threshold_state: Option<DynamicThresholdState>,
761 perf_sticky_threshold_state: Option<DynamicThresholdState>,
763}
764
765impl<'a> Scheduler<'a> {
766 fn init(opts: &'a Opts, open_object: &'a mut MaybeUninit<OpenObject>) -> Result<Self> {
767 try_set_rlimit_infinity();
768
769 let topo = Topology::new().unwrap();
771
772 let smt_enabled = !opts.disable_smt && topo.smt_enabled;
774
775 let nr_nodes = topo
777 .nodes
778 .values()
779 .filter(|node| !node.all_cpus.is_empty())
780 .count();
781 info!("NUMA nodes: {}", nr_nodes);
782
783 let numa_enabled = !opts.disable_numa && nr_nodes > 1;
785 if !numa_enabled {
786 info!("Disabling NUMA optimizations");
787 }
788
789 info!(
790 "{} {} {}",
791 SCHEDULER_NAME,
792 build_id::full_version(env!("CARGO_PKG_VERSION")),
793 if smt_enabled { "SMT on" } else { "SMT off" }
794 );
795
796 info!(
798 "scheduler options: {}",
799 std::env::args().collect::<Vec<_>>().join(" ")
800 );
801
802 let mut skel_builder = BpfSkelBuilder::default();
804 skel_builder.obj_builder.debug(opts.verbose);
805 let open_opts = opts.libbpf.clone().into_bpf_open_opts();
806 let mut skel = scx_ops_open!(skel_builder, open_object, cosmos_ops, open_opts)?;
807
808 skel.struct_ops.cosmos_ops_mut().exit_dump_len = opts.exit_dump_len;
809
810 let rodata = skel.maps.rodata_data.as_mut().unwrap();
812 rodata.slice_ns = opts.slice_us * 1000;
813 rodata.slice_lag = opts.slice_lag_us * 1000;
814 rodata.cpufreq_enabled = !opts.disable_cpufreq;
815 rodata.flat_idle_scan = opts.flat_idle_scan;
816 rodata.smt_enabled = smt_enabled;
817 rodata.numa_enabled = numa_enabled;
818 rodata.nr_node_ids = topo.nodes.len() as u32;
819 rodata.no_wake_sync = opts.no_wake_sync;
820 rodata.avoid_smt = opts.avoid_smt;
821 rodata.no_early_clear = opts.no_early_clear;
822 rodata.tick_preempt = !opts.no_tick_preempt;
823 rodata.mm_affinity = opts.mm_affinity;
824
825 rodata.perf_config = opts.perf_config.event_id;
827 rodata.perf_sticky = opts.perf_sticky.event_id;
828
829 rodata.busy_threshold = opts.cpu_busy_thresh * 1024 / 100;
831
832 let mut cpus: Vec<_> = topo.all_cpus.values().collect();
834 cpus.sort_by_key(|cpu| std::cmp::Reverse(cpu.cpu_capacity));
835 let max_cap = cpus.first().map(|c| c.cpu_capacity).unwrap_or(1).max(1);
837 for (i, cpu) in cpus.iter().enumerate() {
838 let normalized = (cpu.cpu_capacity * 1024 / max_cap).clamp(1, 1024);
839 rodata.cpu_capacity[cpu.id] = normalized as c_ulong;
840 rodata.preferred_cpus[i] = cpu.id as u64;
841 }
842 rodata.all_cpus_same_capacity = cpus.iter().all(|cpu| cpu.cpu_capacity == max_cap);
843 if opts.preferred_idle_scan {
844 info!(
845 "Preferred CPUs: {:?}",
846 &rodata.preferred_cpus[0..cpus.len()]
847 );
848 }
849 rodata.preferred_idle_scan = opts.preferred_idle_scan;
850
851 let primary_cpus = if let Some(ref domain) = opts.primary_domain {
853 match parse_cpu_list(domain) {
854 Ok(cpus) => cpus,
855 Err(e) => bail!("Error parsing primary domain: {}", e),
856 }
857 } else {
858 (0..*NR_CPU_IDS).collect()
859 };
860 if primary_cpus.len() < *NR_CPU_IDS {
861 info!("Primary CPUs: {:?}", primary_cpus);
862 rodata.primary_all = false;
863 } else {
864 rodata.primary_all = true;
865 }
866
867 let (gpu_index_to_node, previous_gpu_pids, nvml) = if opts.gpu && numa_enabled {
870 match Nvml::init_with_flags(InitFlags::NO_GPUS) {
871 Ok(nvml) => {
872 info!("NVIDIA GPU-aware scheduling enabled (NVML PID sync)");
873 rodata.gpu_enabled = true;
874 let mut idx_to_node = HashMap::new();
875 for (id, gpu) in topo.gpus() {
876 let GpuIndex::Nvidia { nvml_id } = id;
877 idx_to_node.insert(nvml_id, gpu.node_id as u32);
878 }
879 (Some(idx_to_node), Some(HashMap::new()), Some(nvml))
880 }
881 Err(e) => {
882 warn!("NVML init failed, disabling GPU-aware scheduling: {}", e);
883 rodata.gpu_enabled = false;
884 (None, None, None)
885 }
886 }
887 } else {
888 rodata.gpu_enabled = false;
889 (None, None, None)
890 };
891
892 skel.struct_ops.cosmos_ops_mut().flags = *compat::SCX_OPS_ENQ_EXITING
894 | *compat::SCX_OPS_ENQ_LAST
895 | *compat::SCX_OPS_ENQ_MIGRATION_DISABLED
896 | *compat::SCX_OPS_ALLOW_QUEUED_WAKEUP;
897
898 info!(
899 "scheduler flags: {:#x}",
900 skel.struct_ops.cosmos_ops_mut().flags
901 );
902
903 let mut skel = scx_ops_load!(skel, cosmos_ops, uei)?;
905
906 let bss = skel.maps.bss_data.as_mut().unwrap();
909 if opts.perf_config.event_id > 0 {
910 bss.perf_threshold = if opts.perf_threshold == 0 {
911 DYNAMIC_THRESHOLD_INIT_VALUE
912 } else {
913 opts.perf_threshold
914 };
915 }
916 if opts.perf_sticky.event_id > 0 {
917 bss.perf_sticky_threshold = if opts.perf_sticky_threshold == 0 {
918 DYNAMIC_THRESHOLD_INIT_VALUE
919 } else {
920 opts.perf_sticky_threshold
921 };
922 }
923
924 for node in topo.nodes.values() {
926 for cpu in node.all_cpus.values() {
927 if opts.verbose {
928 info!("CPU{} -> node{}", cpu.id, node.id);
929 }
930 skel.maps.cpu_node_map.update(
931 &(cpu.id as u32).to_ne_bytes(),
932 &(node.id as u32).to_ne_bytes(),
933 MapFlags::ANY,
934 )?;
935 }
936 }
937
938 let nr_cpus = *NR_CPU_IDS;
942 info!("Setting up performance counters for {} CPUs...", nr_cpus);
943 let mut perf_available = true;
944 let sticky_counter_idx = if opts.perf_config.event_id > 0 { 1 } else { 0 };
945 for cpu in 0..nr_cpus {
946 if opts.perf_config.event_id > 0 {
947 if let Err(e) = setup_perf_events(&mut skel, cpu as i32, &opts.perf_config, 0) {
948 if cpu == 0 {
949 let err_str = e.to_string();
950 if err_str.contains("errno 2") || err_str.contains("os error 2") {
951 warn!("Performance counters not available on this CPU architecture");
952 warn!("PMU event '{}' not supported - scheduler will run without perf monitoring", opts.perf_config.display_name);
953 } else {
954 warn!("Failed to setup perf events: {}", e);
955 }
956 perf_available = false;
957 break;
958 }
959 }
960 }
961 if opts.perf_sticky.event_id > 0 {
962 if let Err(e) =
963 setup_perf_events(&mut skel, cpu as i32, &opts.perf_sticky, sticky_counter_idx)
964 {
965 if cpu == 0 {
966 let err_str = e.to_string();
967 if err_str.contains("errno 2") || err_str.contains("os error 2") {
968 warn!("Performance counters not available on this CPU architecture");
969 warn!("PMU event '{}' not supported - scheduler will run without perf monitoring", opts.perf_sticky.display_name);
970 } else {
971 warn!("Failed to setup perf events: {}", e);
972 }
973 perf_available = false;
974 break;
975 }
976 }
977 }
978 }
979 if perf_available {
980 info!("Performance counters configured successfully for all CPUs");
981 }
982
983 if opts.gpu && numa_enabled {
985 for (id, gpu) in topo.gpus() {
986 let GpuIndex::Nvidia { nvml_id } = id;
987 if opts.verbose {
988 info!("GPU{} -> node{}", nvml_id, gpu.node_id);
989 }
990 skel.maps.gpu_node_map.update(
991 &(nvml_id as u32).to_ne_bytes(),
992 &(gpu.node_id as u32).to_ne_bytes(),
993 MapFlags::ANY,
994 )?;
995 }
996 }
997
998 if primary_cpus.len() < *NR_CPU_IDS {
1000 for cpu in primary_cpus {
1001 if let Err(err) = Self::enable_primary_cpu(&mut skel, cpu as i32) {
1002 bail!("failed to add CPU {} to primary domain: error {}", cpu, err);
1003 }
1004 }
1005 }
1006
1007 if smt_enabled {
1009 Self::init_smt_domains(&mut skel, &topo)?;
1010 }
1011
1012 let struct_ops = Some(scx_ops_attach!(skel, cosmos_ops)?);
1014 let stats_server = StatsServer::new(stats::server_data()).launch()?;
1015
1016 let perf_threshold_state = if opts.perf_config.event_id > 0 && opts.perf_threshold == 0 {
1018 Some(DynamicThresholdState::new(DYNAMIC_THRESHOLD_INIT_VALUE))
1019 } else {
1020 None
1021 };
1022 let perf_sticky_threshold_state =
1023 if opts.perf_sticky.event_id > 0 && opts.perf_sticky_threshold == 0 {
1024 Some(DynamicThresholdState::new(DYNAMIC_THRESHOLD_INIT_VALUE))
1025 } else {
1026 None
1027 };
1028
1029 Ok(Self {
1030 skel,
1031 opts,
1032 struct_ops,
1033 stats_server,
1034 gpu_index_to_node,
1035 previous_gpu_pids,
1036 nvml,
1037 perf_threshold_state,
1038 perf_sticky_threshold_state,
1039 })
1040 }
1041
1042 fn sync_gpu_pids(&mut self) -> Result<()> {
1046 let gpu_index_to_node = match &self.gpu_index_to_node {
1047 Some(m) => m,
1048 None => return Ok(()),
1049 };
1050 let nvml = match &self.nvml {
1051 Some(n) => n,
1052 None => return Ok(()),
1053 };
1054 let threshold = self.opts.gpu_util_threshold;
1055 let previous = self.previous_gpu_pids.as_ref().unwrap();
1056 let mut pid_to_nodes: HashMap<u32, HashSet<u32>> = HashMap::new();
1058
1059 let count = nvml.device_count().context("NVML device count")?;
1060 for i in 0..count {
1061 let node = match gpu_index_to_node.get(&i) {
1062 Some(&n) => n,
1063 None => continue,
1064 };
1065 let device = nvml.device_by_index(i).context("NVML device_by_index")?;
1066
1067 if threshold > 0 {
1068 match device.process_utilization_stats(None::<u64>) {
1070 Ok(samples) => {
1071 for sample in samples {
1072 let util = sample.sm_util.max(sample.mem_util);
1073 if util >= threshold {
1074 pid_to_nodes.entry(sample.pid).or_default().insert(node);
1075 }
1076 }
1077 }
1078 Err(_) => {
1079 Self::add_running_gpu_processes_to_set(&device, node, &mut pid_to_nodes);
1081 }
1082 }
1083 } else {
1084 Self::add_running_gpu_processes_to_set(&device, node, &mut pid_to_nodes);
1085 }
1086 }
1087
1088 let mut current: HashMap<u32, u32> = HashMap::new();
1090 for (tgid, nodes) in pid_to_nodes {
1091 if nodes.len() == 1 {
1092 let node = nodes.into_iter().next().unwrap();
1093 current.insert(tgid, node);
1094 for tid in Self::task_tids(tgid) {
1095 current.insert(tid, node);
1096 }
1097 }
1098 }
1099
1100 let map = &self.skel.maps.gpu_pid_map;
1101 for (pid, node) in ¤t {
1102 map.update(&pid.to_ne_bytes(), &node.to_ne_bytes(), MapFlags::ANY)
1103 .context("gpu_pid_map update")?;
1104 }
1105 for pid in previous.keys() {
1106 if !current.contains_key(pid) {
1107 let _ = map.delete(&pid.to_ne_bytes());
1108 }
1109 }
1110 *self.previous_gpu_pids.as_mut().unwrap() = current;
1111 Ok(())
1112 }
1113
1114 fn add_running_gpu_processes_to_set(
1116 device: &nvml_wrapper::Device<'_>,
1117 node: u32,
1118 pid_to_nodes: &mut HashMap<u32, HashSet<u32>>,
1119 ) {
1120 for proc in device
1121 .running_compute_processes()
1122 .unwrap_or_default()
1123 .into_iter()
1124 .chain(device.running_graphics_processes().unwrap_or_default())
1125 {
1126 pid_to_nodes.entry(proc.pid).or_default().insert(node);
1127 }
1128 }
1129
1130 fn task_tids(pid: u32) -> Vec<u32> {
1132 let task_dir = format!("/proc/{}/task", pid);
1133 let Ok(entries) = fs::read_dir(Path::new(&task_dir)) else {
1134 return Vec::new();
1135 };
1136 entries
1137 .filter_map(|e| e.ok())
1138 .filter_map(|e| e.file_name().to_str().and_then(|s| s.parse::<u32>().ok()))
1139 .collect()
1140 }
1141
1142 fn enable_primary_cpu(skel: &mut BpfSkel<'_>, cpu: i32) -> Result<(), u32> {
1143 let prog = &mut skel.progs.enable_primary_cpu;
1144 let mut args = cpu_arg {
1145 cpu_id: cpu as c_int,
1146 };
1147 let input = ProgramInput {
1148 context_in: Some(unsafe {
1149 std::slice::from_raw_parts_mut(
1150 &mut args as *mut _ as *mut u8,
1151 std::mem::size_of_val(&args),
1152 )
1153 }),
1154 ..Default::default()
1155 };
1156 let out = prog.test_run(input).unwrap();
1157 if out.return_value != 0 {
1158 return Err(out.return_value);
1159 }
1160
1161 Ok(())
1162 }
1163
1164 fn enable_sibling_cpu(
1165 skel: &mut BpfSkel<'_>,
1166 cpu: usize,
1167 sibling_cpu: usize,
1168 ) -> Result<(), u32> {
1169 let prog = &mut skel.progs.enable_sibling_cpu;
1170 let mut args = domain_arg {
1171 cpu_id: cpu as c_int,
1172 sibling_cpu_id: sibling_cpu as c_int,
1173 };
1174 let input = ProgramInput {
1175 context_in: Some(unsafe {
1176 std::slice::from_raw_parts_mut(
1177 &mut args as *mut _ as *mut u8,
1178 std::mem::size_of_val(&args),
1179 )
1180 }),
1181 ..Default::default()
1182 };
1183 let out = prog.test_run(input).unwrap();
1184 if out.return_value != 0 {
1185 return Err(out.return_value);
1186 }
1187
1188 Ok(())
1189 }
1190
1191 fn init_smt_domains(skel: &mut BpfSkel<'_>, topo: &Topology) -> Result<(), std::io::Error> {
1192 let smt_siblings = topo.sibling_cpus();
1193
1194 info!("SMT sibling CPUs: {:?}", smt_siblings);
1195 for (cpu, sibling_cpu) in smt_siblings.iter().enumerate() {
1196 Self::enable_sibling_cpu(skel, cpu, *sibling_cpu as usize).unwrap();
1197 }
1198
1199 Ok(())
1200 }
1201
1202 fn get_metrics(&self) -> Metrics {
1203 let bss_data = self.skel.maps.bss_data.as_ref().unwrap();
1204 Metrics {
1205 nr_event_dispatches: bss_data.nr_event_dispatches,
1206 nr_ev_sticky_dispatches: bss_data.nr_ev_sticky_dispatches,
1207 nr_gpu_dispatches: bss_data.nr_gpu_dispatches,
1208 }
1209 }
1210
1211 pub fn exited(&mut self) -> bool {
1212 uei_exited!(&self.skel, uei)
1213 }
1214
1215 fn compute_user_cpu_pct(prev: &CpuTimes, curr: &CpuTimes) -> Option<u64> {
1216 let user_diff = (curr.user + curr.nice).saturating_sub(prev.user + prev.nice);
1218 let total_diff = curr.total.saturating_sub(prev.total);
1219
1220 if total_diff > 0 {
1221 let user_ratio = user_diff as f64 / total_diff as f64;
1222 Some((user_ratio * 1024.0).round() as u64)
1223 } else {
1224 None
1225 }
1226 }
1227
1228 fn read_per_cpu_cpu_times(nr_cpus: usize) -> Option<Vec<CpuTimes>> {
1231 let file = File::open("/proc/stat").ok()?;
1232 let reader = BufReader::new(file);
1233 let mut result = Vec::with_capacity(nr_cpus);
1234
1235 for line in reader.lines() {
1236 let line = line.ok()?;
1237 let line = line.trim();
1238 if !line.starts_with("cpu") {
1239 continue;
1240 }
1241 let rest = line.strip_prefix("cpu")?;
1242 if rest.starts_with(' ') {
1243 continue;
1245 }
1246 let cpu_id: usize = rest.split_whitespace().next()?.parse().ok()?;
1247 if cpu_id != result.len() {
1248 break;
1249 }
1250 let fields: Vec<&str> = line.split_whitespace().collect();
1251 if fields.len() < 5 {
1252 return None;
1253 }
1254 let user: u64 = fields[1].parse().ok()?;
1255 let nice: u64 = fields[2].parse().ok()?;
1256 let total: u64 = fields
1257 .iter()
1258 .skip(1)
1259 .take(8)
1260 .filter_map(|v| v.parse::<u64>().ok())
1261 .sum();
1262 result.push(CpuTimes { user, nice, total });
1263 if result.len() >= nr_cpus {
1264 break;
1265 }
1266 }
1267
1268 if result.len() == nr_cpus {
1269 Some(result)
1270 } else {
1271 None
1272 }
1273 }
1274
1275 fn run(&mut self, shutdown: Arc<AtomicBool>) -> Result<UserExitInfo> {
1276 let (res_ch, req_ch) = self.stats_server.channels();
1277
1278 let polling_time = Duration::from_millis(self.opts.polling_ms).min(Duration::from_secs(1));
1283 let nr_cpus = *NR_CPU_IDS as usize;
1284 let mut prev_cputime =
1285 Self::read_per_cpu_cpu_times(nr_cpus).expect("Failed to read initial per-CPU stats");
1286 let mut last_update = Instant::now();
1287 let mut last_gpu_sync = Instant::now();
1288
1289 while !shutdown.load(Ordering::Relaxed) && !self.exited() {
1290 if !polling_time.is_zero() && last_update.elapsed() >= polling_time {
1292 if let Some(curr_cputime) = Self::read_per_cpu_cpu_times(nr_cpus) {
1293 let map = &self.skel.maps.cpu_util_map;
1294 for cpu in 0..nr_cpus {
1295 if let Some(util) =
1296 Self::compute_user_cpu_pct(&prev_cputime[cpu], &curr_cputime[cpu])
1297 {
1298 let _ = map.update(
1299 &(cpu as u32).to_ne_bytes(),
1300 &util.to_ne_bytes(),
1301 MapFlags::ANY,
1302 );
1303 }
1304 }
1305 prev_cputime = curr_cputime;
1306 }
1307
1308 let elapsed_secs = last_update.elapsed().as_secs_f64();
1310
1311 if let Some(ref mut state) = self.perf_threshold_state {
1313 let nr_event = self
1314 .skel
1315 .maps
1316 .bss_data
1317 .as_ref()
1318 .unwrap()
1319 .nr_event_dispatches;
1320 if let Some(new_thresh) =
1321 state.update(nr_event, elapsed_secs, self.opts.verbose, "perf_threshold")
1322 {
1323 self.skel.maps.bss_data.as_mut().unwrap().perf_threshold = new_thresh;
1324 }
1325 }
1326
1327 if let Some(ref mut state) = self.perf_sticky_threshold_state {
1329 let nr_sticky = self
1330 .skel
1331 .maps
1332 .bss_data
1333 .as_ref()
1334 .unwrap()
1335 .nr_ev_sticky_dispatches;
1336 if let Some(new_thresh) = state.update(
1337 nr_sticky,
1338 elapsed_secs,
1339 self.opts.verbose,
1340 "perf_sticky_threshold",
1341 ) {
1342 self.skel
1343 .maps
1344 .bss_data
1345 .as_mut()
1346 .unwrap()
1347 .perf_sticky_threshold = new_thresh;
1348 }
1349 }
1350
1351 if self.gpu_index_to_node.is_some() && last_gpu_sync.elapsed() >= GPU_SYNC_INTERVAL
1353 {
1354 if let Err(e) = self.sync_gpu_pids() {
1355 debug!("GPU PID sync: {}", e);
1356 }
1357 last_gpu_sync = Instant::now();
1358 }
1359
1360 last_update = Instant::now();
1361 }
1362
1363 let timeout = if polling_time.is_zero() {
1365 Duration::from_secs(1)
1366 } else {
1367 polling_time
1368 };
1369 match req_ch.recv_timeout(timeout) {
1370 Ok(()) => res_ch.send(self.get_metrics())?,
1371 Err(RecvTimeoutError::Timeout) => {}
1372 Err(e) => Err(e)?,
1373 }
1374 }
1375
1376 let _ = self.struct_ops.take();
1377 uei_report!(&self.skel, uei)
1378 }
1379}
1380
1381impl Drop for Scheduler<'_> {
1382 fn drop(&mut self) {
1383 info!("Unregister {SCHEDULER_NAME} scheduler");
1384 }
1385}
1386
1387fn main() -> Result<()> {
1388 let opts = Opts::parse();
1389
1390 if opts.version {
1391 println!(
1392 "{} {}",
1393 SCHEDULER_NAME,
1394 build_id::full_version(env!("CARGO_PKG_VERSION"))
1395 );
1396 return Ok(());
1397 }
1398
1399 if opts.help_stats {
1400 stats::server_data().describe_meta(&mut std::io::stdout(), None)?;
1401 return Ok(());
1402 }
1403
1404 let loglevel = simplelog::LevelFilter::Info;
1405
1406 let mut lcfg = simplelog::ConfigBuilder::new();
1407 lcfg.set_time_offset_to_local()
1408 .expect("Failed to set local time offset")
1409 .set_time_level(simplelog::LevelFilter::Error)
1410 .set_location_level(simplelog::LevelFilter::Off)
1411 .set_target_level(simplelog::LevelFilter::Off)
1412 .set_thread_level(simplelog::LevelFilter::Off);
1413 simplelog::TermLogger::init(
1414 loglevel,
1415 lcfg.build(),
1416 simplelog::TerminalMode::Stderr,
1417 simplelog::ColorChoice::Auto,
1418 )?;
1419
1420 let shutdown = Arc::new(AtomicBool::new(false));
1421 let shutdown_clone = shutdown.clone();
1422 ctrlc::set_handler(move || {
1423 shutdown_clone.store(true, Ordering::Relaxed);
1424 })
1425 .context("Error setting Ctrl-C handler")?;
1426
1427 if let Some(intv) = opts.monitor.or(opts.stats) {
1428 let shutdown_copy = shutdown.clone();
1429 let jh = std::thread::spawn(move || {
1430 match stats::monitor(Duration::from_secs_f64(intv), shutdown_copy) {
1431 Ok(_) => {
1432 debug!("stats monitor thread finished successfully")
1433 }
1434 Err(error_object) => {
1435 warn!(
1436 "stats monitor thread finished because of an error {}",
1437 error_object
1438 )
1439 }
1440 }
1441 });
1442 if opts.monitor.is_some() {
1443 let _ = jh.join();
1444 return Ok(());
1445 }
1446 }
1447
1448 let mut open_object = MaybeUninit::uninit();
1449 loop {
1450 let mut sched = Scheduler::init(&opts, &mut open_object)?;
1451 if !sched.run(shutdown.clone())?.should_restart() {
1452 break;
1453 }
1454 }
1455
1456 Ok(())
1457}