1mod bpf_skel;
6mod stats;
7
8use std::collections::BTreeMap;
9use std::collections::BTreeSet;
10use std::collections::HashMap;
11use std::collections::HashSet;
12use std::ffi::CString;
13use std::fs;
14use std::io::Write;
15use std::mem::MaybeUninit;
16use std::ops::Sub;
17use std::path::Path;
18use std::path::PathBuf;
19use std::sync::atomic::AtomicBool;
20use std::sync::atomic::Ordering;
21use std::sync::Arc;
22use std::thread::ThreadId;
23use std::time::Duration;
24use std::time::Instant;
25
26use inotify::{Inotify, WatchMask};
27use std::os::unix::io::AsRawFd;
28
29use anyhow::anyhow;
30use anyhow::bail;
31use anyhow::Context;
32use anyhow::Result;
33pub use bpf_skel::*;
34use clap::Parser;
35use crossbeam::channel::Receiver;
36use crossbeam::select;
37use lazy_static::lazy_static;
38use libbpf_rs::libbpf_sys;
39use libbpf_rs::AsRawLibbpf;
40use libbpf_rs::MapCore as _;
41use libbpf_rs::OpenObject;
42use libbpf_rs::ProgramInput;
43use nix::sched::CpuSet;
44use nvml_wrapper::error::NvmlError;
45use nvml_wrapper::Nvml;
46use once_cell::sync::OnceCell;
47use regex::Regex;
48use scx_layered::alloc::{unified_alloc, LayerAlloc, LayerDemand};
49use scx_layered::*;
50use scx_raw_pmu::PMUManager;
51use scx_stats::prelude::*;
52use scx_utils::build_id;
53use scx_utils::compat;
54use scx_utils::init_libbpf_logging;
55use scx_utils::libbpf_clap_opts::LibbpfOpts;
56use scx_utils::perf;
57use scx_utils::pm::{cpu_idle_resume_latency_supported, update_cpu_idle_resume_latency};
58use scx_utils::read_netdevs;
59use scx_utils::scx_enums;
60use scx_utils::scx_ops_attach;
61use scx_utils::scx_ops_load;
62use scx_utils::scx_ops_open;
63use scx_utils::uei_exited;
64use scx_utils::uei_report;
65use scx_utils::CoreType;
66use scx_utils::Cpumask;
67use scx_utils::NetDev;
68use scx_utils::Topology;
69use scx_utils::TopologyArgs;
70use scx_utils::UserExitInfo;
71use scx_utils::NR_CPUS_POSSIBLE;
72use scx_utils::NR_CPU_IDS;
73use stats::LayerStats;
74use stats::StatsReq;
75use stats::StatsRes;
76use stats::SysStats;
77use std::collections::VecDeque;
78use sysinfo::{Pid, ProcessRefreshKind, ProcessesToUpdate, System};
79use tracing::{debug, error, info, trace, warn};
80use tracing_subscriber::filter::EnvFilter;
81use walkdir::WalkDir;
82
83const SCHEDULER_NAME: &str = "scx_layered";
84const MAX_PATH: usize = bpf_intf::consts_MAX_PATH as usize;
85const MAX_COMM: usize = bpf_intf::consts_MAX_COMM as usize;
86const MAX_LAYER_WEIGHT: u32 = bpf_intf::consts_MAX_LAYER_WEIGHT;
87const MIN_LAYER_WEIGHT: u32 = bpf_intf::consts_MIN_LAYER_WEIGHT;
88const MAX_LAYER_MATCH_ORS: usize = bpf_intf::consts_MAX_LAYER_MATCH_ORS as usize;
89const MAX_LAYER_NAME: usize = bpf_intf::consts_MAX_LAYER_NAME as usize;
90const MAX_LAYERS: usize = bpf_intf::consts_MAX_LAYERS as usize;
91const DEFAULT_LAYER_WEIGHT: u32 = bpf_intf::consts_DEFAULT_LAYER_WEIGHT;
92const USAGE_HALF_LIFE: u32 = bpf_intf::consts_USAGE_HALF_LIFE;
93const USAGE_HALF_LIFE_F64: f64 = USAGE_HALF_LIFE as f64 / 1_000_000_000.0;
94
95const LAYER_USAGE_OWNED: usize = bpf_intf::layer_usage_LAYER_USAGE_OWNED as usize;
96const LAYER_USAGE_OPEN: usize = bpf_intf::layer_usage_LAYER_USAGE_OPEN as usize;
97const LAYER_USAGE_SUM_UPTO: usize = bpf_intf::layer_usage_LAYER_USAGE_SUM_UPTO as usize;
98const LAYER_USAGE_PROTECTED: usize = bpf_intf::layer_usage_LAYER_USAGE_PROTECTED as usize;
99const LAYER_USAGE_PROTECTED_PREEMPT: usize =
100 bpf_intf::layer_usage_LAYER_USAGE_PROTECTED_PREEMPT as usize;
101const NR_LAYER_USAGES: usize = bpf_intf::layer_usage_NR_LAYER_USAGES as usize;
102
103const NR_GSTATS: usize = bpf_intf::global_stat_id_NR_GSTATS as usize;
104const NR_LSTATS: usize = bpf_intf::layer_stat_id_NR_LSTATS as usize;
105const NR_LLC_LSTATS: usize = bpf_intf::llc_layer_stat_id_NR_LLC_LSTATS as usize;
106
107const NR_LAYER_MATCH_KINDS: usize = bpf_intf::layer_match_kind_NR_LAYER_MATCH_KINDS as usize;
108
109static NVML: OnceCell<Nvml> = OnceCell::new();
110
111fn nvml() -> Result<&'static Nvml, NvmlError> {
112 NVML.get_or_try_init(Nvml::init)
113}
114
115lazy_static! {
116 static ref USAGE_DECAY: f64 = 0.5f64.powf(1.0 / USAGE_HALF_LIFE_F64);
117 static ref DFL_DISALLOW_OPEN_AFTER_US: u64 = 2 * scx_enums.SCX_SLICE_DFL / 1000;
118 static ref DFL_DISALLOW_PREEMPT_AFTER_US: u64 = 4 * scx_enums.SCX_SLICE_DFL / 1000;
119 static ref EXAMPLE_CONFIG: LayerConfig = serde_json::from_str(
120 r#"[
121 {
122 "name": "batch",
123 "comment": "tasks under system.slice or tasks with nice value > 0",
124 "matches": [[{"CgroupPrefix": "system.slice/"}], [{"NiceAbove": 0}]],
125 "kind": {"Confined": {
126 "util_range": [0.8, 0.9], "cpus_range": [0, 16],
127 "min_exec_us": 1000, "slice_us": 20000, "weight": 100,
128 "xllc_mig_min_us": 1000.0, "perf": 1024
129 }}
130 },
131 {
132 "name": "immediate",
133 "comment": "tasks under workload.slice with nice value < 0",
134 "matches": [[{"CgroupPrefix": "workload.slice/"}, {"NiceBelow": 0}]],
135 "kind": {"Open": {
136 "min_exec_us": 100, "yield_ignore": 0.25, "slice_us": 20000,
137 "preempt": true, "exclusive": true,
138 "prev_over_idle_core": true,
139 "weight": 100, "perf": 1024
140 }}
141 },
142 {
143 "name": "stress-ng",
144 "comment": "stress-ng test layer",
145 "matches": [[{"CommPrefix": "stress-ng"}], [{"PcommPrefix": "stress-ng"}]],
146 "kind": {"Confined": {
147 "util_range": [0.2, 0.8],
148 "min_exec_us": 800, "preempt": true, "slice_us": 800,
149 "weight": 100, "growth_algo": "Topo", "perf": 1024
150 }}
151 },
152 {
153 "name": "normal",
154 "comment": "the rest",
155 "matches": [[]],
156 "kind": {"Grouped": {
157 "util_range": [0.5, 0.6], "util_includes_open_cputime": true,
158 "min_exec_us": 200, "slice_us": 20000, "weight": 100,
159 "xllc_mig_min_us": 100.0, "growth_algo": "Linear", "perf": 1024
160 }}
161 }
162 ]"#,
163 )
164 .unwrap();
165}
166
167#[derive(Debug, Parser)]
469#[command(verbatim_doc_comment)]
470struct Opts {
471 #[clap(short = 'v', long, action = clap::ArgAction::Count)]
473 verbose: u8,
474
475 #[clap(short = 's', long, default_value = "20000")]
477 slice_us: u64,
478
479 #[clap(short = 'M', long, default_value = "0")]
484 max_exec_us: u64,
485
486 #[clap(short = 'i', long, default_value = "0.1")]
488 interval: f64,
489
490 #[clap(short = 'n', long, default_value = "false")]
493 no_load_frac_limit: bool,
494
495 #[clap(long, default_value = "0")]
497 exit_dump_len: u32,
498
499 #[clap(long, default_value = "info")]
502 log_level: String,
503
504 #[arg(short = 't', long, num_args = 0..=1, default_missing_value = "true", require_equals = true)]
508 disable_topology: Option<bool>,
509
510 #[clap(long)]
512 monitor_disable: bool,
513
514 #[clap(short = 'e', long)]
516 example: Option<String>,
517
518 #[clap(long, default_value = "0.0")]
522 layer_preempt_weight_disable: f64,
523
524 #[clap(long, default_value = "0.0")]
528 layer_growth_weight_disable: f64,
529
530 #[clap(long)]
532 stats: Option<f64>,
533
534 #[clap(long)]
537 monitor: Option<f64>,
538
539 #[clap(long, default_value = "95")]
541 stats_columns: usize,
542
543 #[clap(long)]
545 stats_no_llc: bool,
546
547 #[clap(long)]
549 run_example: bool,
550
551 #[clap(long)]
553 allow_partial_core: bool,
554
555 #[clap(long, default_value = "false")]
558 local_llc_iteration: bool,
559
560 #[clap(long, default_value = "10000")]
565 lo_fb_wait_us: u64,
566
567 #[clap(long, default_value = ".05")]
570 lo_fb_share: f64,
571
572 #[clap(long, default_value = "false")]
574 disable_antistall: bool,
575
576 #[clap(long, default_value = "false")]
578 enable_gpu_affinitize: bool,
579
580 #[clap(long, default_value = "900")]
583 gpu_affinitize_secs: u64,
584
585 #[clap(long, default_value = "false")]
590 enable_match_debug: bool,
591
592 #[clap(long, default_value = "3")]
594 antistall_sec: u64,
595
596 #[clap(long, default_value = "false")]
598 enable_gpu_support: bool,
599
600 #[clap(long, default_value = "3")]
608 gpu_kprobe_level: u64,
609
610 #[clap(long, default_value = "false")]
614 util_compensation: bool,
615
616 #[clap(long, default_value = "false")]
618 netdev_irq_balance: bool,
619
620 #[clap(long, default_value = "false")]
622 disable_queued_wakeup: bool,
623
624 #[clap(long, default_value = "false")]
626 disable_percpu_kthread_preempt: bool,
627
628 #[clap(long, default_value = "false")]
632 percpu_kthread_preempt_all: bool,
633
634 #[clap(short = 'V', long, action = clap::ArgAction::SetTrue)]
636 version: bool,
637
638 #[clap(long)]
640 run_id: Option<u64>,
641
642 #[clap(long)]
644 help_stats: bool,
645
646 specs: Vec<String>,
648
649 #[clap(long, default_value = "2000")]
652 layer_refresh_ms_avgruntime: u64,
653
654 #[clap(long, default_value = "")]
656 task_hint_map: String,
657
658 #[clap(long, default_value = "false")]
660 print_and_exit: bool,
661
662 #[clap(long, default_value = "")]
664 hi_fb_thread_name: String,
665
666 #[clap(flatten, next_help_heading = "Topology Options")]
667 topology: TopologyArgs,
668
669 #[clap(flatten, next_help_heading = "Libbpf Options")]
670 pub libbpf: LibbpfOpts,
671}
672
673#[derive(Debug, Clone)]
675enum CgroupEvent {
676 Created {
677 path: String,
678 cgroup_id: u64, match_bitmap: u64, },
681 Removed {
682 path: String,
683 cgroup_id: u64, },
685}
686
687fn read_total_cpu(reader: &fb_procfs::ProcReader) -> Result<fb_procfs::CpuStat> {
688 reader
689 .read_stat()
690 .context("Failed to read procfs")?
691 .total_cpu
692 .ok_or_else(|| anyhow!("Could not read total cpu stat in proc"))
693}
694
695fn calc_util(curr: &fb_procfs::CpuStat, prev: &fb_procfs::CpuStat) -> Result<f64> {
696 match (curr, prev) {
697 (
698 fb_procfs::CpuStat {
699 user_usec: Some(curr_user),
700 nice_usec: Some(curr_nice),
701 system_usec: Some(curr_system),
702 idle_usec: Some(curr_idle),
703 iowait_usec: Some(curr_iowait),
704 irq_usec: Some(curr_irq),
705 softirq_usec: Some(curr_softirq),
706 stolen_usec: Some(curr_stolen),
707 ..
708 },
709 fb_procfs::CpuStat {
710 user_usec: Some(prev_user),
711 nice_usec: Some(prev_nice),
712 system_usec: Some(prev_system),
713 idle_usec: Some(prev_idle),
714 iowait_usec: Some(prev_iowait),
715 irq_usec: Some(prev_irq),
716 softirq_usec: Some(prev_softirq),
717 stolen_usec: Some(prev_stolen),
718 ..
719 },
720 ) => {
721 let idle_usec = curr_idle.saturating_sub(*prev_idle);
722 let iowait_usec = curr_iowait.saturating_sub(*prev_iowait);
723 let user_usec = curr_user.saturating_sub(*prev_user);
724 let system_usec = curr_system.saturating_sub(*prev_system);
725 let nice_usec = curr_nice.saturating_sub(*prev_nice);
726 let irq_usec = curr_irq.saturating_sub(*prev_irq);
727 let softirq_usec = curr_softirq.saturating_sub(*prev_softirq);
728 let stolen_usec = curr_stolen.saturating_sub(*prev_stolen);
729
730 let busy_usec =
731 user_usec + system_usec + nice_usec + irq_usec + softirq_usec + stolen_usec;
732 let total_usec = idle_usec + busy_usec + iowait_usec;
733 if total_usec > 0 {
734 Ok(((busy_usec as f64) / (total_usec as f64)).clamp(0.0, 1.0))
735 } else {
736 Ok(1.0)
737 }
738 }
739 _ => bail!("Missing stats in cpustat"),
740 }
741}
742
743fn copy_into_cstr(dst: &mut [i8], src: &str) {
744 let cstr = CString::new(src).unwrap();
745 let bytes = unsafe { std::mem::transmute::<&[u8], &[i8]>(cstr.as_bytes_with_nul()) };
746 dst[0..bytes.len()].copy_from_slice(bytes);
747}
748
749#[allow(clippy::needless_range_loop)]
750fn read_cpu_ctxs(skel: &BpfSkel) -> Result<Vec<bpf_intf::cpu_ctx>> {
751 let mut cpu_ctxs = vec![];
752 let cpu_ctxs_vec = skel
753 .maps
754 .cpu_ctxs
755 .lookup_percpu(&0u32.to_ne_bytes(), libbpf_rs::MapFlags::ANY)
756 .context("Failed to lookup cpu_ctx")?
757 .unwrap();
758 for cpu in 0..*NR_CPUS_POSSIBLE {
759 cpu_ctxs.push(
760 *plain::from_bytes(cpu_ctxs_vec[cpu].as_slice())
761 .expect("cpu_ctx: short or misaligned buffer"),
762 );
763 }
764 Ok(cpu_ctxs)
765}
766
767#[derive(Clone, Debug)]
768struct BpfStats {
769 gstats: Vec<u64>,
770 lstats: Vec<Vec<u64>>,
771 lstats_sums: Vec<u64>,
772 llc_lstats: Vec<Vec<Vec<u64>>>, }
774
775impl BpfStats {
776 #[allow(clippy::needless_range_loop)]
777 fn read(skel: &BpfSkel, cpu_ctxs: &[bpf_intf::cpu_ctx]) -> Self {
778 let nr_layers = skel.maps.rodata_data.as_ref().unwrap().nr_layers as usize;
779 let nr_llcs = skel.maps.rodata_data.as_ref().unwrap().nr_llcs as usize;
780 let mut gstats = vec![0u64; NR_GSTATS];
781 let mut lstats = vec![vec![0u64; NR_LSTATS]; nr_layers];
782 let mut llc_lstats = vec![vec![vec![0u64; NR_LLC_LSTATS]; nr_llcs]; nr_layers];
783
784 for cpu in 0..*NR_CPUS_POSSIBLE {
785 for (stat, value) in gstats.iter_mut().enumerate() {
786 *value += cpu_ctxs[cpu].gstats[stat];
787 }
788 for (layer, layer_stats) in lstats.iter_mut().enumerate() {
789 for (stat, value) in layer_stats.iter_mut().enumerate() {
790 *value += cpu_ctxs[cpu].lstats[layer][stat];
791 }
792 }
793 }
794
795 let mut lstats_sums = vec![0u64; NR_LSTATS];
796 for layer_stats in lstats.iter() {
797 for (stat, value) in lstats_sums.iter_mut().enumerate() {
798 *value += layer_stats[stat];
799 }
800 }
801
802 for llc_id in 0..nr_llcs {
803 let v = skel
809 .maps
810 .llc_data
811 .lookup(&(llc_id as u32).to_ne_bytes(), libbpf_rs::MapFlags::ANY)
812 .unwrap()
813 .unwrap();
814 let llcc: &bpf_intf::llc_ctx =
815 plain::from_bytes(v.as_slice()).expect("llc_ctx: short or misaligned buffer");
816
817 for (layer_id, layer_llc_stats) in llc_lstats.iter_mut().enumerate() {
818 for (stat_id, stat_value) in layer_llc_stats[llc_id].iter_mut().enumerate() {
819 *stat_value = llcc.lstats[layer_id][stat_id];
820 }
821 }
822 }
823
824 Self {
825 gstats,
826 lstats,
827 lstats_sums,
828 llc_lstats,
829 }
830 }
831}
832
833impl<'b> Sub<&'b BpfStats> for &BpfStats {
834 type Output = BpfStats;
835
836 fn sub(self, rhs: &'b BpfStats) -> BpfStats {
837 let vec_sub = |l: &[u64], r: &[u64]| l.iter().zip(r.iter()).map(|(l, r)| *l - *r).collect();
838 BpfStats {
839 gstats: vec_sub(&self.gstats, &rhs.gstats),
840 lstats: self
841 .lstats
842 .iter()
843 .zip(rhs.lstats.iter())
844 .map(|(l, r)| vec_sub(l, r))
845 .collect(),
846 lstats_sums: vec_sub(&self.lstats_sums, &rhs.lstats_sums),
847 llc_lstats: self
848 .llc_lstats
849 .iter()
850 .zip(rhs.llc_lstats.iter())
851 .map(|(l_layer, r_layer)| {
852 l_layer
853 .iter()
854 .zip(r_layer.iter())
855 .map(|(l_llc, r_llc)| {
856 let (l_llc, mut r_llc) = (l_llc.clone(), r_llc.clone());
857 r_llc[bpf_intf::llc_layer_stat_id_LLC_LSTAT_LAT as usize] = 0;
859 vec_sub(&l_llc, &r_llc)
860 })
861 .collect()
862 })
863 .collect(),
864 }
865 }
866}
867
868#[derive(Clone, Debug)]
869struct Stats {
870 at: Instant,
871 elapsed: Duration,
872 topo: Arc<Topology>,
873 nr_layers: usize,
874 nr_layer_tasks: Vec<usize>,
875 layer_nr_node_pinned_tasks: Vec<Vec<u64>>,
876
877 total_util: f64, layer_utils: Vec<Vec<f64>>,
879 layer_node_pinned_utils: Vec<Vec<f64>>,
880 prev_layer_node_pinned_usages: Vec<Vec<u64>>,
881 layer_node_utils: Vec<Vec<f64>>,
882 prev_layer_node_usages: Vec<Vec<u64>>,
883 layer_node_duty_sums: Vec<Vec<f64>>, prev_layer_node_duty_raw: Vec<Vec<u64>>, layer_membws: Vec<Vec<f64>>, prev_layer_membw_agg: Vec<Vec<u64>>, cpu_busy: f64, prev_total_cpu: fb_procfs::CpuStat,
891 prev_pmu_resctrl_membw: (u64, u64), util_compensation: bool,
894 layer_utils_compensated: Vec<Vec<f64>>, prev_cpu_layer_usages: Vec<u64>, prev_per_cpu_stats: BTreeMap<u32, fb_procfs::CpuStat>,
897
898 system_cpu_util_ewma: f64, layer_dsq_insert_ewma: Vec<f64>, bpf_stats: BpfStats,
902 prev_bpf_stats: BpfStats,
903
904 processing_dur: Duration,
905 prev_processing_dur: Duration,
906
907 layer_slice_us: Vec<u64>,
908
909 gpu_tasks_affinitized: u64,
910 gpu_task_affinitization_ms: u64,
911}
912
913impl Stats {
914 #[allow(clippy::needless_range_loop)]
915 fn read_layer_membw_agg(cpu_ctxs: &[bpf_intf::cpu_ctx], nr_layers: usize) -> Vec<Vec<u64>> {
916 let mut layer_membw_agg = vec![vec![0u64; NR_LAYER_USAGES]; nr_layers];
917
918 for cpu in 0..*NR_CPUS_POSSIBLE {
919 for (layer, layer_membw) in layer_membw_agg.iter_mut().enumerate() {
920 for (usage, value) in layer_membw.iter_mut().enumerate() {
921 *value += cpu_ctxs[cpu].layer_membw_agg[layer][usage];
922 }
923 }
924 }
925
926 layer_membw_agg
927 }
928
929 #[allow(clippy::needless_range_loop)]
930 fn read_layer_node_pinned_usages(
931 cpu_ctxs: &[bpf_intf::cpu_ctx],
932 topo: &Topology,
933 nr_layers: usize,
934 nr_nodes: usize,
935 ) -> Vec<Vec<u64>> {
936 let mut usages = vec![vec![0u64; nr_nodes]; nr_layers];
937
938 for cpu in 0..*NR_CPUS_POSSIBLE {
939 let node = topo.all_cpus.get(&cpu).map_or(0, |c| c.node_id);
940 for (layer, layer_usages) in usages.iter_mut().enumerate().take(nr_layers) {
941 layer_usages[node] += cpu_ctxs[cpu].node_pinned_usage[layer];
942 }
943 }
944
945 usages
946 }
947
948 #[allow(clippy::needless_range_loop)]
949 fn read_layer_node_usages(
950 cpu_ctxs: &[bpf_intf::cpu_ctx],
951 topo: &Topology,
952 nr_layers: usize,
953 nr_nodes: usize,
954 ) -> Vec<Vec<u64>> {
955 let mut usages = vec![vec![0u64; nr_nodes]; nr_layers];
956
957 for cpu in 0..*NR_CPUS_POSSIBLE {
958 let node = topo.all_cpus.get(&cpu).map_or(0, |c| c.node_id);
959 for (layer, layer_usages) in usages.iter_mut().enumerate().take(nr_layers) {
960 for usage in 0..=LAYER_USAGE_SUM_UPTO {
961 layer_usages[node] += cpu_ctxs[cpu].layer_usages[layer][usage];
962 }
963 }
964 }
965
966 usages
967 }
968
969 #[allow(clippy::needless_range_loop)]
970 fn read_layer_node_duty_raw(
971 cpu_ctxs: &[bpf_intf::cpu_ctx],
972 topo: &Topology,
973 nr_layers: usize,
974 nr_nodes: usize,
975 ) -> Vec<Vec<u64>> {
976 let mut sums = vec![vec![0u64; nr_nodes]; nr_layers];
977
978 for cpu in 0..*NR_CPUS_POSSIBLE {
979 let node = topo.all_cpus.get(&cpu).map_or(0, |c| c.node_id);
980 for (layer, layer_sums) in sums.iter_mut().enumerate().take(nr_layers) {
981 layer_sums[node] += cpu_ctxs[cpu].layer_duty_sum[layer];
982 }
983 }
984
985 sums
986 }
987
988 #[allow(clippy::needless_range_loop)]
989 fn read_per_cpu_layer_usages(cpu_ctxs: &[bpf_intf::cpu_ctx], nr_layers: usize) -> Vec<u64> {
990 let stride = nr_layers * NR_LAYER_USAGES;
991 let mut flat = vec![0u64; *NR_CPUS_POSSIBLE * stride];
992
993 for cpu in 0..*NR_CPUS_POSSIBLE {
994 let base = cpu * stride;
995 for layer in 0..nr_layers {
996 for usage in 0..NR_LAYER_USAGES {
997 flat[base + layer * NR_LAYER_USAGES + usage] =
998 cpu_ctxs[cpu].layer_usages[layer][usage];
999 }
1000 }
1001 }
1002
1003 flat
1004 }
1005
1006 fn resctrl_read_total_membw() -> Result<u64> {
1018 let mut total_membw = 0u64;
1019 for entry in WalkDir::new("/sys/fs/resctrl/mon_data")
1020 .min_depth(1)
1021 .into_iter()
1022 .filter_map(Result::ok)
1023 .filter(|x| x.path().is_dir())
1024 {
1025 let mut path = entry.path().to_path_buf();
1026 path.push("mbm_total_bytes");
1027 total_membw += fs::read_to_string(path)?.trim().parse::<u64>()?;
1028 }
1029
1030 Ok(total_membw)
1031 }
1032
1033 fn new(
1034 skel: &mut BpfSkel,
1035 proc_reader: &fb_procfs::ProcReader,
1036 topo: Arc<Topology>,
1037 gpu_task_affinitizer: &GpuTaskAffinitizer,
1038 util_compensation: bool,
1039 ) -> Result<Self> {
1040 let nr_layers = skel.maps.rodata_data.as_ref().unwrap().nr_layers as usize;
1041 let nr_nodes = topo.nodes.len();
1042 let cpu_ctxs = read_cpu_ctxs(skel)?;
1043 let bpf_stats = BpfStats::read(skel, &cpu_ctxs);
1044 let pmu_membw = Self::read_layer_membw_agg(&cpu_ctxs, nr_layers);
1045
1046 Ok(Self {
1047 at: Instant::now(),
1048 elapsed: Default::default(),
1049
1050 topo: topo.clone(),
1051 nr_layers,
1052 nr_layer_tasks: vec![0; nr_layers],
1053 layer_nr_node_pinned_tasks: vec![vec![0; nr_nodes]; nr_layers],
1054
1055 total_util: 0.0,
1056 layer_utils: vec![vec![0.0; NR_LAYER_USAGES]; nr_layers],
1057 layer_node_pinned_utils: vec![vec![0.0; nr_nodes]; nr_layers],
1058 prev_layer_node_pinned_usages: Self::read_layer_node_pinned_usages(
1059 &cpu_ctxs, &topo, nr_layers, nr_nodes,
1060 ),
1061 layer_node_utils: vec![vec![0.0; nr_nodes]; nr_layers],
1062 prev_layer_node_usages: Self::read_layer_node_usages(
1063 &cpu_ctxs, &topo, nr_layers, nr_nodes,
1064 ),
1065 layer_node_duty_sums: vec![vec![0.0; nr_nodes]; nr_layers],
1066 prev_layer_node_duty_raw: Self::read_layer_node_duty_raw(
1067 &cpu_ctxs, &topo, nr_layers, nr_nodes,
1068 ),
1069 layer_membws: vec![vec![0.0; NR_LAYER_USAGES]; nr_layers],
1070 prev_layer_membw_agg: pmu_membw,
1074 prev_pmu_resctrl_membw: (0, 0),
1075
1076 cpu_busy: 0.0,
1077 prev_total_cpu: read_total_cpu(proc_reader)?,
1078 util_compensation,
1079 layer_utils_compensated: vec![vec![0.0; NR_LAYER_USAGES]; nr_layers],
1080 prev_cpu_layer_usages: Self::read_per_cpu_layer_usages(&cpu_ctxs, nr_layers),
1081 prev_per_cpu_stats: BTreeMap::new(),
1082 system_cpu_util_ewma: 0.0,
1083 layer_dsq_insert_ewma: vec![0.0; nr_layers],
1084
1085 bpf_stats: bpf_stats.clone(),
1086 prev_bpf_stats: bpf_stats,
1087
1088 processing_dur: Default::default(),
1089 prev_processing_dur: Default::default(),
1090
1091 layer_slice_us: vec![0; nr_layers],
1092 gpu_tasks_affinitized: gpu_task_affinitizer.tasks_affinitized,
1093 gpu_task_affinitization_ms: gpu_task_affinitizer.last_task_affinitization_ms,
1094 })
1095 }
1096
1097 fn refresh(
1098 &mut self,
1099 skel: &mut BpfSkel,
1100 proc_reader: &fb_procfs::ProcReader,
1101 now: Instant,
1102 cur_processing_dur: Duration,
1103 gpu_task_affinitizer: &GpuTaskAffinitizer,
1104 ) -> Result<()> {
1105 let elapsed = now.duration_since(self.at);
1106 let elapsed_f64 = elapsed.as_secs_f64();
1107 let cpu_ctxs = read_cpu_ctxs(skel)?;
1108
1109 let layers = &skel.maps.bss_data.as_ref().unwrap().layers;
1110 let nr_layer_tasks: Vec<usize> = layers
1111 .iter()
1112 .take(self.nr_layers)
1113 .map(|layer| layer.nr_tasks as usize)
1114 .collect();
1115 let layer_nr_node_pinned_tasks: Vec<Vec<u64>> = layers
1116 .iter()
1117 .take(self.nr_layers)
1118 .map(|layer| {
1119 layer.node[..self.topo.nodes.len()]
1120 .iter()
1121 .map(|n| n.nr_pinned_tasks)
1122 .collect()
1123 })
1124 .collect();
1125 let layer_slice_us: Vec<u64> = layers
1126 .iter()
1127 .take(self.nr_layers)
1128 .map(|layer| layer.slice_ns / 1000_u64)
1129 .collect();
1130
1131 let cur_layer_node_pinned_usages = Self::read_layer_node_pinned_usages(
1132 &cpu_ctxs,
1133 &self.topo,
1134 self.nr_layers,
1135 self.topo.nodes.len(),
1136 );
1137 let cur_layer_node_usages = Self::read_layer_node_usages(
1138 &cpu_ctxs,
1139 &self.topo,
1140 self.nr_layers,
1141 self.topo.nodes.len(),
1142 );
1143 let cur_layer_node_duty_raw = Self::read_layer_node_duty_raw(
1144 &cpu_ctxs,
1145 &self.topo,
1146 self.nr_layers,
1147 self.topo.nodes.len(),
1148 );
1149 let cur_layer_membw_agg = Self::read_layer_membw_agg(&cpu_ctxs, self.nr_layers);
1150
1151 let (pmu_prev, resctrl_prev) = self.prev_pmu_resctrl_membw;
1159 let pmu_cur: u64 = cur_layer_membw_agg
1160 .iter()
1161 .map(|membw_agg| membw_agg[LAYER_USAGE_OPEN] + membw_agg[LAYER_USAGE_OWNED])
1162 .sum();
1163 let resctrl_cur = Self::resctrl_read_total_membw()?;
1164 let factor = (resctrl_cur - resctrl_prev) as f64 / (pmu_cur - pmu_prev) as f64;
1165
1166 let compute_diff = |cur_agg: &Vec<Vec<u64>>, prev_agg: &Vec<Vec<u64>>| {
1168 cur_agg
1169 .iter()
1170 .zip(prev_agg.iter())
1171 .map(|(cur, prev)| {
1172 cur.iter()
1173 .zip(prev.iter())
1174 .map(|(c, p)| (c - p) as f64 / 1_000_000_000.0 / elapsed_f64)
1175 .collect()
1176 })
1177 .collect()
1178 };
1179
1180 let compute_mem_diff = |cur_agg: &Vec<Vec<u64>>, prev_agg: &Vec<Vec<u64>>| {
1183 cur_agg
1184 .iter()
1185 .zip(prev_agg.iter())
1186 .map(|(cur, prev)| {
1187 cur.iter()
1188 .zip(prev.iter())
1189 .map(|(c, p)| (*c as i64 - *p as i64) as f64 / 1024_f64.powf(3.0))
1190 .collect()
1191 })
1192 .collect()
1193 };
1194
1195 let cur_layer_membw: Vec<Vec<f64>> =
1197 compute_mem_diff(&cur_layer_membw_agg, &self.prev_layer_membw_agg);
1198
1199 let cur_layer_membw: Vec<Vec<f64>> = cur_layer_membw
1200 .iter()
1201 .map(|x| x.iter().map(|x| *x * factor).collect())
1202 .collect();
1203
1204 let metric_decay =
1205 |cur_metric: Vec<Vec<f64>>, prev_metric: &Vec<Vec<f64>>, decay_rate: f64| {
1206 cur_metric
1207 .iter()
1208 .zip(prev_metric.iter())
1209 .map(|(cur, prev)| {
1210 cur.iter()
1211 .zip(prev.iter())
1212 .map(|(c, p)| {
1213 let decay = decay_rate.powf(elapsed_f64);
1214 p * decay + c * (1.0 - decay)
1215 })
1216 .collect()
1217 })
1218 .collect()
1219 };
1220
1221 let cur_node_pinned_utils: Vec<Vec<f64>> = compute_diff(
1222 &cur_layer_node_pinned_usages,
1223 &self.prev_layer_node_pinned_usages,
1224 );
1225 let layer_node_pinned_utils: Vec<Vec<f64>> = metric_decay(
1226 cur_node_pinned_utils,
1227 &self.layer_node_pinned_utils,
1228 *USAGE_DECAY,
1229 );
1230 let cur_node_utils: Vec<Vec<f64>> =
1231 compute_diff(&cur_layer_node_usages, &self.prev_layer_node_usages);
1232 let layer_node_utils: Vec<Vec<f64>> =
1233 metric_decay(cur_node_utils, &self.layer_node_utils, *USAGE_DECAY);
1234 let cur_node_duty: Vec<Vec<f64>> =
1235 compute_diff(&cur_layer_node_duty_raw, &self.prev_layer_node_duty_raw);
1236 let layer_node_duty_sums: Vec<Vec<f64>> =
1237 metric_decay(cur_node_duty, &self.layer_node_duty_sums, *USAGE_DECAY);
1238
1239 let layer_membws: Vec<Vec<f64>> = metric_decay(cur_layer_membw, &self.layer_membws, 0.0);
1240
1241 let proc_stat = proc_reader
1242 .read_stat()
1243 .context("Failed to read /proc/stat")?;
1244 let cur_total_cpu = proc_stat
1245 .total_cpu
1246 .ok_or_else(|| anyhow!("Could not read total cpu stat in proc"))?;
1247 let cpu_busy = calc_util(&cur_total_cpu, &self.prev_total_cpu)?;
1248
1249 const SYS_CPU_UTIL_EWMA_SECS: f64 = 10.0;
1251 let elapsed_f64 = elapsed.as_secs_f64();
1252 let alpha = elapsed_f64 / SYS_CPU_UTIL_EWMA_SECS.max(elapsed_f64);
1253 let system_cpu_util_ewma = alpha * cpu_busy + (1.0 - alpha) * self.system_cpu_util_ewma;
1254
1255 let cur_per_cpu_stats = proc_stat.cpus_map.unwrap_or_default();
1258 let mut cpu_scales = vec![1.0f64; *NR_CPUS_POSSIBLE];
1259 if self.util_compensation {
1260 for (&cpu_id, cur_cpu_stat) in &cur_per_cpu_stats {
1261 let cpu = cpu_id as usize;
1262 if let Some(prev_cpu_stat) = self.prev_per_cpu_stats.get(&cpu_id) {
1263 if let (
1264 fb_procfs::CpuStat {
1265 user_usec: Some(cu),
1266 nice_usec: Some(cn),
1267 system_usec: Some(cs),
1268 idle_usec: Some(ci),
1269 iowait_usec: Some(cw),
1270 irq_usec: Some(cq),
1271 softirq_usec: Some(cf),
1272 stolen_usec: Some(ct),
1273 ..
1274 },
1275 fb_procfs::CpuStat {
1276 user_usec: Some(pu),
1277 nice_usec: Some(pn),
1278 system_usec: Some(ps),
1279 idle_usec: Some(pi),
1280 iowait_usec: Some(pw),
1281 irq_usec: Some(pq),
1282 softirq_usec: Some(pf),
1283 stolen_usec: Some(pt),
1284 ..
1285 },
1286 ) = (cur_cpu_stat, prev_cpu_stat)
1287 {
1288 let delta_total = cu.saturating_sub(*pu)
1289 + cn.saturating_sub(*pn)
1290 + cs.saturating_sub(*ps)
1291 + ci.saturating_sub(*pi)
1292 + cw.saturating_sub(*pw)
1293 + cq.saturating_sub(*pq)
1294 + cf.saturating_sub(*pf)
1295 + ct.saturating_sub(*pt);
1296 let overhead = cq.saturating_sub(*pq)
1297 + cf.saturating_sub(*pf)
1298 + ct.saturating_sub(*pt);
1299 let available = delta_total.saturating_sub(overhead);
1300 cpu_scales[cpu] = if available > 0 {
1301 (delta_total as f64 / available as f64).clamp(1.0, 20.0)
1302 } else {
1303 1.0
1304 };
1305 }
1306 }
1307 }
1308 }
1309
1310 let cur_cpu_layer_usages = Self::read_per_cpu_layer_usages(&cpu_ctxs, self.nr_layers);
1314 let stride = self.nr_layers * NR_LAYER_USAGES;
1315 let mut raw_sums = vec![vec![0.0f64; NR_LAYER_USAGES]; self.nr_layers];
1316 let mut scaled_sums = vec![vec![0.0f64; NR_LAYER_USAGES]; self.nr_layers];
1317 #[allow(clippy::needless_range_loop)]
1318 for cpu in 0..*NR_CPUS_POSSIBLE {
1319 let scale = cpu_scales[cpu];
1320 let base = cpu * stride;
1321 for layer in 0..self.nr_layers {
1322 for usage in 0..NR_LAYER_USAGES {
1323 let idx = base + layer * NR_LAYER_USAGES + usage;
1324 let delta =
1325 cur_cpu_layer_usages[idx].saturating_sub(self.prev_cpu_layer_usages[idx]);
1326 if delta > 0 {
1327 let delta_f = delta as f64;
1328 raw_sums[layer][usage] += delta_f;
1329 scaled_sums[layer][usage] += delta_f * scale;
1330 }
1331 }
1332 }
1333 }
1334 let normalize = |sums: Vec<Vec<f64>>| -> Vec<Vec<f64>> {
1335 sums.into_iter()
1336 .map(|layer_sums| {
1337 layer_sums
1338 .into_iter()
1339 .map(|s| s / 1_000_000_000.0 / elapsed_f64)
1340 .collect()
1341 })
1342 .collect()
1343 };
1344 let layer_utils: Vec<Vec<f64>> =
1345 metric_decay(normalize(raw_sums), &self.layer_utils, *USAGE_DECAY);
1346 let layer_utils_compensated: Vec<Vec<f64>> = metric_decay(
1347 normalize(scaled_sums),
1348 &self.layer_utils_compensated,
1349 *USAGE_DECAY,
1350 );
1351
1352 let cur_bpf_stats = BpfStats::read(skel, &cpu_ctxs);
1353 let bpf_stats = &cur_bpf_stats - &self.prev_bpf_stats;
1354
1355 const DSQ_INSERT_EWMA_SECS: f64 = 10.0;
1357 let dsq_alpha = elapsed_f64 / DSQ_INSERT_EWMA_SECS.max(elapsed_f64);
1358 let layer_dsq_insert_ewma: Vec<f64> = (0..self.nr_layers)
1359 .map(|layer_id| {
1360 let sel_local = bpf_stats.lstats[layer_id]
1361 [bpf_intf::layer_stat_id_LSTAT_SEL_LOCAL as usize]
1362 as f64;
1363 let enq_local = bpf_stats.lstats[layer_id]
1364 [bpf_intf::layer_stat_id_LSTAT_ENQ_LOCAL as usize]
1365 as f64;
1366 let enq_dsq = bpf_stats.lstats[layer_id]
1367 [bpf_intf::layer_stat_id_LSTAT_ENQ_DSQ as usize]
1368 as f64;
1369 let total_dispatches = sel_local + enq_local + enq_dsq;
1370
1371 let cur_ratio = if total_dispatches > 0.0 {
1372 enq_dsq / total_dispatches
1373 } else {
1374 0.0
1375 };
1376
1377 dsq_alpha * cur_ratio + (1.0 - dsq_alpha) * self.layer_dsq_insert_ewma[layer_id]
1378 })
1379 .collect();
1380
1381 let processing_dur = cur_processing_dur
1382 .checked_sub(self.prev_processing_dur)
1383 .unwrap();
1384
1385 *self = Self {
1386 at: now,
1387 elapsed,
1388 topo: self.topo.clone(),
1389 nr_layers: self.nr_layers,
1390 nr_layer_tasks,
1391 layer_nr_node_pinned_tasks,
1392
1393 total_util: layer_utils
1394 .iter()
1395 .map(|x| x.iter().take(LAYER_USAGE_SUM_UPTO + 1).sum::<f64>())
1396 .sum(),
1397 layer_utils,
1398 layer_node_pinned_utils,
1399 prev_layer_node_pinned_usages: cur_layer_node_pinned_usages,
1400 layer_node_utils,
1401 prev_layer_node_usages: cur_layer_node_usages,
1402 layer_node_duty_sums,
1403 prev_layer_node_duty_raw: cur_layer_node_duty_raw,
1404
1405 layer_membws,
1406 prev_layer_membw_agg: cur_layer_membw_agg,
1407 prev_pmu_resctrl_membw: (pmu_cur, resctrl_cur),
1409
1410 cpu_busy,
1411 prev_total_cpu: cur_total_cpu,
1412 util_compensation: self.util_compensation,
1413 layer_utils_compensated,
1414 prev_cpu_layer_usages: cur_cpu_layer_usages,
1415 prev_per_cpu_stats: cur_per_cpu_stats,
1416 system_cpu_util_ewma,
1417 layer_dsq_insert_ewma,
1418
1419 bpf_stats,
1420 prev_bpf_stats: cur_bpf_stats,
1421
1422 processing_dur,
1423 prev_processing_dur: cur_processing_dur,
1424
1425 layer_slice_us,
1426 gpu_tasks_affinitized: gpu_task_affinitizer.tasks_affinitized,
1427 gpu_task_affinitization_ms: gpu_task_affinitizer.last_task_affinitization_ms,
1428 };
1429 Ok(())
1430 }
1431}
1432
1433#[derive(Debug)]
1434struct Layer {
1435 name: String,
1436 kind: LayerKind,
1437 growth_algo: LayerGrowthAlgo,
1438 core_order: Vec<Vec<usize>>,
1439
1440 assigned_llcs: Vec<Vec<usize>>,
1441
1442 nr_cpus: usize,
1443 nr_llc_cpus: Vec<usize>,
1444 nr_node_cpus: Vec<usize>,
1445 cpus: Cpumask,
1446 allowed_cpus: Cpumask,
1447
1448 nr_pinned_cpus: Vec<usize>,
1450}
1451
1452fn get_kallsyms_addr(sym_name: &str) -> Result<u64> {
1453 fs::read_to_string("/proc/kallsyms")?
1454 .lines()
1455 .find(|line| line.contains(sym_name))
1456 .and_then(|line| line.split_whitespace().next())
1457 .and_then(|addr| u64::from_str_radix(addr, 16).ok())
1458 .ok_or_else(|| anyhow!("Symbol '{}' not found", sym_name))
1459}
1460
1461fn resolve_cpus_pct_range(
1462 cpus_range: &Option<(usize, usize)>,
1463 cpus_range_frac: &Option<(f64, f64)>,
1464 max_cpus: usize,
1465) -> Result<(usize, usize)> {
1466 match (cpus_range, cpus_range_frac) {
1467 (Some(_x), Some(_y)) => {
1468 bail!("cpus_range cannot be used with cpus_pct.");
1469 }
1470 (Some((cpus_range_min, cpus_range_max)), None) => Ok((*cpus_range_min, *cpus_range_max)),
1471 (None, Some((cpus_frac_min, cpus_frac_max))) => {
1472 if *cpus_frac_min < 0_f64
1473 || *cpus_frac_min > 1_f64
1474 || *cpus_frac_max < 0_f64
1475 || *cpus_frac_max > 1_f64
1476 {
1477 bail!("cpus_range_frac values must be between 0.0 and 1.0");
1478 }
1479 let cpus_min_count = ((max_cpus as f64) * cpus_frac_min).round_ties_even() as usize;
1480 let cpus_max_count = ((max_cpus as f64) * cpus_frac_max).round_ties_even() as usize;
1481 Ok((
1482 std::cmp::max(cpus_min_count, 1),
1483 std::cmp::min(std::cmp::max(cpus_max_count, 1), max_cpus),
1484 ))
1485 }
1486 (None, None) => Ok((0, max_cpus)),
1487 }
1488}
1489
1490impl Layer {
1491 fn new(spec: &LayerSpec, topo: &Topology, core_order: &Vec<Vec<usize>>) -> Result<Self> {
1492 let name = &spec.name;
1493 let kind = spec.kind.clone();
1494 let mut allowed_cpus = Cpumask::new();
1495 match &kind {
1496 LayerKind::Confined {
1497 cpus_range,
1498 cpus_range_frac,
1499 common: LayerCommon { nodes, llcs, .. },
1500 ..
1501 } => {
1502 let cpus_range =
1503 resolve_cpus_pct_range(cpus_range, cpus_range_frac, topo.all_cpus.len())?;
1504 if cpus_range.0 > cpus_range.1 || cpus_range.1 == 0 {
1505 bail!("invalid cpus_range {:?}", cpus_range);
1506 }
1507 if nodes.is_empty() && llcs.is_empty() {
1508 allowed_cpus.set_all();
1509 } else {
1510 for (node_id, node) in &topo.nodes {
1512 if nodes.contains(node_id) {
1514 for &id in node.all_cpus.keys() {
1515 allowed_cpus.set_cpu(id)?;
1516 }
1517 }
1518 for (llc_id, llc) in &node.llcs {
1520 if llcs.contains(llc_id) {
1521 for &id in llc.all_cpus.keys() {
1522 allowed_cpus.set_cpu(id)?;
1523 }
1524 }
1525 }
1526 }
1527 }
1528 }
1529 LayerKind::Grouped {
1530 common: LayerCommon { nodes, llcs, .. },
1531 ..
1532 }
1533 | LayerKind::Open {
1534 common: LayerCommon { nodes, llcs, .. },
1535 ..
1536 } => {
1537 if nodes.is_empty() && llcs.is_empty() {
1538 allowed_cpus.set_all();
1539 } else {
1540 for (node_id, node) in &topo.nodes {
1542 if nodes.contains(node_id) {
1544 for &id in node.all_cpus.keys() {
1545 allowed_cpus.set_cpu(id)?;
1546 }
1547 }
1548 for (llc_id, llc) in &node.llcs {
1550 if llcs.contains(llc_id) {
1551 for &id in llc.all_cpus.keys() {
1552 allowed_cpus.set_cpu(id)?;
1553 }
1554 }
1555 }
1556 }
1557 }
1558 }
1559 }
1560
1561 if let Some(util_range) = kind.util_range() {
1564 if util_range.0 < 0.0 || util_range.1 < 0.0 || util_range.0 >= util_range.1 {
1565 bail!("invalid util_range {:?}", util_range);
1566 }
1567 }
1568
1569 let layer_growth_algo = kind.common().growth_algo.clone();
1570
1571 debug!(
1572 "layer: {} algo: {:?} core order: {:?}",
1573 name, &layer_growth_algo, core_order
1574 );
1575
1576 Ok(Self {
1577 name: name.into(),
1578 kind,
1579 growth_algo: layer_growth_algo,
1580 core_order: core_order.clone(),
1581
1582 assigned_llcs: vec![vec![]; topo.nodes.len()],
1583
1584 nr_cpus: 0,
1585 nr_llc_cpus: vec![0; topo.all_llcs.len()],
1586 nr_node_cpus: vec![0; topo.nodes.len()],
1587 cpus: Cpumask::new(),
1588 allowed_cpus,
1589
1590 nr_pinned_cpus: vec![0; topo.nodes.len()],
1591 })
1592 }
1593}
1594#[derive(Debug, Clone)]
1595struct NodeInfo {
1596 node_mask: nix::sched::CpuSet,
1597 _node_id: usize,
1598}
1599
1600#[derive(Debug)]
1601struct GpuTaskAffinitizer {
1602 gpu_devs_to_node_info: HashMap<u32, NodeInfo>,
1605 gpu_pids_to_devs: HashMap<Pid, u32>,
1606 last_process_time: Option<Instant>,
1607 sys: System,
1608 pid_map: HashMap<Pid, Vec<Pid>>,
1609 poll_interval: Duration,
1610 enable: bool,
1611 tasks_affinitized: u64,
1612 last_task_affinitization_ms: u64,
1613}
1614
1615impl GpuTaskAffinitizer {
1616 pub fn new(poll_interval: u64, enable: bool) -> GpuTaskAffinitizer {
1617 GpuTaskAffinitizer {
1618 gpu_devs_to_node_info: HashMap::new(),
1619 gpu_pids_to_devs: HashMap::new(),
1620 last_process_time: None,
1621 sys: System::default(),
1622 pid_map: HashMap::new(),
1623 poll_interval: Duration::from_secs(poll_interval),
1624 enable,
1625 tasks_affinitized: 0,
1626 last_task_affinitization_ms: 0,
1627 }
1628 }
1629
1630 fn find_one_cpu(&self, affinity: Vec<u64>) -> Result<u32> {
1631 for (chunk, &mask) in affinity.iter().enumerate() {
1632 let mut inner_offset: u64 = 1;
1633 for _ in 0..64 {
1634 if (mask & inner_offset) != 0 {
1635 return Ok((64 * chunk + u64::trailing_zeros(inner_offset) as usize) as u32);
1636 }
1637 inner_offset <<= 1;
1638 }
1639 }
1640 anyhow::bail!("unable to get CPU from NVML bitmask");
1641 }
1642
1643 fn node_to_cpuset(&self, node: &scx_utils::Node) -> Result<CpuSet> {
1644 let mut cpuset = CpuSet::new();
1645 for cpu_id in node.all_cpus.keys() {
1646 cpuset.set(*cpu_id)?;
1647 }
1648 Ok(cpuset)
1649 }
1650
1651 fn init_dev_node_map(&mut self, topo: Arc<Topology>) -> Result<()> {
1652 let nvml = nvml()?;
1653 let device_count = nvml.device_count()?;
1654
1655 for idx in 0..device_count {
1656 let dev = nvml.device_by_index(idx)?;
1657 let cpu = dev.cpu_affinity(16)?;
1659 let ideal_cpu = self.find_one_cpu(cpu)?;
1660 if let Some(cpu) = topo.all_cpus.get(&(ideal_cpu as usize)) {
1661 self.gpu_devs_to_node_info.insert(
1662 idx,
1663 NodeInfo {
1664 node_mask: self.node_to_cpuset(
1665 topo.nodes.get(&cpu.node_id).expect("topo missing node"),
1666 )?,
1667 _node_id: cpu.node_id,
1668 },
1669 );
1670 }
1671 }
1672 Ok(())
1673 }
1674
1675 fn update_gpu_pids(&mut self) -> Result<()> {
1676 let nvml = nvml()?;
1677 for i in 0..nvml.device_count()? {
1678 let device = nvml.device_by_index(i)?;
1679 for proc in device
1680 .running_compute_processes()?
1681 .into_iter()
1682 .chain(device.running_graphics_processes()?.into_iter())
1683 {
1684 self.gpu_pids_to_devs.insert(Pid::from_u32(proc.pid), i);
1685 }
1686 }
1687 Ok(())
1688 }
1689
1690 fn update_process_info(&mut self) -> Result<()> {
1691 self.sys.refresh_processes_specifics(
1692 ProcessesToUpdate::All,
1693 true,
1694 ProcessRefreshKind::nothing(),
1695 );
1696 self.pid_map.clear();
1697 for (pid, proc_) in self.sys.processes() {
1698 if let Some(ppid) = proc_.parent() {
1699 self.pid_map.entry(ppid).or_default().push(*pid);
1700 }
1701 }
1702 Ok(())
1703 }
1704
1705 fn get_child_pids_and_tids(&self, root_pid: Pid) -> HashSet<Pid> {
1706 let mut work = VecDeque::from([root_pid]);
1707 let mut pids_and_tids: HashSet<Pid> = HashSet::new();
1708
1709 while let Some(pid) = work.pop_front() {
1710 if pids_and_tids.insert(pid) {
1711 if let Some(kids) = self.pid_map.get(&pid) {
1712 work.extend(kids);
1713 }
1714 if let Some(proc_) = self.sys.process(pid) {
1715 if let Some(tasks) = proc_.tasks() {
1716 pids_and_tids.extend(tasks.iter().copied());
1717 }
1718 }
1719 }
1720 }
1721 pids_and_tids
1722 }
1723
1724 fn affinitize_gpu_pids(&mut self) -> Result<()> {
1725 if !self.enable {
1726 return Ok(());
1727 }
1728 for (pid, dev) in &self.gpu_pids_to_devs {
1729 let node_info = self
1730 .gpu_devs_to_node_info
1731 .get(dev)
1732 .expect("Unable to get gpu pid node mask");
1733 for child in self.get_child_pids_and_tids(*pid) {
1734 match nix::sched::sched_setaffinity(
1735 nix::unistd::Pid::from_raw(child.as_u32() as i32),
1736 &node_info.node_mask,
1737 ) {
1738 Ok(_) => {
1739 self.tasks_affinitized += 1;
1741 }
1742 Err(_) => {
1743 debug!(
1744 "Error affinitizing gpu pid {} to node {:#?}",
1745 child.as_u32(),
1746 node_info
1747 );
1748 }
1749 };
1750 }
1751 }
1752 Ok(())
1753 }
1754
1755 pub fn maybe_affinitize(&mut self) {
1756 if !self.enable {
1757 return;
1758 }
1759 let now = Instant::now();
1760
1761 if let Some(last_process_time) = self.last_process_time {
1762 if (now - last_process_time) < self.poll_interval {
1763 return;
1764 }
1765 }
1766
1767 match self.update_gpu_pids() {
1768 Ok(_) => {}
1769 Err(e) => {
1770 error!("Error updating GPU PIDs: {}", e);
1771 }
1772 };
1773 match self.update_process_info() {
1774 Ok(_) => {}
1775 Err(e) => {
1776 error!("Error updating process info to affinitize GPU PIDs: {}", e);
1777 }
1778 };
1779 match self.affinitize_gpu_pids() {
1780 Ok(_) => {}
1781 Err(e) => {
1782 error!("Error updating GPU PIDs: {}", e);
1783 }
1784 };
1785 self.last_process_time = Some(now);
1786 self.last_task_affinitization_ms = (Instant::now() - now).as_millis() as u64;
1787 }
1788
1789 pub fn init(&mut self, topo: Arc<Topology>) {
1790 if !self.enable || self.last_process_time.is_some() {
1791 return;
1792 }
1793
1794 match self.init_dev_node_map(topo) {
1795 Ok(_) => {}
1796 Err(e) => {
1797 error!("Error initializing gpu node dev map: {}", e);
1798 }
1799 };
1800 self.sys = System::new_all();
1801 }
1802}
1803
1804struct Scheduler<'a> {
1805 skel: BpfSkel<'a>,
1806 struct_ops: Option<libbpf_rs::Link>,
1807 layer_specs: Vec<LayerSpec>,
1808
1809 sched_intv: Duration,
1810 layer_refresh_intv: Duration,
1811
1812 cpu_pool: CpuPool,
1813 layers: Vec<Layer>,
1814 idle_qos_enabled: bool,
1815
1816 proc_reader: fb_procfs::ProcReader,
1817 sched_stats: Stats,
1818
1819 cgroup_regexes: Option<HashMap<u32, Regex>>,
1820
1821 nr_layer_cpus_ranges: Vec<(usize, usize)>,
1822 xnuma_mig_src: Vec<Vec<bool>>,
1823 growth_denied: Vec<Vec<bool>>,
1824 processing_dur: Duration,
1825
1826 topo: Arc<Topology>,
1827 netdevs: BTreeMap<String, NetDev>,
1828 stats_server: StatsServer<StatsReq, StatsRes>,
1829 gpu_task_handler: GpuTaskAffinitizer,
1830}
1831
1832const DUTY_CYCLE_SCALE: f64 = (1u64 << 20) as f64;
1833const XNUMA_RATE_DAMPEN: f64 = 0.5;
1834
1835struct XnumaRates {
1837 rates: Vec<Vec<u64>>,
1839}
1840
1841fn xnuma_check_active(
1854 duty_sums: &[f64],
1855 allocs: &[usize],
1856 threshold: (f64, f64),
1857 threshold_delta: (f64, f64),
1858 growth_denied: &[bool],
1859 currently_active: &[bool],
1860) -> Vec<bool> {
1861 let nr_nodes = duty_sums.len();
1862 let total_duty: f64 = duty_sums.iter().sum();
1863 let total_alloc: f64 = allocs.iter().map(|&a| a as f64).sum();
1864 let eq_ratio = if total_alloc > 0.0 {
1865 total_duty / total_alloc
1866 } else {
1867 0.0
1868 };
1869
1870 let (thresh_lo, thresh_hi) = threshold;
1871 let (delta_lo, delta_hi) = threshold_delta;
1872
1873 let mut result = vec![false; nr_nodes];
1874 for nid in 0..nr_nodes {
1875 let alloc = allocs[nid] as f64;
1876 if alloc <= 0.0 {
1877 if duty_sums[nid] > 0.0 && growth_denied[nid] {
1878 result[nid] = true;
1879 }
1880 continue;
1881 }
1882
1883 let load_ratio = duty_sums[nid] / alloc;
1884 let surplus = duty_sums[nid] - eq_ratio * alloc;
1885 let surplus_ratio = surplus / alloc;
1886
1887 let should_activate =
1888 load_ratio > thresh_hi && surplus_ratio > delta_hi && growth_denied[nid];
1889 let should_deactivate =
1890 load_ratio < thresh_lo || surplus_ratio < delta_lo || !growth_denied[nid];
1891
1892 if should_activate {
1893 result[nid] = true;
1894 } else if should_deactivate {
1895 result[nid] = false;
1896 } else {
1897 result[nid] = currently_active[nid];
1898 }
1899 }
1900 result
1901}
1902
1903fn xnuma_compute_rates(duty_sums: &[f64], allocs: &[usize]) -> XnumaRates {
1909 let nr_nodes = duty_sums.len();
1910 let total_duty: f64 = duty_sums.iter().sum();
1911 let total_alloc: f64 = allocs.iter().map(|&a| a as f64).sum();
1912
1913 if total_alloc <= 0.0 {
1914 return XnumaRates {
1915 rates: vec![vec![0u64; nr_nodes]; nr_nodes],
1916 };
1917 }
1918
1919 let eq_ratio = total_duty / total_alloc;
1920
1921 let mut surpluses = vec![0.0f64; nr_nodes];
1922 let mut deficits = vec![0.0f64; nr_nodes];
1923 for nid in 0..nr_nodes {
1924 let expected = eq_ratio * allocs[nid] as f64;
1925 let delta = duty_sums[nid] - expected;
1926 if delta > 0.0 {
1927 surpluses[nid] = delta;
1928 } else {
1929 deficits[nid] = -delta;
1930 }
1931 }
1932
1933 let total_deficit: f64 = deficits.iter().sum();
1934
1935 let mut rates = vec![vec![0u64; nr_nodes]; nr_nodes];
1936 for src in 0..nr_nodes {
1937 for dst in 0..nr_nodes {
1938 if src == dst || total_deficit <= 0.0 || surpluses[src] <= 0.0 {
1939 continue;
1940 }
1941 let migration = surpluses[src] * deficits[dst] / total_deficit * XNUMA_RATE_DAMPEN;
1944 rates[src][dst] = (migration * DUTY_CYCLE_SCALE) as u64;
1945 }
1946 }
1947
1948 XnumaRates { rates }
1949}
1950
1951impl<'a> Scheduler<'a> {
1952 fn init_layers(
1953 skel: &mut OpenBpfSkel,
1954 specs: &[LayerSpec],
1955 topo: &Topology,
1956 ) -> Result<HashMap<u32, Regex>> {
1957 skel.maps.rodata_data.as_mut().unwrap().nr_layers = specs.len() as u32;
1958 let mut perf_set = false;
1959
1960 let mut layer_iteration_order = (0..specs.len()).collect::<Vec<_>>();
1961 let mut layer_weights: Vec<usize> = vec![];
1962 let mut cgroup_regex_id = 0;
1963 let mut cgroup_regexes = HashMap::new();
1964
1965 for (spec_i, spec) in specs.iter().enumerate() {
1966 let layer = &mut skel.maps.bss_data.as_mut().unwrap().layers[spec_i];
1967
1968 for (or_i, or) in spec.matches.iter().enumerate() {
1969 for (and_i, and) in or.iter().enumerate() {
1970 let mt = &mut layer.matches[or_i].matches[and_i];
1971
1972 mt.exclude.write(false);
1974
1975 match and {
1976 LayerMatch::CgroupPrefix(prefix) => {
1977 mt.kind = bpf_intf::layer_match_kind_MATCH_CGROUP_PREFIX as i32;
1978 copy_into_cstr(&mut mt.cgroup_prefix, prefix.as_str());
1979 }
1980 LayerMatch::CgroupSuffix(suffix) => {
1981 mt.kind = bpf_intf::layer_match_kind_MATCH_CGROUP_SUFFIX as i32;
1982 copy_into_cstr(&mut mt.cgroup_suffix, suffix.as_str());
1983 }
1984 LayerMatch::CgroupRegex(regex_str) => {
1985 if cgroup_regex_id >= bpf_intf::consts_MAX_CGROUP_REGEXES {
1986 bail!(
1987 "Too many cgroup regex rules. Maximum allowed: {}",
1988 bpf_intf::consts_MAX_CGROUP_REGEXES
1989 );
1990 }
1991
1992 mt.kind = bpf_intf::layer_match_kind_MATCH_CGROUP_REGEX as i32;
1994 mt.cgroup_regex_id = cgroup_regex_id;
1995
1996 let regex = Regex::new(regex_str).with_context(|| {
1997 format!("Invalid regex '{}' in layer '{}'", regex_str, spec.name)
1998 })?;
1999 cgroup_regexes.insert(cgroup_regex_id, regex);
2000 cgroup_regex_id += 1;
2001 }
2002 LayerMatch::CgroupContains(substr) => {
2003 mt.kind = bpf_intf::layer_match_kind_MATCH_CGROUP_CONTAINS as i32;
2004 copy_into_cstr(&mut mt.cgroup_substr, substr.as_str());
2005 }
2006 LayerMatch::CommPrefix(prefix) => {
2007 mt.kind = bpf_intf::layer_match_kind_MATCH_COMM_PREFIX as i32;
2008 copy_into_cstr(&mut mt.comm_prefix, prefix.as_str());
2009 }
2010 LayerMatch::CommPrefixExclude(prefix) => {
2011 mt.kind = bpf_intf::layer_match_kind_MATCH_COMM_PREFIX as i32;
2012 mt.exclude.write(true);
2013 copy_into_cstr(&mut mt.comm_prefix, prefix.as_str());
2014 }
2015 LayerMatch::PcommPrefix(prefix) => {
2016 mt.kind = bpf_intf::layer_match_kind_MATCH_PCOMM_PREFIX as i32;
2017 copy_into_cstr(&mut mt.pcomm_prefix, prefix.as_str());
2018 }
2019 LayerMatch::PcommPrefixExclude(prefix) => {
2020 mt.kind = bpf_intf::layer_match_kind_MATCH_PCOMM_PREFIX as i32;
2021 mt.exclude.write(true);
2022 copy_into_cstr(&mut mt.pcomm_prefix, prefix.as_str());
2023 }
2024 LayerMatch::NiceAbove(nice) => {
2025 mt.kind = bpf_intf::layer_match_kind_MATCH_NICE_ABOVE as i32;
2026 mt.nice = *nice;
2027 }
2028 LayerMatch::NiceBelow(nice) => {
2029 mt.kind = bpf_intf::layer_match_kind_MATCH_NICE_BELOW as i32;
2030 mt.nice = *nice;
2031 }
2032 LayerMatch::NiceEquals(nice) => {
2033 mt.kind = bpf_intf::layer_match_kind_MATCH_NICE_EQUALS as i32;
2034 mt.nice = *nice;
2035 }
2036 LayerMatch::UIDEquals(user_id) => {
2037 mt.kind = bpf_intf::layer_match_kind_MATCH_USER_ID_EQUALS as i32;
2038 mt.user_id = *user_id;
2039 }
2040 LayerMatch::GIDEquals(group_id) => {
2041 mt.kind = bpf_intf::layer_match_kind_MATCH_GROUP_ID_EQUALS as i32;
2042 mt.group_id = *group_id;
2043 }
2044 LayerMatch::PIDEquals(pid) => {
2045 mt.kind = bpf_intf::layer_match_kind_MATCH_PID_EQUALS as i32;
2046 mt.pid = *pid;
2047 }
2048 LayerMatch::PPIDEquals(ppid) => {
2049 mt.kind = bpf_intf::layer_match_kind_MATCH_PPID_EQUALS as i32;
2050 mt.ppid = *ppid;
2051 }
2052 LayerMatch::TGIDEquals(tgid) => {
2053 mt.kind = bpf_intf::layer_match_kind_MATCH_TGID_EQUALS as i32;
2054 mt.tgid = *tgid;
2055 }
2056 LayerMatch::NSPIDEquals(nsid, pid) => {
2057 mt.kind = bpf_intf::layer_match_kind_MATCH_NSPID_EQUALS as i32;
2058 mt.nsid = *nsid;
2059 mt.pid = *pid;
2060 }
2061 LayerMatch::NSEquals(nsid) => {
2062 mt.kind = bpf_intf::layer_match_kind_MATCH_NS_EQUALS as i32;
2063 mt.nsid = *nsid as u64;
2064 }
2065 LayerMatch::CmdJoin(joincmd) => {
2066 mt.kind = bpf_intf::layer_match_kind_MATCH_SCXCMD_JOIN as i32;
2067 copy_into_cstr(&mut mt.comm_prefix, joincmd);
2068 }
2069 LayerMatch::IsGroupLeader(polarity) => {
2070 mt.kind = bpf_intf::layer_match_kind_MATCH_IS_GROUP_LEADER as i32;
2071 mt.is_group_leader.write(*polarity);
2072 }
2073 LayerMatch::IsKthread(polarity) => {
2074 mt.kind = bpf_intf::layer_match_kind_MATCH_IS_KTHREAD as i32;
2075 mt.is_kthread.write(*polarity);
2076 }
2077 LayerMatch::UsedGpuTid(polarity) => {
2078 mt.kind = bpf_intf::layer_match_kind_MATCH_USED_GPU_TID as i32;
2079 mt.used_gpu_tid.write(*polarity);
2080 }
2081 LayerMatch::UsedGpuPid(polarity) => {
2082 mt.kind = bpf_intf::layer_match_kind_MATCH_USED_GPU_PID as i32;
2083 mt.used_gpu_pid.write(*polarity);
2084 }
2085 LayerMatch::AvgRuntime(min, max) => {
2086 mt.kind = bpf_intf::layer_match_kind_MATCH_AVG_RUNTIME as i32;
2087 mt.min_avg_runtime_us = *min;
2088 mt.max_avg_runtime_us = *max;
2089 }
2090 LayerMatch::HintEquals(hint) => {
2091 mt.kind = bpf_intf::layer_match_kind_MATCH_HINT_EQUALS as i32;
2092 mt.hint = *hint;
2093 }
2094 LayerMatch::SystemCpuUtilBelow(threshold) => {
2095 mt.kind = bpf_intf::layer_match_kind_MATCH_SYSTEM_CPU_UTIL_BELOW as i32;
2096 mt.system_cpu_util_below = (*threshold * 10000.0) as u64;
2097 }
2098 LayerMatch::DsqInsertBelow(threshold) => {
2099 mt.kind = bpf_intf::layer_match_kind_MATCH_DSQ_INSERT_BELOW as i32;
2100 mt.dsq_insert_below = (*threshold * 10000.0) as u64;
2101 }
2102 LayerMatch::NumaNode(node_id) => {
2103 if *node_id as usize >= topo.nodes.len() {
2104 bail!(
2105 "Spec {:?} has invalid NUMA node ID {} (available nodes: 0-{})",
2106 spec.name,
2107 node_id,
2108 topo.nodes.len() - 1
2109 );
2110 }
2111 mt.kind = bpf_intf::layer_match_kind_MATCH_NUMA_NODE as i32;
2112 mt.numa_node_id = *node_id;
2113 }
2114 }
2115 }
2116 layer.matches[or_i].nr_match_ands = or.len() as i32;
2117 }
2118
2119 layer.nr_match_ors = spec.matches.len() as u32;
2120 layer.kind = spec.kind.as_bpf_enum();
2121
2122 {
2123 let LayerCommon {
2124 min_exec_us,
2125 yield_ignore,
2126 perf,
2127 preempt,
2128 preempt_first,
2129 exclusive,
2130 skip_remote_node,
2131 prev_over_idle_core,
2132 growth_algo,
2133 slice_us,
2134 fifo,
2135 weight,
2136 disallow_open_after_us,
2137 disallow_preempt_after_us,
2138 xllc_mig_min_us,
2139 placement,
2140 member_expire_ms,
2141 ..
2142 } = spec.kind.common();
2143
2144 layer.slice_ns = *slice_us * 1000;
2145 layer.fifo.write(*fifo);
2146 layer.min_exec_ns = min_exec_us * 1000;
2147 layer.yield_step_ns = if *yield_ignore > 0.999 {
2148 0
2149 } else if *yield_ignore < 0.001 {
2150 layer.slice_ns
2151 } else {
2152 (layer.slice_ns as f64 * (1.0 - *yield_ignore)) as u64
2153 };
2154 let mut layer_name: String = spec.name.clone();
2155 layer_name.truncate(MAX_LAYER_NAME);
2156 copy_into_cstr(&mut layer.name, layer_name.as_str());
2157 layer.preempt.write(*preempt);
2158 layer.preempt_first.write(*preempt_first);
2159 layer.excl.write(*exclusive);
2160 layer.skip_remote_node.write(*skip_remote_node);
2161 layer.prev_over_idle_core.write(*prev_over_idle_core);
2162 layer.growth_algo = growth_algo.as_bpf_enum();
2163 layer.weight = *weight;
2164 layer.member_expire_ms = *member_expire_ms;
2165 layer.disallow_open_after_ns = match disallow_open_after_us.unwrap() {
2166 v if v == u64::MAX => v,
2167 v => v * 1000,
2168 };
2169 layer.disallow_preempt_after_ns = match disallow_preempt_after_us.unwrap() {
2170 v if v == u64::MAX => v,
2171 v => v * 1000,
2172 };
2173 layer.xllc_mig_min_ns = (xllc_mig_min_us * 1000.0) as u64;
2174 layer_weights.push(layer.weight.try_into().unwrap());
2175 layer.perf = u32::try_from(*perf)?;
2176
2177 let task_place = |place: u32| crate::types::layer_task_place(place);
2178 layer.task_place = match placement {
2179 LayerPlacement::Standard => {
2180 task_place(bpf_intf::layer_task_place_PLACEMENT_STD)
2181 }
2182 LayerPlacement::Sticky => {
2183 task_place(bpf_intf::layer_task_place_PLACEMENT_STICK)
2184 }
2185 LayerPlacement::Floating => {
2186 task_place(bpf_intf::layer_task_place_PLACEMENT_FLOAT)
2187 }
2188 };
2189 }
2190
2191 layer.is_protected.write(match spec.kind {
2192 LayerKind::Open { .. } => false,
2193 LayerKind::Confined { protected, .. } | LayerKind::Grouped { protected, .. } => {
2194 protected
2195 }
2196 });
2197
2198 layer.idle_confined.write(match spec.kind {
2199 LayerKind::Grouped { idle_confined, .. } => idle_confined,
2200 _ => false,
2201 });
2202
2203 match &spec.cpuset {
2204 Some(mask) => {
2205 Self::update_cpumask(mask, &mut layer.cpuset);
2206 layer.has_cpuset.write(true);
2207 }
2208 None => {
2209 for i in 0..layer.cpuset.len() {
2210 layer.cpuset[i] = u8::MAX;
2211 }
2212 layer.has_cpuset.write(false);
2213 }
2214 };
2215
2216 perf_set |= layer.perf > 0;
2217 }
2218
2219 layer_iteration_order.sort_by(|i, j| layer_weights[*i].cmp(&layer_weights[*j]));
2220 for (idx, layer_idx) in layer_iteration_order.iter().enumerate() {
2221 skel.maps
2222 .rodata_data
2223 .as_mut()
2224 .unwrap()
2225 .layer_iteration_order[idx] = *layer_idx as u32;
2226 }
2227
2228 if perf_set && !compat::ksym_exists("scx_bpf_cpuperf_set")? {
2229 warn!("cpufreq support not available, ignoring perf configurations");
2230 }
2231
2232 Ok(cgroup_regexes)
2233 }
2234
2235 fn init_nodes(skel: &mut OpenBpfSkel, _opts: &Opts, topo: &Topology) {
2236 skel.maps.rodata_data.as_mut().unwrap().nr_nodes = topo.nodes.len() as u32;
2237 skel.maps.rodata_data.as_mut().unwrap().nr_llcs = 0;
2238
2239 for (&node_id, node) in &topo.nodes {
2240 debug!("configuring node {}, LLCs {:?}", node_id, node.llcs.len());
2241 skel.maps.rodata_data.as_mut().unwrap().nr_llcs += node.llcs.len() as u32;
2242 let raw_numa_slice = node.span.as_raw_slice();
2243 let node_cpumask_slice =
2244 &mut skel.maps.rodata_data.as_mut().unwrap().numa_cpumasks[node_id];
2245 let (left, _) = node_cpumask_slice.split_at_mut(raw_numa_slice.len());
2246 left.clone_from_slice(raw_numa_slice);
2247 debug!(
2248 "node {} mask: {:?}",
2249 node_id,
2250 skel.maps.rodata_data.as_ref().unwrap().numa_cpumasks[node_id]
2251 );
2252
2253 for llc in node.llcs.values() {
2254 debug!("configuring llc {:?} for node {:?}", llc.id, node_id);
2255 skel.maps.rodata_data.as_mut().unwrap().llc_numa_id_map[llc.id] = node_id as u32;
2256 }
2257 }
2258
2259 for cpu in topo.all_cpus.values() {
2260 skel.maps.rodata_data.as_mut().unwrap().cpu_llc_id_map[cpu.id] = cpu.llc_id as u32;
2261 }
2262 }
2263
2264 fn init_cpu_prox_map(topo: &Topology, cpu_ctxs: &mut [bpf_intf::cpu_ctx]) {
2265 let radiate = |mut vec: Vec<usize>, center_id: usize| -> Vec<usize> {
2266 vec.sort_by_key(|&id| (center_id as i32 - id as i32).abs());
2267 vec
2268 };
2269 let radiate_cpu =
2270 |mut vec: Vec<usize>, center_cpu: usize, center_core: usize| -> Vec<usize> {
2271 vec.sort_by_key(|&id| {
2272 (
2273 (center_core as i32 - topo.all_cpus.get(&id).unwrap().core_id as i32).abs(),
2274 (center_cpu as i32 - id as i32).abs(),
2275 )
2276 });
2277 vec
2278 };
2279
2280 for (&cpu_id, cpu) in &topo.all_cpus {
2281 let mut core_span = topo.all_cores[&cpu.core_id].span.clone();
2283 let llc_span = &topo.all_llcs[&cpu.llc_id].span;
2284 let node_span = &topo.nodes[&cpu.node_id].span;
2285 let sys_span = &topo.span;
2286
2287 let sys_span = sys_span.and(&node_span.not());
2289 let node_span = node_span.and(&llc_span.not());
2290 let llc_span = llc_span.and(&core_span.not());
2291 core_span.clear_cpu(cpu_id).unwrap();
2292
2293 let mut sys_order: Vec<usize> = sys_span.iter().collect();
2295 let mut node_order: Vec<usize> = node_span.iter().collect();
2296 let mut llc_order: Vec<usize> = llc_span.iter().collect();
2297 let mut core_order: Vec<usize> = core_span.iter().collect();
2298
2299 sys_order = radiate_cpu(sys_order, cpu_id, cpu.core_id);
2304 node_order = radiate(node_order, cpu.node_id);
2305 llc_order = radiate_cpu(llc_order, cpu_id, cpu.core_id);
2306 core_order = radiate_cpu(core_order, cpu_id, cpu.core_id);
2307
2308 let mut order: Vec<usize> = vec![];
2310 let mut idx: usize = 0;
2311
2312 idx += 1;
2313 order.push(cpu_id);
2314
2315 idx += core_order.len();
2316 order.append(&mut core_order);
2317 let core_end = idx;
2318
2319 idx += llc_order.len();
2320 order.append(&mut llc_order);
2321 let llc_end = idx;
2322
2323 idx += node_order.len();
2324 order.append(&mut node_order);
2325 let node_end = idx;
2326
2327 idx += sys_order.len();
2328 order.append(&mut sys_order);
2329 let sys_end = idx;
2330
2331 debug!(
2332 "CPU[{}] proximity map[{}/{}/{}/{}]: {:?}",
2333 cpu_id, core_end, llc_end, node_end, sys_end, &order
2334 );
2335
2336 let pmap = &mut cpu_ctxs[cpu_id].prox_map;
2338 for (i, &cpu) in order.iter().enumerate() {
2339 pmap.cpus[i] = cpu as u16;
2340 }
2341 pmap.core_end = core_end as u32;
2342 pmap.llc_end = llc_end as u32;
2343 pmap.node_end = node_end as u32;
2344 pmap.sys_end = sys_end as u32;
2345 }
2346 }
2347
2348 fn convert_cpu_ctxs(cpu_ctxs: Vec<bpf_intf::cpu_ctx>) -> Vec<Vec<u8>> {
2349 cpu_ctxs
2350 .iter()
2351 .map(|cpu_ctx| unsafe { plain::as_bytes(cpu_ctx) }.to_vec())
2352 .collect()
2353 }
2354
2355 fn init_cpus(skel: &BpfSkel, layer_specs: &[LayerSpec], topo: &Topology) -> Result<()> {
2356 let key = (0_u32).to_ne_bytes();
2357 let mut cpu_ctxs: Vec<bpf_intf::cpu_ctx> = vec![];
2358 let cpu_ctxs_vec = skel
2359 .maps
2360 .cpu_ctxs
2361 .lookup_percpu(&key, libbpf_rs::MapFlags::ANY)
2362 .context("Failed to lookup cpu_ctx")?
2363 .unwrap();
2364
2365 let op_layers: Vec<u32> = layer_specs
2366 .iter()
2367 .enumerate()
2368 .filter(|(_idx, spec)| match &spec.kind {
2369 LayerKind::Open { .. } => spec.kind.common().preempt,
2370 _ => false,
2371 })
2372 .map(|(idx, _)| idx as u32)
2373 .collect();
2374 let on_layers: Vec<u32> = layer_specs
2375 .iter()
2376 .enumerate()
2377 .filter(|(_idx, spec)| match &spec.kind {
2378 LayerKind::Open { .. } => !spec.kind.common().preempt,
2379 _ => false,
2380 })
2381 .map(|(idx, _)| idx as u32)
2382 .collect();
2383 let gp_layers: Vec<u32> = layer_specs
2384 .iter()
2385 .enumerate()
2386 .filter(|(_idx, spec)| match &spec.kind {
2387 LayerKind::Grouped { .. } => spec.kind.common().preempt,
2388 _ => false,
2389 })
2390 .map(|(idx, _)| idx as u32)
2391 .collect();
2392 let gn_layers: Vec<u32> = layer_specs
2393 .iter()
2394 .enumerate()
2395 .filter(|(_idx, spec)| match &spec.kind {
2396 LayerKind::Grouped { .. } => !spec.kind.common().preempt,
2397 _ => false,
2398 })
2399 .map(|(idx, _)| idx as u32)
2400 .collect();
2401
2402 for cpu in 0..*NR_CPUS_POSSIBLE {
2404 cpu_ctxs.push(
2405 *plain::from_bytes(cpu_ctxs_vec[cpu].as_slice())
2406 .expect("cpu_ctx: short or misaligned buffer"),
2407 );
2408
2409 let topo_cpu = topo.all_cpus.get(&cpu).unwrap();
2410 let is_big = topo_cpu.core_type == CoreType::Big { turbo: true };
2411 cpu_ctxs[cpu].cpu = cpu as i32;
2412 cpu_ctxs[cpu].layer_id = MAX_LAYERS as u32;
2413 cpu_ctxs[cpu].is_big = is_big;
2414
2415 fastrand::seed(cpu as u64);
2416
2417 let mut ogp_order = op_layers.clone();
2418 ogp_order.append(&mut gp_layers.clone());
2419 fastrand::shuffle(&mut ogp_order);
2420
2421 let mut ogn_order = on_layers.clone();
2422 ogn_order.append(&mut gn_layers.clone());
2423 fastrand::shuffle(&mut ogn_order);
2424
2425 let mut op_order = op_layers.clone();
2426 fastrand::shuffle(&mut op_order);
2427
2428 let mut on_order = on_layers.clone();
2429 fastrand::shuffle(&mut on_order);
2430
2431 let mut gp_order = gp_layers.clone();
2432 fastrand::shuffle(&mut gp_order);
2433
2434 let mut gn_order = gn_layers.clone();
2435 fastrand::shuffle(&mut gn_order);
2436
2437 for i in 0..MAX_LAYERS {
2438 cpu_ctxs[cpu].ogp_layer_order[i] =
2439 ogp_order.get(i).cloned().unwrap_or(MAX_LAYERS as u32);
2440 cpu_ctxs[cpu].ogn_layer_order[i] =
2441 ogn_order.get(i).cloned().unwrap_or(MAX_LAYERS as u32);
2442
2443 cpu_ctxs[cpu].op_layer_order[i] =
2444 op_order.get(i).cloned().unwrap_or(MAX_LAYERS as u32);
2445 cpu_ctxs[cpu].on_layer_order[i] =
2446 on_order.get(i).cloned().unwrap_or(MAX_LAYERS as u32);
2447 cpu_ctxs[cpu].gp_layer_order[i] =
2448 gp_order.get(i).cloned().unwrap_or(MAX_LAYERS as u32);
2449 cpu_ctxs[cpu].gn_layer_order[i] =
2450 gn_order.get(i).cloned().unwrap_or(MAX_LAYERS as u32);
2451 }
2452 }
2453
2454 Self::init_cpu_prox_map(topo, &mut cpu_ctxs);
2455
2456 skel.maps
2457 .cpu_ctxs
2458 .update_percpu(
2459 &key,
2460 &Self::convert_cpu_ctxs(cpu_ctxs),
2461 libbpf_rs::MapFlags::ANY,
2462 )
2463 .context("Failed to update cpu_ctx")?;
2464
2465 Ok(())
2466 }
2467
2468 fn init_single_prox_map_per_llc(
2469 skel: &mut BpfSkel,
2470 topo: &Topology,
2471 prox_map_idx: &usize,
2472 ) -> Result<()> {
2473 for (&llc_id, llc) in &topo.all_llcs {
2474 let mut node_order: Vec<usize> =
2476 topo.nodes[&llc.node_id].llcs.keys().cloned().collect();
2477 let mut sys_order: Vec<usize> = topo.all_llcs.keys().cloned().collect();
2478
2479 sys_order.retain(|id| !node_order.contains(id));
2481 node_order.retain(|&id| id != llc_id);
2482
2483 fastrand::seed((*prox_map_idx as u64) << 32 | llc_id as u64);
2486 fastrand::shuffle(&mut sys_order);
2487 fastrand::shuffle(&mut node_order);
2488
2489 let mut order: Vec<usize> = vec![];
2491 let mut idx: usize = 0;
2492
2493 idx += 1;
2494 order.push(llc_id);
2495
2496 idx += node_order.len();
2497 order.append(&mut node_order);
2498 let node_end = idx;
2499
2500 idx += sys_order.len();
2501 order.append(&mut sys_order);
2502 let sys_end = idx;
2503
2504 debug!(
2505 "LLC[{}] proximity map {}[{}/{}]: {:?}",
2506 llc_id, prox_map_idx, node_end, sys_end, &order
2507 );
2508
2509 let key = (llc_id as u32).to_ne_bytes();
2514 let v = skel
2515 .maps
2516 .llc_data
2517 .lookup(&key, libbpf_rs::MapFlags::ANY)
2518 .unwrap()
2519 .unwrap();
2520 let mut llcc: bpf_intf::llc_ctx =
2521 *plain::from_bytes(v.as_slice()).expect("llc_ctx: short or misaligned buffer");
2522
2523 let pmap = &mut llcc.prox_maps[*prox_map_idx];
2524 for (i, &llc_id) in order.iter().enumerate() {
2525 pmap.llcs[i] = llc_id as u16;
2526 }
2527 pmap.node_end = node_end as u32;
2528 pmap.sys_end = sys_end as u32;
2529
2530 skel.maps.llc_data.update(
2531 &key,
2532 unsafe { plain::as_bytes(&llcc) },
2533 libbpf_rs::MapFlags::ANY,
2534 )?
2535 }
2536
2537 Ok(())
2538 }
2539
2540 fn init_llc_prox_map(skel: &mut BpfSkel, topo: &Topology) -> Result<()> {
2541 let num_proximity_maps = bpf_intf::consts_NUM_PROXIMITY_MAPS as usize;
2542 for prox_map_idx in 0..num_proximity_maps {
2543 Self::init_single_prox_map_per_llc(skel, topo, &prox_map_idx)?;
2544 }
2545
2546 Ok(())
2547 }
2548
2549 fn init_node_prox_map(skel: &mut BpfSkel, topo: &Topology) -> Result<()> {
2550 for (&node_id, node) in &topo.nodes {
2551 let mut order: Vec<(usize, usize)> = node
2552 .distance
2553 .iter()
2554 .enumerate()
2555 .filter(|&(nid, _)| nid != node_id)
2556 .map(|(nid, &dist)| (nid, dist))
2557 .collect();
2558 order.sort_by_key(|&(_, dist)| dist);
2559
2560 let key = (node_id as u32).to_ne_bytes();
2561
2562 let v = skel.maps.node_data.lookup(&key, libbpf_rs::MapFlags::ANY);
2564 let mut nodec: bpf_intf::node_ctx = match v {
2565 Ok(Some(v)) => {
2566 *plain::from_bytes(v.as_slice()).expect("node_ctx: short or misaligned buffer")
2567 }
2568 _ => unsafe { MaybeUninit::zeroed().assume_init() },
2569 };
2570
2571 let pmap = &mut nodec.prox_map;
2572 for (i, &(nid, _)) in order.iter().enumerate() {
2573 pmap.nodes[i] = nid as u16;
2574 }
2575 pmap.sys_end = order.len() as u32;
2576
2577 debug!(
2578 "NODE[{}] prox_map[{}]: {:?}",
2579 node_id,
2580 pmap.sys_end,
2581 &order.iter().map(|(n, d)| (*n, *d)).collect::<Vec<_>>()
2582 );
2583
2584 skel.maps.node_data.update(
2585 &key,
2586 unsafe { plain::as_bytes(&nodec) },
2587 libbpf_rs::MapFlags::ANY,
2588 )?;
2589 }
2590 Ok(())
2591 }
2592
2593 fn init_node_ctx(skel: &mut BpfSkel, topo: &Topology, nr_layers: usize) -> Result<()> {
2594 let all_layers: Vec<u32> = (0..nr_layers as u32).collect();
2595 let node_empty_layers: Vec<Vec<u32>> =
2596 (0..topo.nodes.len()).map(|_| all_layers.clone()).collect();
2597 Self::refresh_node_ctx(skel, topo, &node_empty_layers, true);
2598 Ok(())
2599 }
2600
2601 fn init(
2602 opts: &'a Opts,
2603 layer_specs: &[LayerSpec],
2604 open_object: &'a mut MaybeUninit<OpenObject>,
2605 hint_to_layer_map: &HashMap<u64, HintLayerInfo>,
2606 membw_tracking: bool,
2607 ) -> Result<Self> {
2608 let nr_layers = layer_specs.len();
2609 let mut disable_topology = opts.disable_topology.unwrap_or(false);
2610
2611 let topo = Arc::new(if disable_topology {
2612 Topology::with_flattened_llc_node()?
2613 } else if opts.topology.virt_llc.is_some() {
2614 Topology::with_args(&opts.topology)?
2615 } else {
2616 Topology::new()?
2617 });
2618
2619 if topo.nodes.keys().enumerate().any(|(i, &k)| i != k) {
2626 bail!("Holes in node IDs detected: {:?}", topo.nodes.keys());
2627 }
2628 if topo.all_llcs.keys().enumerate().any(|(i, &k)| i != k) {
2629 bail!("Holes in LLC IDs detected: {:?}", topo.all_llcs.keys());
2630 }
2631 if topo.all_cpus.keys().enumerate().any(|(i, &k)| i != k) {
2632 bail!("Holes in CPU IDs detected: {:?}", topo.all_cpus.keys());
2633 }
2634
2635 let netdevs = if opts.netdev_irq_balance {
2636 let devs = read_netdevs()?;
2637 let total_irqs: usize = devs.values().map(|d| d.irqs.len()).sum();
2638 let breakdown = devs
2639 .iter()
2640 .map(|(iface, d)| format!("{iface}={}", d.irqs.len()))
2641 .collect::<Vec<_>>()
2642 .join(", ");
2643 info!(
2644 "Netdev IRQ balancing enabled: overriding {total_irqs} IRQ{} \
2645 across {} interface{} [{breakdown}]",
2646 if total_irqs == 1 { "" } else { "s" },
2647 devs.len(),
2648 if devs.len() == 1 { "" } else { "s" },
2649 );
2650 devs
2651 } else {
2652 BTreeMap::new()
2653 };
2654
2655 if !disable_topology {
2656 if topo.nodes.len() == 1 && topo.nodes[&0].llcs.len() == 1 {
2657 disable_topology = true;
2658 };
2659 info!(
2660 "Topology awareness not specified, selecting {} based on hardware",
2661 if disable_topology {
2662 "disabled"
2663 } else {
2664 "enabled"
2665 }
2666 );
2667 };
2668
2669 let cpu_pool = CpuPool::new(topo.clone(), opts.allow_partial_core)?;
2670
2671 let layer_specs: Vec<_> = if disable_topology {
2674 info!("Disabling topology awareness");
2675 layer_specs
2676 .iter()
2677 .cloned()
2678 .map(|mut s| {
2679 s.kind.common_mut().nodes.clear();
2680 s.kind.common_mut().llcs.clear();
2681 s
2682 })
2683 .collect()
2684 } else {
2685 layer_specs.to_vec()
2686 };
2687
2688 for spec in layer_specs.iter() {
2690 let mut seen = BTreeSet::new();
2691 for &node_id in spec.nodes().iter() {
2692 if !topo.nodes.contains_key(&node_id) {
2693 bail!(
2694 "layer {:?}: nodes references node {} which does not \
2695 exist in the topology (available: {:?})",
2696 spec.name,
2697 node_id,
2698 topo.nodes.keys().collect::<Vec<_>>()
2699 );
2700 }
2701 if !seen.insert(node_id) {
2702 bail!(
2703 "layer {:?}: nodes contains duplicate node {}",
2704 spec.name,
2705 node_id
2706 );
2707 }
2708 }
2709
2710 seen.clear();
2711 for &llc_id in spec.llcs().iter() {
2712 if !topo.all_llcs.contains_key(&llc_id) {
2713 bail!(
2714 "layer {:?}: llcs references LLC {} which does not \
2715 exist in the topology (available: {:?})",
2716 spec.name,
2717 llc_id,
2718 topo.all_llcs.keys().collect::<Vec<_>>()
2719 );
2720 }
2721 if !seen.insert(llc_id) {
2722 bail!(
2723 "layer {:?}: llcs contains duplicate LLC {}",
2724 spec.name,
2725 llc_id
2726 );
2727 }
2728 }
2729 }
2730
2731 for spec in layer_specs.iter() {
2732 let has_numa_node_match = spec
2733 .matches
2734 .iter()
2735 .flatten()
2736 .any(|m| matches!(m, LayerMatch::NumaNode(_)));
2737 let has_node_spread_algo = matches!(
2738 spec.kind.common().growth_algo,
2739 LayerGrowthAlgo::NodeSpread
2740 | LayerGrowthAlgo::NodeSpreadReverse
2741 | LayerGrowthAlgo::NodeSpreadRandom
2742 );
2743 if has_numa_node_match && has_node_spread_algo {
2744 bail!(
2745 "layer {:?}: NumaNode matcher cannot be combined with {:?} \
2746 growth algorithm. NodeSpread* allocates CPUs equally across \
2747 ALL NUMA nodes, but NumaNode restricts tasks to one node's \
2748 CPUs — CPUs on other nodes are wasted and utilization \
2749 will never exceed 1/numa_nodes. Use a non-spread algorithm \
2750 (e.g. Linear, Topo) instead.",
2751 spec.name,
2752 spec.kind.common().growth_algo
2753 );
2754 }
2755 }
2756
2757 init_libbpf_logging(None);
2759 let kfuncs_in_syscall = scx_bpf_compat::kfuncs_supported_in_syscall()?;
2760 if !kfuncs_in_syscall {
2761 warn!("Using slow path: kfuncs not supported in syscall programs (a8e03b6bbb2c ∉ ker)");
2762 }
2763
2764 let debug_level = if opts.log_level.contains("trace") {
2766 2
2767 } else if opts.log_level.contains("debug") {
2768 1
2769 } else {
2770 0
2771 };
2772 let mut skel_builder = BpfSkelBuilder::default();
2773 skel_builder.obj_builder.debug(debug_level > 1);
2774
2775 info!(
2776 "Running scx_layered (build ID: {})",
2777 build_id::full_version(env!("CARGO_PKG_VERSION"))
2778 );
2779 let open_opts = opts.libbpf.clone().into_bpf_open_opts();
2780 let mut skel = scx_ops_open!(skel_builder, open_object, layered, open_opts)?;
2781
2782 skel.progs.scx_pmu_switch_tc.set_autoload(membw_tracking);
2784 skel.progs.scx_pmu_tick_tc.set_autoload(membw_tracking);
2785
2786 let mut loaded_kprobes = HashSet::new();
2787
2788 if opts.enable_gpu_support {
2791 if opts.gpu_kprobe_level >= 1 {
2794 compat::cond_kprobe_load("nvidia_open", &skel.progs.kprobe_nvidia_open)?;
2795 loaded_kprobes.insert("nvidia_open");
2796 }
2797 if opts.gpu_kprobe_level >= 2 {
2800 compat::cond_kprobe_load("nvidia_mmap", &skel.progs.kprobe_nvidia_mmap)?;
2801 loaded_kprobes.insert("nvidia_mmap");
2802 }
2803 if opts.gpu_kprobe_level >= 3 {
2804 compat::cond_kprobe_load("nvidia_poll", &skel.progs.kprobe_nvidia_poll)?;
2805 loaded_kprobes.insert("nvidia_poll");
2806 }
2807 }
2808
2809 let ext_sched_class_addr = get_kallsyms_addr("ext_sched_class");
2810 let idle_sched_class_addr = get_kallsyms_addr("idle_sched_class");
2811
2812 let event = if membw_tracking {
2813 setup_membw_tracking(&mut skel)?
2814 } else {
2815 0
2816 };
2817
2818 let rodata = skel.maps.rodata_data.as_mut().unwrap();
2819
2820 if let (Ok(ext_addr), Ok(idle_addr)) = (ext_sched_class_addr, idle_sched_class_addr) {
2821 rodata.ext_sched_class_addr = ext_addr;
2822 rodata.idle_sched_class_addr = idle_addr;
2823 } else {
2824 warn!(
2825 "Unable to get sched_class addresses from /proc/kallsyms, disabling skip_preempt."
2826 );
2827 }
2828
2829 rodata.slice_ns = scx_enums.SCX_SLICE_DFL;
2830 rodata.max_exec_ns = 20 * scx_enums.SCX_SLICE_DFL;
2831
2832 skel.struct_ops.layered_mut().exit_dump_len = opts.exit_dump_len;
2834
2835 if !opts.disable_queued_wakeup {
2836 match *compat::SCX_OPS_ALLOW_QUEUED_WAKEUP {
2837 0 => info!("Kernel does not support queued wakeup optimization"),
2838 v => skel.struct_ops.layered_mut().flags |= v,
2839 }
2840 }
2841
2842 rodata.percpu_kthread_preempt = !opts.disable_percpu_kthread_preempt;
2843 rodata.percpu_kthread_preempt_all =
2844 !opts.disable_percpu_kthread_preempt && opts.percpu_kthread_preempt_all;
2845 rodata.debug = debug_level as u32;
2846 rodata.slice_ns = opts.slice_us * 1000;
2847 rodata.max_exec_ns = if opts.max_exec_us > 0 {
2848 opts.max_exec_us * 1000
2849 } else {
2850 opts.slice_us * 1000 * 20
2851 };
2852 rodata.nr_cpu_ids = *NR_CPU_IDS as u32;
2853 rodata.nr_possible_cpus = *NR_CPUS_POSSIBLE as u32;
2854 rodata.smt_enabled = topo.smt_enabled;
2855 rodata.has_little_cores = topo.has_little_cores();
2856 rodata.antistall_sec = opts.antistall_sec;
2857 rodata.monitor_disable = opts.monitor_disable;
2858 rodata.lo_fb_wait_ns = opts.lo_fb_wait_us * 1000;
2859 rodata.lo_fb_share_ppk = ((opts.lo_fb_share * 1024.0) as u32).clamp(1, 1024);
2860 rodata.enable_antistall = !opts.disable_antistall;
2861 rodata.enable_match_debug = opts.enable_match_debug;
2862 rodata.enable_gpu_support = opts.enable_gpu_support;
2863 rodata.kfuncs_supported_in_syscall = kfuncs_in_syscall;
2864
2865 for (cpu, sib) in topo.sibling_cpus().iter().enumerate() {
2866 rodata.__sibling_cpu[cpu] = *sib;
2867 }
2868 for cpu in topo.all_cpus.keys() {
2869 rodata.all_cpus[cpu / 8] |= 1 << (cpu % 8);
2870 }
2871
2872 rodata.nr_op_layers = layer_specs
2873 .iter()
2874 .filter(|spec| match &spec.kind {
2875 LayerKind::Open { .. } => spec.kind.common().preempt,
2876 _ => false,
2877 })
2878 .count() as u32;
2879 rodata.nr_on_layers = layer_specs
2880 .iter()
2881 .filter(|spec| match &spec.kind {
2882 LayerKind::Open { .. } => !spec.kind.common().preempt,
2883 _ => false,
2884 })
2885 .count() as u32;
2886 rodata.nr_gp_layers = layer_specs
2887 .iter()
2888 .filter(|spec| match &spec.kind {
2889 LayerKind::Grouped { .. } => spec.kind.common().preempt,
2890 _ => false,
2891 })
2892 .count() as u32;
2893 rodata.nr_gn_layers = layer_specs
2894 .iter()
2895 .filter(|spec| match &spec.kind {
2896 LayerKind::Grouped { .. } => !spec.kind.common().preempt,
2897 _ => false,
2898 })
2899 .count() as u32;
2900 rodata.nr_excl_layers = layer_specs
2901 .iter()
2902 .filter(|spec| spec.kind.common().exclusive)
2903 .count() as u32;
2904
2905 let mut min_open = u64::MAX;
2906 let mut min_preempt = u64::MAX;
2907
2908 for spec in layer_specs.iter() {
2909 if let LayerKind::Open { common, .. } = &spec.kind {
2910 min_open = min_open.min(common.disallow_open_after_us.unwrap());
2911 min_preempt = min_preempt.min(common.disallow_preempt_after_us.unwrap());
2912 }
2913 }
2914
2915 rodata.min_open_layer_disallow_open_after_ns = match min_open {
2916 u64::MAX => *DFL_DISALLOW_OPEN_AFTER_US,
2917 v => v,
2918 };
2919 rodata.min_open_layer_disallow_preempt_after_ns = match min_preempt {
2920 u64::MAX => *DFL_DISALLOW_PREEMPT_AFTER_US,
2921 v => v,
2922 };
2923
2924 let layered_task_hint_map_path = &opts.task_hint_map;
2929 let hint_map = &mut skel.maps.scx_layered_task_hint_map;
2930 if !layered_task_hint_map_path.is_empty() {
2932 hint_map.set_pin_path(layered_task_hint_map_path).unwrap();
2933 rodata.task_hint_map_enabled = true;
2934 }
2935
2936 if !opts.hi_fb_thread_name.is_empty() {
2937 let bpf_hi_fb_thread_name = &mut rodata.hi_fb_thread_name;
2938 copy_into_cstr(bpf_hi_fb_thread_name, opts.hi_fb_thread_name.as_str());
2939 rodata.enable_hi_fb_thread_name_match = true;
2940 }
2941
2942 let cgroup_regexes = Self::init_layers(&mut skel, &layer_specs, &topo)?;
2943 skel.maps.rodata_data.as_mut().unwrap().nr_cgroup_regexes = cgroup_regexes.len() as u32;
2944 Self::init_nodes(&mut skel, opts, &topo);
2945
2946 let mut skel = scx_ops_load!(skel, layered, uei)?;
2947
2948 if !hint_to_layer_map.is_empty() {
2950 for (k, v) in hint_to_layer_map.iter() {
2951 let key: u32 = *k as u32;
2952
2953 let mut info_bytes = vec![0u8; std::mem::size_of::<bpf_intf::hint_layer_info>()];
2955 let info_ptr = info_bytes.as_mut_ptr() as *mut bpf_intf::hint_layer_info;
2956 unsafe {
2957 (*info_ptr).layer_id = v.layer_id as u32;
2958 (*info_ptr).system_cpu_util_below = match v.system_cpu_util_below {
2959 Some(threshold) => (threshold * 10000.0) as u64,
2960 None => u64::MAX, };
2962 (*info_ptr).dsq_insert_below = match v.dsq_insert_below {
2963 Some(threshold) => (threshold * 10000.0) as u64,
2964 None => u64::MAX, };
2966 }
2967
2968 skel.maps.hint_to_layer_id_map.update(
2969 &key.to_ne_bytes(),
2970 &info_bytes,
2971 libbpf_rs::MapFlags::ANY,
2972 )?;
2973 }
2974 }
2975
2976 if membw_tracking {
2977 create_perf_fds(&mut skel, event)?;
2978 }
2979
2980 let mut layers = vec![];
2981 let layer_growth_orders =
2982 LayerGrowthAlgo::layer_core_orders(&cpu_pool, &layer_specs, &topo)?;
2983 for (idx, spec) in layer_specs.iter().enumerate() {
2984 let growth_order = layer_growth_orders
2985 .get(&idx)
2986 .with_context(|| "layer has no growth order".to_string())?;
2987 layers.push(Layer::new(spec, &topo, growth_order)?);
2988 }
2989
2990 let mut idle_qos_enabled = layers
2991 .iter()
2992 .any(|layer| layer.kind.common().idle_resume_us.unwrap_or(0) > 0);
2993 if idle_qos_enabled && !cpu_idle_resume_latency_supported() {
2994 warn!("idle_resume_us not supported, ignoring");
2995 idle_qos_enabled = false;
2996 }
2997
2998 Self::init_cpus(&skel, &layer_specs, &topo)?;
2999 Self::init_llc_prox_map(&mut skel, &topo)?;
3000 Self::init_node_prox_map(&mut skel, &topo)?;
3001 Self::init_node_ctx(&mut skel, &topo, nr_layers)?;
3002
3003 let proc_reader = fb_procfs::ProcReader::new();
3005
3006 let input = ProgramInput {
3008 ..Default::default()
3009 };
3010 let prog = &mut skel.progs.initialize_pid_namespace;
3011
3012 let _ = prog.test_run(input);
3013
3014 if !layered_task_hint_map_path.is_empty() {
3023 let path = CString::new(layered_task_hint_map_path.as_bytes()).unwrap();
3024 let mode: libc::mode_t = 0o666;
3025 unsafe {
3026 if libc::chmod(path.as_ptr(), mode) != 0 {
3027 trace!("'chmod' to 666 of task hint map failed, continuing...");
3028 }
3029 }
3030 }
3031
3032 let struct_ops = scx_ops_attach!(skel, layered)?;
3034
3035 if opts.enable_gpu_support {
3037 if loaded_kprobes.contains("nvidia_open") {
3038 compat::cond_kprobe_attach("nvidia_open", &skel.progs.kprobe_nvidia_open)?;
3039 }
3040 if loaded_kprobes.contains("nvidia_mmap") {
3041 compat::cond_kprobe_attach("nvidia_mmap", &skel.progs.kprobe_nvidia_mmap)?;
3042 }
3043 if loaded_kprobes.contains("nvidia_poll") {
3044 compat::cond_kprobe_attach("nvidia_poll", &skel.progs.kprobe_nvidia_poll)?;
3045 }
3046 }
3047
3048 let stats_server = StatsServer::new(stats::server_data()).launch()?;
3049 let mut gpu_task_handler =
3050 GpuTaskAffinitizer::new(opts.gpu_affinitize_secs, opts.enable_gpu_affinitize);
3051 gpu_task_handler.init(topo.clone());
3052
3053 let sched = Self {
3054 struct_ops: Some(struct_ops),
3055 layer_specs,
3056
3057 sched_intv: Duration::from_secs_f64(opts.interval),
3058 layer_refresh_intv: Duration::from_millis(opts.layer_refresh_ms_avgruntime),
3059
3060 cpu_pool,
3061 layers,
3062 idle_qos_enabled,
3063
3064 sched_stats: Stats::new(
3065 &mut skel,
3066 &proc_reader,
3067 topo.clone(),
3068 &gpu_task_handler,
3069 opts.util_compensation,
3070 )?,
3071
3072 cgroup_regexes: Some(cgroup_regexes),
3073 nr_layer_cpus_ranges: vec![(0, 0); nr_layers],
3074 xnuma_mig_src: vec![vec![false; topo.nodes.len()]; nr_layers],
3075 growth_denied: vec![vec![false; topo.nodes.len()]; nr_layers],
3076 processing_dur: Default::default(),
3077
3078 proc_reader,
3079 skel,
3080
3081 topo,
3082 netdevs,
3083 stats_server,
3084 gpu_task_handler,
3085 };
3086
3087 info!("Layered Scheduler Attached. Run `scx_layered --monitor` for metrics.");
3088
3089 Ok(sched)
3090 }
3091
3092 fn update_cpumask(mask: &Cpumask, bpfmask: &mut [u8]) {
3093 for cpu in 0..mask.len() {
3094 if mask.test_cpu(cpu) {
3095 bpfmask[cpu / 8] |= 1 << (cpu % 8);
3096 } else {
3097 bpfmask[cpu / 8] &= !(1 << (cpu % 8));
3098 }
3099 }
3100 }
3101
3102 fn update_bpf_layer_cpumask(layer: &Layer, bpf_layer: &mut types::layer) {
3103 trace!("[{}] Updating BPF CPUs: {}", layer.name, &layer.cpus);
3104 Self::update_cpumask(&layer.cpus, &mut bpf_layer.cpus);
3105
3106 bpf_layer.nr_cpus = layer.nr_cpus as u32;
3107 for (llc_id, &nr_llc_cpus) in layer.nr_llc_cpus.iter().enumerate() {
3108 bpf_layer.nr_llc_cpus[llc_id] = nr_llc_cpus as u32;
3109 }
3110 for (node_id, &nr_node_cpus) in layer.nr_node_cpus.iter().enumerate() {
3111 bpf_layer.node[node_id].nr_cpus = nr_node_cpus as u32;
3112 }
3113
3114 bpf_layer.refresh_cpus = 1;
3115 }
3116
3117 fn update_netdev_cpumasks(&mut self) -> Result<()> {
3118 let available_cpus = self.cpu_pool.available_cpus();
3119 if available_cpus.is_empty() {
3120 return Ok(());
3121 }
3122
3123 for (iface, netdev) in self.netdevs.iter_mut() {
3124 let node = self
3125 .topo
3126 .nodes
3127 .values()
3128 .find(|n| n.id == netdev.node())
3129 .ok_or_else(|| anyhow!("Failed to get netdev node"))?;
3130 let node_cpus = node.span.clone();
3131 for (irq, irqmask) in netdev.irqs.iter_mut() {
3132 irqmask.clear_all();
3133 for cpu in available_cpus.iter() {
3134 if !node_cpus.test_cpu(cpu) {
3135 continue;
3136 }
3137 let _ = irqmask.set_cpu(cpu);
3138 }
3139 if irqmask.weight() == 0 {
3141 for cpu in node_cpus.iter() {
3142 let _ = irqmask.set_cpu(cpu);
3143 }
3144 }
3145 trace!("{} updating irq {} cpumask {:?}", iface, irq, irqmask);
3146 }
3147 netdev.apply_cpumasks()?;
3148 debug!(
3149 "{iface}: applied affinity override to {} IRQ{}",
3150 netdev.irqs.len(),
3151 if netdev.irqs.len() == 1 { "" } else { "s" },
3152 );
3153 }
3154
3155 Ok(())
3156 }
3157
3158 fn clamp_target_by_membw(
3159 &self,
3160 layer: &Layer,
3161 membw_limit: f64,
3162 membw: f64,
3163 curtarget: u64,
3164 ) -> usize {
3165 let ncpu: u64 = layer.cpus.weight() as u64;
3166 let membw = (membw * 1024_f64.powf(3.0)).round() as u64;
3167 let membw_limit = (membw_limit * 1024_f64.powf(3.0)).round() as u64;
3168 let last_membw_percpu = if ncpu > 0 { membw / ncpu } else { 0 };
3169
3170 if membw_limit == 0 || last_membw_percpu == 0 {
3173 return curtarget as usize;
3174 }
3175
3176 (membw_limit / last_membw_percpu) as usize
3177 }
3178
3179 fn calc_raw_demands(&self, targets: &[(usize, usize)]) -> Vec<LayerDemand> {
3183 let au = self.cpu_pool.alloc_unit();
3184 let pinned_utils = &self.sched_stats.layer_node_pinned_utils;
3185 let nr_nodes = self.topo.nodes.len();
3186
3187 targets
3188 .iter()
3189 .enumerate()
3190 .map(|(idx, &(target, _min))| {
3191 let layer = &self.layers[idx];
3192 let weight = layer.kind.common().weight as usize;
3193
3194 if matches!(layer.kind, LayerKind::Open { .. }) {
3196 return LayerDemand {
3197 raw_pinned: vec![0; nr_nodes],
3198 raw_unpinned: 0,
3199 weight,
3200 spread: false,
3201 };
3202 }
3203
3204 let spread = matches!(
3209 layer.growth_algo,
3210 LayerGrowthAlgo::NodeSpread
3211 | LayerGrowthAlgo::NodeSpreadReverse
3212 | LayerGrowthAlgo::NodeSpreadRandom
3213 );
3214
3215 let util_high = match &layer.kind {
3216 LayerKind::Confined { util_range, .. }
3217 | LayerKind::Grouped { util_range, .. } => util_range.1,
3218 _ => 1.0,
3219 };
3220
3221 let mut raw_pinned = vec![0usize; nr_nodes];
3223 for n in 0..nr_nodes {
3224 let pu = pinned_utils[idx][n];
3225 if pu < 0.01 {
3226 continue;
3227 }
3228 let node_span = &self.topo.nodes[&n].span;
3230 if layer.allowed_cpus.and(node_span).is_empty() {
3231 continue;
3232 }
3233 let cpus = (pu / util_high).ceil() as usize;
3234 let units = cpus.div_ceil(au);
3236 raw_pinned[n] = units;
3237 }
3238
3239 let target_units = target.div_ceil(au);
3241 let pinned_units: usize = raw_pinned.iter().sum();
3242 let raw_unpinned = target_units.saturating_sub(pinned_units);
3243
3244 LayerDemand {
3245 raw_pinned,
3246 raw_unpinned,
3247 weight,
3248 spread,
3249 }
3250 })
3251 .collect()
3252 }
3253
3254 fn calc_target_nr_cpus(&self) -> Vec<(usize, usize)> {
3262 let nr_cpus = self.cpu_pool.topo.all_cpus.len();
3263 let utils = if self.sched_stats.util_compensation {
3264 &self.sched_stats.layer_utils_compensated
3265 } else {
3266 &self.sched_stats.layer_utils
3267 };
3268 let membws = &self.sched_stats.layer_membws;
3269
3270 let mut records: Vec<(u64, u64, u64, usize, usize, usize)> = vec![];
3271 let mut targets: Vec<(usize, usize)> = vec![];
3272
3273 for (idx, layer) in self.layers.iter().enumerate() {
3274 targets.push(match &layer.kind {
3275 LayerKind::Confined {
3276 util_range,
3277 cpus_range,
3278 cpus_range_frac,
3279 membw_gb,
3280 ..
3281 }
3282 | LayerKind::Grouped {
3283 util_range,
3284 cpus_range,
3285 cpus_range_frac,
3286 membw_gb,
3287 ..
3288 } => {
3289 let cpus_range =
3290 resolve_cpus_pct_range(cpus_range, cpus_range_frac, nr_cpus).unwrap();
3291
3292 let owned = utils[idx][LAYER_USAGE_OWNED];
3297 let open = utils[idx][LAYER_USAGE_OPEN];
3298
3299 let membw_owned = membws[idx][LAYER_USAGE_OWNED];
3300 let membw_open = membws[idx][LAYER_USAGE_OPEN];
3301
3302 let mut util = owned;
3303 let mut membw = membw_owned;
3304 if layer.kind.util_includes_open_cputime() || layer.nr_cpus == 0 {
3305 util += open;
3306 membw += membw_open;
3307 }
3308
3309 let util = if util < 0.01 { 0.0 } else { util };
3310
3311 let low = (util / util_range.1).ceil() as usize;
3312 let high = ((util / util_range.0).floor() as usize).max(low);
3313
3314 let membw_limit = match membw_gb {
3315 Some(membw_limit) => *membw_limit,
3316 None => 0.0,
3317 };
3318
3319 trace!(
3320 "layer {0} (membw, membw_limit): ({membw} gi_b, {membw_limit} gi_b)",
3321 layer.name
3322 );
3323
3324 let target = layer.cpus.weight().clamp(low, high);
3325
3326 records.push((
3327 (owned * 100.0) as u64,
3328 (open * 100.0) as u64,
3329 (util * 100.0) as u64,
3330 low,
3331 high,
3332 target,
3333 ));
3334
3335 let target = target.clamp(cpus_range.0, cpus_range.1);
3336 let membw_target =
3337 self.clamp_target_by_membw(layer, membw_limit, membw, target as u64);
3338
3339 trace!("CPU target pre- and post-membw adjustment: {target} -> {membw_target}");
3340
3341 if membw_target < cpus_range.0 {
3344 warn!("cannot satisfy memory bw limit for layer {}", layer.name);
3345 warn!("membw_target {membw_target} low {}", cpus_range.0);
3346 };
3347
3348 let target = membw_target.clamp(cpus_range.0, target);
3351
3352 (target, cpus_range.0)
3353 }
3354 LayerKind::Open { .. } => (0, 0),
3355 });
3356 }
3357
3358 trace!("(owned, open, util, low, high, target): {:?}", &records);
3359 targets
3360 }
3361
3362 fn compute_target_llcs(target: usize, topo: &Topology) -> (usize, usize) {
3366 let cores_per_llc = topo.all_cores.len() / topo.all_llcs.len();
3368 let cpus_per_core = topo.all_cores.first_key_value().unwrap().1.cpus.len();
3370 let cpus_per_llc = cores_per_llc * cpus_per_core;
3371
3372 let full = target / cpus_per_llc;
3373 let extra = target % cpus_per_llc;
3374
3375 (full, extra.div_ceil(cpus_per_core))
3376 }
3377
3378 fn recompute_layer_core_order(
3383 &mut self,
3384 layer_targets: &[(usize, usize)],
3385 layer_allocs: &[LayerAlloc],
3386 au: usize,
3387 ) -> Result<bool> {
3388 let nr_nodes = self.topo.nodes.len();
3389
3390 debug!(
3392 " free: before pass: free_llcs={:?}",
3393 self.cpu_pool.free_llcs
3394 );
3395 for &(idx, _) in layer_targets.iter().rev() {
3396 let layer = &mut self.layers[idx];
3397
3398 if layer.growth_algo != LayerGrowthAlgo::StickyDynamic {
3399 continue;
3400 }
3401
3402 let alloc = &layer_allocs[idx];
3403
3404 for n in 0..nr_nodes {
3405 let assigned_on_n = layer.assigned_llcs[n].len();
3406 let target_full_n =
3407 Self::compute_target_llcs(alloc.node_target(n) * au, &self.topo).0;
3408 let mut to_free = assigned_on_n.saturating_sub(target_full_n);
3409
3410 debug!(
3411 " free: layer={} node={} assigned={} target_full={} to_free={}",
3412 layer.name, n, assigned_on_n, target_full_n, to_free,
3413 );
3414
3415 while to_free > 0 {
3416 if let Some(llc) = layer.assigned_llcs[n].pop() {
3417 self.cpu_pool.return_llc(llc);
3418 to_free -= 1;
3419 debug!(" layer={} freed_llc={} from node={}", layer.name, llc, n);
3420 } else {
3421 break;
3422 }
3423 }
3424 }
3425 }
3426 debug!(" free: after pass: free_llcs={:?}", self.cpu_pool.free_llcs);
3427
3428 for &(idx, _) in layer_targets.iter().rev() {
3430 let layer = &mut self.layers[idx];
3431
3432 if layer.growth_algo != LayerGrowthAlgo::StickyDynamic {
3433 continue;
3434 }
3435
3436 let alloc = &layer_allocs[idx];
3437
3438 for n in 0..nr_nodes {
3439 let cur_on_n = layer.assigned_llcs[n].len();
3440 let target_full_n =
3441 Self::compute_target_llcs(alloc.node_target(n) * au, &self.topo).0;
3442 let mut to_alloc = target_full_n.saturating_sub(cur_on_n);
3443
3444 debug!(
3445 " alloc: layer={} node={} cur={} target_full={} to_alloc={} free={}",
3446 layer.name,
3447 n,
3448 cur_on_n,
3449 target_full_n,
3450 to_alloc,
3451 self.cpu_pool.free_llcs.get(&n).map_or(0, |v| v.len()),
3452 );
3453
3454 while to_alloc > 0 {
3455 if let Some(llc) = self.cpu_pool.take_llc_from_node(n) {
3456 layer.assigned_llcs[n].push(llc);
3457 to_alloc -= 1;
3458 debug!(" layer={} alloc_llc={} on node={}", layer.name, llc, n);
3459 } else {
3460 break;
3461 }
3462 }
3463 }
3464
3465 debug!(
3466 " alloc: layer={} assigned_llcs={:?}",
3467 layer.name, layer.assigned_llcs
3468 );
3469 }
3470
3471 let cores_per_llc = self.topo.all_cores.len() / self.topo.all_llcs.len();
3473 let cpus_per_core = self.topo.all_cores.first_key_value().unwrap().1.cpus.len();
3474 let cpus_per_llc = cores_per_llc * cpus_per_core;
3475
3476 for &(idx, _) in layer_targets.iter() {
3477 let layer = &mut self.layers[idx];
3478
3479 if layer.growth_algo != LayerGrowthAlgo::StickyDynamic {
3480 continue;
3481 }
3482
3483 layer.core_order = vec![Vec::new(); nr_nodes];
3484 let alloc = &layer_allocs[idx];
3485
3486 for n in 0..nr_nodes {
3487 let mut extra = Self::compute_target_llcs(alloc.node_target(n) * au, &self.topo).1;
3488
3489 if let Some(node_llcs) = self.cpu_pool.free_llcs.get_mut(&n) {
3490 for entry in node_llcs.iter_mut() {
3491 if extra == 0 {
3492 break;
3493 }
3494 let avail = cpus_per_llc - entry.1;
3495 let mut used = extra.min(avail);
3496 let cores_to_add = used;
3497
3498 let shift = entry.1;
3499 entry.1 += used;
3500
3501 let llc_id = entry.0;
3502 let llc = self.topo.all_llcs.get(&llc_id).unwrap();
3503
3504 for core in llc.cores.iter().skip(shift) {
3505 if used == 0 {
3506 break;
3507 }
3508 layer.core_order[n].push(core.1.id);
3509 used -= 1;
3510 }
3511
3512 extra -= cores_to_add;
3513 }
3514 }
3515 }
3516
3517 for node_cores in &mut layer.core_order {
3518 node_cores.reverse();
3519 }
3520 }
3521
3522 for node_llcs in self.cpu_pool.free_llcs.values_mut() {
3524 for entry in node_llcs.iter_mut() {
3525 entry.1 = 0;
3526 }
3527 }
3528
3529 for &(idx, _) in layer_targets.iter() {
3531 let layer = &mut self.layers[idx];
3532
3533 if layer.growth_algo != LayerGrowthAlgo::StickyDynamic {
3534 continue;
3535 }
3536
3537 let all_assigned: HashSet<usize> =
3538 layer.assigned_llcs.iter().flatten().copied().collect();
3539
3540 for core in self.topo.all_cores.iter() {
3541 let llc_id = core.1.llc_id;
3542 if all_assigned.contains(&llc_id) {
3543 let nid = core.1.node_id;
3544 layer.core_order[nid].push(core.1.id);
3545 }
3546 }
3547 for node_cores in &mut layer.core_order {
3548 node_cores.reverse();
3549 }
3550
3551 debug!(
3552 " alloc: layer={} core_order={:?}",
3553 layer.name, layer.core_order
3554 );
3555 }
3556
3557 let mut updated = false;
3560
3561 for &(idx, _) in layer_targets.iter() {
3563 let layer = &mut self.layers[idx];
3564
3565 if layer.growth_algo != LayerGrowthAlgo::StickyDynamic {
3566 continue;
3567 }
3568
3569 for n in 0..nr_nodes {
3570 let mut node_target = Cpumask::new();
3571 for &core_id in &layer.core_order[n] {
3572 if let Some(core) = self.topo.all_cores.get(&core_id) {
3573 node_target |= &core.span;
3574 }
3575 }
3576 node_target &= &layer.allowed_cpus;
3577
3578 let node_span = &self.topo.nodes[&n].span;
3579 let node_cur = layer.cpus.and(node_span);
3580 let cpus_to_free = node_cur.and(&node_target.not());
3581
3582 if cpus_to_free.weight() > 0 {
3583 debug!(
3584 " apply: layer={} freeing CPUs on node {}: {}",
3585 layer.name, n, cpus_to_free
3586 );
3587 layer.cpus &= &cpus_to_free.not();
3588 layer.nr_cpus -= cpus_to_free.weight();
3589 for cpu in cpus_to_free.iter() {
3590 layer.nr_llc_cpus[self.cpu_pool.topo.all_cpus[&cpu].llc_id] -= 1;
3591 layer.nr_node_cpus[n] -= 1;
3592 }
3593 self.cpu_pool.free(&cpus_to_free)?;
3594 updated = true;
3595 }
3596 }
3597 }
3598
3599 for &(idx, _) in layer_targets.iter() {
3601 let layer = &mut self.layers[idx];
3602
3603 if layer.growth_algo != LayerGrowthAlgo::StickyDynamic {
3604 continue;
3605 }
3606
3607 for n in 0..nr_nodes {
3608 let mut node_target = Cpumask::new();
3609 for &core_id in &layer.core_order[n] {
3610 if let Some(core) = self.topo.all_cores.get(&core_id) {
3611 node_target |= &core.span;
3612 }
3613 }
3614 node_target &= &layer.allowed_cpus;
3615
3616 let available_cpus = self.cpu_pool.available_cpus();
3617 let desired_to_alloc = node_target.and(&layer.cpus.clone().not());
3618 let cpus_to_alloc = desired_to_alloc.clone().and(&available_cpus);
3619
3620 if desired_to_alloc.weight() > cpus_to_alloc.weight() {
3621 debug!(
3622 " apply: layer={} node {} wanted to alloc {} CPUs but only {} available",
3623 layer.name,
3624 n,
3625 desired_to_alloc.weight(),
3626 cpus_to_alloc.weight()
3627 );
3628 }
3629
3630 if cpus_to_alloc.weight() > 0 {
3631 debug!(
3632 " apply: layer={} allocating CPUs on node {}: {}",
3633 layer.name, n, cpus_to_alloc
3634 );
3635 layer.cpus |= &cpus_to_alloc;
3636 layer.nr_cpus += cpus_to_alloc.weight();
3637 for cpu in cpus_to_alloc.iter() {
3638 layer.nr_llc_cpus[self.cpu_pool.topo.all_cpus[&cpu].llc_id] += 1;
3639 layer.nr_node_cpus[n] += 1;
3640 }
3641 self.cpu_pool.mark_allocated(&cpus_to_alloc)?;
3642 updated = true;
3643 }
3644 }
3645
3646 debug!(
3647 " apply: layer={} final cpus.weight()={} nr_cpus={}",
3648 layer.name,
3649 layer.cpus.weight(),
3650 layer.nr_cpus
3651 );
3652 }
3653
3654 Ok(updated)
3655 }
3656
3657 fn refresh_node_ctx(
3658 skel: &mut BpfSkel,
3659 topo: &Topology,
3660 node_empty_layers: &[Vec<u32>],
3661 init: bool,
3662 ) {
3663 for &nid in topo.nodes.keys() {
3664 let mut arg: bpf_intf::refresh_node_ctx_arg =
3665 unsafe { MaybeUninit::zeroed().assume_init() };
3666 arg.node_id = nid as u32;
3667 arg.init = init as u32;
3668
3669 let empty = &node_empty_layers[nid];
3670 arg.nr_empty_layer_ids = empty.len() as u32;
3671 for (i, &lid) in empty.iter().enumerate() {
3672 arg.empty_layer_ids[i] = lid;
3673 }
3674 for i in empty.len()..MAX_LAYERS {
3675 arg.empty_layer_ids[i] = MAX_LAYERS as u32;
3676 }
3677
3678 if init {
3679 let node = &topo.nodes[&nid];
3681 let llcs: Vec<u32> = node.llcs.keys().map(|&id| id as u32).collect();
3682 arg.nr_llcs = llcs.len() as u32;
3683 for (i, &llc_id) in llcs.iter().enumerate() {
3684 arg.llcs[i] = llc_id;
3685 }
3686 }
3687
3688 let input = ProgramInput {
3689 context_in: Some(unsafe { plain::as_mut_bytes(&mut arg) }),
3690 ..Default::default()
3691 };
3692 let _ = skel.progs.refresh_node_ctx.test_run(input);
3693 }
3694 }
3695
3696 fn refresh_cpumasks(&mut self) -> Result<()> {
3697 let layer_is_open = |layer: &Layer| matches!(layer.kind, LayerKind::Open { .. });
3698
3699 let mut updated = false;
3700 let raw_targets = self.calc_target_nr_cpus();
3701 let au = self.cpu_pool.alloc_unit();
3702 let total_cpus = self.cpu_pool.topo.all_cpus.len();
3703
3704 let targets: Vec<(usize, usize)> = raw_targets
3709 .iter()
3710 .enumerate()
3711 .map(|(idx, &(target, min))| {
3712 let cur = self.layers[idx].nr_cpus;
3713 if target < cur {
3714 let dampened = cur - (cur - target).div_ceil(2);
3715 (dampened.max(min), min)
3716 } else {
3717 (target, min)
3718 }
3719 })
3720 .collect();
3721
3722 let demands = self.calc_raw_demands(&targets);
3724 let nr_nodes = self.topo.nodes.len();
3725 let node_caps: Vec<usize> = self
3726 .topo
3727 .nodes
3728 .values()
3729 .map(|n| n.span.weight() / au)
3730 .collect();
3731 let all_layer_nodes: Vec<&[usize]> = self
3732 .layer_specs
3733 .iter()
3734 .map(|s| s.nodes().as_slice())
3735 .collect();
3736 let norders: Vec<Vec<usize>> = (0..self.layers.len())
3737 .map(|idx| {
3738 layer_core_growth::node_order(
3739 self.layer_specs[idx].nodes(),
3740 &self.topo,
3741 idx,
3742 &all_layer_nodes,
3743 )
3744 })
3745 .collect();
3746 let node_groups: Vec<Vec<Vec<usize>>> = (0..self.layers.len())
3747 .map(|idx| {
3748 layer_core_growth::node_groups(
3749 self.layer_specs[idx].nodes(),
3750 &self.topo,
3751 idx,
3752 &all_layer_nodes,
3753 &self.layer_specs[idx].kind.common().growth_algo,
3754 )
3755 })
3756 .collect();
3757 let layer_allocs = unified_alloc(total_cpus / au, &node_caps, &demands, &node_groups);
3758
3759 let cpu_targets: Vec<usize> = layer_allocs.iter().map(|a| a.total() * au).collect();
3762
3763 let prev_nr_cpus: Vec<usize> = self.layers.iter().map(|l| l.nr_cpus).collect();
3765
3766 let mut ascending: Vec<(usize, usize)> = cpu_targets.iter().copied().enumerate().collect();
3767 ascending.sort_by(|a, b| a.1.cmp(&b.1));
3768
3769 let prev_node_cpus: Vec<Vec<usize>> =
3771 self.layers.iter().map(|l| l.nr_node_cpus.clone()).collect();
3772
3773 let use_sd_alloc = self.topo.all_llcs.len() > 1;
3777 let sticky_dynamic_updated = if use_sd_alloc {
3778 self.recompute_layer_core_order(&ascending, &layer_allocs, au)?
3779 } else {
3780 false
3781 };
3782 updated |= sticky_dynamic_updated;
3783
3784 if sticky_dynamic_updated {
3786 for (idx, layer) in self.layers.iter().enumerate() {
3787 if layer.growth_algo == LayerGrowthAlgo::StickyDynamic {
3788 Self::update_bpf_layer_cpumask(
3789 layer,
3790 &mut self.skel.maps.bss_data.as_mut().unwrap().layers[idx],
3791 );
3792 }
3793 }
3794 }
3795
3796 for &(idx, _target) in ascending.iter().rev() {
3798 let layer = &mut self.layers[idx];
3799 if layer_is_open(layer) {
3800 continue;
3801 }
3802
3803 if layer.growth_algo == LayerGrowthAlgo::StickyDynamic && use_sd_alloc {
3805 continue;
3806 }
3807
3808 let alloc = &layer_allocs[idx];
3809 let mut freed = false;
3810
3811 for n in 0..nr_nodes {
3812 let desired = alloc.node_target(n) * au;
3813 let mut to_free = layer.nr_node_cpus[n].saturating_sub(desired);
3814 let node_span = &self.topo.nodes[&n].span;
3815
3816 while to_free > 0 {
3817 let node_cands = layer.cpus.and(node_span);
3818 let cpus_to_free = match self
3819 .cpu_pool
3820 .next_to_free(&node_cands, layer.core_order[n].iter().rev())?
3821 {
3822 Some(ret) => ret,
3823 None => break,
3824 };
3825 let nr = cpus_to_free.weight();
3826 trace!(
3827 "[{}] freeing CPUs on node {}: {}",
3828 layer.name,
3829 n,
3830 &cpus_to_free
3831 );
3832 layer.cpus &= &cpus_to_free.not();
3833 layer.nr_cpus -= nr;
3834 for cpu in cpus_to_free.iter() {
3835 let node_id = self.cpu_pool.topo.all_cpus[&cpu].node_id;
3836 layer.nr_llc_cpus[self.cpu_pool.topo.all_cpus[&cpu].llc_id] -= 1;
3837 layer.nr_node_cpus[node_id] -= 1;
3838 layer.nr_pinned_cpus[node_id] =
3839 layer.nr_pinned_cpus[node_id].min(layer.nr_node_cpus[node_id]);
3840 }
3841 self.cpu_pool.free(&cpus_to_free)?;
3842 to_free = to_free.saturating_sub(nr);
3843 freed = true;
3844 }
3845 }
3846
3847 if freed {
3848 Self::update_bpf_layer_cpumask(
3849 layer,
3850 &mut self.skel.maps.bss_data.as_mut().unwrap().layers[idx],
3851 );
3852 updated = true;
3853 }
3854 }
3855
3856 for &(idx, _target) in &ascending {
3858 let layer = &mut self.layers[idx];
3859
3860 if layer_is_open(layer) {
3861 continue;
3862 }
3863
3864 if layer.growth_algo == LayerGrowthAlgo::StickyDynamic && use_sd_alloc {
3866 continue;
3867 }
3868
3869 let alloc = &layer_allocs[idx];
3870 let norder = &norders[idx];
3871 let mut alloced = false;
3872
3873 for &node_id in norder.iter() {
3874 let node_target = alloc.node_target(node_id) * au;
3875 let cur_node = layer.nr_node_cpus[node_id];
3876 if node_target <= cur_node {
3877 continue;
3878 }
3879 let pinned_target = alloc.pinned[node_id] * au;
3880 let mut nr_to_alloc = node_target - cur_node;
3881 let node_span = &self.topo.nodes[&node_id].span;
3882 let node_allowed = layer.allowed_cpus.and(node_span);
3883
3884 while nr_to_alloc > 0 {
3885 let nr_alloced = match self.cpu_pool.alloc_cpus(
3886 &node_allowed,
3887 &layer.core_order[node_id],
3888 nr_to_alloc,
3889 ) {
3890 Some(new_cpus) => {
3891 let nr = new_cpus.weight();
3892 layer.cpus |= &new_cpus;
3893 layer.nr_cpus += nr;
3894 for cpu in new_cpus.iter() {
3895 layer.nr_llc_cpus[self.cpu_pool.topo.all_cpus[&cpu].llc_id] += 1;
3896 let nid = self.cpu_pool.topo.all_cpus[&cpu].node_id;
3897 layer.nr_node_cpus[nid] += 1;
3898 if layer.nr_pinned_cpus[nid] < pinned_target {
3899 layer.nr_pinned_cpus[nid] += 1;
3900 }
3901 }
3902 nr
3903 }
3904 None => 0,
3905 };
3906 if nr_alloced == 0 {
3907 break;
3908 }
3909 alloced = true;
3910 nr_to_alloc -= nr_alloced.min(nr_to_alloc);
3911 }
3912 }
3913
3914 if alloced {
3915 Self::update_bpf_layer_cpumask(
3916 layer,
3917 &mut self.skel.maps.bss_data.as_mut().unwrap().layers[idx],
3918 );
3919 updated = true;
3920 }
3921 }
3922
3923 let total_allocated: usize = self.layers.iter().map(|l| l.nr_cpus).sum();
3928 let fully_allocated = total_allocated >= total_cpus;
3929 for (idx, layer) in self.layers.iter().enumerate() {
3930 if !layer_is_open(layer) {
3931 self.skel.maps.bss_data.as_mut().unwrap().layers[idx]
3932 .fully_allocated
3933 .write(fully_allocated);
3934 }
3935 }
3936
3937 let node_utils = &self.sched_stats.layer_node_utils;
3942 let pinned_utils = &self.sched_stats.layer_node_pinned_utils;
3943 for (idx, layer) in self.layers.iter().enumerate() {
3944 self.growth_denied[idx].fill(false);
3945 let util_high = match layer.kind.util_range() {
3946 Some((_, high)) => high,
3947 None => continue,
3948 };
3949 for n in 0..nr_nodes {
3950 let unpinned_util = (node_utils[idx][n] - pinned_utils[idx][n]).max(0.0);
3951 let unpinned_cpus_needed = unpinned_util / util_high;
3952 let unpinned_cpus_have =
3953 layer.nr_node_cpus[n].saturating_sub(layer.nr_pinned_cpus[n]) as f64;
3954 let wanted = unpinned_cpus_needed > unpinned_cpus_have;
3955 let got = layer.nr_node_cpus[n] > prev_node_cpus[idx][n];
3956 if wanted && !got {
3957 self.growth_denied[idx][n] = true;
3958 }
3959 }
3960 }
3961
3962 if updated {
3964 for (idx, layer) in self.layers.iter().enumerate() {
3965 if layer_is_open(layer) {
3966 continue;
3967 }
3968 let prev = prev_nr_cpus[idx];
3969 let cur = layer.nr_cpus;
3970 if prev != cur {
3971 debug!(
3972 "ALLOC {} algo={:?} cpus:{}→{} mask={:x}",
3973 layer.name, layer.growth_algo, prev, cur, layer.cpus,
3974 );
3975 }
3976 }
3977 debug!(
3978 "ALLOC pool_available={}",
3979 self.cpu_pool.available_cpus().weight()
3980 );
3981 }
3982
3983 if updated {
3985 let nr_nodes = self.topo.nodes.len();
3986 for (idx, layer) in self.layers.iter().enumerate() {
3987 if layer_is_open(layer) {
3988 continue;
3989 }
3990 let prev = &prev_node_cpus[idx];
3991 let cur = &layer.nr_node_cpus;
3992 if prev == cur {
3993 continue;
3994 }
3995 let per_node: String = (0..nr_nodes)
3996 .map(|n| format!("n{}:{}→{}", n, prev[n], cur[n]))
3997 .collect::<Vec<_>>()
3998 .join(" ");
3999 let prev_total: usize = prev.iter().sum();
4000 let cur_total: usize = cur[..nr_nodes].iter().sum();
4001 let target: String = (0..nr_nodes)
4002 .map(|n| format!("n{}:{}", n, layer_allocs[idx].node_target(n) * au))
4003 .collect::<Vec<_>>()
4004 .join(" ");
4005 debug!(
4006 "ALLOC {} algo={:?} {} total:{}→{} target:[{}] mask={:x}",
4007 layer.name,
4008 layer.growth_algo,
4009 per_node,
4010 prev_total,
4011 cur_total,
4012 target,
4013 layer.cpus,
4014 );
4015 }
4016 debug!(
4017 "ALLOC pool_available={}",
4018 self.cpu_pool.available_cpus().weight()
4019 );
4020 }
4021
4022 if updated {
4024 for (idx, layer) in self.layers.iter_mut().enumerate() {
4025 if !layer_is_open(layer) {
4026 continue;
4027 }
4028
4029 let bpf_layer = &mut self.skel.maps.bss_data.as_mut().unwrap().layers[idx];
4030 let available_cpus = self.cpu_pool.available_cpus().and(&layer.allowed_cpus);
4031 let nr_available_cpus = available_cpus.weight();
4032
4033 layer.cpus = available_cpus;
4037 layer.nr_cpus = nr_available_cpus;
4038 for llc in self.cpu_pool.topo.all_llcs.values() {
4039 layer.nr_llc_cpus[llc.id] = layer.cpus.and(&llc.span).weight();
4040 }
4041 for node in self.cpu_pool.topo.nodes.values() {
4042 layer.nr_node_cpus[node.id] = layer.cpus.and(&node.span).weight();
4043 layer.nr_pinned_cpus[node.id] = 0;
4044 }
4045 Self::update_bpf_layer_cpumask(layer, bpf_layer);
4046 }
4047
4048 for (&node_id, &cpu) in &self.cpu_pool.fallback_cpus {
4049 self.skel.maps.bss_data.as_mut().unwrap().fallback_cpus[node_id] = cpu as u32;
4050 }
4051
4052 for (lidx, layer) in self.layers.iter().enumerate() {
4053 self.nr_layer_cpus_ranges[lidx] = (
4054 self.nr_layer_cpus_ranges[lidx].0.min(layer.nr_cpus),
4055 self.nr_layer_cpus_ranges[lidx].1.max(layer.nr_cpus),
4056 );
4057 }
4058
4059 let input = ProgramInput {
4061 ..Default::default()
4062 };
4063 let prog = &mut self.skel.progs.refresh_layer_cpumasks;
4064 let _ = prog.test_run(input);
4065
4066 let nr_nodes = self.topo.nodes.len();
4068 let node_empty_layers: Vec<Vec<u32>> = (0..nr_nodes)
4069 .map(|nid| {
4070 self.layers
4071 .iter()
4072 .enumerate()
4073 .filter(|(_lidx, layer)| layer.nr_node_cpus[nid] == 0)
4074 .map(|(lidx, _)| lidx as u32)
4075 .collect()
4076 })
4077 .collect();
4078 Self::refresh_node_ctx(&mut self.skel, &self.topo, &node_empty_layers, false);
4079 }
4080
4081 if let Err(e) = self.update_netdev_cpumasks() {
4082 warn!("Failed to update netdev IRQ cpumasks: {:#}", e);
4083 }
4084 Ok(())
4085 }
4086
4087 fn refresh_idle_qos(&mut self) -> Result<()> {
4088 if !self.idle_qos_enabled {
4089 return Ok(());
4090 }
4091
4092 let mut cpu_idle_qos = vec![0; *NR_CPU_IDS];
4093 for layer in self.layers.iter() {
4094 let idle_resume_us = layer.kind.common().idle_resume_us.unwrap_or(0) as i32;
4095 for cpu in layer.cpus.iter() {
4096 cpu_idle_qos[cpu] = idle_resume_us;
4097 }
4098 }
4099
4100 for (cpu, idle_resume_usec) in cpu_idle_qos.iter().enumerate() {
4101 update_cpu_idle_resume_latency(cpu, *idle_resume_usec)?;
4102 }
4103
4104 Ok(())
4105 }
4106
4107 fn refresh_xnuma(&mut self) {
4108 let nr_nodes = self.topo.nodes.len();
4109 if nr_nodes <= 1 {
4110 return;
4111 }
4112
4113 let duty_sums = &self.sched_stats.layer_node_duty_sums;
4114
4115 for (layer_idx, spec) in self.layer_specs.iter().enumerate() {
4116 let common = spec.kind.common();
4117 let threshold = common.xnuma_threshold;
4118 let threshold_delta = common.xnuma_threshold_delta;
4119 let bpf_layer = &mut self.skel.maps.bss_data.as_mut().unwrap().layers[layer_idx];
4120
4121 if threshold.0 <= 0.0 && threshold.1 <= 0.0 {
4122 for src in 0..nr_nodes {
4124 bpf_layer.node[src].xnuma_is_mig_src.write(true);
4125 for dst in 0..nr_nodes {
4126 bpf_layer.node[src].xnuma[dst].rate = u64::MAX;
4127 }
4128 }
4129 self.xnuma_mig_src[layer_idx].fill(false);
4130 continue;
4131 }
4132
4133 let layer = &self.layers[layer_idx];
4134 let is_mig_src = xnuma_check_active(
4135 &duty_sums[layer_idx],
4136 &layer.nr_node_cpus,
4137 threshold,
4138 threshold_delta,
4139 &self.growth_denied[layer_idx],
4140 &self.xnuma_mig_src[layer_idx],
4141 );
4142
4143 self.xnuma_mig_src[layer_idx] = is_mig_src.clone();
4144
4145 let result = xnuma_compute_rates(&duty_sums[layer_idx], &layer.nr_node_cpus);
4146
4147 for src in 0..nr_nodes {
4150 for dst in 0..nr_nodes {
4151 bpf_layer.node[src].xnuma[dst].rate = result.rates[src][dst];
4152 }
4153 }
4154 for (nid, is_src) in is_mig_src.iter().enumerate().take(nr_nodes) {
4155 bpf_layer.node[nid].xnuma_is_mig_src.write(*is_src);
4156 }
4157 }
4158 }
4159
4160 fn step(&mut self) -> Result<()> {
4161 let started_at = Instant::now();
4162 self.sched_stats.refresh(
4163 &mut self.skel,
4164 &self.proc_reader,
4165 started_at,
4166 self.processing_dur,
4167 &self.gpu_task_handler,
4168 )?;
4169
4170 self.skel
4172 .maps
4173 .bss_data
4174 .as_mut()
4175 .unwrap()
4176 .system_cpu_util_ewma = (self.sched_stats.system_cpu_util_ewma * 10000.0) as u64;
4177
4178 for layer_id in 0..self.sched_stats.nr_layers {
4179 self.skel
4180 .maps
4181 .bss_data
4182 .as_mut()
4183 .unwrap()
4184 .layer_dsq_insert_ewma[layer_id] =
4185 (self.sched_stats.layer_dsq_insert_ewma[layer_id] * 10000.0) as u64;
4186 }
4187
4188 self.refresh_cpumasks()?;
4189 self.refresh_xnuma();
4190 self.refresh_idle_qos()?;
4191 self.gpu_task_handler.maybe_affinitize();
4192 self.processing_dur += Instant::now().duration_since(started_at);
4193 Ok(())
4194 }
4195
4196 fn generate_sys_stats(
4197 &mut self,
4198 stats: &Stats,
4199 cpus_ranges: &mut [(usize, usize)],
4200 ) -> Result<SysStats> {
4201 let bstats = &stats.bpf_stats;
4202 let mut sys_stats = SysStats::new(stats, bstats, &self.cpu_pool.fallback_cpus)?;
4203
4204 for (lidx, (spec, layer)) in self.layer_specs.iter().zip(self.layers.iter()).enumerate() {
4205 let layer_stats = LayerStats::new(
4206 lidx,
4207 layer,
4208 stats,
4209 bstats,
4210 cpus_ranges[lidx],
4211 self.xnuma_mig_src[lidx].iter().any(|&a| a),
4212 );
4213 sys_stats.layers.insert(spec.name.to_string(), layer_stats);
4214 cpus_ranges[lidx] = (layer.nr_cpus, layer.nr_cpus);
4215 }
4216
4217 Ok(sys_stats)
4218 }
4219
4220 fn process_cgroup_creation(
4222 path: &Path,
4223 cgroup_regexes: &HashMap<u32, Regex>,
4224 cgroup_path_to_id: &mut HashMap<String, u64>,
4225 sender: &crossbeam::channel::Sender<CgroupEvent>,
4226 ) {
4227 let path_str = path.to_string_lossy().to_string();
4228
4229 let cgroup_id = std::fs::metadata(path)
4231 .map(|metadata| {
4232 use std::os::unix::fs::MetadataExt;
4233 metadata.ino()
4234 })
4235 .unwrap_or(0);
4236
4237 let mut match_bitmap = 0u64;
4239 for (rule_id, regex) in cgroup_regexes {
4240 if regex.is_match(&path_str) {
4241 match_bitmap |= 1u64 << rule_id;
4242 }
4243 }
4244
4245 cgroup_path_to_id.insert(path_str.clone(), cgroup_id);
4247
4248 if let Err(e) = sender.send(CgroupEvent::Created {
4250 path: path_str,
4251 cgroup_id,
4252 match_bitmap,
4253 }) {
4254 error!("Failed to send cgroup creation event: {}", e);
4255 }
4256 }
4257
4258 fn start_cgroup_watcher(
4259 shutdown: Arc<AtomicBool>,
4260 cgroup_regexes: HashMap<u32, Regex>,
4261 ) -> Result<Receiver<CgroupEvent>> {
4262 let mut inotify = Inotify::init().context("Failed to initialize inotify")?;
4263 let mut wd_to_path = HashMap::new();
4264
4265 let (sender, receiver) = crossbeam::channel::bounded::<CgroupEvent>(1024);
4267
4268 let root_wd = inotify
4270 .watches()
4271 .add("/sys/fs/cgroup", WatchMask::CREATE | WatchMask::DELETE)
4272 .context("Failed to add watch for /sys/fs/cgroup")?;
4273 wd_to_path.insert(root_wd, PathBuf::from("/sys/fs/cgroup"));
4274
4275 Self::add_recursive_watches(&mut inotify, &mut wd_to_path, Path::new("/sys/fs/cgroup"))?;
4277
4278 std::thread::spawn(move || {
4280 let mut buffer = [0; 4096];
4281 let inotify_fd = inotify.as_raw_fd();
4282 let mut cgroup_path_to_id = HashMap::<String, u64>::new();
4284
4285 for entry in WalkDir::new("/sys/fs/cgroup")
4287 .into_iter()
4288 .filter_map(|e| e.ok())
4289 .filter(|e| e.file_type().is_dir())
4290 {
4291 let path = entry.path();
4292 Self::process_cgroup_creation(
4293 path,
4294 &cgroup_regexes,
4295 &mut cgroup_path_to_id,
4296 &sender,
4297 );
4298 }
4299
4300 while !shutdown.load(Ordering::Relaxed) {
4301 let ready = unsafe {
4303 let mut read_fds: libc::fd_set = std::mem::zeroed();
4304 libc::FD_ZERO(&mut read_fds);
4305 libc::FD_SET(inotify_fd, &mut read_fds);
4306
4307 let mut timeout = libc::timeval {
4308 tv_sec: 0,
4309 tv_usec: 100_000, };
4311
4312 libc::select(
4313 inotify_fd + 1,
4314 &mut read_fds,
4315 std::ptr::null_mut(),
4316 std::ptr::null_mut(),
4317 &mut timeout,
4318 )
4319 };
4320
4321 if ready <= 0 {
4322 continue;
4324 }
4325
4326 let events = match inotify.read_events(&mut buffer) {
4328 Ok(events) => events,
4329 Err(e) => {
4330 error!("Error reading inotify events: {}", e);
4331 break;
4332 }
4333 };
4334
4335 for event in events {
4336 if !event.mask.contains(inotify::EventMask::CREATE)
4337 && !event.mask.contains(inotify::EventMask::DELETE)
4338 {
4339 continue;
4340 }
4341
4342 let name = match event.name {
4343 Some(name) => name,
4344 None => continue,
4345 };
4346
4347 let parent_path = match wd_to_path.get(&event.wd) {
4348 Some(parent) => parent,
4349 None => {
4350 warn!("Unknown watch descriptor: {:?}", event.wd);
4351 continue;
4352 }
4353 };
4354
4355 let path = parent_path.join(name.to_string_lossy().as_ref());
4356
4357 if event.mask.contains(inotify::EventMask::CREATE) {
4358 if !path.is_dir() {
4359 continue;
4360 }
4361
4362 Self::process_cgroup_creation(
4363 &path,
4364 &cgroup_regexes,
4365 &mut cgroup_path_to_id,
4366 &sender,
4367 );
4368
4369 match inotify
4371 .watches()
4372 .add(&path, WatchMask::CREATE | WatchMask::DELETE)
4373 {
4374 Ok(wd) => {
4375 wd_to_path.insert(wd, path.clone());
4376 }
4377 Err(e) => {
4378 warn!(
4379 "Failed to add watch for new cgroup {}: {}",
4380 path.display(),
4381 e
4382 );
4383 }
4384 }
4385 } else if event.mask.contains(inotify::EventMask::DELETE) {
4386 let path_str = path.to_string_lossy().to_string();
4387
4388 let cgroup_id = cgroup_path_to_id.remove(&path_str).unwrap_or(0);
4390
4391 if let Err(e) = sender.send(CgroupEvent::Removed {
4393 path: path_str,
4394 cgroup_id,
4395 }) {
4396 error!("Failed to send cgroup removal event: {}", e);
4397 }
4398
4399 let wd_to_remove = wd_to_path.iter().find_map(|(wd, watched_path)| {
4401 if watched_path == &path {
4402 Some(wd.clone())
4403 } else {
4404 None
4405 }
4406 });
4407 if let Some(wd) = wd_to_remove {
4408 wd_to_path.remove(&wd);
4409 }
4410 }
4411 }
4412 }
4413 });
4414
4415 Ok(receiver)
4416 }
4417
4418 fn add_recursive_watches(
4419 inotify: &mut Inotify,
4420 wd_to_path: &mut HashMap<inotify::WatchDescriptor, PathBuf>,
4421 path: &Path,
4422 ) -> Result<()> {
4423 for entry in WalkDir::new(path)
4424 .into_iter()
4425 .filter_map(|e| e.ok())
4426 .filter(|e| e.file_type().is_dir())
4427 .skip(1)
4428 {
4429 let entry_path = entry.path();
4430 match inotify
4432 .watches()
4433 .add(entry_path, WatchMask::CREATE | WatchMask::DELETE)
4434 {
4435 Ok(wd) => {
4436 wd_to_path.insert(wd, entry_path.to_path_buf());
4437 }
4438 Err(e) => {
4439 debug!("Failed to add watch for {}: {}", entry_path.display(), e);
4440 }
4441 }
4442 }
4443 Ok(())
4444 }
4445
4446 fn run(&mut self, shutdown: Arc<AtomicBool>) -> Result<UserExitInfo> {
4447 let (res_ch, req_ch) = self.stats_server.channels();
4448 let mut next_sched_at = Instant::now() + self.sched_intv;
4449 let enable_layer_refresh = !self.layer_refresh_intv.is_zero();
4450 let mut next_layer_refresh_at = Instant::now() + self.layer_refresh_intv;
4451 let mut cpus_ranges = HashMap::<ThreadId, Vec<(usize, usize)>>::new();
4452
4453 let cgroup_regexes = self.cgroup_regexes.take().unwrap();
4455 let cgroup_event_rx = if !cgroup_regexes.is_empty() {
4456 Some(Self::start_cgroup_watcher(
4457 shutdown.clone(),
4458 cgroup_regexes,
4459 )?)
4460 } else {
4461 None
4462 };
4463
4464 while !shutdown.load(Ordering::Relaxed) && !uei_exited!(&self.skel, uei) {
4465 let now = Instant::now();
4466
4467 if now >= next_sched_at {
4468 self.step()?;
4469 while next_sched_at < now {
4470 next_sched_at += self.sched_intv;
4471 }
4472 }
4473
4474 if enable_layer_refresh && now >= next_layer_refresh_at {
4475 self.skel
4476 .maps
4477 .bss_data
4478 .as_mut()
4479 .unwrap()
4480 .layer_refresh_seq_avgruntime += 1;
4481 while next_layer_refresh_at < now {
4482 next_layer_refresh_at += self.layer_refresh_intv;
4483 }
4484 }
4485
4486 let timeout_duration = next_sched_at.saturating_duration_since(Instant::now());
4488 let never_rx = crossbeam::channel::never();
4489 let cgroup_rx = cgroup_event_rx.as_ref().unwrap_or(&never_rx);
4490
4491 select! {
4492 recv(req_ch) -> msg => match msg {
4493 Ok(StatsReq::Hello(tid)) => {
4494 cpus_ranges.insert(
4495 tid,
4496 self.layers.iter().map(|l| (l.nr_cpus, l.nr_cpus)).collect(),
4497 );
4498 let stats =
4499 Stats::new(&mut self.skel, &self.proc_reader, self.topo.clone(), &self.gpu_task_handler, self.sched_stats.util_compensation)?;
4500 res_ch.send(StatsRes::Hello(Box::new(stats)))?;
4501 }
4502 Ok(StatsReq::Refresh(tid, mut stats)) => {
4503 for i in 0..self.nr_layer_cpus_ranges.len() {
4505 for (_, ranges) in cpus_ranges.iter_mut() {
4506 ranges[i] = (
4507 ranges[i].0.min(self.nr_layer_cpus_ranges[i].0),
4508 ranges[i].1.max(self.nr_layer_cpus_ranges[i].1),
4509 );
4510 }
4511 self.nr_layer_cpus_ranges[i] =
4512 (self.layers[i].nr_cpus, self.layers[i].nr_cpus);
4513 }
4514
4515 stats.refresh(
4516 &mut self.skel,
4517 &self.proc_reader,
4518 now,
4519 self.processing_dur,
4520 &self.gpu_task_handler,
4521 )?;
4522 let sys_stats =
4523 self.generate_sys_stats(&stats, cpus_ranges.get_mut(&tid).unwrap())?;
4524 res_ch.send(StatsRes::Refreshed(Box::new((*stats, sys_stats))))?;
4525 }
4526 Ok(StatsReq::Bye(tid)) => {
4527 cpus_ranges.remove(&tid);
4528 res_ch.send(StatsRes::Bye)?;
4529 }
4530 Err(e) => Err(e)?,
4531 },
4532
4533 recv(cgroup_rx) -> event => match event {
4534 Ok(CgroupEvent::Created { path, cgroup_id, match_bitmap }) => {
4535 self.skel.maps.cgroup_match_bitmap.update(
4537 &cgroup_id.to_ne_bytes(),
4538 &match_bitmap.to_ne_bytes(),
4539 libbpf_rs::MapFlags::ANY,
4540 ).with_context(|| format!(
4541 "Failed to insert cgroup {}({}) into BPF map. Cgroup map may be full \
4542 (max 16384 entries). Aborting.",
4543 cgroup_id, path
4544 ))?;
4545
4546 debug!("Added cgroup {} to BPF map with bitmap 0x{:x}", cgroup_id, match_bitmap);
4547 }
4548 Ok(CgroupEvent::Removed { path, cgroup_id }) => {
4549 if let Err(e) = self.skel.maps.cgroup_match_bitmap.delete(&cgroup_id.to_ne_bytes()) {
4551 warn!("Failed to delete cgroup {} from BPF map: {}", cgroup_id, e);
4552 } else {
4553 debug!("Removed cgroup {}({}) from BPF map", cgroup_id, path);
4554 }
4555 }
4556 Err(e) => {
4557 error!("Error receiving cgroup event: {}", e);
4558 }
4559 },
4560
4561 recv(crossbeam::channel::after(timeout_duration)) -> _ => {
4562 }
4564 }
4565 }
4566
4567 let _ = self.struct_ops.take();
4568 uei_report!(&self.skel, uei)
4569 }
4570}
4571
4572impl Drop for Scheduler<'_> {
4573 fn drop(&mut self) {
4574 info!("Unregister {SCHEDULER_NAME} scheduler");
4575
4576 if !self.netdevs.is_empty() {
4577 for (iface, netdev) in &self.netdevs {
4578 if let Err(e) = netdev.restore_cpumasks() {
4579 warn!("Failed to restore {iface} IRQ affinity: {e}");
4580 }
4581 }
4582 info!("Restored original netdev IRQ affinity");
4583 }
4584
4585 if let Some(struct_ops) = self.struct_ops.take() {
4586 drop(struct_ops);
4587 }
4588 }
4589}
4590
4591fn write_example_file(path: &str) -> Result<()> {
4592 let mut f = fs::OpenOptions::new()
4593 .create_new(true)
4594 .write(true)
4595 .open(path)?;
4596 Ok(f.write_all(serde_json::to_string_pretty(&*EXAMPLE_CONFIG)?.as_bytes())?)
4597}
4598
4599struct HintLayerInfo {
4600 layer_id: usize,
4601 system_cpu_util_below: Option<f64>,
4602 dsq_insert_below: Option<f64>,
4603}
4604
4605fn verify_layer_specs(specs: &[LayerSpec]) -> Result<HashMap<u64, HintLayerInfo>> {
4606 let mut hint_to_layer_map = HashMap::<u64, (usize, String, Option<f64>, Option<f64>)>::new();
4607
4608 let nr_specs = specs.len();
4609 if nr_specs == 0 {
4610 bail!("No layer spec");
4611 }
4612 if nr_specs > MAX_LAYERS {
4613 bail!("Too many layer specs");
4614 }
4615
4616 for (idx, spec) in specs.iter().enumerate() {
4617 if idx < nr_specs - 1 {
4618 if spec.matches.is_empty() {
4619 bail!("Non-terminal spec {:?} has NULL matches", spec.name);
4620 }
4621 } else if spec.matches.len() != 1 || !spec.matches[0].is_empty() {
4622 bail!("Terminal spec {:?} must have an empty match", spec.name);
4623 }
4624
4625 if spec.matches.len() > MAX_LAYER_MATCH_ORS {
4626 bail!(
4627 "Spec {:?} has too many ({}) OR match blocks",
4628 spec.name,
4629 spec.matches.len()
4630 );
4631 }
4632
4633 for (ands_idx, ands) in spec.matches.iter().enumerate() {
4634 if ands.len() > NR_LAYER_MATCH_KINDS {
4635 bail!(
4636 "Spec {:?}'s {}th OR block has too many ({}) match conditions",
4637 spec.name,
4638 ands_idx,
4639 ands.len()
4640 );
4641 }
4642 let mut hint_equals_cnt = 0;
4643 let mut system_cpu_util_below_cnt = 0;
4644 let mut dsq_insert_below_cnt = 0;
4645 let mut hint_value: Option<u64> = None;
4646 let mut system_cpu_util_threshold: Option<f64> = None;
4647 let mut dsq_insert_threshold: Option<f64> = None;
4648 for one in ands.iter() {
4649 match one {
4650 LayerMatch::CgroupPrefix(prefix) => {
4651 if prefix.len() > MAX_PATH {
4652 bail!("Spec {:?} has too long a cgroup prefix", spec.name);
4653 }
4654 }
4655 LayerMatch::CgroupSuffix(suffix) => {
4656 if suffix.len() > MAX_PATH {
4657 bail!("Spec {:?} has too long a cgroup suffix", spec.name);
4658 }
4659 }
4660 LayerMatch::CgroupContains(substr) => {
4661 if substr.len() > MAX_PATH {
4662 bail!("Spec {:?} has too long a cgroup substr", spec.name);
4663 }
4664 }
4665 LayerMatch::CommPrefix(prefix) => {
4666 if prefix.len() > MAX_COMM {
4667 bail!("Spec {:?} has too long a comm prefix", spec.name);
4668 }
4669 }
4670 LayerMatch::PcommPrefix(prefix) => {
4671 if prefix.len() > MAX_COMM {
4672 bail!("Spec {:?} has too long a process name prefix", spec.name);
4673 }
4674 }
4675 LayerMatch::SystemCpuUtilBelow(threshold) => {
4676 if *threshold < 0.0 || *threshold > 1.0 {
4677 bail!(
4678 "Spec {:?} has SystemCpuUtilBelow threshold outside the range [0.0, 1.0]",
4679 spec.name
4680 );
4681 }
4682 system_cpu_util_threshold = Some(*threshold);
4683 system_cpu_util_below_cnt += 1;
4684 }
4685 LayerMatch::DsqInsertBelow(threshold) => {
4686 if *threshold < 0.0 || *threshold > 1.0 {
4687 bail!(
4688 "Spec {:?} has DsqInsertBelow threshold outside the range [0.0, 1.0]",
4689 spec.name
4690 );
4691 }
4692 dsq_insert_threshold = Some(*threshold);
4693 dsq_insert_below_cnt += 1;
4694 }
4695 LayerMatch::HintEquals(hint) => {
4696 if *hint > 1024 {
4697 bail!(
4698 "Spec {:?} has hint value outside the range [0, 1024]",
4699 spec.name
4700 );
4701 }
4702 hint_value = Some(*hint);
4703 hint_equals_cnt += 1;
4704 }
4705 _ => {}
4706 }
4707 }
4708 if hint_equals_cnt > 1 {
4709 bail!("Only 1 HintEquals match permitted per AND block");
4710 }
4711 let high_freq_matcher_cnt = system_cpu_util_below_cnt + dsq_insert_below_cnt;
4712 if high_freq_matcher_cnt > 0 {
4713 if hint_equals_cnt != 1 {
4714 bail!("High-frequency matchers (SystemCpuUtilBelow, DsqInsertBelow) must be used with one HintEquals");
4715 }
4716 if system_cpu_util_below_cnt > 1 {
4717 bail!("Only 1 SystemCpuUtilBelow match permitted per AND block");
4718 }
4719 if dsq_insert_below_cnt > 1 {
4720 bail!("Only 1 DsqInsertBelow match permitted per AND block");
4721 }
4722 if ands.len() != hint_equals_cnt + system_cpu_util_below_cnt + dsq_insert_below_cnt
4723 {
4724 bail!("High-frequency matchers must be used only with HintEquals (no other matchers)");
4725 }
4726 } else if hint_equals_cnt == 1 && ands.len() != 1 {
4727 bail!("HintEquals match cannot be in conjunction with other matches");
4728 }
4729
4730 if let Some(hint) = hint_value {
4732 if let Some((layer_id, name, _, _)) = hint_to_layer_map.get(&hint) {
4733 if *layer_id != idx {
4734 bail!(
4735 "Spec {:?} has hint value ({}) that is already mapped to Spec {:?}",
4736 spec.name,
4737 hint,
4738 name
4739 );
4740 }
4741 } else {
4742 hint_to_layer_map.insert(
4743 hint,
4744 (
4745 idx,
4746 spec.name.clone(),
4747 system_cpu_util_threshold,
4748 dsq_insert_threshold,
4749 ),
4750 );
4751 }
4752 }
4753 }
4754
4755 match spec.kind {
4756 LayerKind::Confined {
4757 cpus_range,
4758 util_range,
4759 ..
4760 }
4761 | LayerKind::Grouped {
4762 cpus_range,
4763 util_range,
4764 ..
4765 } => {
4766 if let Some((cpus_min, cpus_max)) = cpus_range {
4767 if cpus_min > cpus_max {
4768 bail!(
4769 "Spec {:?} has invalid cpus_range({}, {})",
4770 spec.name,
4771 cpus_min,
4772 cpus_max
4773 );
4774 }
4775 }
4776 if util_range.0 >= util_range.1 {
4777 bail!(
4778 "Spec {:?} has invalid util_range ({}, {})",
4779 spec.name,
4780 util_range.0,
4781 util_range.1
4782 );
4783 }
4784 }
4785 _ => {}
4786 }
4787 }
4788
4789 Ok(hint_to_layer_map
4790 .into_iter()
4791 .map(|(k, v)| {
4792 (
4793 k,
4794 HintLayerInfo {
4795 layer_id: v.0,
4796 system_cpu_util_below: v.2,
4797 dsq_insert_below: v.3,
4798 },
4799 )
4800 })
4801 .collect())
4802}
4803
4804fn name_suffix(cgroup: &str, len: usize) -> String {
4805 let suffixlen = std::cmp::min(len, cgroup.len());
4806 let suffixrev: String = cgroup.chars().rev().take(suffixlen).collect();
4807
4808 suffixrev.chars().rev().collect()
4809}
4810
4811fn traverse_sysfs(dir: &Path) -> Result<Vec<PathBuf>> {
4812 let mut paths = vec![];
4813
4814 if !dir.is_dir() {
4815 panic!("path {:?} does not correspond to directory", dir);
4816 }
4817
4818 let direntries = fs::read_dir(dir)?;
4819
4820 for entry in direntries {
4821 let path = entry?.path();
4822 if path.is_dir() {
4823 paths.append(&mut traverse_sysfs(&path)?);
4824 paths.push(path);
4825 }
4826 }
4827
4828 Ok(paths)
4829}
4830
4831fn find_cpumask(cgroup: &str) -> Cpumask {
4832 let mut path = String::from(cgroup);
4833 path.push_str("/cpuset.cpus.effective");
4834
4835 let description = fs::read_to_string(&mut path).unwrap();
4836
4837 Cpumask::from_cpulist(&description).unwrap()
4838}
4839
4840fn expand_template(rule: &LayerMatch) -> Result<Vec<(LayerMatch, Cpumask)>> {
4841 match rule {
4842 LayerMatch::CgroupSuffix(suffix) => Ok(traverse_sysfs(Path::new("/sys/fs/cgroup"))?
4843 .into_iter()
4844 .map(|cgroup| String::from(cgroup.to_str().expect("could not parse cgroup path")))
4845 .filter(|cgroup| cgroup.ends_with(suffix))
4846 .map(|cgroup| {
4847 (
4848 {
4849 let mut slashterminated = cgroup.clone();
4850 slashterminated.push('/');
4851 LayerMatch::CgroupSuffix(name_suffix(&slashterminated, 64))
4852 },
4853 find_cpumask(&cgroup),
4854 )
4855 })
4856 .collect()),
4857 LayerMatch::CgroupRegex(expr) => Ok(traverse_sysfs(Path::new("/sys/fs/cgroup"))?
4858 .into_iter()
4859 .map(|cgroup| String::from(cgroup.to_str().expect("could not parse cgroup path")))
4860 .filter(|cgroup| {
4861 let re = Regex::new(expr).unwrap();
4862 re.is_match(cgroup)
4863 })
4864 .map(|cgroup| {
4865 (
4866 {
4870 let mut slashterminated = cgroup.clone();
4871 slashterminated.push('/');
4872 LayerMatch::CgroupSuffix(name_suffix(&slashterminated, 64))
4873 },
4874 find_cpumask(&cgroup),
4875 )
4876 })
4877 .collect()),
4878 _ => panic!("Unimplemented template enum {:?}", rule),
4879 }
4880}
4881
4882fn create_perf_fds(skel: &mut BpfSkel, event: u64) -> Result<()> {
4883 let mut attr = perf::bindings::perf_event_attr {
4884 size: std::mem::size_of::<perf::bindings::perf_event_attr>() as u32,
4885 type_: perf::bindings::PERF_TYPE_RAW,
4886 config: event,
4887 sample_type: 0u64,
4888 ..Default::default()
4889 };
4890 attr.__bindgen_anon_1.sample_period = 0u64;
4891 attr.set_disabled(0);
4892
4893 let perf_events_map = &skel.maps.scx_pmu_map;
4894 let map_fd = unsafe { libbpf_sys::bpf_map__fd(perf_events_map.as_libbpf_object().as_ptr()) };
4895
4896 let mut failures = 0u64;
4897
4898 for cpu in 0..*NR_CPUS_POSSIBLE {
4899 let fd = unsafe { perf::perf_event_open(&mut attr as *mut _, -1, cpu as i32, -1, 0) };
4900 if fd < 0 {
4901 failures += 1;
4902 trace!(
4903 "perf_event_open failed cpu={cpu} errno={}",
4904 std::io::Error::last_os_error()
4905 );
4906 continue;
4907 }
4908
4909 let key = cpu as u32;
4910 let val = fd as u32;
4911 let ret = unsafe {
4912 libbpf_sys::bpf_map_update_elem(
4913 map_fd,
4914 &key as *const _ as *const _,
4915 &val as *const _ as *const _,
4916 0,
4917 )
4918 };
4919 if ret != 0 {
4920 trace!("bpf_map_update_elem failed cpu={cpu} fd={fd} ret={ret}");
4921 } else {
4922 trace!("mapped cpu={cpu} -> fd={fd}");
4923 }
4924 }
4925
4926 if failures > 0 {
4927 println!("membw tracking: failed to install {failures} counters");
4928 }
4930
4931 Ok(())
4932}
4933
4934fn setup_membw_tracking(skel: &mut OpenBpfSkel) -> Result<u64> {
4936 let pmumanager = PMUManager::new()?;
4937 let codename = &pmumanager.codename as &str;
4938
4939 let pmuspec = match codename {
4940 "amdzen1" | "amdzen2" | "amdzen3" => {
4941 trace!("found AMD codename {codename}");
4942 pmumanager.pmus.get("ls_any_fills_from_sys.mem_io_local")
4943 }
4944 "amdzen4" | "amdzen5" => {
4945 trace!("found AMD codename {codename}");
4946 pmumanager.pmus.get("ls_any_fills_from_sys.dram_io_all")
4947 }
4948
4949 "haswell" | "broadwell" | "broadwellde" | "broadwellx" | "skylake" | "skylakex"
4950 | "cascadelakex" | "arrowlake" | "meteorlake" | "sapphirerapids" | "emeraldrapids"
4951 | "graniterapids" => {
4952 trace!("found Intel codename {codename}");
4953 pmumanager.pmus.get("LONGEST_LAT_CACHE.MISS")
4954 }
4955
4956 _ => {
4957 trace!("found unknown codename {codename}");
4958 None
4959 }
4960 };
4961
4962 let spec = pmuspec.ok_or("not_found").unwrap();
4963 let config = (spec.umask << 8) | spec.event[0];
4964
4965 skel.maps.rodata_data.as_mut().unwrap().membw_event = config;
4967
4968 Ok(config)
4969}
4970
4971#[clap_main::clap_main]
4972fn main(opts: Opts) -> Result<()> {
4973 if opts.version {
4974 println!(
4975 "scx_layered {}",
4976 build_id::full_version(env!("CARGO_PKG_VERSION"))
4977 );
4978 return Ok(());
4979 }
4980
4981 if opts.help_stats {
4982 stats::server_data().describe_meta(&mut std::io::stdout(), None)?;
4983 return Ok(());
4984 }
4985
4986 let env_filter = EnvFilter::try_from_default_env()
4987 .or_else(|_| match EnvFilter::try_new(&opts.log_level) {
4988 Ok(filter) => Ok(filter),
4989 Err(e) => {
4990 eprintln!(
4991 "invalid log envvar: {}, using info, err is: {}",
4992 opts.log_level, e
4993 );
4994 EnvFilter::try_new("info")
4995 }
4996 })
4997 .unwrap_or_else(|_| EnvFilter::new("info"));
4998
4999 match tracing_subscriber::fmt()
5000 .with_env_filter(env_filter)
5001 .with_target(true)
5002 .with_thread_ids(true)
5003 .with_file(true)
5004 .with_line_number(true)
5005 .try_init()
5006 {
5007 Ok(()) => {}
5008 Err(e) => eprintln!("failed to init logger: {}", e),
5009 }
5010
5011 if opts.verbose > 0 {
5012 warn!("Setting verbose via -v is deprecated and will be an error in future releases.");
5013 }
5014
5015 if opts.no_load_frac_limit {
5016 warn!("--no-load-frac-limit is deprecated and noop");
5017 }
5018 if opts.layer_preempt_weight_disable != 0.0 {
5019 warn!("--layer-preempt-weight-disable is deprecated and noop");
5020 }
5021 if opts.layer_growth_weight_disable != 0.0 {
5022 warn!("--layer-growth-weight-disable is deprecated and noop");
5023 }
5024 if opts.local_llc_iteration {
5025 warn!("--local_llc_iteration is deprecated and noop");
5026 }
5027
5028 debug!("opts={:?}", &opts);
5029
5030 if let Some(run_id) = opts.run_id {
5031 info!("scx_layered run_id: {}", run_id);
5032 }
5033
5034 let shutdown = Arc::new(AtomicBool::new(false));
5035 let shutdown_clone = shutdown.clone();
5036 ctrlc::set_handler(move || {
5037 shutdown_clone.store(true, Ordering::Relaxed);
5038 })
5039 .context("Error setting Ctrl-C handler")?;
5040
5041 if let Some(intv) = opts.monitor.or(opts.stats) {
5042 let shutdown_copy = shutdown.clone();
5043 let stats_columns = opts.stats_columns;
5044 let stats_no_llc = opts.stats_no_llc;
5045 let jh = std::thread::spawn(move || {
5046 match stats::monitor(
5047 Duration::from_secs_f64(intv),
5048 shutdown_copy,
5049 stats_columns,
5050 stats_no_llc,
5051 ) {
5052 Ok(_) => {
5053 debug!("stats monitor thread finished successfully")
5054 }
5055 Err(error_object) => {
5056 warn!(
5057 "stats monitor thread finished because of an error {}",
5058 error_object
5059 )
5060 }
5061 }
5062 });
5063 if opts.monitor.is_some() {
5064 let _ = jh.join();
5065 return Ok(());
5066 }
5067 }
5068
5069 if let Some(path) = &opts.example {
5070 write_example_file(path)?;
5071 return Ok(());
5072 }
5073
5074 let mut layer_config = match opts.run_example {
5075 true => EXAMPLE_CONFIG.clone(),
5076 false => LayerConfig { specs: vec![] },
5077 };
5078
5079 for (idx, input) in opts.specs.iter().enumerate() {
5080 let specs = LayerSpec::parse(input)
5081 .context(format!("Failed to parse specs[{}] ({:?})", idx, input))?;
5082
5083 for spec in specs {
5084 match spec.template {
5085 Some(ref rule) => {
5086 let matches = expand_template(rule)?;
5087 if matches.is_empty() {
5090 layer_config.specs.push(spec);
5091 } else {
5092 for (mt, mask) in matches {
5093 let mut genspec = spec.clone();
5094
5095 genspec.cpuset = Some(mask);
5096
5097 for orterm in &mut genspec.matches {
5099 orterm.push(mt.clone());
5100 }
5101
5102 match &mt {
5103 LayerMatch::CgroupSuffix(cgroup) => genspec.name.push_str(cgroup),
5104 _ => bail!("Template match has unexpected type"),
5105 }
5106
5107 layer_config.specs.push(genspec);
5109 }
5110 }
5111 }
5112
5113 None => {
5114 layer_config.specs.push(spec);
5115 }
5116 }
5117 }
5118 }
5119
5120 for spec in layer_config.specs.iter_mut() {
5121 let common = spec.kind.common_mut();
5122
5123 if common.slice_us == 0 {
5124 common.slice_us = opts.slice_us;
5125 }
5126
5127 if common.weight == 0 {
5128 common.weight = DEFAULT_LAYER_WEIGHT;
5129 }
5130 common.weight = common.weight.clamp(MIN_LAYER_WEIGHT, MAX_LAYER_WEIGHT);
5131
5132 if common.preempt {
5133 if common.disallow_open_after_us.is_some() {
5134 warn!(
5135 "Preempt layer {} has non-null disallow_open_after_us, ignored",
5136 &spec.name
5137 );
5138 }
5139 if common.disallow_preempt_after_us.is_some() {
5140 warn!(
5141 "Preempt layer {} has non-null disallow_preempt_after_us, ignored",
5142 &spec.name
5143 );
5144 }
5145 common.disallow_open_after_us = Some(u64::MAX);
5146 common.disallow_preempt_after_us = Some(u64::MAX);
5147 } else {
5148 if common.disallow_open_after_us.is_none() {
5149 common.disallow_open_after_us = Some(*DFL_DISALLOW_OPEN_AFTER_US);
5150 }
5151
5152 if common.disallow_preempt_after_us.is_none() {
5153 common.disallow_preempt_after_us = Some(*DFL_DISALLOW_PREEMPT_AFTER_US);
5154 }
5155 }
5156
5157 if common.idle_smt.is_some() {
5158 warn!("Layer {} has deprecated flag \"idle_smt\"", &spec.name);
5159 }
5160
5161 if common.allow_node_aligned.is_some() {
5162 warn!("Layer {} has deprecated flag \"allow_node_aligned\", node-aligned tasks are now always dispatched on layer DSQs", &spec.name);
5163 }
5164 }
5165
5166 let membw_required = layer_config.specs.iter().any(|spec| match spec.kind {
5167 LayerKind::Confined { membw_gb, .. } | LayerKind::Grouped { membw_gb, .. } => {
5168 membw_gb.is_some()
5169 }
5170 LayerKind::Open { .. } => false,
5171 });
5172
5173 if opts.print_and_exit {
5174 println!("specs={}", serde_json::to_string_pretty(&layer_config)?);
5175 return Ok(());
5176 }
5177
5178 debug!("specs={}", serde_json::to_string_pretty(&layer_config)?);
5179 let hint_to_layer_map = verify_layer_specs(&layer_config.specs)?;
5180
5181 let mut open_object = MaybeUninit::uninit();
5182 loop {
5183 let mut sched = Scheduler::init(
5184 &opts,
5185 &layer_config.specs,
5186 &mut open_object,
5187 &hint_to_layer_map,
5188 membw_required,
5189 )?;
5190 if !sched.run(shutdown.clone())?.should_restart() {
5191 break;
5192 }
5193 }
5194
5195 Ok(())
5196}
5197
5198#[cfg(test)]
5199mod xnuma_tests {
5200 use super::*;
5201
5202 const THRESH: (f64, f64) = (0.6, 0.7);
5204 const DELTA: (f64, f64) = (0.2, 0.3);
5205
5206 #[test]
5211 fn test_activation_below_threshold() {
5212 let duty = vec![40.0, 40.0];
5214 let allocs = vec![96, 96];
5215 let gd = vec![true, true];
5216 let cur = vec![false, false];
5217 let result = xnuma_check_active(&duty, &allocs, THRESH, DELTA, &gd, &cur);
5218 assert!(!result[0]);
5220 assert!(!result[1]);
5221 }
5222
5223 #[test]
5224 fn test_activation_above_all_thresholds() {
5225 let duty = vec![90.0, 20.0];
5227 let allocs = vec![96, 96];
5228 let gd = vec![true, false];
5229 let cur = vec![false, false];
5230 let result = xnuma_check_active(&duty, &allocs, THRESH, DELTA, &gd, &cur);
5231 assert!(result[0]);
5233 assert!(!result[1]);
5235 }
5236
5237 #[test]
5238 fn test_activation_requires_growth_denied() {
5239 let duty = vec![90.0, 20.0];
5241 let allocs = vec![96, 96];
5242 let gd = vec![false, false]; let cur = vec![false, false];
5244 let result = xnuma_check_active(&duty, &allocs, THRESH, DELTA, &gd, &cur);
5245 assert!(!result[0]); }
5247
5248 #[test]
5249 fn test_symmetric_high_load_stays_closed() {
5250 let duty = vec![80.0, 80.0];
5252 let allocs = vec![96, 96];
5253 let gd = vec![true, true];
5254 let cur = vec![false, false];
5255 let result = xnuma_check_active(&duty, &allocs, THRESH, DELTA, &gd, &cur);
5256 assert!(!result[0]);
5258 assert!(!result[1]);
5259 }
5260
5261 #[test]
5262 fn test_hysteresis_stays_active() {
5263 let duty = vec![75.0, 20.0];
5265 let allocs = vec![96, 96];
5266 let gd = vec![true, false];
5267 let cur = vec![true, false]; let result = xnuma_check_active(&duty, &allocs, THRESH, DELTA, &gd, &cur);
5269 assert!(result[0]);
5273 }
5274
5275 #[test]
5276 fn test_hysteresis_stays_inactive() {
5277 let duty = vec![75.0, 20.0];
5279 let allocs = vec![96, 96];
5280 let gd = vec![true, false];
5281 let cur = vec![false, false]; let result = xnuma_check_active(&duty, &allocs, THRESH, DELTA, &gd, &cur);
5283 assert!(!result[0]);
5285 }
5286
5287 #[test]
5288 fn test_deactivation_load_drops() {
5289 let duty = vec![50.0, 50.0];
5291 let allocs = vec![96, 96];
5292 let gd = vec![true, true];
5293 let cur = vec![true, false];
5294 let result = xnuma_check_active(&duty, &allocs, THRESH, DELTA, &gd, &cur);
5295 assert!(!result[0]);
5297 }
5298
5299 #[test]
5300 fn test_deactivation_growth_succeeds() {
5301 let duty = vec![90.0, 20.0];
5303 let allocs = vec![96, 96];
5304 let gd = vec![false, false]; let cur = vec![true, false];
5306 let result = xnuma_check_active(&duty, &allocs, THRESH, DELTA, &gd, &cur);
5307 assert!(!result[0]);
5309 }
5310
5311 #[test]
5312 fn test_zero_alloc_with_duty_and_growth_denied() {
5313 let duty = vec![50.0, 0.0];
5315 let allocs = vec![0, 96];
5316 let gd = vec![true, false];
5317 let cur = vec![false, false];
5318 let result = xnuma_check_active(&duty, &allocs, THRESH, DELTA, &gd, &cur);
5319 assert!(result[0]); }
5321
5322 #[test]
5323 fn test_all_zero_alloc() {
5324 let duty = vec![0.0, 0.0];
5325 let allocs = vec![0, 0];
5326 let gd = vec![true, true];
5327 let cur = vec![true, true];
5328 let result = xnuma_check_active(&duty, &allocs, THRESH, DELTA, &gd, &cur);
5329 assert!(!result[0]);
5330 assert!(!result[1]);
5331 }
5332
5333 #[test]
5334 fn test_three_nodes_mixed() {
5335 let duty = vec![90.0, 40.0, 10.0];
5337 let allocs = vec![96, 96, 96];
5338 let gd = vec![true, true, false];
5339 let cur = vec![false, false, false];
5340 let result = xnuma_check_active(&duty, &allocs, THRESH, DELTA, &gd, &cur);
5341 assert!(result[0]);
5344 assert!(!result[1]);
5346 assert!(!result[2]);
5348 }
5349
5350 #[test]
5351 fn test_per_node_independence() {
5352 let duty = vec![90.0, 50.0];
5354 let allocs = vec![96, 96];
5355 let gd = vec![true, true];
5356 let cur = vec![true, true];
5357 let result = xnuma_check_active(&duty, &allocs, THRESH, DELTA, &gd, &cur);
5358 assert!(result[0]);
5362 assert!(!result[1]);
5364 }
5365
5366 #[test]
5371 fn test_rates_balanced_load() {
5372 let duty = vec![48.0, 48.0];
5374 let allocs = vec![96, 96];
5375 let result = xnuma_compute_rates(&duty, &allocs);
5376
5377 for src in 0..2 {
5379 for dst in 0..2 {
5380 assert_eq!(result.rates[src][dst], 0);
5381 }
5382 }
5383 }
5384
5385 #[test]
5386 fn test_rates_one_overloaded() {
5387 let duty = vec![80.0, 40.0];
5392 let allocs = vec![96, 96];
5393 let result = xnuma_compute_rates(&duty, &allocs);
5394
5395 assert!(result.rates[0][1] > 0);
5397 assert_eq!(result.rates[1][0], 0);
5399 assert_eq!(result.rates[0][0], 0);
5401 assert_eq!(result.rates[1][1], 0);
5402
5403 let expected_rate = (20.0 * XNUMA_RATE_DAMPEN * DUTY_CYCLE_SCALE) as u64;
5405 assert_eq!(result.rates[0][1], expected_rate);
5406 }
5407
5408 #[test]
5409 fn test_rates_asymmetric_allocation() {
5410 let duty = vec![48.0, 48.0];
5415 let allocs = vec![48, 144];
5416 let result = xnuma_compute_rates(&duty, &allocs);
5417
5418 let expected_rate = (24.0 * XNUMA_RATE_DAMPEN * DUTY_CYCLE_SCALE) as u64;
5419 assert_eq!(result.rates[0][1], expected_rate);
5420 assert_eq!(result.rates[1][0], 0);
5421 }
5422
5423 #[test]
5428 fn test_rates_three_nodes_one_source() {
5429 let duty = vec![120.0, 30.0, 30.0];
5436 let allocs = vec![96, 96, 96];
5437 let result = xnuma_compute_rates(&duty, &allocs);
5438
5439 let rate_01 = (30.0 * XNUMA_RATE_DAMPEN * DUTY_CYCLE_SCALE) as u64;
5443 let rate_02 = (30.0 * XNUMA_RATE_DAMPEN * DUTY_CYCLE_SCALE) as u64;
5444 assert_eq!(result.rates[0][1], rate_01);
5445 assert_eq!(result.rates[0][2], rate_02);
5446
5447 assert_eq!(result.rates[1][0], 0);
5449 assert_eq!(result.rates[2][0], 0);
5450 assert_eq!(result.rates[1][2], 0);
5451 assert_eq!(result.rates[2][1], 0);
5452 }
5453
5454 #[test]
5455 fn test_rates_three_nodes_unequal_deficit() {
5456 let duty = vec![120.0, 50.0, 10.0];
5463 let allocs = vec![96, 96, 96];
5464 let result = xnuma_compute_rates(&duty, &allocs);
5465
5466 let rate_01 = (10.0 * XNUMA_RATE_DAMPEN * DUTY_CYCLE_SCALE) as u64;
5470 let rate_02 = (50.0 * XNUMA_RATE_DAMPEN * DUTY_CYCLE_SCALE) as u64;
5471 assert_eq!(result.rates[0][1], rate_01);
5472 assert_eq!(result.rates[0][2], rate_02);
5473 }
5474
5475 #[test]
5476 fn test_rates_two_sources_one_sink() {
5477 let duty = vec![80.0, 70.0, 30.0];
5484 let allocs = vec![96, 96, 96];
5485 let result = xnuma_compute_rates(&duty, &allocs);
5486
5487 let rate_02 = (20.0 * XNUMA_RATE_DAMPEN * DUTY_CYCLE_SCALE) as u64;
5491 let rate_12 = (10.0 * XNUMA_RATE_DAMPEN * DUTY_CYCLE_SCALE) as u64;
5492 assert_eq!(result.rates[0][2], rate_02);
5493 assert_eq!(result.rates[1][2], rate_12);
5494
5495 assert_eq!(result.rates[0][1], 0);
5497 assert_eq!(result.rates[1][0], 0);
5498 }
5499
5500 #[test]
5505 fn test_conservation_per_source_outbound() {
5506 let duty = vec![100.0, 30.0, 50.0, 20.0];
5509 let allocs = vec![96, 96, 96, 96];
5510 let nr = 4;
5511 let result = xnuma_compute_rates(&duty, &allocs);
5512
5513 let total_duty: f64 = duty.iter().sum();
5514 let total_alloc: f64 = allocs.iter().map(|&a| a as f64).sum();
5515 let eq_ratio = total_duty / total_alloc;
5516
5517 for src in 0..nr {
5518 let expected = eq_ratio * allocs[src] as f64;
5519 let surplus = (duty[src] - expected).max(0.0);
5520 let total_outbound: u64 = (0..nr).map(|dst| result.rates[src][dst]).sum();
5521 let expected_rate = (surplus * XNUMA_RATE_DAMPEN * DUTY_CYCLE_SCALE) as u64;
5522 assert_eq!(
5523 total_outbound, expected_rate,
5524 "node {} outbound mismatch",
5525 src
5526 );
5527 }
5528 }
5529
5530 #[test]
5531 fn test_conservation_surplus_equals_deficit() {
5532 let duty = vec![100.0, 30.0, 50.0];
5534 let allocs = vec![96, 96, 96];
5535
5536 let total_duty: f64 = duty.iter().sum();
5537 let total_alloc: f64 = allocs.iter().map(|&a| a as f64).sum();
5538 let eq_ratio = total_duty / total_alloc;
5539
5540 let mut total_surplus = 0.0f64;
5541 let mut total_deficit = 0.0f64;
5542 for i in 0..3 {
5543 let expected = eq_ratio * allocs[i] as f64;
5544 let delta = duty[i] - expected;
5545 if delta > 0.0 {
5546 total_surplus += delta;
5547 } else {
5548 total_deficit += -delta;
5549 }
5550 }
5551 assert!((total_surplus - total_deficit).abs() < 1e-10);
5552 }
5553
5554 #[test]
5555 fn test_self_rates_always_zero() {
5556 let duty = vec![100.0, 30.0, 50.0];
5557 let allocs = vec![96, 96, 96];
5558 let result = xnuma_compute_rates(&duty, &allocs);
5559
5560 for nid in 0..3 {
5561 assert_eq!(result.rates[nid][nid], 0);
5562 }
5563 }
5564
5565 #[test]
5566 fn test_deficit_nodes_have_zero_outbound() {
5567 let duty = vec![100.0, 20.0, 30.0, 50.0];
5569 let allocs = vec![96, 96, 96, 96];
5570 let result = xnuma_compute_rates(&duty, &allocs);
5571
5572 for nid in 0..4 {
5576 let outbound: u64 = (0..4).map(|dst| result.rates[nid][dst]).sum();
5577 if outbound == 0 {
5578 for dst in 0..4 {
5579 assert_eq!(
5580 result.rates[nid][dst], 0,
5581 "deficit node {} has non-zero rate to {}",
5582 nid, dst
5583 );
5584 }
5585 }
5586 }
5587 }
5588
5589 #[test]
5594 fn test_rates_zero_duty_everywhere() {
5595 let duty = vec![0.0, 0.0];
5596 let allocs = vec![96, 96];
5597 let result = xnuma_compute_rates(&duty, &allocs);
5598
5599 for src in 0..2 {
5600 for dst in 0..2 {
5601 assert_eq!(result.rates[src][dst], 0);
5602 }
5603 }
5604 }
5605
5606 #[test]
5607 fn test_rates_zero_alloc() {
5608 let duty = vec![50.0, 50.0];
5609 let allocs = vec![0, 0];
5610 let result = xnuma_compute_rates(&duty, &allocs);
5611
5612 for src in 0..2 {
5614 for dst in 0..2 {
5615 assert_eq!(result.rates[src][dst], 0);
5616 }
5617 }
5618 }
5619
5620 #[test]
5621 fn test_rates_all_load_one_node() {
5622 let duty = vec![96.0, 0.0];
5628 let allocs = vec![96, 96];
5629 let result = xnuma_compute_rates(&duty, &allocs);
5630
5631 let expected_rate = (48.0 * XNUMA_RATE_DAMPEN * DUTY_CYCLE_SCALE) as u64;
5632 assert_eq!(result.rates[0][1], expected_rate);
5633 }
5634
5635 #[test]
5636 fn test_rates_single_node() {
5637 let duty = vec![96.0];
5639 let allocs = vec![96];
5640 let result = xnuma_compute_rates(&duty, &allocs);
5641
5642 assert_eq!(result.rates[0][0], 0);
5643 }
5644
5645 #[test]
5646 fn test_rates_one_node_zero_alloc() {
5647 let duty = vec![80.0, 0.0];
5652 let allocs = vec![96, 0];
5653 let result = xnuma_compute_rates(&duty, &allocs);
5654
5655 assert_eq!(result.rates[0][1], 0);
5659 assert_eq!(result.rates[1][0], 0);
5660 }
5661
5662 #[test]
5667 fn test_rate_scaling() {
5668 let duty = vec![80.0, 40.0];
5670 let allocs = vec![96, 96];
5671 let result = xnuma_compute_rates(&duty, &allocs);
5672
5673 let expected = (20.0 * XNUMA_RATE_DAMPEN * DUTY_CYCLE_SCALE) as u64;
5676 assert_eq!(result.rates[0][1], expected);
5677 assert_eq!(expected, 10 * (1 << 20));
5678 }
5679
5680 #[test]
5681 fn test_rates_tiny_imbalance() {
5682 let duty = vec![48.001, 47.999];
5684 let allocs = vec![96, 96];
5685 let result = xnuma_compute_rates(&duty, &allocs);
5686
5687 assert!(result.rates[0][1] > 0);
5689 assert!(result.rates[0][1] < (1 << 20)); }
5691
5692 #[test]
5693 fn test_rates_large_values() {
5694 let duty = vec![300.0, 100.0, 50.0, 50.0, 50.0, 50.0, 50.0, 50.0];
5696 let allocs = vec![96, 96, 96, 96, 96, 96, 96, 96];
5697 let result = xnuma_compute_rates(&duty, &allocs);
5698
5699 assert!(result.rates[0][2] > 0); assert!(result.rates[1][2] > 0); let total_duty: f64 = duty.iter().sum();
5707 let total_alloc: f64 = allocs.iter().map(|&a| a as f64).sum();
5708 let eq_ratio = total_duty / total_alloc;
5709 let nr = 8;
5710 for src in 0..nr {
5711 let surplus = (duty[src] - eq_ratio * allocs[src] as f64).max(0.0);
5712 let outbound: u64 = (0..nr).map(|dst| result.rates[src][dst]).sum();
5713 let expected = (surplus * XNUMA_RATE_DAMPEN * DUTY_CYCLE_SCALE) as u64;
5714 let tolerance = nr as u64; assert!(
5716 outbound.abs_diff(expected) <= tolerance,
5717 "node {} outbound {} vs expected {}, diff {}",
5718 src,
5719 outbound,
5720 expected,
5721 outbound.abs_diff(expected)
5722 );
5723 }
5724 }
5725
5726 #[test]
5731 fn test_hysteresis_cycle() {
5732 let allocs = vec![96, 96];
5733 let gd = vec![true, false]; let active =
5737 xnuma_check_active(&[40.0, 40.0], &allocs, THRESH, DELTA, &gd, &[false, false]);
5738 assert!(!active[0]); let active =
5742 xnuma_check_active(&[90.0, 20.0], &allocs, THRESH, DELTA, &gd, &[false, false]);
5743 assert!(active[0]); let active = xnuma_check_active(&[75.0, 20.0], &allocs, THRESH, DELTA, &gd, &[true, false]);
5747 assert!(active[0]); let active = xnuma_check_active(&[50.0, 50.0], &allocs, THRESH, DELTA, &gd, &[true, false]);
5751 assert!(!active[0]); }
5753
5754 #[test]
5755 fn test_hysteresis_growth_toggle() {
5756 let allocs = vec![96, 96];
5758 let gd_denied = vec![true, false];
5759 let gd_ok = vec![false, false];
5760
5761 let active = xnuma_check_active(
5763 &[90.0, 20.0],
5764 &allocs,
5765 THRESH,
5766 DELTA,
5767 &gd_denied,
5768 &[false, false],
5769 );
5770 assert!(active[0]);
5771
5772 let active = xnuma_check_active(
5774 &[90.0, 20.0],
5775 &allocs,
5776 THRESH,
5777 DELTA,
5778 &gd_ok,
5779 &[true, false],
5780 );
5781 assert!(!active[0]); }
5783
5784 #[test]
5789 fn test_proportional_sink_distribution() {
5790 let duty = vec![180.0, 20.0, 40.0, 0.0];
5799 let allocs = vec![96, 96, 96, 96];
5800 let result = xnuma_compute_rates(&duty, &allocs);
5801
5802 assert_eq!(
5806 result.rates[0][1],
5807 (40.0 * XNUMA_RATE_DAMPEN * DUTY_CYCLE_SCALE) as u64
5808 );
5809 assert_eq!(
5810 result.rates[0][2],
5811 (20.0 * XNUMA_RATE_DAMPEN * DUTY_CYCLE_SCALE) as u64
5812 );
5813 assert_eq!(
5814 result.rates[0][3],
5815 (60.0 * XNUMA_RATE_DAMPEN * DUTY_CYCLE_SCALE) as u64
5816 );
5817
5818 let total_from_n0: u64 = (0..4).map(|dst| result.rates[0][dst]).sum();
5820 assert_eq!(
5821 total_from_n0,
5822 (120.0 * XNUMA_RATE_DAMPEN * DUTY_CYCLE_SCALE) as u64
5823 );
5824 }
5825}
5826
5827#[cfg(test)]
5828mod util_compensation_tests {
5829 fn compute_scale(delta_total: u64, overhead: u64) -> f64 {
5831 let available = delta_total.saturating_sub(overhead);
5832 if available > 0 {
5833 (delta_total as f64 / available as f64).clamp(1.0, 20.0)
5834 } else {
5835 1.0
5836 }
5837 }
5838
5839 fn scaled_aggregate(
5841 deltas: &[Vec<u64>],
5842 scales: &[f64],
5843 nr_layers: usize,
5844 elapsed: f64,
5845 ) -> Vec<f64> {
5846 (0..nr_layers)
5847 .map(|layer| {
5848 let mut sum = 0.0f64;
5849 for (cpu, cpu_deltas) in deltas.iter().enumerate() {
5850 sum += cpu_deltas[layer] as f64 * scales[cpu];
5851 }
5852 sum / 1_000_000_000.0 / elapsed
5853 })
5854 .collect()
5855 }
5856
5857 #[test]
5858 fn test_scale_no_overhead() {
5859 assert!((compute_scale(1000, 0) - 1.0).abs() < 0.01);
5861 }
5862
5863 #[test]
5864 fn test_scale_half_overhead() {
5865 assert!((compute_scale(1000, 500) - 2.0).abs() < 0.01);
5867 }
5868
5869 #[test]
5870 fn test_scale_high_overhead() {
5871 assert!((compute_scale(1000, 900) - 10.0).abs() < 0.01);
5873 }
5874
5875 #[test]
5876 fn test_scale_very_high_overhead() {
5877 assert!((compute_scale(1000, 950) - 20.0).abs() < 0.01);
5879 }
5880
5881 #[test]
5882 fn test_scale_clamped_at_max() {
5883 assert!((compute_scale(1000, 980) - 20.0).abs() < 0.01);
5885 }
5886
5887 #[test]
5888 fn test_scale_all_overhead() {
5889 assert!((compute_scale(1000, 1000) - 1.0).abs() < 0.01);
5891 }
5892
5893 #[test]
5894 fn test_scale_idle_cpu() {
5895 assert!((compute_scale(0, 0) - 1.0).abs() < 0.01);
5897 }
5898
5899 #[test]
5900 fn test_scale_small_overhead_mostly_idle() {
5901 let s = compute_scale(10000, 100);
5904 assert!((s - 1.01).abs() < 0.01, "expected ~1.01, got {}", s);
5905 }
5906
5907 #[test]
5908 fn test_uniform_scale_matches_unscaled() {
5909 let deltas = vec![vec![1_000_000_000u64; 2]; 4];
5910 let scales = vec![1.0; 4];
5911 let result = scaled_aggregate(&deltas, &scales, 2, 1.0);
5912 assert!((result[0] - 4.0).abs() < 0.01);
5913 assert!((result[1] - 4.0).abs() < 0.01);
5914 }
5915
5916 #[test]
5917 fn test_hot_cpu_weighted_more() {
5918 let deltas = vec![vec![900_000_000, 0], vec![100_000_000, 0]];
5922 let scales = vec![2.0, 1.0];
5923 let result = scaled_aggregate(&deltas, &scales, 2, 1.0);
5924 assert!(
5925 (result[0] - 1.9).abs() < 0.01,
5926 "expected 1.9, got {}",
5927 result[0]
5928 );
5929 }
5930
5931 #[test]
5932 fn test_cold_cpu_weighted_less() {
5933 let deltas = vec![vec![100_000_000, 0], vec![900_000_000, 0]];
5934 let scales = vec![2.0, 1.0];
5935 let result = scaled_aggregate(&deltas, &scales, 2, 1.0);
5936 assert!(
5937 (result[0] - 1.1).abs() < 0.01,
5938 "expected 1.1, got {}",
5939 result[0]
5940 );
5941 }
5942
5943 #[test]
5944 fn test_no_usage_no_compensation() {
5945 let deltas = vec![vec![0u64; 2]; 4];
5946 let scales = vec![5.0; 4];
5947 let result = scaled_aggregate(&deltas, &scales, 2, 1.0);
5948 assert_eq!(result[0], 0.0);
5949 assert_eq!(result[1], 0.0);
5950 }
5951
5952 #[test]
5953 fn test_multilayer_independent_scaling() {
5954 let deltas = vec![
5955 vec![800_000_000, 200_000_000],
5956 vec![200_000_000, 800_000_000],
5957 ];
5958 let scales = vec![3.0, 1.0];
5959 let result = scaled_aggregate(&deltas, &scales, 2, 1.0);
5960 assert!((result[0] - 2.6).abs() < 0.01);
5961 assert!((result[1] - 1.4).abs() < 0.01);
5962 }
5963
5964 #[test]
5965 fn test_elapsed_time_normalization() {
5966 let deltas = vec![vec![500_000_000u64; 1]; 1];
5967 let scales = vec![1.0];
5968 let result = scaled_aggregate(&deltas, &scales, 1, 2.0);
5969 assert!((result[0] - 0.25).abs() < 0.01);
5970 }
5971
5972 #[test]
5973 fn test_many_cpus_mixed_scales() {
5974 let deltas = vec![vec![1_000_000_000u64; 1]; 8];
5975 let scales = vec![2.5, 1.0, 2.5, 1.0, 2.5, 1.0, 2.5, 1.0];
5976 let result = scaled_aggregate(&deltas, &scales, 1, 1.0);
5977 assert!((result[0] - 14.0).abs() < 0.01);
5978 }
5979
5980 #[test]
5981 fn test_compensated_ge_raw() {
5982 let deltas = vec![
5984 vec![500_000_000u64; 3],
5985 vec![300_000_000; 3],
5986 vec![200_000_000; 3],
5987 ];
5988 let scales_raw = vec![1.0; 3];
5989 let scales_comp = vec![1.5, 2.0, 1.0];
5990 let raw = scaled_aggregate(&deltas, &scales_raw, 3, 1.0);
5991 let comp = scaled_aggregate(&deltas, &scales_comp, 3, 1.0);
5992 for i in 0..3 {
5993 assert!(
5994 comp[i] >= raw[i] - 0.001,
5995 "layer {}: comp {} < raw {}",
5996 i,
5997 comp[i],
5998 raw[i]
5999 );
6000 }
6001 }
6002}