1mod bpf_skel;
6mod stats;
7
8use std::collections::BTreeMap;
9use std::collections::BTreeSet;
10use std::collections::HashMap;
11use std::collections::HashSet;
12use std::ffi::CString;
13use std::fs;
14use std::io::Write;
15use std::mem::MaybeUninit;
16use std::ops::Sub;
17use std::path::Path;
18use std::path::PathBuf;
19use std::sync::atomic::AtomicBool;
20use std::sync::atomic::Ordering;
21use std::sync::Arc;
22use std::thread::ThreadId;
23use std::time::Duration;
24use std::time::Instant;
25
26use inotify::{Inotify, WatchMask};
27use std::os::unix::io::AsRawFd;
28
29use anyhow::anyhow;
30use anyhow::bail;
31use anyhow::Context;
32use anyhow::Result;
33pub use bpf_skel::*;
34use clap::Parser;
35use crossbeam::channel::Receiver;
36use crossbeam::select;
37use lazy_static::lazy_static;
38use libbpf_rs::libbpf_sys;
39use libbpf_rs::AsRawLibbpf;
40use libbpf_rs::MapCore as _;
41use libbpf_rs::OpenObject;
42use libbpf_rs::ProgramInput;
43use nix::sched::CpuSet;
44use nvml_wrapper::error::NvmlError;
45use nvml_wrapper::Nvml;
46use once_cell::sync::OnceCell;
47use regex::Regex;
48use scx_layered::alloc::{unified_alloc, LayerAlloc, LayerDemand};
49use scx_layered::*;
50use scx_raw_pmu::PMUManager;
51use scx_stats::prelude::*;
52use scx_utils::build_id;
53use scx_utils::compat;
54use scx_utils::init_libbpf_logging;
55use scx_utils::libbpf_clap_opts::LibbpfOpts;
56use scx_utils::perf;
57use scx_utils::pm::{cpu_idle_resume_latency_supported, update_cpu_idle_resume_latency};
58use scx_utils::read_netdevs;
59use scx_utils::scx_enums;
60use scx_utils::scx_ops_attach;
61use scx_utils::scx_ops_load;
62use scx_utils::scx_ops_open;
63use scx_utils::uei_exited;
64use scx_utils::uei_report;
65use scx_utils::CoreType;
66use scx_utils::Cpumask;
67use scx_utils::Llc;
68use scx_utils::NetDev;
69use scx_utils::Topology;
70use scx_utils::TopologyArgs;
71use scx_utils::UserExitInfo;
72use scx_utils::NR_CPUS_POSSIBLE;
73use scx_utils::NR_CPU_IDS;
74use stats::LayerStats;
75use stats::StatsReq;
76use stats::StatsRes;
77use stats::SysStats;
78use std::collections::VecDeque;
79use sysinfo::{Pid, ProcessRefreshKind, ProcessesToUpdate, System};
80use tracing::{debug, error, info, trace, warn};
81use tracing_subscriber::filter::EnvFilter;
82use walkdir::WalkDir;
83
84const SCHEDULER_NAME: &str = "scx_layered";
85const MAX_PATH: usize = bpf_intf::consts_MAX_PATH as usize;
86const MAX_COMM: usize = bpf_intf::consts_MAX_COMM as usize;
87const MAX_LAYER_WEIGHT: u32 = bpf_intf::consts_MAX_LAYER_WEIGHT;
88const MIN_LAYER_WEIGHT: u32 = bpf_intf::consts_MIN_LAYER_WEIGHT;
89const MAX_LAYER_MATCH_ORS: usize = bpf_intf::consts_MAX_LAYER_MATCH_ORS as usize;
90const MAX_LAYER_NAME: usize = bpf_intf::consts_MAX_LAYER_NAME as usize;
91const MAX_LAYERS: usize = bpf_intf::consts_MAX_LAYERS as usize;
92const DEFAULT_LAYER_WEIGHT: u32 = bpf_intf::consts_DEFAULT_LAYER_WEIGHT;
93const USAGE_HALF_LIFE: u32 = bpf_intf::consts_USAGE_HALF_LIFE;
94const USAGE_HALF_LIFE_F64: f64 = USAGE_HALF_LIFE as f64 / 1_000_000_000.0;
95
96const LAYER_USAGE_OWNED: usize = bpf_intf::layer_usage_LAYER_USAGE_OWNED as usize;
97const LAYER_USAGE_OPEN: usize = bpf_intf::layer_usage_LAYER_USAGE_OPEN as usize;
98const LAYER_USAGE_SUM_UPTO: usize = bpf_intf::layer_usage_LAYER_USAGE_SUM_UPTO as usize;
99const LAYER_USAGE_PROTECTED: usize = bpf_intf::layer_usage_LAYER_USAGE_PROTECTED as usize;
100const LAYER_USAGE_PROTECTED_PREEMPT: usize =
101 bpf_intf::layer_usage_LAYER_USAGE_PROTECTED_PREEMPT as usize;
102const NR_LAYER_USAGES: usize = bpf_intf::layer_usage_NR_LAYER_USAGES as usize;
103
104const NR_GSTATS: usize = bpf_intf::global_stat_id_NR_GSTATS as usize;
105const NR_LSTATS: usize = bpf_intf::layer_stat_id_NR_LSTATS as usize;
106const NR_LLC_LSTATS: usize = bpf_intf::llc_layer_stat_id_NR_LLC_LSTATS as usize;
107
108const NR_LAYER_MATCH_KINDS: usize = bpf_intf::layer_match_kind_NR_LAYER_MATCH_KINDS as usize;
109
110static NVML: OnceCell<Nvml> = OnceCell::new();
111
112fn nvml() -> Result<&'static Nvml, NvmlError> {
113 NVML.get_or_try_init(Nvml::init)
114}
115
116lazy_static! {
117 static ref USAGE_DECAY: f64 = 0.5f64.powf(1.0 / USAGE_HALF_LIFE_F64);
118 static ref DFL_DISALLOW_OPEN_AFTER_US: u64 = 2 * scx_enums.SCX_SLICE_DFL / 1000;
119 static ref DFL_DISALLOW_PREEMPT_AFTER_US: u64 = 4 * scx_enums.SCX_SLICE_DFL / 1000;
120 static ref EXAMPLE_CONFIG: LayerConfig = serde_json::from_str(
121 r#"[
122 {
123 "name": "batch",
124 "comment": "tasks under system.slice or tasks with nice value > 0",
125 "matches": [[{"CgroupPrefix": "system.slice/"}], [{"NiceAbove": 0}]],
126 "kind": {"Confined": {
127 "util_range": [0.8, 0.9], "cpus_range": [0, 16],
128 "min_exec_us": 1000, "slice_us": 20000, "weight": 100,
129 "xllc_mig_min_us": 1000.0, "perf": 1024
130 }}
131 },
132 {
133 "name": "immediate",
134 "comment": "tasks under workload.slice with nice value < 0",
135 "matches": [[{"CgroupPrefix": "workload.slice/"}, {"NiceBelow": 0}]],
136 "kind": {"Open": {
137 "min_exec_us": 100, "yield_ignore": 0.25, "slice_us": 20000,
138 "preempt": true, "exclusive": true,
139 "prev_over_idle_core": true,
140 "weight": 100, "perf": 1024
141 }}
142 },
143 {
144 "name": "stress-ng",
145 "comment": "stress-ng test layer",
146 "matches": [[{"CommPrefix": "stress-ng"}], [{"PcommPrefix": "stress-ng"}]],
147 "kind": {"Confined": {
148 "util_range": [0.2, 0.8],
149 "min_exec_us": 800, "preempt": true, "slice_us": 800,
150 "weight": 100, "growth_algo": "Topo", "perf": 1024
151 }}
152 },
153 {
154 "name": "normal",
155 "comment": "the rest",
156 "matches": [[]],
157 "kind": {"Grouped": {
158 "util_range": [0.5, 0.6], "util_includes_open_cputime": true,
159 "min_exec_us": 200, "slice_us": 20000, "weight": 100,
160 "xllc_mig_min_us": 100.0, "growth_algo": "Linear", "perf": 1024
161 }}
162 }
163 ]"#,
164 )
165 .unwrap();
166}
167
168#[derive(Debug, Parser)]
470#[command(verbatim_doc_comment)]
471struct Opts {
472 #[clap(short = 'v', long, action = clap::ArgAction::Count)]
474 verbose: u8,
475
476 #[clap(short = 's', long, default_value = "20000")]
478 slice_us: u64,
479
480 #[clap(short = 'M', long, default_value = "0")]
485 max_exec_us: u64,
486
487 #[clap(short = 'i', long, default_value = "0.1")]
489 interval: f64,
490
491 #[clap(short = 'n', long, default_value = "false")]
494 no_load_frac_limit: bool,
495
496 #[clap(long, default_value = "0")]
498 exit_dump_len: u32,
499
500 #[clap(long, default_value = "info")]
503 log_level: String,
504
505 #[arg(short = 't', long, num_args = 0..=1, default_missing_value = "true", require_equals = true)]
509 disable_topology: Option<bool>,
510
511 #[clap(long)]
513 monitor_disable: bool,
514
515 #[clap(short = 'e', long)]
517 example: Option<String>,
518
519 #[clap(long, default_value = "0.0")]
523 layer_preempt_weight_disable: f64,
524
525 #[clap(long, default_value = "0.0")]
529 layer_growth_weight_disable: f64,
530
531 #[clap(long)]
533 stats: Option<f64>,
534
535 #[clap(long)]
538 monitor: Option<f64>,
539
540 #[clap(long, default_value = "95")]
542 stats_columns: usize,
543
544 #[clap(long)]
546 stats_no_llc: bool,
547
548 #[clap(long)]
550 run_example: bool,
551
552 #[clap(long)]
554 allow_partial_core: bool,
555
556 #[clap(long, default_value = "false")]
559 local_llc_iteration: bool,
560
561 #[clap(long, default_value = "10000")]
566 lo_fb_wait_us: u64,
567
568 #[clap(long, default_value = ".05")]
571 lo_fb_share: f64,
572
573 #[clap(long, default_value = "false")]
575 disable_antistall: bool,
576
577 #[clap(long, default_value = "false")]
579 enable_gpu_affinitize: bool,
580
581 #[clap(long, default_value = "900")]
584 gpu_affinitize_secs: u64,
585
586 #[clap(long, default_value = "false")]
591 enable_match_debug: bool,
592
593 #[clap(long, default_value = "3")]
595 antistall_sec: u64,
596
597 #[clap(long, default_value = "false")]
599 enable_gpu_support: bool,
600
601 #[clap(long, default_value = "3")]
609 gpu_kprobe_level: u64,
610
611 #[clap(long, default_value = "false")]
615 util_compensation: bool,
616
617 #[clap(long, default_value = "false")]
619 netdev_irq_balance: bool,
620
621 #[clap(long, default_value = "false")]
623 disable_queued_wakeup: bool,
624
625 #[clap(long, default_value = "false")]
627 disable_percpu_kthread_preempt: bool,
628
629 #[clap(long, default_value = "false")]
633 percpu_kthread_preempt_all: bool,
634
635 #[clap(short = 'V', long, action = clap::ArgAction::SetTrue)]
637 version: bool,
638
639 #[clap(long)]
641 run_id: Option<u64>,
642
643 #[clap(long)]
645 help_stats: bool,
646
647 specs: Vec<String>,
649
650 #[clap(long, default_value = "2000")]
653 layer_refresh_ms_avgruntime: u64,
654
655 #[clap(long, default_value = "")]
657 task_hint_map: String,
658
659 #[clap(long, default_value = "false")]
661 print_and_exit: bool,
662
663 #[clap(long, default_value = "")]
665 hi_fb_thread_name: String,
666
667 #[clap(flatten, next_help_heading = "Topology Options")]
668 topology: TopologyArgs,
669
670 #[clap(flatten, next_help_heading = "Libbpf Options")]
671 pub libbpf: LibbpfOpts,
672}
673
674#[derive(Debug, Clone)]
676enum CgroupEvent {
677 Created {
678 path: String,
679 cgroup_id: u64, match_bitmap: u64, },
682 Removed {
683 path: String,
684 cgroup_id: u64, },
686}
687
688fn read_total_cpu(reader: &fb_procfs::ProcReader) -> Result<fb_procfs::CpuStat> {
689 reader
690 .read_stat()
691 .context("Failed to read procfs")?
692 .total_cpu
693 .ok_or_else(|| anyhow!("Could not read total cpu stat in proc"))
694}
695
696fn calc_util(curr: &fb_procfs::CpuStat, prev: &fb_procfs::CpuStat) -> Result<f64> {
697 match (curr, prev) {
698 (
699 fb_procfs::CpuStat {
700 user_usec: Some(curr_user),
701 nice_usec: Some(curr_nice),
702 system_usec: Some(curr_system),
703 idle_usec: Some(curr_idle),
704 iowait_usec: Some(curr_iowait),
705 irq_usec: Some(curr_irq),
706 softirq_usec: Some(curr_softirq),
707 stolen_usec: Some(curr_stolen),
708 ..
709 },
710 fb_procfs::CpuStat {
711 user_usec: Some(prev_user),
712 nice_usec: Some(prev_nice),
713 system_usec: Some(prev_system),
714 idle_usec: Some(prev_idle),
715 iowait_usec: Some(prev_iowait),
716 irq_usec: Some(prev_irq),
717 softirq_usec: Some(prev_softirq),
718 stolen_usec: Some(prev_stolen),
719 ..
720 },
721 ) => {
722 let idle_usec = curr_idle.saturating_sub(*prev_idle);
723 let iowait_usec = curr_iowait.saturating_sub(*prev_iowait);
724 let user_usec = curr_user.saturating_sub(*prev_user);
725 let system_usec = curr_system.saturating_sub(*prev_system);
726 let nice_usec = curr_nice.saturating_sub(*prev_nice);
727 let irq_usec = curr_irq.saturating_sub(*prev_irq);
728 let softirq_usec = curr_softirq.saturating_sub(*prev_softirq);
729 let stolen_usec = curr_stolen.saturating_sub(*prev_stolen);
730
731 let busy_usec =
732 user_usec + system_usec + nice_usec + irq_usec + softirq_usec + stolen_usec;
733 let total_usec = idle_usec + busy_usec + iowait_usec;
734 if total_usec > 0 {
735 Ok(((busy_usec as f64) / (total_usec as f64)).clamp(0.0, 1.0))
736 } else {
737 Ok(1.0)
738 }
739 }
740 _ => bail!("Missing stats in cpustat"),
741 }
742}
743
744fn copy_into_cstr(dst: &mut [i8], src: &str) {
745 let cstr = CString::new(src).unwrap();
746 let bytes = unsafe { std::mem::transmute::<&[u8], &[i8]>(cstr.as_bytes_with_nul()) };
747 dst[0..bytes.len()].copy_from_slice(bytes);
748}
749
750#[allow(clippy::needless_range_loop)]
751fn read_cpu_ctxs(skel: &BpfSkel) -> Result<Vec<bpf_intf::cpu_ctx>> {
752 let mut cpu_ctxs = vec![];
753 let cpu_ctxs_vec = skel
754 .maps
755 .cpu_ctxs
756 .lookup_percpu(&0u32.to_ne_bytes(), libbpf_rs::MapFlags::ANY)
757 .context("Failed to lookup cpu_ctx")?
758 .unwrap();
759 for cpu in 0..*NR_CPUS_POSSIBLE {
760 cpu_ctxs.push(
761 *plain::from_bytes(cpu_ctxs_vec[cpu].as_slice())
762 .expect("cpu_ctx: short or misaligned buffer"),
763 );
764 }
765 Ok(cpu_ctxs)
766}
767
768#[derive(Clone, Debug)]
769struct BpfStats {
770 gstats: Vec<u64>,
771 lstats: Vec<Vec<u64>>,
772 lstats_sums: Vec<u64>,
773 llc_lstats: Vec<Vec<Vec<u64>>>, }
775
776impl BpfStats {
777 #[allow(clippy::needless_range_loop)]
778 fn read(skel: &BpfSkel, cpu_ctxs: &[bpf_intf::cpu_ctx]) -> Self {
779 let nr_layers = skel.maps.rodata_data.as_ref().unwrap().nr_layers as usize;
780 let nr_llcs = skel.maps.rodata_data.as_ref().unwrap().nr_llcs as usize;
781 let mut gstats = vec![0u64; NR_GSTATS];
782 let mut lstats = vec![vec![0u64; NR_LSTATS]; nr_layers];
783 let mut llc_lstats = vec![vec![vec![0u64; NR_LLC_LSTATS]; nr_llcs]; nr_layers];
784
785 for cpu in 0..*NR_CPUS_POSSIBLE {
786 for (stat, value) in gstats.iter_mut().enumerate() {
787 *value += cpu_ctxs[cpu].gstats[stat];
788 }
789 for (layer, layer_stats) in lstats.iter_mut().enumerate() {
790 for (stat, value) in layer_stats.iter_mut().enumerate() {
791 *value += cpu_ctxs[cpu].lstats[layer][stat];
792 }
793 }
794 }
795
796 let mut lstats_sums = vec![0u64; NR_LSTATS];
797 for layer_stats in lstats.iter() {
798 for (stat, value) in lstats_sums.iter_mut().enumerate() {
799 *value += layer_stats[stat];
800 }
801 }
802
803 for llc_id in 0..nr_llcs {
804 let v = skel
810 .maps
811 .llc_data
812 .lookup(&(llc_id as u32).to_ne_bytes(), libbpf_rs::MapFlags::ANY)
813 .unwrap()
814 .unwrap();
815 let llcc: &bpf_intf::llc_ctx =
816 plain::from_bytes(v.as_slice()).expect("llc_ctx: short or misaligned buffer");
817
818 for (layer_id, layer_llc_stats) in llc_lstats.iter_mut().enumerate() {
819 for (stat_id, stat_value) in layer_llc_stats[llc_id].iter_mut().enumerate() {
820 *stat_value = llcc.lstats[layer_id][stat_id];
821 }
822 }
823 }
824
825 Self {
826 gstats,
827 lstats,
828 lstats_sums,
829 llc_lstats,
830 }
831 }
832}
833
834impl<'b> Sub<&'b BpfStats> for &BpfStats {
835 type Output = BpfStats;
836
837 fn sub(self, rhs: &'b BpfStats) -> BpfStats {
838 let vec_sub = |l: &[u64], r: &[u64]| l.iter().zip(r.iter()).map(|(l, r)| *l - *r).collect();
839 BpfStats {
840 gstats: vec_sub(&self.gstats, &rhs.gstats),
841 lstats: self
842 .lstats
843 .iter()
844 .zip(rhs.lstats.iter())
845 .map(|(l, r)| vec_sub(l, r))
846 .collect(),
847 lstats_sums: vec_sub(&self.lstats_sums, &rhs.lstats_sums),
848 llc_lstats: self
849 .llc_lstats
850 .iter()
851 .zip(rhs.llc_lstats.iter())
852 .map(|(l_layer, r_layer)| {
853 l_layer
854 .iter()
855 .zip(r_layer.iter())
856 .map(|(l_llc, r_llc)| {
857 let (l_llc, mut r_llc) = (l_llc.clone(), r_llc.clone());
858 r_llc[bpf_intf::llc_layer_stat_id_LLC_LSTAT_LAT as usize] = 0;
860 vec_sub(&l_llc, &r_llc)
861 })
862 .collect()
863 })
864 .collect(),
865 }
866 }
867}
868
869#[derive(Clone, Debug)]
870struct Stats {
871 at: Instant,
872 elapsed: Duration,
873 topo: Arc<Topology>,
874 nr_layers: usize,
875 nr_layer_tasks: Vec<usize>,
876 layer_nr_node_pinned_tasks: Vec<Vec<u64>>,
877
878 total_util: f64, layer_utils: Vec<Vec<f64>>,
880 layer_node_pinned_utils: Vec<Vec<f64>>,
881 prev_layer_node_pinned_usages: Vec<Vec<u64>>,
882 layer_node_utils: Vec<Vec<f64>>,
883 prev_layer_node_usages: Vec<Vec<u64>>,
884 layer_node_duty_sums: Vec<Vec<f64>>, prev_layer_node_duty_raw: Vec<Vec<u64>>, layer_membws: Vec<Vec<f64>>, prev_layer_membw_agg: Vec<Vec<u64>>, cpu_busy: f64, prev_total_cpu: fb_procfs::CpuStat,
892 prev_pmu_resctrl_membw: (u64, u64), util_compensation: bool,
895 layer_utils_compensated: Vec<Vec<f64>>, prev_cpu_layer_usages: Vec<u64>, prev_per_cpu_stats: BTreeMap<u32, fb_procfs::CpuStat>,
898
899 system_cpu_util_ewma: f64, layer_dsq_insert_ewma: Vec<f64>, bpf_stats: BpfStats,
903 prev_bpf_stats: BpfStats,
904
905 processing_dur: Duration,
906 prev_processing_dur: Duration,
907
908 layer_slice_us: Vec<u64>,
909
910 gpu_tasks_affinitized: u64,
911 gpu_task_affinitization_ms: u64,
912}
913
914impl Stats {
915 #[allow(clippy::needless_range_loop)]
916 fn read_layer_membw_agg(cpu_ctxs: &[bpf_intf::cpu_ctx], nr_layers: usize) -> Vec<Vec<u64>> {
917 let mut layer_membw_agg = vec![vec![0u64; NR_LAYER_USAGES]; nr_layers];
918
919 for cpu in 0..*NR_CPUS_POSSIBLE {
920 for (layer, layer_membw) in layer_membw_agg.iter_mut().enumerate() {
921 for (usage, value) in layer_membw.iter_mut().enumerate() {
922 *value += cpu_ctxs[cpu].layer_membw_agg[layer][usage];
923 }
924 }
925 }
926
927 layer_membw_agg
928 }
929
930 #[allow(clippy::needless_range_loop)]
931 fn read_layer_node_pinned_usages(
932 cpu_ctxs: &[bpf_intf::cpu_ctx],
933 topo: &Topology,
934 nr_layers: usize,
935 nr_nodes: usize,
936 ) -> Vec<Vec<u64>> {
937 let mut usages = vec![vec![0u64; nr_nodes]; nr_layers];
938
939 for cpu in 0..*NR_CPUS_POSSIBLE {
940 let node = topo.all_cpus.get(&cpu).map_or(0, |c| c.node_id);
941 for (layer, layer_usages) in usages.iter_mut().enumerate().take(nr_layers) {
942 layer_usages[node] += cpu_ctxs[cpu].node_pinned_usage[layer];
943 }
944 }
945
946 usages
947 }
948
949 #[allow(clippy::needless_range_loop)]
950 fn read_layer_node_usages(
951 cpu_ctxs: &[bpf_intf::cpu_ctx],
952 topo: &Topology,
953 nr_layers: usize,
954 nr_nodes: usize,
955 ) -> Vec<Vec<u64>> {
956 let mut usages = vec![vec![0u64; nr_nodes]; nr_layers];
957
958 for cpu in 0..*NR_CPUS_POSSIBLE {
959 let node = topo.all_cpus.get(&cpu).map_or(0, |c| c.node_id);
960 for (layer, layer_usages) in usages.iter_mut().enumerate().take(nr_layers) {
961 for usage in 0..=LAYER_USAGE_SUM_UPTO {
962 layer_usages[node] += cpu_ctxs[cpu].layer_usages[layer][usage];
963 }
964 }
965 }
966
967 usages
968 }
969
970 #[allow(clippy::needless_range_loop)]
971 fn read_layer_node_duty_raw(
972 cpu_ctxs: &[bpf_intf::cpu_ctx],
973 topo: &Topology,
974 nr_layers: usize,
975 nr_nodes: usize,
976 ) -> Vec<Vec<u64>> {
977 let mut sums = vec![vec![0u64; nr_nodes]; nr_layers];
978
979 for cpu in 0..*NR_CPUS_POSSIBLE {
980 let node = topo.all_cpus.get(&cpu).map_or(0, |c| c.node_id);
981 for (layer, layer_sums) in sums.iter_mut().enumerate().take(nr_layers) {
982 layer_sums[node] += cpu_ctxs[cpu].layer_duty_sum[layer];
983 }
984 }
985
986 sums
987 }
988
989 #[allow(clippy::needless_range_loop)]
990 fn read_per_cpu_layer_usages(cpu_ctxs: &[bpf_intf::cpu_ctx], nr_layers: usize) -> Vec<u64> {
991 let stride = nr_layers * NR_LAYER_USAGES;
992 let mut flat = vec![0u64; *NR_CPUS_POSSIBLE * stride];
993
994 for cpu in 0..*NR_CPUS_POSSIBLE {
995 let base = cpu * stride;
996 for layer in 0..nr_layers {
997 for usage in 0..NR_LAYER_USAGES {
998 flat[base + layer * NR_LAYER_USAGES + usage] =
999 cpu_ctxs[cpu].layer_usages[layer][usage];
1000 }
1001 }
1002 }
1003
1004 flat
1005 }
1006
1007 fn resctrl_read_total_membw() -> Result<u64> {
1019 let mut total_membw = 0u64;
1020 for entry in WalkDir::new("/sys/fs/resctrl/mon_data")
1021 .min_depth(1)
1022 .into_iter()
1023 .filter_map(Result::ok)
1024 .filter(|x| x.path().is_dir())
1025 {
1026 let mut path = entry.path().to_path_buf();
1027 path.push("mbm_total_bytes");
1028 total_membw += fs::read_to_string(path)?.trim().parse::<u64>()?;
1029 }
1030
1031 Ok(total_membw)
1032 }
1033
1034 fn new(
1035 skel: &mut BpfSkel,
1036 proc_reader: &fb_procfs::ProcReader,
1037 topo: Arc<Topology>,
1038 gpu_task_affinitizer: &GpuTaskAffinitizer,
1039 util_compensation: bool,
1040 ) -> Result<Self> {
1041 let nr_layers = skel.maps.rodata_data.as_ref().unwrap().nr_layers as usize;
1042 let nr_nodes = topo.nodes.len();
1043 let cpu_ctxs = read_cpu_ctxs(skel)?;
1044 let bpf_stats = BpfStats::read(skel, &cpu_ctxs);
1045 let pmu_membw = Self::read_layer_membw_agg(&cpu_ctxs, nr_layers);
1046
1047 Ok(Self {
1048 at: Instant::now(),
1049 elapsed: Default::default(),
1050
1051 topo: topo.clone(),
1052 nr_layers,
1053 nr_layer_tasks: vec![0; nr_layers],
1054 layer_nr_node_pinned_tasks: vec![vec![0; nr_nodes]; nr_layers],
1055
1056 total_util: 0.0,
1057 layer_utils: vec![vec![0.0; NR_LAYER_USAGES]; nr_layers],
1058 layer_node_pinned_utils: vec![vec![0.0; nr_nodes]; nr_layers],
1059 prev_layer_node_pinned_usages: Self::read_layer_node_pinned_usages(
1060 &cpu_ctxs, &topo, nr_layers, nr_nodes,
1061 ),
1062 layer_node_utils: vec![vec![0.0; nr_nodes]; nr_layers],
1063 prev_layer_node_usages: Self::read_layer_node_usages(
1064 &cpu_ctxs, &topo, nr_layers, nr_nodes,
1065 ),
1066 layer_node_duty_sums: vec![vec![0.0; nr_nodes]; nr_layers],
1067 prev_layer_node_duty_raw: Self::read_layer_node_duty_raw(
1068 &cpu_ctxs, &topo, nr_layers, nr_nodes,
1069 ),
1070 layer_membws: vec![vec![0.0; NR_LAYER_USAGES]; nr_layers],
1071 prev_layer_membw_agg: pmu_membw,
1075 prev_pmu_resctrl_membw: (0, 0),
1076
1077 cpu_busy: 0.0,
1078 prev_total_cpu: read_total_cpu(proc_reader)?,
1079 util_compensation,
1080 layer_utils_compensated: vec![vec![0.0; NR_LAYER_USAGES]; nr_layers],
1081 prev_cpu_layer_usages: Self::read_per_cpu_layer_usages(&cpu_ctxs, nr_layers),
1082 prev_per_cpu_stats: BTreeMap::new(),
1083 system_cpu_util_ewma: 0.0,
1084 layer_dsq_insert_ewma: vec![0.0; nr_layers],
1085
1086 bpf_stats: bpf_stats.clone(),
1087 prev_bpf_stats: bpf_stats,
1088
1089 processing_dur: Default::default(),
1090 prev_processing_dur: Default::default(),
1091
1092 layer_slice_us: vec![0; nr_layers],
1093 gpu_tasks_affinitized: gpu_task_affinitizer.tasks_affinitized,
1094 gpu_task_affinitization_ms: gpu_task_affinitizer.last_task_affinitization_ms,
1095 })
1096 }
1097
1098 fn refresh(
1099 &mut self,
1100 skel: &mut BpfSkel,
1101 proc_reader: &fb_procfs::ProcReader,
1102 now: Instant,
1103 cur_processing_dur: Duration,
1104 gpu_task_affinitizer: &GpuTaskAffinitizer,
1105 ) -> Result<()> {
1106 let elapsed = now.duration_since(self.at);
1107 let elapsed_f64 = elapsed.as_secs_f64();
1108 let cpu_ctxs = read_cpu_ctxs(skel)?;
1109
1110 let layers = &skel.maps.bss_data.as_ref().unwrap().layers;
1111 let nr_layer_tasks: Vec<usize> = layers
1112 .iter()
1113 .take(self.nr_layers)
1114 .map(|layer| layer.nr_tasks as usize)
1115 .collect();
1116 let layer_nr_node_pinned_tasks: Vec<Vec<u64>> = layers
1117 .iter()
1118 .take(self.nr_layers)
1119 .map(|layer| {
1120 layer.node[..self.topo.nodes.len()]
1121 .iter()
1122 .map(|n| n.nr_pinned_tasks)
1123 .collect()
1124 })
1125 .collect();
1126 let layer_slice_us: Vec<u64> = layers
1127 .iter()
1128 .take(self.nr_layers)
1129 .map(|layer| layer.slice_ns / 1000_u64)
1130 .collect();
1131
1132 let cur_layer_node_pinned_usages = Self::read_layer_node_pinned_usages(
1133 &cpu_ctxs,
1134 &self.topo,
1135 self.nr_layers,
1136 self.topo.nodes.len(),
1137 );
1138 let cur_layer_node_usages = Self::read_layer_node_usages(
1139 &cpu_ctxs,
1140 &self.topo,
1141 self.nr_layers,
1142 self.topo.nodes.len(),
1143 );
1144 let cur_layer_node_duty_raw = Self::read_layer_node_duty_raw(
1145 &cpu_ctxs,
1146 &self.topo,
1147 self.nr_layers,
1148 self.topo.nodes.len(),
1149 );
1150 let cur_layer_membw_agg = Self::read_layer_membw_agg(&cpu_ctxs, self.nr_layers);
1151
1152 let (pmu_prev, resctrl_prev) = self.prev_pmu_resctrl_membw;
1160 let pmu_cur: u64 = cur_layer_membw_agg
1161 .iter()
1162 .map(|membw_agg| membw_agg[LAYER_USAGE_OPEN] + membw_agg[LAYER_USAGE_OWNED])
1163 .sum();
1164 let resctrl_cur = Self::resctrl_read_total_membw()?;
1165 let factor = (resctrl_cur - resctrl_prev) as f64 / (pmu_cur - pmu_prev) as f64;
1166
1167 let compute_diff = |cur_agg: &Vec<Vec<u64>>, prev_agg: &Vec<Vec<u64>>| {
1169 cur_agg
1170 .iter()
1171 .zip(prev_agg.iter())
1172 .map(|(cur, prev)| {
1173 cur.iter()
1174 .zip(prev.iter())
1175 .map(|(c, p)| (c - p) as f64 / 1_000_000_000.0 / elapsed_f64)
1176 .collect()
1177 })
1178 .collect()
1179 };
1180
1181 let compute_mem_diff = |cur_agg: &Vec<Vec<u64>>, prev_agg: &Vec<Vec<u64>>| {
1184 cur_agg
1185 .iter()
1186 .zip(prev_agg.iter())
1187 .map(|(cur, prev)| {
1188 cur.iter()
1189 .zip(prev.iter())
1190 .map(|(c, p)| (*c as i64 - *p as i64) as f64 / 1024_f64.powf(3.0))
1191 .collect()
1192 })
1193 .collect()
1194 };
1195
1196 let cur_layer_membw: Vec<Vec<f64>> =
1198 compute_mem_diff(&cur_layer_membw_agg, &self.prev_layer_membw_agg);
1199
1200 let cur_layer_membw: Vec<Vec<f64>> = cur_layer_membw
1201 .iter()
1202 .map(|x| x.iter().map(|x| *x * factor).collect())
1203 .collect();
1204
1205 let metric_decay =
1206 |cur_metric: Vec<Vec<f64>>, prev_metric: &Vec<Vec<f64>>, decay_rate: f64| {
1207 cur_metric
1208 .iter()
1209 .zip(prev_metric.iter())
1210 .map(|(cur, prev)| {
1211 cur.iter()
1212 .zip(prev.iter())
1213 .map(|(c, p)| {
1214 let decay = decay_rate.powf(elapsed_f64);
1215 p * decay + c * (1.0 - decay)
1216 })
1217 .collect()
1218 })
1219 .collect()
1220 };
1221
1222 let cur_node_pinned_utils: Vec<Vec<f64>> = compute_diff(
1223 &cur_layer_node_pinned_usages,
1224 &self.prev_layer_node_pinned_usages,
1225 );
1226 let layer_node_pinned_utils: Vec<Vec<f64>> = metric_decay(
1227 cur_node_pinned_utils,
1228 &self.layer_node_pinned_utils,
1229 *USAGE_DECAY,
1230 );
1231 let cur_node_utils: Vec<Vec<f64>> =
1232 compute_diff(&cur_layer_node_usages, &self.prev_layer_node_usages);
1233 let layer_node_utils: Vec<Vec<f64>> =
1234 metric_decay(cur_node_utils, &self.layer_node_utils, *USAGE_DECAY);
1235 let cur_node_duty: Vec<Vec<f64>> =
1236 compute_diff(&cur_layer_node_duty_raw, &self.prev_layer_node_duty_raw);
1237 let layer_node_duty_sums: Vec<Vec<f64>> =
1238 metric_decay(cur_node_duty, &self.layer_node_duty_sums, *USAGE_DECAY);
1239
1240 let layer_membws: Vec<Vec<f64>> = metric_decay(cur_layer_membw, &self.layer_membws, 0.0);
1241
1242 let proc_stat = proc_reader
1243 .read_stat()
1244 .context("Failed to read /proc/stat")?;
1245 let cur_total_cpu = proc_stat
1246 .total_cpu
1247 .ok_or_else(|| anyhow!("Could not read total cpu stat in proc"))?;
1248 let cpu_busy = calc_util(&cur_total_cpu, &self.prev_total_cpu)?;
1249
1250 const SYS_CPU_UTIL_EWMA_SECS: f64 = 10.0;
1252 let elapsed_f64 = elapsed.as_secs_f64();
1253 let alpha = elapsed_f64 / SYS_CPU_UTIL_EWMA_SECS.max(elapsed_f64);
1254 let system_cpu_util_ewma = alpha * cpu_busy + (1.0 - alpha) * self.system_cpu_util_ewma;
1255
1256 let cur_per_cpu_stats = proc_stat.cpus_map.unwrap_or_default();
1259 let mut cpu_scales = vec![1.0f64; *NR_CPUS_POSSIBLE];
1260 if self.util_compensation {
1261 for (&cpu_id, cur_cpu_stat) in &cur_per_cpu_stats {
1262 let cpu = cpu_id as usize;
1263 if let Some(prev_cpu_stat) = self.prev_per_cpu_stats.get(&cpu_id) {
1264 if let (
1265 fb_procfs::CpuStat {
1266 user_usec: Some(cu),
1267 nice_usec: Some(cn),
1268 system_usec: Some(cs),
1269 idle_usec: Some(ci),
1270 iowait_usec: Some(cw),
1271 irq_usec: Some(cq),
1272 softirq_usec: Some(cf),
1273 stolen_usec: Some(ct),
1274 ..
1275 },
1276 fb_procfs::CpuStat {
1277 user_usec: Some(pu),
1278 nice_usec: Some(pn),
1279 system_usec: Some(ps),
1280 idle_usec: Some(pi),
1281 iowait_usec: Some(pw),
1282 irq_usec: Some(pq),
1283 softirq_usec: Some(pf),
1284 stolen_usec: Some(pt),
1285 ..
1286 },
1287 ) = (cur_cpu_stat, prev_cpu_stat)
1288 {
1289 let delta_total = cu.saturating_sub(*pu)
1290 + cn.saturating_sub(*pn)
1291 + cs.saturating_sub(*ps)
1292 + ci.saturating_sub(*pi)
1293 + cw.saturating_sub(*pw)
1294 + cq.saturating_sub(*pq)
1295 + cf.saturating_sub(*pf)
1296 + ct.saturating_sub(*pt);
1297 let overhead = cq.saturating_sub(*pq)
1298 + cf.saturating_sub(*pf)
1299 + ct.saturating_sub(*pt);
1300 let available = delta_total.saturating_sub(overhead);
1301 cpu_scales[cpu] = if available > 0 {
1302 (delta_total as f64 / available as f64).clamp(1.0, 20.0)
1303 } else {
1304 1.0
1305 };
1306 }
1307 }
1308 }
1309 }
1310
1311 let cur_cpu_layer_usages = Self::read_per_cpu_layer_usages(&cpu_ctxs, self.nr_layers);
1315 let stride = self.nr_layers * NR_LAYER_USAGES;
1316 let mut raw_sums = vec![vec![0.0f64; NR_LAYER_USAGES]; self.nr_layers];
1317 let mut scaled_sums = vec![vec![0.0f64; NR_LAYER_USAGES]; self.nr_layers];
1318 #[allow(clippy::needless_range_loop)]
1319 for cpu in 0..*NR_CPUS_POSSIBLE {
1320 let scale = cpu_scales[cpu];
1321 let base = cpu * stride;
1322 for layer in 0..self.nr_layers {
1323 for usage in 0..NR_LAYER_USAGES {
1324 let idx = base + layer * NR_LAYER_USAGES + usage;
1325 let delta =
1326 cur_cpu_layer_usages[idx].saturating_sub(self.prev_cpu_layer_usages[idx]);
1327 if delta > 0 {
1328 let delta_f = delta as f64;
1329 raw_sums[layer][usage] += delta_f;
1330 scaled_sums[layer][usage] += delta_f * scale;
1331 }
1332 }
1333 }
1334 }
1335 let normalize = |sums: Vec<Vec<f64>>| -> Vec<Vec<f64>> {
1336 sums.into_iter()
1337 .map(|layer_sums| {
1338 layer_sums
1339 .into_iter()
1340 .map(|s| s / 1_000_000_000.0 / elapsed_f64)
1341 .collect()
1342 })
1343 .collect()
1344 };
1345 let layer_utils: Vec<Vec<f64>> =
1346 metric_decay(normalize(raw_sums), &self.layer_utils, *USAGE_DECAY);
1347 let layer_utils_compensated: Vec<Vec<f64>> = metric_decay(
1348 normalize(scaled_sums),
1349 &self.layer_utils_compensated,
1350 *USAGE_DECAY,
1351 );
1352
1353 let cur_bpf_stats = BpfStats::read(skel, &cpu_ctxs);
1354 let bpf_stats = &cur_bpf_stats - &self.prev_bpf_stats;
1355
1356 const DSQ_INSERT_EWMA_SECS: f64 = 10.0;
1358 let dsq_alpha = elapsed_f64 / DSQ_INSERT_EWMA_SECS.max(elapsed_f64);
1359 let layer_dsq_insert_ewma: Vec<f64> = (0..self.nr_layers)
1360 .map(|layer_id| {
1361 let sel_local = bpf_stats.lstats[layer_id]
1362 [bpf_intf::layer_stat_id_LSTAT_SEL_LOCAL as usize]
1363 as f64;
1364 let enq_local = bpf_stats.lstats[layer_id]
1365 [bpf_intf::layer_stat_id_LSTAT_ENQ_LOCAL as usize]
1366 as f64;
1367 let enq_dsq = bpf_stats.lstats[layer_id]
1368 [bpf_intf::layer_stat_id_LSTAT_ENQ_DSQ as usize]
1369 as f64;
1370 let total_dispatches = sel_local + enq_local + enq_dsq;
1371
1372 let cur_ratio = if total_dispatches > 0.0 {
1373 enq_dsq / total_dispatches
1374 } else {
1375 0.0
1376 };
1377
1378 dsq_alpha * cur_ratio + (1.0 - dsq_alpha) * self.layer_dsq_insert_ewma[layer_id]
1379 })
1380 .collect();
1381
1382 let processing_dur = cur_processing_dur
1383 .checked_sub(self.prev_processing_dur)
1384 .unwrap();
1385
1386 *self = Self {
1387 at: now,
1388 elapsed,
1389 topo: self.topo.clone(),
1390 nr_layers: self.nr_layers,
1391 nr_layer_tasks,
1392 layer_nr_node_pinned_tasks,
1393
1394 total_util: layer_utils
1395 .iter()
1396 .map(|x| x.iter().take(LAYER_USAGE_SUM_UPTO + 1).sum::<f64>())
1397 .sum(),
1398 layer_utils,
1399 layer_node_pinned_utils,
1400 prev_layer_node_pinned_usages: cur_layer_node_pinned_usages,
1401 layer_node_utils,
1402 prev_layer_node_usages: cur_layer_node_usages,
1403 layer_node_duty_sums,
1404 prev_layer_node_duty_raw: cur_layer_node_duty_raw,
1405
1406 layer_membws,
1407 prev_layer_membw_agg: cur_layer_membw_agg,
1408 prev_pmu_resctrl_membw: (pmu_cur, resctrl_cur),
1410
1411 cpu_busy,
1412 prev_total_cpu: cur_total_cpu,
1413 util_compensation: self.util_compensation,
1414 layer_utils_compensated,
1415 prev_cpu_layer_usages: cur_cpu_layer_usages,
1416 prev_per_cpu_stats: cur_per_cpu_stats,
1417 system_cpu_util_ewma,
1418 layer_dsq_insert_ewma,
1419
1420 bpf_stats,
1421 prev_bpf_stats: cur_bpf_stats,
1422
1423 processing_dur,
1424 prev_processing_dur: cur_processing_dur,
1425
1426 layer_slice_us,
1427 gpu_tasks_affinitized: gpu_task_affinitizer.tasks_affinitized,
1428 gpu_task_affinitization_ms: gpu_task_affinitizer.last_task_affinitization_ms,
1429 };
1430 Ok(())
1431 }
1432}
1433
1434#[derive(Debug)]
1435struct Layer {
1436 name: String,
1437 kind: LayerKind,
1438 growth_algo: LayerGrowthAlgo,
1439 core_order: Vec<Vec<usize>>,
1440
1441 assigned_llcs: Vec<Vec<usize>>,
1442
1443 nr_cpus: usize,
1444 nr_llc_cpus: Vec<usize>,
1445 nr_node_cpus: Vec<usize>,
1446 cpus: Cpumask,
1447 allowed_cpus: Cpumask,
1448
1449 nr_pinned_cpus: Vec<usize>,
1451}
1452
1453fn get_kallsyms_addr(sym_name: &str) -> Result<u64> {
1454 fs::read_to_string("/proc/kallsyms")?
1455 .lines()
1456 .find(|line| line.contains(sym_name))
1457 .and_then(|line| line.split_whitespace().next())
1458 .and_then(|addr| u64::from_str_radix(addr, 16).ok())
1459 .ok_or_else(|| anyhow!("Symbol '{}' not found", sym_name))
1460}
1461
1462fn resolve_cpus_pct_range(
1463 cpus_range: &Option<(usize, usize)>,
1464 cpus_range_frac: &Option<(f64, f64)>,
1465 max_cpus: usize,
1466) -> Result<(usize, usize)> {
1467 match (cpus_range, cpus_range_frac) {
1468 (Some(_x), Some(_y)) => {
1469 bail!("cpus_range cannot be used with cpus_pct.");
1470 }
1471 (Some((cpus_range_min, cpus_range_max)), None) => Ok((*cpus_range_min, *cpus_range_max)),
1472 (None, Some((cpus_frac_min, cpus_frac_max))) => {
1473 if *cpus_frac_min < 0_f64
1474 || *cpus_frac_min > 1_f64
1475 || *cpus_frac_max < 0_f64
1476 || *cpus_frac_max > 1_f64
1477 {
1478 bail!("cpus_range_frac values must be between 0.0 and 1.0");
1479 }
1480 let cpus_min_count = ((max_cpus as f64) * cpus_frac_min).round_ties_even() as usize;
1481 let cpus_max_count = ((max_cpus as f64) * cpus_frac_max).round_ties_even() as usize;
1482 Ok((
1483 std::cmp::max(cpus_min_count, 1),
1484 std::cmp::min(std::cmp::max(cpus_max_count, 1), max_cpus),
1485 ))
1486 }
1487 (None, None) => Ok((0, max_cpus)),
1488 }
1489}
1490
1491fn update_peak_util(prev_peak: f64, cur_util: f64, half_life: Duration, elapsed: Duration) -> f64 {
1492 let decay = 0.5f64.powf(elapsed.as_secs_f64() / half_life.as_secs_f64());
1493 (prev_peak * decay).max(cur_util)
1494}
1495
1496fn calc_peak_aware_target_range(
1497 util: f64,
1498 peak_util: f64,
1499 util_range: (f64, f64),
1500) -> (usize, usize) {
1501 let low = (util / util_range.1).ceil() as usize;
1502 let high = ((util.max(peak_util) / util_range.0).floor() as usize).max(low);
1503 (low, high)
1504}
1505
1506impl Layer {
1507 fn new(spec: &LayerSpec, topo: &Topology, core_order: &Vec<Vec<usize>>) -> Result<Self> {
1508 let name = &spec.name;
1509 let kind = spec.kind.clone();
1510 let mut allowed_cpus = Cpumask::new();
1511 match &kind {
1512 LayerKind::Confined {
1513 cpus_range,
1514 cpus_range_frac,
1515 common: LayerCommon { nodes, llcs, .. },
1516 ..
1517 } => {
1518 let cpus_range =
1519 resolve_cpus_pct_range(cpus_range, cpus_range_frac, topo.all_cpus.len())?;
1520 if cpus_range.0 > cpus_range.1 || cpus_range.1 == 0 {
1521 bail!("invalid cpus_range {:?}", cpus_range);
1522 }
1523 if nodes.is_empty() && llcs.is_empty() {
1524 allowed_cpus.set_all();
1525 } else {
1526 for (node_id, node) in &topo.nodes {
1528 if nodes.contains(node_id) {
1530 for &id in node.all_cpus.keys() {
1531 allowed_cpus.set_cpu(id)?;
1532 }
1533 }
1534 for (llc_id, llc) in &node.llcs {
1536 if llcs.contains(llc_id) {
1537 for &id in llc.all_cpus.keys() {
1538 allowed_cpus.set_cpu(id)?;
1539 }
1540 }
1541 }
1542 }
1543 }
1544 }
1545 LayerKind::Grouped {
1546 common: LayerCommon { nodes, llcs, .. },
1547 ..
1548 }
1549 | LayerKind::Open {
1550 common: LayerCommon { nodes, llcs, .. },
1551 ..
1552 } => {
1553 if nodes.is_empty() && llcs.is_empty() {
1554 allowed_cpus.set_all();
1555 } else {
1556 for (node_id, node) in &topo.nodes {
1558 if nodes.contains(node_id) {
1560 for &id in node.all_cpus.keys() {
1561 allowed_cpus.set_cpu(id)?;
1562 }
1563 }
1564 for (llc_id, llc) in &node.llcs {
1566 if llcs.contains(llc_id) {
1567 for &id in llc.all_cpus.keys() {
1568 allowed_cpus.set_cpu(id)?;
1569 }
1570 }
1571 }
1572 }
1573 }
1574 }
1575 }
1576
1577 if let Some(util_range) = kind.util_range() {
1580 if util_range.0 < 0.0 || util_range.1 < 0.0 || util_range.0 >= util_range.1 {
1581 bail!("invalid util_range {:?}", util_range);
1582 }
1583 } else if kind.common().util_peak_half_life_ms != 0 {
1584 bail!("util_peak_half_life_ms requires util_range");
1585 }
1586
1587 let layer_growth_algo = kind.common().growth_algo.clone();
1588
1589 debug!(
1590 "layer: {} algo: {:?} core order: {:?}",
1591 name, &layer_growth_algo, core_order
1592 );
1593
1594 Ok(Self {
1595 name: name.into(),
1596 kind,
1597 growth_algo: layer_growth_algo,
1598 core_order: core_order.clone(),
1599
1600 assigned_llcs: vec![vec![]; topo.nodes.len()],
1601
1602 nr_cpus: 0,
1603 nr_llc_cpus: vec![0; topo.all_llcs.len()],
1604 nr_node_cpus: vec![0; topo.nodes.len()],
1605 cpus: Cpumask::new(),
1606 allowed_cpus,
1607
1608 nr_pinned_cpus: vec![0; topo.nodes.len()],
1609 })
1610 }
1611}
1612#[derive(Debug, Clone)]
1613struct NodeInfo {
1614 node_mask: nix::sched::CpuSet,
1615 _node_id: usize,
1616}
1617
1618#[derive(Debug)]
1619struct GpuTaskAffinitizer {
1620 gpu_devs_to_node_info: HashMap<u32, NodeInfo>,
1623 gpu_pids_to_devs: HashMap<Pid, u32>,
1624 last_process_time: Option<Instant>,
1625 sys: System,
1626 pid_map: HashMap<Pid, Vec<Pid>>,
1627 poll_interval: Duration,
1628 enable: bool,
1629 tasks_affinitized: u64,
1630 last_task_affinitization_ms: u64,
1631}
1632
1633impl GpuTaskAffinitizer {
1634 pub fn new(poll_interval: u64, enable: bool) -> GpuTaskAffinitizer {
1635 GpuTaskAffinitizer {
1636 gpu_devs_to_node_info: HashMap::new(),
1637 gpu_pids_to_devs: HashMap::new(),
1638 last_process_time: None,
1639 sys: System::default(),
1640 pid_map: HashMap::new(),
1641 poll_interval: Duration::from_secs(poll_interval),
1642 enable,
1643 tasks_affinitized: 0,
1644 last_task_affinitization_ms: 0,
1645 }
1646 }
1647
1648 fn find_one_cpu(&self, affinity: Vec<u64>) -> Result<u32> {
1649 for (chunk, &mask) in affinity.iter().enumerate() {
1650 let mut inner_offset: u64 = 1;
1651 for _ in 0..64 {
1652 if (mask & inner_offset) != 0 {
1653 return Ok((64 * chunk + u64::trailing_zeros(inner_offset) as usize) as u32);
1654 }
1655 inner_offset <<= 1;
1656 }
1657 }
1658 anyhow::bail!("unable to get CPU from NVML bitmask");
1659 }
1660
1661 fn node_to_cpuset(&self, node: &scx_utils::Node) -> Result<CpuSet> {
1662 let mut cpuset = CpuSet::new();
1663 for cpu_id in node.all_cpus.keys() {
1664 cpuset.set(*cpu_id)?;
1665 }
1666 Ok(cpuset)
1667 }
1668
1669 fn init_dev_node_map(&mut self, topo: Arc<Topology>) -> Result<()> {
1670 let nvml = nvml()?;
1671 let device_count = nvml.device_count()?;
1672
1673 for idx in 0..device_count {
1674 let dev = nvml.device_by_index(idx)?;
1675 let cpu = dev.cpu_affinity(16)?;
1677 let ideal_cpu = self.find_one_cpu(cpu)?;
1678 if let Some(cpu) = topo.all_cpus.get(&(ideal_cpu as usize)) {
1679 self.gpu_devs_to_node_info.insert(
1680 idx,
1681 NodeInfo {
1682 node_mask: self.node_to_cpuset(
1683 topo.nodes.get(&cpu.node_id).expect("topo missing node"),
1684 )?,
1685 _node_id: cpu.node_id,
1686 },
1687 );
1688 }
1689 }
1690 Ok(())
1691 }
1692
1693 fn update_gpu_pids(&mut self) -> Result<()> {
1694 let nvml = nvml()?;
1695 for i in 0..nvml.device_count()? {
1696 let device = nvml.device_by_index(i)?;
1697 for proc in device
1698 .running_compute_processes()?
1699 .into_iter()
1700 .chain(device.running_graphics_processes()?.into_iter())
1701 {
1702 self.gpu_pids_to_devs.insert(Pid::from_u32(proc.pid), i);
1703 }
1704 }
1705 Ok(())
1706 }
1707
1708 fn update_process_info(&mut self) -> Result<()> {
1709 self.sys.refresh_processes_specifics(
1710 ProcessesToUpdate::All,
1711 true,
1712 ProcessRefreshKind::nothing(),
1713 );
1714 self.pid_map.clear();
1715 for (pid, proc_) in self.sys.processes() {
1716 if let Some(ppid) = proc_.parent() {
1717 self.pid_map.entry(ppid).or_default().push(*pid);
1718 }
1719 }
1720 Ok(())
1721 }
1722
1723 fn get_child_pids_and_tids(&self, root_pid: Pid) -> HashSet<Pid> {
1724 let mut work = VecDeque::from([root_pid]);
1725 let mut pids_and_tids: HashSet<Pid> = HashSet::new();
1726
1727 while let Some(pid) = work.pop_front() {
1728 if pids_and_tids.insert(pid) {
1729 if let Some(kids) = self.pid_map.get(&pid) {
1730 work.extend(kids);
1731 }
1732 if let Some(proc_) = self.sys.process(pid) {
1733 if let Some(tasks) = proc_.tasks() {
1734 pids_and_tids.extend(tasks.iter().copied());
1735 }
1736 }
1737 }
1738 }
1739 pids_and_tids
1740 }
1741
1742 fn affinitize_gpu_pids(&mut self) -> Result<()> {
1743 if !self.enable {
1744 return Ok(());
1745 }
1746 for (pid, dev) in &self.gpu_pids_to_devs {
1747 let node_info = self
1748 .gpu_devs_to_node_info
1749 .get(dev)
1750 .expect("Unable to get gpu pid node mask");
1751 for child in self.get_child_pids_and_tids(*pid) {
1752 match nix::sched::sched_setaffinity(
1753 nix::unistd::Pid::from_raw(child.as_u32() as i32),
1754 &node_info.node_mask,
1755 ) {
1756 Ok(_) => {
1757 self.tasks_affinitized += 1;
1759 }
1760 Err(_) => {
1761 debug!(
1762 "Error affinitizing gpu pid {} to node {:#?}",
1763 child.as_u32(),
1764 node_info
1765 );
1766 }
1767 };
1768 }
1769 }
1770 Ok(())
1771 }
1772
1773 pub fn maybe_affinitize(&mut self) {
1774 if !self.enable {
1775 return;
1776 }
1777 let now = Instant::now();
1778
1779 if let Some(last_process_time) = self.last_process_time {
1780 if (now - last_process_time) < self.poll_interval {
1781 return;
1782 }
1783 }
1784
1785 match self.update_gpu_pids() {
1786 Ok(_) => {}
1787 Err(e) => {
1788 error!("Error updating GPU PIDs: {}", e);
1789 }
1790 };
1791 match self.update_process_info() {
1792 Ok(_) => {}
1793 Err(e) => {
1794 error!("Error updating process info to affinitize GPU PIDs: {}", e);
1795 }
1796 };
1797 match self.affinitize_gpu_pids() {
1798 Ok(_) => {}
1799 Err(e) => {
1800 error!("Error updating GPU PIDs: {}", e);
1801 }
1802 };
1803 self.last_process_time = Some(now);
1804 self.last_task_affinitization_ms = (Instant::now() - now).as_millis() as u64;
1805 }
1806
1807 pub fn init(&mut self, topo: Arc<Topology>) {
1808 if !self.enable || self.last_process_time.is_some() {
1809 return;
1810 }
1811
1812 match self.init_dev_node_map(topo) {
1813 Ok(_) => {}
1814 Err(e) => {
1815 error!("Error initializing gpu node dev map: {}", e);
1816 }
1817 };
1818 self.sys = System::new_all();
1819 }
1820}
1821
1822struct Scheduler<'a> {
1823 skel: BpfSkel<'a>,
1824 struct_ops: Option<libbpf_rs::Link>,
1825 layer_specs: Vec<LayerSpec>,
1826
1827 sched_intv: Duration,
1828 layer_refresh_intv: Duration,
1829
1830 cpu_pool: CpuPool,
1831 layers: Vec<Layer>,
1832 idle_qos_enabled: bool,
1833
1834 proc_reader: fb_procfs::ProcReader,
1835 sched_stats: Stats,
1836 layer_peak_utils: Vec<f64>,
1837
1838 cgroup_regexes: Option<HashMap<u32, Regex>>,
1839
1840 nr_layer_cpus_ranges: Vec<(usize, usize)>,
1841 xnuma_mig_src: Vec<Vec<bool>>,
1842 growth_denied: Vec<Vec<bool>>,
1843 processing_dur: Duration,
1844
1845 topo: Arc<Topology>,
1846 netdevs: BTreeMap<String, NetDev>,
1847 stats_server: StatsServer<StatsReq, StatsRes>,
1848 gpu_task_handler: GpuTaskAffinitizer,
1849}
1850
1851const DUTY_CYCLE_SCALE: f64 = (1u64 << 20) as f64;
1852const XNUMA_RATE_DAMPEN: f64 = 0.5;
1853
1854struct XnumaRates {
1856 rates: Vec<Vec<u64>>,
1858}
1859
1860fn xnuma_check_active(
1873 duty_sums: &[f64],
1874 allocs: &[usize],
1875 threshold: (f64, f64),
1876 threshold_delta: (f64, f64),
1877 growth_denied: &[bool],
1878 currently_active: &[bool],
1879) -> Vec<bool> {
1880 let nr_nodes = duty_sums.len();
1881 let total_duty: f64 = duty_sums.iter().sum();
1882 let total_alloc: f64 = allocs.iter().map(|&a| a as f64).sum();
1883 let eq_ratio = if total_alloc > 0.0 {
1884 total_duty / total_alloc
1885 } else {
1886 0.0
1887 };
1888
1889 let (thresh_lo, thresh_hi) = threshold;
1890 let (delta_lo, delta_hi) = threshold_delta;
1891
1892 let mut result = vec![false; nr_nodes];
1893 for nid in 0..nr_nodes {
1894 let alloc = allocs[nid] as f64;
1895 if alloc <= 0.0 {
1896 if duty_sums[nid] > 0.0 && growth_denied[nid] {
1897 result[nid] = true;
1898 }
1899 continue;
1900 }
1901
1902 let load_ratio = duty_sums[nid] / alloc;
1903 let surplus = duty_sums[nid] - eq_ratio * alloc;
1904 let surplus_ratio = surplus / alloc;
1905
1906 let should_activate =
1907 load_ratio > thresh_hi && surplus_ratio > delta_hi && growth_denied[nid];
1908 let should_deactivate =
1909 load_ratio < thresh_lo || surplus_ratio < delta_lo || !growth_denied[nid];
1910
1911 if should_activate {
1912 result[nid] = true;
1913 } else if should_deactivate {
1914 result[nid] = false;
1915 } else {
1916 result[nid] = currently_active[nid];
1917 }
1918 }
1919 result
1920}
1921
1922fn xnuma_compute_rates(duty_sums: &[f64], allocs: &[usize]) -> XnumaRates {
1928 let nr_nodes = duty_sums.len();
1929 let total_duty: f64 = duty_sums.iter().sum();
1930 let total_alloc: f64 = allocs.iter().map(|&a| a as f64).sum();
1931
1932 if total_alloc <= 0.0 {
1933 return XnumaRates {
1934 rates: vec![vec![0u64; nr_nodes]; nr_nodes],
1935 };
1936 }
1937
1938 let eq_ratio = total_duty / total_alloc;
1939
1940 let mut surpluses = vec![0.0f64; nr_nodes];
1941 let mut deficits = vec![0.0f64; nr_nodes];
1942 for nid in 0..nr_nodes {
1943 let expected = eq_ratio * allocs[nid] as f64;
1944 let delta = duty_sums[nid] - expected;
1945 if delta > 0.0 {
1946 surpluses[nid] = delta;
1947 } else {
1948 deficits[nid] = -delta;
1949 }
1950 }
1951
1952 let total_deficit: f64 = deficits.iter().sum();
1953
1954 let mut rates = vec![vec![0u64; nr_nodes]; nr_nodes];
1955 for src in 0..nr_nodes {
1956 for dst in 0..nr_nodes {
1957 if src == dst || total_deficit <= 0.0 || surpluses[src] <= 0.0 {
1958 continue;
1959 }
1960 let migration = surpluses[src] * deficits[dst] / total_deficit * XNUMA_RATE_DAMPEN;
1963 rates[src][dst] = (migration * DUTY_CYCLE_SCALE) as u64;
1964 }
1965 }
1966
1967 XnumaRates { rates }
1968}
1969
1970impl<'a> Scheduler<'a> {
1971 fn init_layers(
1972 skel: &mut OpenBpfSkel,
1973 specs: &[LayerSpec],
1974 topo: &Topology,
1975 ) -> Result<HashMap<u32, Regex>> {
1976 skel.maps.rodata_data.as_mut().unwrap().nr_layers = specs.len() as u32;
1977 let mut perf_set = false;
1978
1979 let mut layer_iteration_order = (0..specs.len()).collect::<Vec<_>>();
1980 let mut layer_weights: Vec<usize> = vec![];
1981 let mut cgroup_regex_id = 0;
1982 let mut cgroup_regexes = HashMap::new();
1983
1984 for (spec_i, spec) in specs.iter().enumerate() {
1985 let layer = &mut skel.maps.bss_data.as_mut().unwrap().layers[spec_i];
1986
1987 for (or_i, or) in spec.matches.iter().enumerate() {
1988 for (and_i, and) in or.iter().enumerate() {
1989 let mt = &mut layer.matches[or_i].matches[and_i];
1990
1991 mt.exclude.write(false);
1993
1994 match and {
1995 LayerMatch::CgroupPrefix(prefix) => {
1996 mt.kind = bpf_intf::layer_match_kind_MATCH_CGROUP_PREFIX as i32;
1997 copy_into_cstr(&mut mt.cgroup_prefix, prefix.as_str());
1998 }
1999 LayerMatch::CgroupSuffix(suffix) => {
2000 mt.kind = bpf_intf::layer_match_kind_MATCH_CGROUP_SUFFIX as i32;
2001 copy_into_cstr(&mut mt.cgroup_suffix, suffix.as_str());
2002 }
2003 LayerMatch::CgroupRegex(regex_str) => {
2004 if cgroup_regex_id >= bpf_intf::consts_MAX_CGROUP_REGEXES {
2005 bail!(
2006 "Too many cgroup regex rules. Maximum allowed: {}",
2007 bpf_intf::consts_MAX_CGROUP_REGEXES
2008 );
2009 }
2010
2011 mt.kind = bpf_intf::layer_match_kind_MATCH_CGROUP_REGEX as i32;
2013 mt.cgroup_regex_id = cgroup_regex_id;
2014
2015 let regex = Regex::new(regex_str).with_context(|| {
2016 format!("Invalid regex '{}' in layer '{}'", regex_str, spec.name)
2017 })?;
2018 cgroup_regexes.insert(cgroup_regex_id, regex);
2019 cgroup_regex_id += 1;
2020 }
2021 LayerMatch::CgroupContains(substr) => {
2022 mt.kind = bpf_intf::layer_match_kind_MATCH_CGROUP_CONTAINS as i32;
2023 copy_into_cstr(&mut mt.cgroup_substr, substr.as_str());
2024 }
2025 LayerMatch::CommPrefix(prefix) => {
2026 mt.kind = bpf_intf::layer_match_kind_MATCH_COMM_PREFIX as i32;
2027 copy_into_cstr(&mut mt.comm_prefix, prefix.as_str());
2028 }
2029 LayerMatch::CommPrefixExclude(prefix) => {
2030 mt.kind = bpf_intf::layer_match_kind_MATCH_COMM_PREFIX as i32;
2031 mt.exclude.write(true);
2032 copy_into_cstr(&mut mt.comm_prefix, prefix.as_str());
2033 }
2034 LayerMatch::PcommPrefix(prefix) => {
2035 mt.kind = bpf_intf::layer_match_kind_MATCH_PCOMM_PREFIX as i32;
2036 copy_into_cstr(&mut mt.pcomm_prefix, prefix.as_str());
2037 }
2038 LayerMatch::PcommPrefixExclude(prefix) => {
2039 mt.kind = bpf_intf::layer_match_kind_MATCH_PCOMM_PREFIX as i32;
2040 mt.exclude.write(true);
2041 copy_into_cstr(&mut mt.pcomm_prefix, prefix.as_str());
2042 }
2043 LayerMatch::NiceAbove(nice) => {
2044 mt.kind = bpf_intf::layer_match_kind_MATCH_NICE_ABOVE as i32;
2045 mt.nice = *nice;
2046 }
2047 LayerMatch::NiceBelow(nice) => {
2048 mt.kind = bpf_intf::layer_match_kind_MATCH_NICE_BELOW as i32;
2049 mt.nice = *nice;
2050 }
2051 LayerMatch::NiceEquals(nice) => {
2052 mt.kind = bpf_intf::layer_match_kind_MATCH_NICE_EQUALS as i32;
2053 mt.nice = *nice;
2054 }
2055 LayerMatch::UIDEquals(user_id) => {
2056 mt.kind = bpf_intf::layer_match_kind_MATCH_USER_ID_EQUALS as i32;
2057 mt.user_id = *user_id;
2058 }
2059 LayerMatch::GIDEquals(group_id) => {
2060 mt.kind = bpf_intf::layer_match_kind_MATCH_GROUP_ID_EQUALS as i32;
2061 mt.group_id = *group_id;
2062 }
2063 LayerMatch::PIDEquals(pid) => {
2064 mt.kind = bpf_intf::layer_match_kind_MATCH_PID_EQUALS as i32;
2065 mt.pid = *pid;
2066 }
2067 LayerMatch::PPIDEquals(ppid) => {
2068 mt.kind = bpf_intf::layer_match_kind_MATCH_PPID_EQUALS as i32;
2069 mt.ppid = *ppid;
2070 }
2071 LayerMatch::TGIDEquals(tgid) => {
2072 mt.kind = bpf_intf::layer_match_kind_MATCH_TGID_EQUALS as i32;
2073 mt.tgid = *tgid;
2074 }
2075 LayerMatch::NSPIDEquals(nsid, pid) => {
2076 mt.kind = bpf_intf::layer_match_kind_MATCH_NSPID_EQUALS as i32;
2077 mt.nsid = *nsid;
2078 mt.pid = *pid;
2079 }
2080 LayerMatch::NSEquals(nsid) => {
2081 mt.kind = bpf_intf::layer_match_kind_MATCH_NS_EQUALS as i32;
2082 mt.nsid = *nsid as u64;
2083 }
2084 LayerMatch::CmdJoin(joincmd) => {
2085 mt.kind = bpf_intf::layer_match_kind_MATCH_SCXCMD_JOIN as i32;
2086 copy_into_cstr(&mut mt.comm_prefix, joincmd);
2087 }
2088 LayerMatch::IsGroupLeader(polarity) => {
2089 mt.kind = bpf_intf::layer_match_kind_MATCH_IS_GROUP_LEADER as i32;
2090 mt.is_group_leader.write(*polarity);
2091 }
2092 LayerMatch::IsKthread(polarity) => {
2093 mt.kind = bpf_intf::layer_match_kind_MATCH_IS_KTHREAD as i32;
2094 mt.is_kthread.write(*polarity);
2095 }
2096 LayerMatch::UsedGpuTid(polarity) => {
2097 mt.kind = bpf_intf::layer_match_kind_MATCH_USED_GPU_TID as i32;
2098 mt.used_gpu_tid.write(*polarity);
2099 }
2100 LayerMatch::UsedGpuPid(polarity) => {
2101 mt.kind = bpf_intf::layer_match_kind_MATCH_USED_GPU_PID as i32;
2102 mt.used_gpu_pid.write(*polarity);
2103 }
2104 LayerMatch::AvgRuntime(min, max) => {
2105 mt.kind = bpf_intf::layer_match_kind_MATCH_AVG_RUNTIME as i32;
2106 mt.min_avg_runtime_us = *min;
2107 mt.max_avg_runtime_us = *max;
2108 }
2109 LayerMatch::HintEquals(hint) => {
2110 mt.kind = bpf_intf::layer_match_kind_MATCH_HINT_EQUALS as i32;
2111 mt.hint = *hint;
2112 }
2113 LayerMatch::SystemCpuUtilBelow(threshold) => {
2114 mt.kind = bpf_intf::layer_match_kind_MATCH_SYSTEM_CPU_UTIL_BELOW as i32;
2115 mt.system_cpu_util_below = (*threshold * 10000.0) as u64;
2116 }
2117 LayerMatch::DsqInsertBelow(threshold) => {
2118 mt.kind = bpf_intf::layer_match_kind_MATCH_DSQ_INSERT_BELOW as i32;
2119 mt.dsq_insert_below = (*threshold * 10000.0) as u64;
2120 }
2121 LayerMatch::NumaNode(node_id) => {
2122 if *node_id as usize >= topo.nodes.len() {
2123 bail!(
2124 "Spec {:?} has invalid NUMA node ID {} (available nodes: 0-{})",
2125 spec.name,
2126 node_id,
2127 topo.nodes.len() - 1
2128 );
2129 }
2130 mt.kind = bpf_intf::layer_match_kind_MATCH_NUMA_NODE as i32;
2131 mt.numa_node_id = *node_id;
2132 }
2133 }
2134 }
2135 layer.matches[or_i].nr_match_ands = or.len() as i32;
2136 }
2137
2138 layer.nr_match_ors = spec.matches.len() as u32;
2139 layer.kind = spec.kind.as_bpf_enum();
2140
2141 {
2142 let LayerCommon {
2143 min_exec_us,
2144 yield_ignore,
2145 perf,
2146 preempt,
2147 preempt_first,
2148 exclusive,
2149 skip_remote_node,
2150 prev_over_idle_core,
2151 growth_algo,
2152 slice_us,
2153 fifo,
2154 weight,
2155 disallow_open_after_us,
2156 disallow_preempt_after_us,
2157 xllc_mig_min_us,
2158 placement,
2159 member_expire_ms,
2160 ..
2161 } = spec.kind.common();
2162
2163 layer.slice_ns = *slice_us * 1000;
2164 layer.fifo.write(*fifo);
2165 layer.min_exec_ns = min_exec_us * 1000;
2166 layer.yield_step_ns = if *yield_ignore > 0.999 {
2167 0
2168 } else if *yield_ignore < 0.001 {
2169 layer.slice_ns
2170 } else {
2171 (layer.slice_ns as f64 * (1.0 - *yield_ignore)) as u64
2172 };
2173 let mut layer_name: String = spec.name.clone();
2174 layer_name.truncate(MAX_LAYER_NAME);
2175 copy_into_cstr(&mut layer.name, layer_name.as_str());
2176 layer.preempt.write(*preempt);
2177 layer.preempt_first.write(*preempt_first);
2178 layer.excl.write(*exclusive);
2179 layer.skip_remote_node.write(*skip_remote_node);
2180 layer.prev_over_idle_core.write(*prev_over_idle_core);
2181 layer.growth_algo = growth_algo.as_bpf_enum();
2182 layer.weight = *weight;
2183 layer.member_expire_ms = *member_expire_ms;
2184 layer.disallow_open_after_ns = match disallow_open_after_us.unwrap() {
2185 v if v == u64::MAX => v,
2186 v => v * 1000,
2187 };
2188 layer.disallow_preempt_after_ns = match disallow_preempt_after_us.unwrap() {
2189 v if v == u64::MAX => v,
2190 v => v * 1000,
2191 };
2192 layer.xllc_mig_min_ns = (xllc_mig_min_us * 1000.0) as u64;
2193 layer_weights.push(layer.weight.try_into().unwrap());
2194 layer.perf = u32::try_from(*perf)?;
2195
2196 let task_place = |place: u32| crate::types::layer_task_place(place);
2197 layer.task_place = match placement {
2198 LayerPlacement::Standard => {
2199 task_place(bpf_intf::layer_task_place_PLACEMENT_STD)
2200 }
2201 LayerPlacement::Sticky => {
2202 task_place(bpf_intf::layer_task_place_PLACEMENT_STICK)
2203 }
2204 LayerPlacement::Floating => {
2205 task_place(bpf_intf::layer_task_place_PLACEMENT_FLOAT)
2206 }
2207 };
2208 }
2209
2210 layer.is_protected.write(match spec.kind {
2211 LayerKind::Open { .. } => false,
2212 LayerKind::Confined { protected, .. } | LayerKind::Grouped { protected, .. } => {
2213 protected
2214 }
2215 });
2216
2217 layer.idle_confined.write(match spec.kind {
2218 LayerKind::Grouped { idle_confined, .. } => idle_confined,
2219 _ => false,
2220 });
2221
2222 match &spec.cpuset {
2223 Some(mask) => {
2224 Self::update_cpumask(mask, &mut layer.cpuset);
2225 layer.has_cpuset.write(true);
2226 }
2227 None => {
2228 for i in 0..layer.cpuset.len() {
2229 layer.cpuset[i] = u8::MAX;
2230 }
2231 layer.has_cpuset.write(false);
2232 }
2233 };
2234
2235 perf_set |= layer.perf > 0;
2236 }
2237
2238 layer_iteration_order.sort_by(|i, j| layer_weights[*i].cmp(&layer_weights[*j]));
2239 for (idx, layer_idx) in layer_iteration_order.iter().enumerate() {
2240 skel.maps
2241 .rodata_data
2242 .as_mut()
2243 .unwrap()
2244 .layer_iteration_order[idx] = *layer_idx as u32;
2245 }
2246
2247 if perf_set && !compat::ksym_exists("scx_bpf_cpuperf_set")? {
2248 warn!("cpufreq support not available, ignoring perf configurations");
2249 }
2250
2251 Ok(cgroup_regexes)
2252 }
2253
2254 fn init_nodes(skel: &mut OpenBpfSkel, _opts: &Opts, topo: &Topology) {
2255 skel.maps.rodata_data.as_mut().unwrap().nr_nodes = topo.nodes.len() as u32;
2256 skel.maps.rodata_data.as_mut().unwrap().nr_llcs = 0;
2257
2258 for (&node_id, node) in &topo.nodes {
2259 debug!("configuring node {}, LLCs {:?}", node_id, node.llcs.len());
2260 skel.maps.rodata_data.as_mut().unwrap().nr_llcs += node.llcs.len() as u32;
2261 let raw_numa_slice = node.span.as_raw_slice();
2262 let node_cpumask_slice =
2263 &mut skel.maps.rodata_data.as_mut().unwrap().numa_cpumasks[node_id];
2264 let (left, _) = node_cpumask_slice.split_at_mut(raw_numa_slice.len());
2265 left.clone_from_slice(raw_numa_slice);
2266 debug!(
2267 "node {} mask: {:?}",
2268 node_id,
2269 skel.maps.rodata_data.as_ref().unwrap().numa_cpumasks[node_id]
2270 );
2271
2272 for llc in node.llcs.values() {
2273 debug!("configuring llc {:?} for node {:?}", llc.id, node_id);
2274 skel.maps.rodata_data.as_mut().unwrap().llc_numa_id_map[llc.id] = node_id as u32;
2275 }
2276 }
2277
2278 for cpu in topo.all_cpus.values() {
2279 skel.maps.rodata_data.as_mut().unwrap().cpu_llc_id_map[cpu.id] = cpu.llc_id as u32;
2280 }
2281 }
2282
2283 fn init_cpu_prox_map(topo: &Topology, cpu_ctxs: &mut [bpf_intf::cpu_ctx]) {
2284 let radiate = |mut vec: Vec<usize>, center_id: usize| -> Vec<usize> {
2285 vec.sort_by_key(|&id| (center_id as i32 - id as i32).abs());
2286 vec
2287 };
2288 let radiate_cpu =
2289 |mut vec: Vec<usize>, center_cpu: usize, center_core: usize| -> Vec<usize> {
2290 vec.sort_by_key(|&id| {
2291 (
2292 (center_core as i32 - topo.all_cpus.get(&id).unwrap().core_id as i32).abs(),
2293 (center_cpu as i32 - id as i32).abs(),
2294 )
2295 });
2296 vec
2297 };
2298
2299 for (&cpu_id, cpu) in &topo.all_cpus {
2300 let mut core_span = topo.all_cores[&cpu.core_id].span.clone();
2302 let llc_span = &topo.all_llcs[&cpu.llc_id].span;
2303 let node_span = &topo.nodes[&cpu.node_id].span;
2304 let sys_span = &topo.span;
2305
2306 let sys_span = sys_span.and(&node_span.not());
2308 let node_span = node_span.and(&llc_span.not());
2309 let llc_span = llc_span.and(&core_span.not());
2310 core_span.clear_cpu(cpu_id).unwrap();
2311
2312 let mut sys_order: Vec<usize> = sys_span.iter().collect();
2314 let mut node_order: Vec<usize> = node_span.iter().collect();
2315 let mut llc_order: Vec<usize> = llc_span.iter().collect();
2316 let mut core_order: Vec<usize> = core_span.iter().collect();
2317
2318 sys_order = radiate_cpu(sys_order, cpu_id, cpu.core_id);
2323 node_order = radiate(node_order, cpu.node_id);
2324 llc_order = radiate_cpu(llc_order, cpu_id, cpu.core_id);
2325 core_order = radiate_cpu(core_order, cpu_id, cpu.core_id);
2326
2327 let mut order: Vec<usize> = vec![];
2329 let mut idx: usize = 0;
2330
2331 idx += 1;
2332 order.push(cpu_id);
2333
2334 idx += core_order.len();
2335 order.append(&mut core_order);
2336 let core_end = idx;
2337
2338 idx += llc_order.len();
2339 order.append(&mut llc_order);
2340 let llc_end = idx;
2341
2342 idx += node_order.len();
2343 order.append(&mut node_order);
2344 let node_end = idx;
2345
2346 idx += sys_order.len();
2347 order.append(&mut sys_order);
2348 let sys_end = idx;
2349
2350 debug!(
2351 "CPU[{}] proximity map[{}/{}/{}/{}]: {:?}",
2352 cpu_id, core_end, llc_end, node_end, sys_end, &order
2353 );
2354
2355 let pmap = &mut cpu_ctxs[cpu_id].prox_map;
2357 for (i, &cpu) in order.iter().enumerate() {
2358 pmap.cpus[i] = cpu as u16;
2359 }
2360 pmap.core_end = core_end as u32;
2361 pmap.llc_end = llc_end as u32;
2362 pmap.node_end = node_end as u32;
2363 pmap.sys_end = sys_end as u32;
2364 }
2365 }
2366
2367 fn convert_cpu_ctxs(cpu_ctxs: Vec<bpf_intf::cpu_ctx>) -> Vec<Vec<u8>> {
2368 cpu_ctxs
2369 .iter()
2370 .map(|cpu_ctx| unsafe { plain::as_bytes(cpu_ctx) }.to_vec())
2371 .collect()
2372 }
2373
2374 fn init_cpus(skel: &BpfSkel, layer_specs: &[LayerSpec], topo: &Topology) -> Result<()> {
2375 let key = (0_u32).to_ne_bytes();
2376 let mut cpu_ctxs: Vec<bpf_intf::cpu_ctx> = vec![];
2377 let cpu_ctxs_vec = skel
2378 .maps
2379 .cpu_ctxs
2380 .lookup_percpu(&key, libbpf_rs::MapFlags::ANY)
2381 .context("Failed to lookup cpu_ctx")?
2382 .unwrap();
2383
2384 let op_layers: Vec<u32> = layer_specs
2385 .iter()
2386 .enumerate()
2387 .filter(|(_idx, spec)| match &spec.kind {
2388 LayerKind::Open { .. } => spec.kind.common().preempt,
2389 _ => false,
2390 })
2391 .map(|(idx, _)| idx as u32)
2392 .collect();
2393 let on_layers: Vec<u32> = layer_specs
2394 .iter()
2395 .enumerate()
2396 .filter(|(_idx, spec)| match &spec.kind {
2397 LayerKind::Open { .. } => !spec.kind.common().preempt,
2398 _ => false,
2399 })
2400 .map(|(idx, _)| idx as u32)
2401 .collect();
2402 let gp_layers: Vec<u32> = layer_specs
2403 .iter()
2404 .enumerate()
2405 .filter(|(_idx, spec)| match &spec.kind {
2406 LayerKind::Grouped { .. } => spec.kind.common().preempt,
2407 _ => false,
2408 })
2409 .map(|(idx, _)| idx as u32)
2410 .collect();
2411 let gn_layers: Vec<u32> = layer_specs
2412 .iter()
2413 .enumerate()
2414 .filter(|(_idx, spec)| match &spec.kind {
2415 LayerKind::Grouped { .. } => !spec.kind.common().preempt,
2416 _ => false,
2417 })
2418 .map(|(idx, _)| idx as u32)
2419 .collect();
2420
2421 for cpu in 0..*NR_CPUS_POSSIBLE {
2423 cpu_ctxs.push(
2424 *plain::from_bytes(cpu_ctxs_vec[cpu].as_slice())
2425 .expect("cpu_ctx: short or misaligned buffer"),
2426 );
2427
2428 let topo_cpu = topo.all_cpus.get(&cpu).unwrap();
2429 let is_big = topo_cpu.core_type == CoreType::Big { turbo: true };
2430 cpu_ctxs[cpu].cpu = cpu as i32;
2431 cpu_ctxs[cpu].layer_id = MAX_LAYERS as u32;
2432 cpu_ctxs[cpu].is_big = is_big;
2433
2434 fastrand::seed(cpu as u64);
2435
2436 let mut ogp_order = op_layers.clone();
2437 ogp_order.append(&mut gp_layers.clone());
2438 fastrand::shuffle(&mut ogp_order);
2439
2440 let mut ogn_order = on_layers.clone();
2441 ogn_order.append(&mut gn_layers.clone());
2442 fastrand::shuffle(&mut ogn_order);
2443
2444 let mut op_order = op_layers.clone();
2445 fastrand::shuffle(&mut op_order);
2446
2447 let mut on_order = on_layers.clone();
2448 fastrand::shuffle(&mut on_order);
2449
2450 let mut gp_order = gp_layers.clone();
2451 fastrand::shuffle(&mut gp_order);
2452
2453 let mut gn_order = gn_layers.clone();
2454 fastrand::shuffle(&mut gn_order);
2455
2456 for i in 0..MAX_LAYERS {
2457 cpu_ctxs[cpu].ogp_layer_order[i] =
2458 ogp_order.get(i).cloned().unwrap_or(MAX_LAYERS as u32);
2459 cpu_ctxs[cpu].ogn_layer_order[i] =
2460 ogn_order.get(i).cloned().unwrap_or(MAX_LAYERS as u32);
2461
2462 cpu_ctxs[cpu].op_layer_order[i] =
2463 op_order.get(i).cloned().unwrap_or(MAX_LAYERS as u32);
2464 cpu_ctxs[cpu].on_layer_order[i] =
2465 on_order.get(i).cloned().unwrap_or(MAX_LAYERS as u32);
2466 cpu_ctxs[cpu].gp_layer_order[i] =
2467 gp_order.get(i).cloned().unwrap_or(MAX_LAYERS as u32);
2468 cpu_ctxs[cpu].gn_layer_order[i] =
2469 gn_order.get(i).cloned().unwrap_or(MAX_LAYERS as u32);
2470 }
2471 }
2472
2473 Self::init_cpu_prox_map(topo, &mut cpu_ctxs);
2474
2475 skel.maps
2476 .cpu_ctxs
2477 .update_percpu(
2478 &key,
2479 &Self::convert_cpu_ctxs(cpu_ctxs),
2480 libbpf_rs::MapFlags::ANY,
2481 )
2482 .context("Failed to update cpu_ctx")?;
2483
2484 Ok(())
2485 }
2486
2487 fn init_single_prox_map_per_llc(
2488 skel: &mut BpfSkel,
2489 topo: &Topology,
2490 prox_map_idx: &usize,
2491 ) -> Result<()> {
2492 for (&llc_id, llc) in &topo.all_llcs {
2493 let mut node_order: Vec<usize> =
2495 topo.nodes[&llc.node_id].llcs.keys().cloned().collect();
2496 let mut sys_order: Vec<usize> = topo.all_llcs.keys().cloned().collect();
2497
2498 sys_order.retain(|id| !node_order.contains(id));
2500 node_order.retain(|&id| id != llc_id);
2501
2502 fastrand::seed((*prox_map_idx as u64) << 32 | llc_id as u64);
2505 fastrand::shuffle(&mut sys_order);
2506 fastrand::shuffle(&mut node_order);
2507
2508 let mut order: Vec<usize> = vec![];
2510 let mut idx: usize = 0;
2511
2512 idx += 1;
2513 order.push(llc_id);
2514
2515 idx += node_order.len();
2516 order.append(&mut node_order);
2517 let node_end = idx;
2518
2519 idx += sys_order.len();
2520 order.append(&mut sys_order);
2521 let sys_end = idx;
2522
2523 debug!(
2524 "LLC[{}] proximity map {}[{}/{}]: {:?}",
2525 llc_id, prox_map_idx, node_end, sys_end, &order
2526 );
2527
2528 let key = (llc_id as u32).to_ne_bytes();
2533 let v = skel
2534 .maps
2535 .llc_data
2536 .lookup(&key, libbpf_rs::MapFlags::ANY)
2537 .unwrap()
2538 .unwrap();
2539 let mut llcc: bpf_intf::llc_ctx =
2540 *plain::from_bytes(v.as_slice()).expect("llc_ctx: short or misaligned buffer");
2541
2542 let pmap = &mut llcc.prox_maps[*prox_map_idx];
2543 for (i, &llc_id) in order.iter().enumerate() {
2544 pmap.llcs[i] = llc_id as u16;
2545 }
2546 pmap.node_end = node_end as u32;
2547 pmap.sys_end = sys_end as u32;
2548
2549 skel.maps.llc_data.update(
2550 &key,
2551 unsafe { plain::as_bytes(&llcc) },
2552 libbpf_rs::MapFlags::ANY,
2553 )?
2554 }
2555
2556 Ok(())
2557 }
2558
2559 fn init_llc_prox_map(skel: &mut BpfSkel, topo: &Topology) -> Result<()> {
2560 let num_proximity_maps = bpf_intf::consts_NUM_PROXIMITY_MAPS as usize;
2561 for prox_map_idx in 0..num_proximity_maps {
2562 Self::init_single_prox_map_per_llc(skel, topo, &prox_map_idx)?;
2563 }
2564
2565 Ok(())
2566 }
2567
2568 fn init_node_prox_map(skel: &mut BpfSkel, topo: &Topology) -> Result<()> {
2569 for (&node_id, node) in &topo.nodes {
2570 let mut order: Vec<(usize, usize)> = node
2571 .distance
2572 .iter()
2573 .enumerate()
2574 .filter(|&(nid, _)| nid != node_id)
2575 .map(|(nid, &dist)| (nid, dist))
2576 .collect();
2577 order.sort_by_key(|&(_, dist)| dist);
2578
2579 let key = (node_id as u32).to_ne_bytes();
2580
2581 let v = skel.maps.node_data.lookup(&key, libbpf_rs::MapFlags::ANY);
2583 let mut nodec: bpf_intf::node_ctx = match v {
2584 Ok(Some(v)) => {
2585 *plain::from_bytes(v.as_slice()).expect("node_ctx: short or misaligned buffer")
2586 }
2587 _ => unsafe { MaybeUninit::zeroed().assume_init() },
2588 };
2589
2590 let pmap = &mut nodec.prox_map;
2591 for (i, &(nid, _)) in order.iter().enumerate() {
2592 pmap.nodes[i] = nid as u16;
2593 }
2594 pmap.sys_end = order.len() as u32;
2595
2596 debug!(
2597 "NODE[{}] prox_map[{}]: {:?}",
2598 node_id,
2599 pmap.sys_end,
2600 &order.iter().map(|(n, d)| (*n, *d)).collect::<Vec<_>>()
2601 );
2602
2603 skel.maps.node_data.update(
2604 &key,
2605 unsafe { plain::as_bytes(&nodec) },
2606 libbpf_rs::MapFlags::ANY,
2607 )?;
2608 }
2609 Ok(())
2610 }
2611
2612 fn init_node_ctx(skel: &mut BpfSkel, topo: &Topology, nr_layers: usize) -> Result<()> {
2613 let all_layers: Vec<u32> = (0..nr_layers as u32).collect();
2614 let node_empty_layers: Vec<Vec<u32>> =
2615 (0..topo.nodes.len()).map(|_| all_layers.clone()).collect();
2616 Self::refresh_node_ctx(skel, topo, &node_empty_layers, true);
2617 Ok(())
2618 }
2619
2620 fn init(
2621 opts: &'a Opts,
2622 layer_specs: &[LayerSpec],
2623 open_object: &'a mut MaybeUninit<OpenObject>,
2624 hint_to_layer_map: &HashMap<u64, HintLayerInfo>,
2625 membw_tracking: bool,
2626 ) -> Result<Self> {
2627 let nr_layers = layer_specs.len();
2628 let mut disable_topology = opts.disable_topology.unwrap_or(false);
2629
2630 let topo = Arc::new(if disable_topology {
2631 Topology::with_flattened_llc_node()?
2632 } else if opts.topology.virt_llc.is_some() {
2633 Topology::with_args(&opts.topology)?
2634 } else {
2635 Topology::new()?
2636 });
2637
2638 if topo.nodes.keys().enumerate().any(|(i, &k)| i != k) {
2645 bail!("Holes in node IDs detected: {:?}", topo.nodes.keys());
2646 }
2647 if topo.all_llcs.keys().enumerate().any(|(i, &k)| i != k) {
2648 bail!("Holes in LLC IDs detected: {:?}", topo.all_llcs.keys());
2649 }
2650 if topo.all_cpus.keys().enumerate().any(|(i, &k)| i != k) {
2651 bail!("Holes in CPU IDs detected: {:?}", topo.all_cpus.keys());
2652 }
2653
2654 let netdevs = if opts.netdev_irq_balance {
2655 let devs = read_netdevs()?;
2656 let total_irqs: usize = devs.values().map(|d| d.irqs.len()).sum();
2657 let breakdown = devs
2658 .iter()
2659 .map(|(iface, d)| format!("{iface}={}", d.irqs.len()))
2660 .collect::<Vec<_>>()
2661 .join(", ");
2662 info!(
2663 "Netdev IRQ balancing enabled: overriding {total_irqs} IRQ{} \
2664 across {} interface{} [{breakdown}]",
2665 if total_irqs == 1 { "" } else { "s" },
2666 devs.len(),
2667 if devs.len() == 1 { "" } else { "s" },
2668 );
2669 devs
2670 } else {
2671 BTreeMap::new()
2672 };
2673
2674 if !disable_topology {
2675 if topo.nodes.len() == 1 && topo.nodes[&0].llcs.len() == 1 {
2676 disable_topology = true;
2677 };
2678 info!(
2679 "Topology awareness not specified, selecting {} based on hardware",
2680 if disable_topology {
2681 "disabled"
2682 } else {
2683 "enabled"
2684 }
2685 );
2686 };
2687
2688 let cpu_pool = CpuPool::new(topo.clone(), opts.allow_partial_core)?;
2689
2690 let layer_specs: Vec<_> = if disable_topology {
2693 info!("Disabling topology awareness");
2694 layer_specs
2695 .iter()
2696 .cloned()
2697 .map(|mut s| {
2698 s.kind.common_mut().nodes.clear();
2699 s.kind.common_mut().llcs.clear();
2700 s
2701 })
2702 .collect()
2703 } else {
2704 layer_specs.to_vec()
2705 };
2706
2707 for spec in layer_specs.iter() {
2709 let mut seen = BTreeSet::new();
2710 for &node_id in spec.nodes().iter() {
2711 if !topo.nodes.contains_key(&node_id) {
2712 bail!(
2713 "layer {:?}: nodes references node {} which does not \
2714 exist in the topology (available: {:?})",
2715 spec.name,
2716 node_id,
2717 topo.nodes.keys().collect::<Vec<_>>()
2718 );
2719 }
2720 if !seen.insert(node_id) {
2721 bail!(
2722 "layer {:?}: nodes contains duplicate node {}",
2723 spec.name,
2724 node_id
2725 );
2726 }
2727 }
2728
2729 seen.clear();
2730 for &llc_id in spec.llcs().iter() {
2731 if !topo.all_llcs.contains_key(&llc_id) {
2732 bail!(
2733 "layer {:?}: llcs references LLC {} which does not \
2734 exist in the topology (available: {:?})",
2735 spec.name,
2736 llc_id,
2737 topo.all_llcs.keys().collect::<Vec<_>>()
2738 );
2739 }
2740 if !seen.insert(llc_id) {
2741 bail!(
2742 "layer {:?}: llcs contains duplicate LLC {}",
2743 spec.name,
2744 llc_id
2745 );
2746 }
2747 }
2748 }
2749
2750 for spec in layer_specs.iter() {
2751 let has_numa_node_match = spec
2752 .matches
2753 .iter()
2754 .flatten()
2755 .any(|m| matches!(m, LayerMatch::NumaNode(_)));
2756 let has_node_spread_algo = matches!(
2757 spec.kind.common().growth_algo,
2758 LayerGrowthAlgo::NodeSpread
2759 | LayerGrowthAlgo::NodeSpreadReverse
2760 | LayerGrowthAlgo::NodeSpreadRandom
2761 );
2762 if has_numa_node_match && has_node_spread_algo {
2763 bail!(
2764 "layer {:?}: NumaNode matcher cannot be combined with {:?} \
2765 growth algorithm. NodeSpread* allocates CPUs equally across \
2766 ALL NUMA nodes, but NumaNode restricts tasks to one node's \
2767 CPUs — CPUs on other nodes are wasted and utilization \
2768 will never exceed 1/numa_nodes. Use a non-spread algorithm \
2769 (e.g. Linear, Topo) instead.",
2770 spec.name,
2771 spec.kind.common().growth_algo
2772 );
2773 }
2774 }
2775
2776 init_libbpf_logging(None);
2778 let kfuncs_in_syscall = scx_bpf_compat::kfuncs_supported_in_syscall()?;
2779 if !kfuncs_in_syscall {
2780 warn!("Using slow path: kfuncs not supported in syscall programs (a8e03b6bbb2c ∉ ker)");
2781 }
2782
2783 let debug_level = if opts.log_level.contains("trace") {
2785 2
2786 } else if opts.log_level.contains("debug") {
2787 1
2788 } else {
2789 0
2790 };
2791 let mut skel_builder = BpfSkelBuilder::default();
2792 skel_builder.obj_builder.debug(debug_level > 1);
2793
2794 info!(
2795 "Running scx_layered (build ID: {})",
2796 build_id::full_version(env!("CARGO_PKG_VERSION"))
2797 );
2798 let open_opts = opts.libbpf.clone().into_bpf_open_opts();
2799 let mut skel = scx_ops_open!(skel_builder, open_object, layered, open_opts)?;
2800
2801 skel.progs.scx_pmu_switch_tc.set_autoload(membw_tracking);
2803 skel.progs.scx_pmu_tick_tc.set_autoload(membw_tracking);
2804
2805 let mut loaded_kprobes = HashSet::new();
2806
2807 if opts.enable_gpu_support {
2810 if opts.gpu_kprobe_level >= 1 {
2813 compat::cond_kprobe_load("nvidia_open", &skel.progs.kprobe_nvidia_open)?;
2814 loaded_kprobes.insert("nvidia_open");
2815 }
2816 if opts.gpu_kprobe_level >= 2 {
2819 compat::cond_kprobe_load("nvidia_mmap", &skel.progs.kprobe_nvidia_mmap)?;
2820 loaded_kprobes.insert("nvidia_mmap");
2821 }
2822 if opts.gpu_kprobe_level >= 3 {
2823 compat::cond_kprobe_load("nvidia_poll", &skel.progs.kprobe_nvidia_poll)?;
2824 loaded_kprobes.insert("nvidia_poll");
2825 }
2826 }
2827
2828 let ext_sched_class_addr = get_kallsyms_addr("ext_sched_class");
2829 let idle_sched_class_addr = get_kallsyms_addr("idle_sched_class");
2830
2831 let event = if membw_tracking {
2832 setup_membw_tracking(&mut skel)?
2833 } else {
2834 0
2835 };
2836
2837 let rodata = skel.maps.rodata_data.as_mut().unwrap();
2838
2839 if let (Ok(ext_addr), Ok(idle_addr)) = (ext_sched_class_addr, idle_sched_class_addr) {
2840 rodata.ext_sched_class_addr = ext_addr;
2841 rodata.idle_sched_class_addr = idle_addr;
2842 } else {
2843 warn!(
2844 "Unable to get sched_class addresses from /proc/kallsyms, disabling skip_preempt."
2845 );
2846 }
2847
2848 rodata.slice_ns = scx_enums.SCX_SLICE_DFL;
2849 rodata.max_exec_ns = 20 * scx_enums.SCX_SLICE_DFL;
2850
2851 skel.struct_ops.layered_mut().exit_dump_len = opts.exit_dump_len;
2853
2854 if !opts.disable_queued_wakeup {
2855 match *compat::SCX_OPS_ALLOW_QUEUED_WAKEUP {
2856 0 => info!("Kernel does not support queued wakeup optimization"),
2857 v => skel.struct_ops.layered_mut().flags |= v,
2858 }
2859 }
2860
2861 rodata.percpu_kthread_preempt = !opts.disable_percpu_kthread_preempt;
2862 rodata.percpu_kthread_preempt_all =
2863 !opts.disable_percpu_kthread_preempt && opts.percpu_kthread_preempt_all;
2864 rodata.debug = debug_level as u32;
2865 rodata.slice_ns = opts.slice_us * 1000;
2866 rodata.max_exec_ns = if opts.max_exec_us > 0 {
2867 opts.max_exec_us * 1000
2868 } else {
2869 opts.slice_us * 1000 * 20
2870 };
2871 rodata.nr_cpu_ids = *NR_CPU_IDS as u32;
2872 rodata.nr_possible_cpus = *NR_CPUS_POSSIBLE as u32;
2873 rodata.smt_enabled = topo.smt_enabled;
2874 rodata.has_little_cores = topo.has_little_cores();
2875 rodata.antistall_sec = opts.antistall_sec;
2876 rodata.monitor_disable = opts.monitor_disable;
2877 rodata.lo_fb_wait_ns = opts.lo_fb_wait_us * 1000;
2878 rodata.lo_fb_share_ppk = ((opts.lo_fb_share * 1024.0) as u32).clamp(1, 1024);
2879 rodata.enable_antistall = !opts.disable_antistall;
2880 rodata.enable_match_debug = opts.enable_match_debug;
2881 rodata.enable_gpu_support = opts.enable_gpu_support;
2882 rodata.kfuncs_supported_in_syscall = kfuncs_in_syscall;
2883
2884 for (cpu, sib) in topo.sibling_cpus().iter().enumerate() {
2885 rodata.__sibling_cpu[cpu] = *sib;
2886 }
2887 for cpu in topo.all_cpus.keys() {
2888 rodata.all_cpus[cpu / 8] |= 1 << (cpu % 8);
2889 }
2890
2891 rodata.nr_op_layers = layer_specs
2892 .iter()
2893 .filter(|spec| match &spec.kind {
2894 LayerKind::Open { .. } => spec.kind.common().preempt,
2895 _ => false,
2896 })
2897 .count() as u32;
2898 rodata.nr_on_layers = layer_specs
2899 .iter()
2900 .filter(|spec| match &spec.kind {
2901 LayerKind::Open { .. } => !spec.kind.common().preempt,
2902 _ => false,
2903 })
2904 .count() as u32;
2905 rodata.nr_gp_layers = layer_specs
2906 .iter()
2907 .filter(|spec| match &spec.kind {
2908 LayerKind::Grouped { .. } => spec.kind.common().preempt,
2909 _ => false,
2910 })
2911 .count() as u32;
2912 rodata.nr_gn_layers = layer_specs
2913 .iter()
2914 .filter(|spec| match &spec.kind {
2915 LayerKind::Grouped { .. } => !spec.kind.common().preempt,
2916 _ => false,
2917 })
2918 .count() as u32;
2919 rodata.nr_excl_layers = layer_specs
2920 .iter()
2921 .filter(|spec| spec.kind.common().exclusive)
2922 .count() as u32;
2923
2924 let mut min_open = u64::MAX;
2925 let mut min_preempt = u64::MAX;
2926
2927 for spec in layer_specs.iter() {
2928 if let LayerKind::Open { common, .. } = &spec.kind {
2929 min_open = min_open.min(common.disallow_open_after_us.unwrap());
2930 min_preempt = min_preempt.min(common.disallow_preempt_after_us.unwrap());
2931 }
2932 }
2933
2934 rodata.min_open_layer_disallow_open_after_ns = match min_open {
2935 u64::MAX => *DFL_DISALLOW_OPEN_AFTER_US,
2936 v => v,
2937 };
2938 rodata.min_open_layer_disallow_preempt_after_ns = match min_preempt {
2939 u64::MAX => *DFL_DISALLOW_PREEMPT_AFTER_US,
2940 v => v,
2941 };
2942
2943 let layered_task_hint_map_path = &opts.task_hint_map;
2948 let hint_map = &mut skel.maps.scx_layered_task_hint_map;
2949 if !layered_task_hint_map_path.is_empty() {
2951 hint_map.set_pin_path(layered_task_hint_map_path).unwrap();
2952 rodata.task_hint_map_enabled = true;
2953 }
2954
2955 if !opts.hi_fb_thread_name.is_empty() {
2956 let bpf_hi_fb_thread_name = &mut rodata.hi_fb_thread_name;
2957 copy_into_cstr(bpf_hi_fb_thread_name, opts.hi_fb_thread_name.as_str());
2958 rodata.enable_hi_fb_thread_name_match = true;
2959 }
2960
2961 let cgroup_regexes = Self::init_layers(&mut skel, &layer_specs, &topo)?;
2962 skel.maps.rodata_data.as_mut().unwrap().nr_cgroup_regexes = cgroup_regexes.len() as u32;
2963 Self::init_nodes(&mut skel, opts, &topo);
2964
2965 let mut skel = scx_ops_load!(skel, layered, uei)?;
2966
2967 if !hint_to_layer_map.is_empty() {
2969 for (k, v) in hint_to_layer_map.iter() {
2970 let key: u32 = *k as u32;
2971
2972 let mut info_bytes = vec![0u8; std::mem::size_of::<bpf_intf::hint_layer_info>()];
2974 let info_ptr = info_bytes.as_mut_ptr() as *mut bpf_intf::hint_layer_info;
2975 unsafe {
2976 (*info_ptr).layer_id = v.layer_id as u32;
2977 (*info_ptr).system_cpu_util_below = match v.system_cpu_util_below {
2978 Some(threshold) => (threshold * 10000.0) as u64,
2979 None => u64::MAX, };
2981 (*info_ptr).dsq_insert_below = match v.dsq_insert_below {
2982 Some(threshold) => (threshold * 10000.0) as u64,
2983 None => u64::MAX, };
2985 }
2986
2987 skel.maps.hint_to_layer_id_map.update(
2988 &key.to_ne_bytes(),
2989 &info_bytes,
2990 libbpf_rs::MapFlags::ANY,
2991 )?;
2992 }
2993 }
2994
2995 if membw_tracking {
2996 create_perf_fds(&mut skel, event)?;
2997 }
2998
2999 let mut layers = vec![];
3000 let layer_growth_orders =
3001 LayerGrowthAlgo::layer_core_orders(&cpu_pool, &layer_specs, &topo)?;
3002 for (idx, spec) in layer_specs.iter().enumerate() {
3003 let growth_order = layer_growth_orders
3004 .get(&idx)
3005 .with_context(|| "layer has no growth order".to_string())?;
3006 layers.push(Layer::new(spec, &topo, growth_order)?);
3007 }
3008
3009 let mut idle_qos_enabled = layers
3010 .iter()
3011 .any(|layer| layer.kind.common().idle_resume_us.unwrap_or(0) > 0);
3012 if idle_qos_enabled && !cpu_idle_resume_latency_supported() {
3013 warn!("idle_resume_us not supported, ignoring");
3014 idle_qos_enabled = false;
3015 }
3016
3017 Self::init_cpus(&skel, &layer_specs, &topo)?;
3018 Self::init_llc_prox_map(&mut skel, &topo)?;
3019 Self::init_node_prox_map(&mut skel, &topo)?;
3020 Self::init_node_ctx(&mut skel, &topo, nr_layers)?;
3021
3022 let proc_reader = fb_procfs::ProcReader::new();
3024
3025 let input = ProgramInput {
3027 ..Default::default()
3028 };
3029 let prog = &mut skel.progs.initialize_pid_namespace;
3030
3031 let _ = prog.test_run(input);
3032
3033 if !layered_task_hint_map_path.is_empty() {
3042 let path = CString::new(layered_task_hint_map_path.as_bytes()).unwrap();
3043 let mode: libc::mode_t = 0o666;
3044 unsafe {
3045 if libc::chmod(path.as_ptr(), mode) != 0 {
3046 trace!("'chmod' to 666 of task hint map failed, continuing...");
3047 }
3048 }
3049 }
3050
3051 let struct_ops = scx_ops_attach!(skel, layered)?;
3053
3054 if opts.enable_gpu_support {
3056 if loaded_kprobes.contains("nvidia_open") {
3057 compat::cond_kprobe_attach("nvidia_open", &skel.progs.kprobe_nvidia_open)?;
3058 }
3059 if loaded_kprobes.contains("nvidia_mmap") {
3060 compat::cond_kprobe_attach("nvidia_mmap", &skel.progs.kprobe_nvidia_mmap)?;
3061 }
3062 if loaded_kprobes.contains("nvidia_poll") {
3063 compat::cond_kprobe_attach("nvidia_poll", &skel.progs.kprobe_nvidia_poll)?;
3064 }
3065 }
3066
3067 let stats_server = StatsServer::new(stats::server_data()).launch()?;
3068 let mut gpu_task_handler =
3069 GpuTaskAffinitizer::new(opts.gpu_affinitize_secs, opts.enable_gpu_affinitize);
3070 gpu_task_handler.init(topo.clone());
3071
3072 let sched = Self {
3073 struct_ops: Some(struct_ops),
3074 layer_specs,
3075
3076 sched_intv: Duration::from_secs_f64(opts.interval),
3077 layer_refresh_intv: Duration::from_millis(opts.layer_refresh_ms_avgruntime),
3078
3079 cpu_pool,
3080 layers,
3081 idle_qos_enabled,
3082
3083 sched_stats: Stats::new(
3084 &mut skel,
3085 &proc_reader,
3086 topo.clone(),
3087 &gpu_task_handler,
3088 opts.util_compensation,
3089 )?,
3090 layer_peak_utils: vec![0.0; nr_layers],
3091
3092 cgroup_regexes: Some(cgroup_regexes),
3093 nr_layer_cpus_ranges: vec![(0, 0); nr_layers],
3094 xnuma_mig_src: vec![vec![false; topo.nodes.len()]; nr_layers],
3095 growth_denied: vec![vec![false; topo.nodes.len()]; nr_layers],
3096 processing_dur: Default::default(),
3097
3098 proc_reader,
3099 skel,
3100
3101 topo,
3102 netdevs,
3103 stats_server,
3104 gpu_task_handler,
3105 };
3106
3107 info!("Layered Scheduler Attached. Run `scx_layered --monitor` for metrics.");
3108
3109 Ok(sched)
3110 }
3111
3112 fn update_cpumask(mask: &Cpumask, bpfmask: &mut [u8]) {
3113 for cpu in 0..mask.len() {
3114 if mask.test_cpu(cpu) {
3115 bpfmask[cpu / 8] |= 1 << (cpu % 8);
3116 } else {
3117 bpfmask[cpu / 8] &= !(1 << (cpu % 8));
3118 }
3119 }
3120 }
3121
3122 fn update_bpf_layer_cpumask(layer: &Layer, bpf_layer: &mut types::layer) {
3123 trace!("[{}] Updating BPF CPUs: {}", layer.name, &layer.cpus);
3124 Self::update_cpumask(&layer.cpus, &mut bpf_layer.cpus);
3125
3126 bpf_layer.nr_cpus = layer.nr_cpus as u32;
3127 for (llc_id, &nr_llc_cpus) in layer.nr_llc_cpus.iter().enumerate() {
3128 bpf_layer.nr_llc_cpus[llc_id] = nr_llc_cpus as u32;
3129 }
3130 for (node_id, &nr_node_cpus) in layer.nr_node_cpus.iter().enumerate() {
3131 bpf_layer.node[node_id].nr_cpus = nr_node_cpus as u32;
3132 }
3133
3134 bpf_layer.refresh_cpus = 1;
3135 }
3136
3137 fn update_netdev_cpumasks(&mut self) -> Result<()> {
3138 let available_cpus = self.cpu_pool.available_cpus();
3139 if available_cpus.is_empty() {
3140 return Ok(());
3141 }
3142
3143 for (iface, netdev) in self.netdevs.iter_mut() {
3144 let node = self
3145 .topo
3146 .nodes
3147 .values()
3148 .find(|n| n.id == netdev.node())
3149 .ok_or_else(|| anyhow!("Failed to get netdev node"))?;
3150 let node_cpus = node.span.clone();
3151 for (irq, irqmask) in netdev.irqs.iter_mut() {
3152 irqmask.clear_all();
3153 for cpu in available_cpus.iter() {
3154 if !node_cpus.test_cpu(cpu) {
3155 continue;
3156 }
3157 let _ = irqmask.set_cpu(cpu);
3158 }
3159 if irqmask.weight() == 0 {
3161 for cpu in node_cpus.iter() {
3162 let _ = irqmask.set_cpu(cpu);
3163 }
3164 }
3165 trace!("{} updating irq {} cpumask {:?}", iface, irq, irqmask);
3166 }
3167 netdev.apply_cpumasks()?;
3168 debug!(
3169 "{iface}: applied affinity override to {} IRQ{}",
3170 netdev.irqs.len(),
3171 if netdev.irqs.len() == 1 { "" } else { "s" },
3172 );
3173 }
3174
3175 Ok(())
3176 }
3177
3178 fn clamp_target_by_membw(
3179 &self,
3180 layer: &Layer,
3181 membw_limit: f64,
3182 membw: f64,
3183 curtarget: u64,
3184 ) -> usize {
3185 let ncpu: u64 = layer.cpus.weight() as u64;
3186 let membw = (membw * 1024_f64.powf(3.0)).round() as u64;
3187 let membw_limit = (membw_limit * 1024_f64.powf(3.0)).round() as u64;
3188 let last_membw_percpu = if ncpu > 0 { membw / ncpu } else { 0 };
3189
3190 if membw_limit == 0 || last_membw_percpu == 0 {
3193 return curtarget as usize;
3194 }
3195
3196 (membw_limit / last_membw_percpu) as usize
3197 }
3198
3199 fn calc_raw_demands(&self, targets: &[(usize, usize)]) -> Vec<LayerDemand> {
3203 let au = self.cpu_pool.alloc_unit();
3204 let pinned_utils = &self.sched_stats.layer_node_pinned_utils;
3205 let nr_nodes = self.topo.nodes.len();
3206
3207 targets
3208 .iter()
3209 .enumerate()
3210 .map(|(idx, &(target, _min))| {
3211 let layer = &self.layers[idx];
3212 let weight = layer.kind.common().weight as usize;
3213
3214 if matches!(layer.kind, LayerKind::Open { .. }) {
3216 return LayerDemand {
3217 raw_pinned: vec![0; nr_nodes],
3218 raw_unpinned: 0,
3219 weight,
3220 spread: false,
3221 };
3222 }
3223
3224 let spread = matches!(
3229 layer.growth_algo,
3230 LayerGrowthAlgo::NodeSpread
3231 | LayerGrowthAlgo::NodeSpreadReverse
3232 | LayerGrowthAlgo::NodeSpreadRandom
3233 );
3234
3235 let util_high = match &layer.kind {
3236 LayerKind::Confined { util_range, .. }
3237 | LayerKind::Grouped { util_range, .. } => util_range.1,
3238 _ => 1.0,
3239 };
3240
3241 let mut raw_pinned = vec![0usize; nr_nodes];
3243 for n in 0..nr_nodes {
3244 let pu = pinned_utils[idx][n];
3245 if pu < 0.01 {
3246 continue;
3247 }
3248 let node_span = &self.topo.nodes[&n].span;
3250 if layer.allowed_cpus.and(node_span).is_empty() {
3251 continue;
3252 }
3253 let cpus = (pu / util_high).ceil() as usize;
3254 let units = cpus.div_ceil(au);
3256 raw_pinned[n] = units;
3257 }
3258
3259 let target_units = target.div_ceil(au);
3261 let pinned_units: usize = raw_pinned.iter().sum();
3262 let raw_unpinned = target_units.saturating_sub(pinned_units);
3263
3264 LayerDemand {
3265 raw_pinned,
3266 raw_unpinned,
3267 weight,
3268 spread,
3269 }
3270 })
3271 .collect()
3272 }
3273
3274 fn calc_target_nr_cpus(&mut self) -> Vec<(usize, usize)> {
3282 let nr_cpus = self.cpu_pool.topo.all_cpus.len();
3283 let membws = &self.sched_stats.layer_membws;
3284
3285 let mut records: Vec<(u64, u64, u64, u64, usize, usize, usize)> = vec![];
3286 let mut targets: Vec<(usize, usize)> = vec![];
3287
3288 for (idx, layer) in self.layers.iter().enumerate() {
3289 targets.push(match &layer.kind {
3290 LayerKind::Confined {
3291 util_range,
3292 cpus_range,
3293 cpus_range_frac,
3294 membw_gb,
3295 ..
3296 }
3297 | LayerKind::Grouped {
3298 util_range,
3299 cpus_range,
3300 cpus_range_frac,
3301 membw_gb,
3302 ..
3303 } => {
3304 let cpus_range =
3305 resolve_cpus_pct_range(cpus_range, cpus_range_frac, nr_cpus).unwrap();
3306
3307 let (owned, open) = {
3312 let utils = if self.sched_stats.util_compensation {
3313 &self.sched_stats.layer_utils_compensated[idx]
3314 } else {
3315 &self.sched_stats.layer_utils[idx]
3316 };
3317 (utils[LAYER_USAGE_OWNED], utils[LAYER_USAGE_OPEN])
3318 };
3319
3320 let membw_owned = membws[idx][LAYER_USAGE_OWNED];
3321 let membw_open = membws[idx][LAYER_USAGE_OPEN];
3322
3323 let mut util = owned;
3324 let mut membw = membw_owned;
3325 if layer.kind.util_includes_open_cputime() || layer.nr_cpus == 0 {
3326 util += open;
3327 membw += membw_open;
3328 }
3329
3330 let util = if util < 0.01 { 0.0 } else { util };
3331
3332 let peak_util = if layer.kind.common().util_peak_half_life_ms == 0 {
3333 util
3334 } else {
3335 update_peak_util(
3336 self.layer_peak_utils[idx],
3337 util,
3338 Duration::from_millis(layer.kind.common().util_peak_half_life_ms),
3339 self.sched_stats.elapsed,
3340 )
3341 };
3342 self.layer_peak_utils[idx] = peak_util;
3343 let (low, high) = calc_peak_aware_target_range(util, peak_util, *util_range);
3344
3345 let membw_limit = match membw_gb {
3346 Some(membw_limit) => *membw_limit,
3347 None => 0.0,
3348 };
3349
3350 trace!(
3351 "layer {0} (membw, membw_limit): ({membw} gi_b, {membw_limit} gi_b)",
3352 layer.name
3353 );
3354
3355 let target = layer.cpus.weight().clamp(low, high);
3356
3357 records.push((
3358 (owned * 100.0) as u64,
3359 (open * 100.0) as u64,
3360 (util * 100.0) as u64,
3361 (peak_util * 100.0) as u64,
3362 low,
3363 high,
3364 target,
3365 ));
3366
3367 let target = target.clamp(cpus_range.0, cpus_range.1);
3368 let membw_target =
3369 self.clamp_target_by_membw(layer, membw_limit, membw, target as u64);
3370
3371 trace!("CPU target pre- and post-membw adjustment: {target} -> {membw_target}");
3372
3373 if membw_target < cpus_range.0 {
3376 warn!("cannot satisfy memory bw limit for layer {}", layer.name);
3377 warn!("membw_target {membw_target} low {}", cpus_range.0);
3378 };
3379
3380 let target = membw_target.clamp(cpus_range.0, target);
3383
3384 (target, cpus_range.0)
3385 }
3386 LayerKind::Open { .. } => (0, 0),
3387 });
3388 }
3389
3390 trace!(
3391 "(owned, open, util, peak, low, high, target): {:?}",
3392 &records
3393 );
3394 targets
3395 }
3396
3397 fn compute_target_llcs(target: usize, node_llcs: &BTreeMap<usize, Arc<Llc>>) -> (usize, usize) {
3401 let mut remaining = target;
3402 let mut full = 0;
3403 for llc in node_llcs.values() {
3404 let cpus_in_llc = llc.span.weight();
3405 if remaining >= cpus_in_llc {
3406 full += 1;
3407 remaining -= cpus_in_llc;
3408 } else {
3409 let mut extra = 0;
3410 for (_, core) in llc.cores.iter() {
3411 if remaining == 0 {
3412 break;
3413 }
3414 extra += 1;
3415 remaining = remaining.saturating_sub(core.cpus.len());
3416 }
3417 return (full, extra);
3418 }
3419 }
3420 (full, 0)
3421 }
3422
3423 fn recompute_layer_core_order(
3428 &mut self,
3429 layer_targets: &[(usize, usize)],
3430 layer_allocs: &[LayerAlloc],
3431 au: usize,
3432 ) -> Result<bool> {
3433 let nr_nodes = self.topo.nodes.len();
3434
3435 debug!(
3437 " free: before pass: free_llcs={:?}",
3438 self.cpu_pool.free_llcs
3439 );
3440 for &(idx, _) in layer_targets.iter().rev() {
3441 let layer = &mut self.layers[idx];
3442
3443 if layer.growth_algo != LayerGrowthAlgo::StickyDynamic {
3444 continue;
3445 }
3446
3447 let alloc = &layer_allocs[idx];
3448
3449 for n in 0..nr_nodes {
3450 let assigned_on_n = layer.assigned_llcs[n].len();
3451 let node = self.topo.nodes.get(&n).unwrap();
3452 let target_full_n =
3453 Self::compute_target_llcs(alloc.node_target(n) * au, &node.llcs).0;
3454 let mut to_free = assigned_on_n.saturating_sub(target_full_n);
3455
3456 debug!(
3457 " free: layer={} node={} assigned={} target_full={} to_free={}",
3458 layer.name, n, assigned_on_n, target_full_n, to_free,
3459 );
3460
3461 while to_free > 0 {
3462 if let Some(llc) = layer.assigned_llcs[n].pop() {
3463 self.cpu_pool.return_llc(llc);
3464 to_free -= 1;
3465 debug!(" layer={} freed_llc={} from node={}", layer.name, llc, n);
3466 } else {
3467 break;
3468 }
3469 }
3470 }
3471 }
3472 debug!(" free: after pass: free_llcs={:?}", self.cpu_pool.free_llcs);
3473
3474 for &(idx, _) in layer_targets.iter().rev() {
3476 let layer = &mut self.layers[idx];
3477
3478 if layer.growth_algo != LayerGrowthAlgo::StickyDynamic {
3479 continue;
3480 }
3481
3482 let alloc = &layer_allocs[idx];
3483
3484 for n in 0..nr_nodes {
3485 let cur_on_n = layer.assigned_llcs[n].len();
3486 let node = self.topo.nodes.get(&n).unwrap();
3487 let target_full_n =
3488 Self::compute_target_llcs(alloc.node_target(n) * au, &node.llcs).0;
3489 let mut to_alloc = target_full_n.saturating_sub(cur_on_n);
3490
3491 debug!(
3492 " alloc: layer={} node={} cur={} target_full={} to_alloc={} free={}",
3493 layer.name,
3494 n,
3495 cur_on_n,
3496 target_full_n,
3497 to_alloc,
3498 self.cpu_pool.free_llcs.get(&n).map_or(0, |v| v.len()),
3499 );
3500
3501 while to_alloc > 0 {
3502 if let Some(llc) = self.cpu_pool.take_llc_from_node(n) {
3503 layer.assigned_llcs[n].push(llc);
3504 to_alloc -= 1;
3505 debug!(" layer={} alloc_llc={} on node={}", layer.name, llc, n);
3506 } else {
3507 break;
3508 }
3509 }
3510 }
3511
3512 debug!(
3513 " alloc: layer={} assigned_llcs={:?}",
3514 layer.name, layer.assigned_llcs
3515 );
3516 }
3517
3518 for &(idx, _) in layer_targets.iter() {
3520 let layer = &mut self.layers[idx];
3521
3522 if layer.growth_algo != LayerGrowthAlgo::StickyDynamic {
3523 continue;
3524 }
3525
3526 layer.core_order = vec![Vec::new(); nr_nodes];
3527 let alloc = &layer_allocs[idx];
3528
3529 for n in 0..nr_nodes {
3530 let node = self.topo.nodes.get(&n).unwrap();
3531 let mut extra = Self::compute_target_llcs(alloc.node_target(n) * au, &node.llcs).1;
3532
3533 if let Some(node_llcs) = self.cpu_pool.free_llcs.get_mut(&n) {
3534 for entry in node_llcs.iter_mut() {
3535 if extra == 0 {
3536 break;
3537 }
3538 let llc_id = entry.0;
3539 let llc = self.topo.all_llcs.get(&llc_id).unwrap();
3540 let avail = llc.cores.len() - entry.1;
3541 let mut used = extra.min(avail);
3542 let cores_to_add = used;
3543
3544 let shift = entry.1;
3545 entry.1 += used;
3546
3547 for core in llc.cores.iter().skip(shift) {
3548 if used == 0 {
3549 break;
3550 }
3551 layer.core_order[n].push(core.1.id);
3552 used -= 1;
3553 }
3554
3555 extra -= cores_to_add;
3556 }
3557 }
3558 }
3559
3560 for node_cores in &mut layer.core_order {
3561 node_cores.reverse();
3562 }
3563 }
3564
3565 for node_llcs in self.cpu_pool.free_llcs.values_mut() {
3567 for entry in node_llcs.iter_mut() {
3568 entry.1 = 0;
3569 }
3570 }
3571
3572 for &(idx, _) in layer_targets.iter() {
3574 let layer = &mut self.layers[idx];
3575
3576 if layer.growth_algo != LayerGrowthAlgo::StickyDynamic {
3577 continue;
3578 }
3579
3580 let all_assigned: HashSet<usize> =
3581 layer.assigned_llcs.iter().flatten().copied().collect();
3582
3583 for core in self.topo.all_cores.iter() {
3584 let llc_id = core.1.llc_id;
3585 if all_assigned.contains(&llc_id) {
3586 let nid = core.1.node_id;
3587 layer.core_order[nid].push(core.1.id);
3588 }
3589 }
3590 for node_cores in &mut layer.core_order {
3591 node_cores.reverse();
3592 }
3593
3594 debug!(
3595 " alloc: layer={} core_order={:?}",
3596 layer.name, layer.core_order
3597 );
3598 }
3599
3600 let mut updated = false;
3603
3604 for &(idx, _) in layer_targets.iter() {
3606 let layer = &mut self.layers[idx];
3607
3608 if layer.growth_algo != LayerGrowthAlgo::StickyDynamic {
3609 continue;
3610 }
3611
3612 for n in 0..nr_nodes {
3613 let mut node_target = Cpumask::new();
3614 for &core_id in &layer.core_order[n] {
3615 if let Some(core) = self.topo.all_cores.get(&core_id) {
3616 node_target |= &core.span;
3617 }
3618 }
3619 node_target &= &layer.allowed_cpus;
3620
3621 let node_span = &self.topo.nodes[&n].span;
3622 let node_cur = layer.cpus.and(node_span);
3623 let cpus_to_free = node_cur.and(&node_target.not());
3624
3625 if cpus_to_free.weight() > 0 {
3626 debug!(
3627 " apply: layer={} freeing CPUs on node {}: {}",
3628 layer.name, n, cpus_to_free
3629 );
3630 layer.cpus &= &cpus_to_free.not();
3631 layer.nr_cpus -= cpus_to_free.weight();
3632 for cpu in cpus_to_free.iter() {
3633 layer.nr_llc_cpus[self.cpu_pool.topo.all_cpus[&cpu].llc_id] -= 1;
3634 layer.nr_node_cpus[n] -= 1;
3635 }
3636 self.cpu_pool.free(&cpus_to_free)?;
3637 updated = true;
3638 }
3639 }
3640 }
3641
3642 for &(idx, _) in layer_targets.iter() {
3644 let layer = &mut self.layers[idx];
3645
3646 if layer.growth_algo != LayerGrowthAlgo::StickyDynamic {
3647 continue;
3648 }
3649
3650 for n in 0..nr_nodes {
3651 let mut node_target = Cpumask::new();
3652 for &core_id in &layer.core_order[n] {
3653 if let Some(core) = self.topo.all_cores.get(&core_id) {
3654 node_target |= &core.span;
3655 }
3656 }
3657 node_target &= &layer.allowed_cpus;
3658
3659 let available_cpus = self.cpu_pool.available_cpus();
3660 let desired_to_alloc = node_target.and(&layer.cpus.clone().not());
3661 let cpus_to_alloc = desired_to_alloc.clone().and(&available_cpus);
3662
3663 if desired_to_alloc.weight() > cpus_to_alloc.weight() {
3664 debug!(
3665 " apply: layer={} node {} wanted to alloc {} CPUs but only {} available",
3666 layer.name,
3667 n,
3668 desired_to_alloc.weight(),
3669 cpus_to_alloc.weight()
3670 );
3671 }
3672
3673 if cpus_to_alloc.weight() > 0 {
3674 debug!(
3675 " apply: layer={} allocating CPUs on node {}: {}",
3676 layer.name, n, cpus_to_alloc
3677 );
3678 layer.cpus |= &cpus_to_alloc;
3679 layer.nr_cpus += cpus_to_alloc.weight();
3680 for cpu in cpus_to_alloc.iter() {
3681 layer.nr_llc_cpus[self.cpu_pool.topo.all_cpus[&cpu].llc_id] += 1;
3682 layer.nr_node_cpus[n] += 1;
3683 }
3684 self.cpu_pool.mark_allocated(&cpus_to_alloc)?;
3685 updated = true;
3686 }
3687 }
3688
3689 debug!(
3690 " apply: layer={} final cpus.weight()={} nr_cpus={}",
3691 layer.name,
3692 layer.cpus.weight(),
3693 layer.nr_cpus
3694 );
3695 }
3696
3697 Ok(updated)
3698 }
3699
3700 fn refresh_node_ctx(
3701 skel: &mut BpfSkel,
3702 topo: &Topology,
3703 node_empty_layers: &[Vec<u32>],
3704 init: bool,
3705 ) {
3706 for &nid in topo.nodes.keys() {
3707 let mut arg: bpf_intf::refresh_node_ctx_arg =
3708 unsafe { MaybeUninit::zeroed().assume_init() };
3709 arg.node_id = nid as u32;
3710 arg.init = init as u32;
3711
3712 let empty = &node_empty_layers[nid];
3713 arg.nr_empty_layer_ids = empty.len() as u32;
3714 for (i, &lid) in empty.iter().enumerate() {
3715 arg.empty_layer_ids[i] = lid;
3716 }
3717 for i in empty.len()..MAX_LAYERS {
3718 arg.empty_layer_ids[i] = MAX_LAYERS as u32;
3719 }
3720
3721 if init {
3722 let node = &topo.nodes[&nid];
3724 let llcs: Vec<u32> = node.llcs.keys().map(|&id| id as u32).collect();
3725 arg.nr_llcs = llcs.len() as u32;
3726 for (i, &llc_id) in llcs.iter().enumerate() {
3727 arg.llcs[i] = llc_id;
3728 }
3729 }
3730
3731 let input = ProgramInput {
3732 context_in: Some(unsafe { plain::as_mut_bytes(&mut arg) }),
3733 ..Default::default()
3734 };
3735 let _ = skel.progs.refresh_node_ctx.test_run(input);
3736 }
3737 }
3738
3739 fn refresh_cpumasks(&mut self) -> Result<()> {
3740 let layer_is_open = |layer: &Layer| matches!(layer.kind, LayerKind::Open { .. });
3741
3742 let mut updated = false;
3743 let raw_targets = self.calc_target_nr_cpus();
3744 let au = self.cpu_pool.alloc_unit();
3745 let total_cpus = self.cpu_pool.topo.all_cpus.len();
3746
3747 let targets: Vec<(usize, usize)> = raw_targets
3752 .iter()
3753 .enumerate()
3754 .map(|(idx, &(target, min))| {
3755 let cur = self.layers[idx].nr_cpus;
3756 if target < cur {
3757 let dampened = cur - (cur - target).div_ceil(2);
3758 (dampened.max(min), min)
3759 } else {
3760 (target, min)
3761 }
3762 })
3763 .collect();
3764
3765 let demands = self.calc_raw_demands(&targets);
3767 let nr_nodes = self.topo.nodes.len();
3768 let node_caps: Vec<usize> = self
3769 .topo
3770 .nodes
3771 .values()
3772 .map(|n| n.span.weight() / au)
3773 .collect();
3774 let all_layer_nodes: Vec<&[usize]> = self
3775 .layer_specs
3776 .iter()
3777 .map(|s| s.nodes().as_slice())
3778 .collect();
3779 let norders: Vec<Vec<usize>> = (0..self.layers.len())
3780 .map(|idx| {
3781 layer_core_growth::node_order(
3782 self.layer_specs[idx].nodes(),
3783 &self.topo,
3784 idx,
3785 &all_layer_nodes,
3786 )
3787 })
3788 .collect();
3789 let node_groups: Vec<Vec<Vec<usize>>> = (0..self.layers.len())
3790 .map(|idx| {
3791 layer_core_growth::node_groups(
3792 self.layer_specs[idx].nodes(),
3793 &self.topo,
3794 idx,
3795 &all_layer_nodes,
3796 &self.layer_specs[idx].kind.common().growth_algo,
3797 )
3798 })
3799 .collect();
3800 let layer_allocs = unified_alloc(total_cpus / au, &node_caps, &demands, &node_groups);
3801
3802 let cpu_targets: Vec<usize> = layer_allocs.iter().map(|a| a.total() * au).collect();
3805
3806 let prev_nr_cpus: Vec<usize> = self.layers.iter().map(|l| l.nr_cpus).collect();
3808
3809 let mut ascending: Vec<(usize, usize)> = cpu_targets.iter().copied().enumerate().collect();
3810 ascending.sort_by(|a, b| a.1.cmp(&b.1));
3811
3812 let prev_node_cpus: Vec<Vec<usize>> =
3814 self.layers.iter().map(|l| l.nr_node_cpus.clone()).collect();
3815
3816 let use_sd_alloc = self.topo.all_llcs.len() > 1;
3820 let sticky_dynamic_updated = if use_sd_alloc {
3821 self.recompute_layer_core_order(&ascending, &layer_allocs, au)?
3822 } else {
3823 false
3824 };
3825 updated |= sticky_dynamic_updated;
3826
3827 if sticky_dynamic_updated {
3829 for (idx, layer) in self.layers.iter().enumerate() {
3830 if layer.growth_algo == LayerGrowthAlgo::StickyDynamic {
3831 Self::update_bpf_layer_cpumask(
3832 layer,
3833 &mut self.skel.maps.bss_data.as_mut().unwrap().layers[idx],
3834 );
3835 }
3836 }
3837 }
3838
3839 for &(idx, _target) in ascending.iter().rev() {
3841 let layer = &mut self.layers[idx];
3842 if layer_is_open(layer) {
3843 continue;
3844 }
3845
3846 if layer.growth_algo == LayerGrowthAlgo::StickyDynamic && use_sd_alloc {
3848 continue;
3849 }
3850
3851 let alloc = &layer_allocs[idx];
3852 let mut freed = false;
3853
3854 for n in 0..nr_nodes {
3855 let desired = alloc.node_target(n) * au;
3856 let mut to_free = layer.nr_node_cpus[n].saturating_sub(desired);
3857 let node_span = &self.topo.nodes[&n].span;
3858
3859 while to_free > 0 {
3860 let node_cands = layer.cpus.and(node_span);
3861 let cpus_to_free = match self
3862 .cpu_pool
3863 .next_to_free(&node_cands, layer.core_order[n].iter().rev())?
3864 {
3865 Some(ret) => ret,
3866 None => break,
3867 };
3868 let nr = cpus_to_free.weight();
3869 trace!(
3870 "[{}] freeing CPUs on node {}: {}",
3871 layer.name,
3872 n,
3873 &cpus_to_free
3874 );
3875 layer.cpus &= &cpus_to_free.not();
3876 layer.nr_cpus -= nr;
3877 for cpu in cpus_to_free.iter() {
3878 let node_id = self.cpu_pool.topo.all_cpus[&cpu].node_id;
3879 layer.nr_llc_cpus[self.cpu_pool.topo.all_cpus[&cpu].llc_id] -= 1;
3880 layer.nr_node_cpus[node_id] -= 1;
3881 layer.nr_pinned_cpus[node_id] =
3882 layer.nr_pinned_cpus[node_id].min(layer.nr_node_cpus[node_id]);
3883 }
3884 self.cpu_pool.free(&cpus_to_free)?;
3885 to_free = to_free.saturating_sub(nr);
3886 freed = true;
3887 }
3888 }
3889
3890 if freed {
3891 Self::update_bpf_layer_cpumask(
3892 layer,
3893 &mut self.skel.maps.bss_data.as_mut().unwrap().layers[idx],
3894 );
3895 updated = true;
3896 }
3897 }
3898
3899 for &(idx, _target) in &ascending {
3901 let layer = &mut self.layers[idx];
3902
3903 if layer_is_open(layer) {
3904 continue;
3905 }
3906
3907 if layer.growth_algo == LayerGrowthAlgo::StickyDynamic && use_sd_alloc {
3909 continue;
3910 }
3911
3912 let alloc = &layer_allocs[idx];
3913 let norder = &norders[idx];
3914 let mut alloced = false;
3915
3916 for &node_id in norder.iter() {
3917 let node_target = alloc.node_target(node_id) * au;
3918 let cur_node = layer.nr_node_cpus[node_id];
3919 if node_target <= cur_node {
3920 continue;
3921 }
3922 let pinned_target = alloc.pinned[node_id] * au;
3923 let mut nr_to_alloc = node_target - cur_node;
3924 let node_span = &self.topo.nodes[&node_id].span;
3925 let node_allowed = layer.allowed_cpus.and(node_span);
3926
3927 while nr_to_alloc > 0 {
3928 let nr_alloced = match self.cpu_pool.alloc_cpus(
3929 &node_allowed,
3930 &layer.core_order[node_id],
3931 nr_to_alloc,
3932 ) {
3933 Some(new_cpus) => {
3934 let nr = new_cpus.weight();
3935 layer.cpus |= &new_cpus;
3936 layer.nr_cpus += nr;
3937 for cpu in new_cpus.iter() {
3938 layer.nr_llc_cpus[self.cpu_pool.topo.all_cpus[&cpu].llc_id] += 1;
3939 let nid = self.cpu_pool.topo.all_cpus[&cpu].node_id;
3940 layer.nr_node_cpus[nid] += 1;
3941 if layer.nr_pinned_cpus[nid] < pinned_target {
3942 layer.nr_pinned_cpus[nid] += 1;
3943 }
3944 }
3945 nr
3946 }
3947 None => 0,
3948 };
3949 if nr_alloced == 0 {
3950 break;
3951 }
3952 alloced = true;
3953 nr_to_alloc -= nr_alloced.min(nr_to_alloc);
3954 }
3955 }
3956
3957 if alloced {
3958 Self::update_bpf_layer_cpumask(
3959 layer,
3960 &mut self.skel.maps.bss_data.as_mut().unwrap().layers[idx],
3961 );
3962 updated = true;
3963 }
3964 }
3965
3966 let total_allocated: usize = self.layers.iter().map(|l| l.nr_cpus).sum();
3971 let fully_allocated = total_allocated >= total_cpus;
3972 for (idx, layer) in self.layers.iter().enumerate() {
3973 if !layer_is_open(layer) {
3974 self.skel.maps.bss_data.as_mut().unwrap().layers[idx]
3975 .fully_allocated
3976 .write(fully_allocated);
3977 }
3978 }
3979
3980 let node_utils = &self.sched_stats.layer_node_utils;
3985 let pinned_utils = &self.sched_stats.layer_node_pinned_utils;
3986 for (idx, layer) in self.layers.iter().enumerate() {
3987 self.growth_denied[idx].fill(false);
3988 let util_high = match layer.kind.util_range() {
3989 Some((_, high)) => high,
3990 None => continue,
3991 };
3992 for n in 0..nr_nodes {
3993 let unpinned_util = (node_utils[idx][n] - pinned_utils[idx][n]).max(0.0);
3994 let unpinned_cpus_needed = unpinned_util / util_high;
3995 let unpinned_cpus_have =
3996 layer.nr_node_cpus[n].saturating_sub(layer.nr_pinned_cpus[n]) as f64;
3997 let wanted = unpinned_cpus_needed > unpinned_cpus_have;
3998 let got = layer.nr_node_cpus[n] > prev_node_cpus[idx][n];
3999 if wanted && !got {
4000 self.growth_denied[idx][n] = true;
4001 }
4002 }
4003 }
4004
4005 if updated {
4007 for (idx, layer) in self.layers.iter().enumerate() {
4008 if layer_is_open(layer) {
4009 continue;
4010 }
4011 let prev = prev_nr_cpus[idx];
4012 let cur = layer.nr_cpus;
4013 if prev != cur {
4014 debug!(
4015 "ALLOC {} algo={:?} cpus:{}→{} mask={:x}",
4016 layer.name, layer.growth_algo, prev, cur, layer.cpus,
4017 );
4018 }
4019 }
4020 debug!(
4021 "ALLOC pool_available={}",
4022 self.cpu_pool.available_cpus().weight()
4023 );
4024 }
4025
4026 if updated {
4028 let nr_nodes = self.topo.nodes.len();
4029 for (idx, layer) in self.layers.iter().enumerate() {
4030 if layer_is_open(layer) {
4031 continue;
4032 }
4033 let prev = &prev_node_cpus[idx];
4034 let cur = &layer.nr_node_cpus;
4035 if prev == cur {
4036 continue;
4037 }
4038 let per_node: String = (0..nr_nodes)
4039 .map(|n| format!("n{}:{}→{}", n, prev[n], cur[n]))
4040 .collect::<Vec<_>>()
4041 .join(" ");
4042 let prev_total: usize = prev.iter().sum();
4043 let cur_total: usize = cur[..nr_nodes].iter().sum();
4044 let target: String = (0..nr_nodes)
4045 .map(|n| format!("n{}:{}", n, layer_allocs[idx].node_target(n) * au))
4046 .collect::<Vec<_>>()
4047 .join(" ");
4048 debug!(
4049 "ALLOC {} algo={:?} {} total:{}→{} target:[{}] mask={:x}",
4050 layer.name,
4051 layer.growth_algo,
4052 per_node,
4053 prev_total,
4054 cur_total,
4055 target,
4056 layer.cpus,
4057 );
4058 }
4059 debug!(
4060 "ALLOC pool_available={}",
4061 self.cpu_pool.available_cpus().weight()
4062 );
4063 }
4064
4065 if updated {
4067 for (idx, layer) in self.layers.iter_mut().enumerate() {
4068 if !layer_is_open(layer) {
4069 continue;
4070 }
4071
4072 let bpf_layer = &mut self.skel.maps.bss_data.as_mut().unwrap().layers[idx];
4073 let available_cpus = self.cpu_pool.available_cpus().and(&layer.allowed_cpus);
4074 let nr_available_cpus = available_cpus.weight();
4075
4076 layer.cpus = available_cpus;
4080 layer.nr_cpus = nr_available_cpus;
4081 for llc in self.cpu_pool.topo.all_llcs.values() {
4082 layer.nr_llc_cpus[llc.id] = layer.cpus.and(&llc.span).weight();
4083 }
4084 for node in self.cpu_pool.topo.nodes.values() {
4085 layer.nr_node_cpus[node.id] = layer.cpus.and(&node.span).weight();
4086 layer.nr_pinned_cpus[node.id] = 0;
4087 }
4088 Self::update_bpf_layer_cpumask(layer, bpf_layer);
4089 }
4090
4091 for (&node_id, &cpu) in &self.cpu_pool.fallback_cpus {
4092 self.skel.maps.bss_data.as_mut().unwrap().fallback_cpus[node_id] = cpu as u32;
4093 }
4094
4095 for (lidx, layer) in self.layers.iter().enumerate() {
4096 self.nr_layer_cpus_ranges[lidx] = (
4097 self.nr_layer_cpus_ranges[lidx].0.min(layer.nr_cpus),
4098 self.nr_layer_cpus_ranges[lidx].1.max(layer.nr_cpus),
4099 );
4100 }
4101
4102 let input = ProgramInput {
4104 ..Default::default()
4105 };
4106 let prog = &mut self.skel.progs.refresh_layer_cpumasks;
4107 let _ = prog.test_run(input);
4108
4109 let nr_nodes = self.topo.nodes.len();
4111 let node_empty_layers: Vec<Vec<u32>> = (0..nr_nodes)
4112 .map(|nid| {
4113 self.layers
4114 .iter()
4115 .enumerate()
4116 .filter(|(_lidx, layer)| layer.nr_node_cpus[nid] == 0)
4117 .map(|(lidx, _)| lidx as u32)
4118 .collect()
4119 })
4120 .collect();
4121 Self::refresh_node_ctx(&mut self.skel, &self.topo, &node_empty_layers, false);
4122 }
4123
4124 if let Err(e) = self.update_netdev_cpumasks() {
4125 warn!("Failed to update netdev IRQ cpumasks: {:#}", e);
4126 }
4127 Ok(())
4128 }
4129
4130 fn refresh_idle_qos(&mut self) -> Result<()> {
4131 if !self.idle_qos_enabled {
4132 return Ok(());
4133 }
4134
4135 let mut cpu_idle_qos = vec![0; *NR_CPU_IDS];
4136 for layer in self.layers.iter() {
4137 let idle_resume_us = layer.kind.common().idle_resume_us.unwrap_or(0) as i32;
4138 for cpu in layer.cpus.iter() {
4139 cpu_idle_qos[cpu] = idle_resume_us;
4140 }
4141 }
4142
4143 for (cpu, idle_resume_usec) in cpu_idle_qos.iter().enumerate() {
4144 update_cpu_idle_resume_latency(cpu, *idle_resume_usec)?;
4145 }
4146
4147 Ok(())
4148 }
4149
4150 fn refresh_xnuma(&mut self) {
4151 let nr_nodes = self.topo.nodes.len();
4152 if nr_nodes <= 1 {
4153 return;
4154 }
4155
4156 let duty_sums = &self.sched_stats.layer_node_duty_sums;
4157
4158 for (layer_idx, spec) in self.layer_specs.iter().enumerate() {
4159 let common = spec.kind.common();
4160 let threshold = common.xnuma_threshold;
4161 let threshold_delta = common.xnuma_threshold_delta;
4162 let bpf_layer = &mut self.skel.maps.bss_data.as_mut().unwrap().layers[layer_idx];
4163
4164 if threshold.0 <= 0.0 && threshold.1 <= 0.0 {
4165 for src in 0..nr_nodes {
4167 bpf_layer.node[src].xnuma_is_mig_src.write(true);
4168 for dst in 0..nr_nodes {
4169 bpf_layer.node[src].xnuma[dst].rate = u64::MAX;
4170 }
4171 }
4172 self.xnuma_mig_src[layer_idx].fill(false);
4173 continue;
4174 }
4175
4176 let layer = &self.layers[layer_idx];
4177 let is_mig_src = xnuma_check_active(
4178 &duty_sums[layer_idx],
4179 &layer.nr_node_cpus,
4180 threshold,
4181 threshold_delta,
4182 &self.growth_denied[layer_idx],
4183 &self.xnuma_mig_src[layer_idx],
4184 );
4185
4186 self.xnuma_mig_src[layer_idx] = is_mig_src.clone();
4187
4188 let result = xnuma_compute_rates(&duty_sums[layer_idx], &layer.nr_node_cpus);
4189
4190 for src in 0..nr_nodes {
4193 for dst in 0..nr_nodes {
4194 bpf_layer.node[src].xnuma[dst].rate = result.rates[src][dst];
4195 }
4196 }
4197 for (nid, is_src) in is_mig_src.iter().enumerate().take(nr_nodes) {
4198 bpf_layer.node[nid].xnuma_is_mig_src.write(*is_src);
4199 }
4200 }
4201 }
4202
4203 fn step(&mut self) -> Result<()> {
4204 let started_at = Instant::now();
4205 self.sched_stats.refresh(
4206 &mut self.skel,
4207 &self.proc_reader,
4208 started_at,
4209 self.processing_dur,
4210 &self.gpu_task_handler,
4211 )?;
4212
4213 self.skel
4215 .maps
4216 .bss_data
4217 .as_mut()
4218 .unwrap()
4219 .system_cpu_util_ewma = (self.sched_stats.system_cpu_util_ewma * 10000.0) as u64;
4220
4221 for layer_id in 0..self.sched_stats.nr_layers {
4222 self.skel
4223 .maps
4224 .bss_data
4225 .as_mut()
4226 .unwrap()
4227 .layer_dsq_insert_ewma[layer_id] =
4228 (self.sched_stats.layer_dsq_insert_ewma[layer_id] * 10000.0) as u64;
4229 }
4230
4231 self.refresh_cpumasks()?;
4232 self.refresh_xnuma();
4233 self.refresh_idle_qos()?;
4234 self.gpu_task_handler.maybe_affinitize();
4235 self.processing_dur += Instant::now().duration_since(started_at);
4236 Ok(())
4237 }
4238
4239 fn generate_sys_stats(
4240 &mut self,
4241 stats: &Stats,
4242 cpus_ranges: &mut [(usize, usize)],
4243 ) -> Result<SysStats> {
4244 let bstats = &stats.bpf_stats;
4245 let mut sys_stats = SysStats::new(stats, bstats, &self.cpu_pool.fallback_cpus)?;
4246
4247 for (lidx, (spec, layer)) in self.layer_specs.iter().zip(self.layers.iter()).enumerate() {
4248 let layer_stats = LayerStats::new(
4249 lidx,
4250 layer,
4251 stats,
4252 bstats,
4253 cpus_ranges[lidx],
4254 self.xnuma_mig_src[lidx].iter().any(|&a| a),
4255 );
4256 sys_stats.layers.insert(spec.name.to_string(), layer_stats);
4257 cpus_ranges[lidx] = (layer.nr_cpus, layer.nr_cpus);
4258 }
4259
4260 Ok(sys_stats)
4261 }
4262
4263 fn process_cgroup_creation(
4265 path: &Path,
4266 cgroup_regexes: &HashMap<u32, Regex>,
4267 cgroup_path_to_id: &mut HashMap<String, u64>,
4268 sender: &crossbeam::channel::Sender<CgroupEvent>,
4269 ) {
4270 let path_str = path.to_string_lossy().to_string();
4271
4272 let cgroup_id = std::fs::metadata(path)
4274 .map(|metadata| {
4275 use std::os::unix::fs::MetadataExt;
4276 metadata.ino()
4277 })
4278 .unwrap_or(0);
4279
4280 let mut match_bitmap = 0u64;
4282 for (rule_id, regex) in cgroup_regexes {
4283 if regex.is_match(&path_str) {
4284 match_bitmap |= 1u64 << rule_id;
4285 }
4286 }
4287
4288 cgroup_path_to_id.insert(path_str.clone(), cgroup_id);
4290
4291 if let Err(e) = sender.send(CgroupEvent::Created {
4293 path: path_str,
4294 cgroup_id,
4295 match_bitmap,
4296 }) {
4297 error!("Failed to send cgroup creation event: {}", e);
4298 }
4299 }
4300
4301 fn start_cgroup_watcher(
4302 shutdown: Arc<AtomicBool>,
4303 cgroup_regexes: HashMap<u32, Regex>,
4304 ) -> Result<Receiver<CgroupEvent>> {
4305 let mut inotify = Inotify::init().context("Failed to initialize inotify")?;
4306 let mut wd_to_path = HashMap::new();
4307
4308 let (sender, receiver) = crossbeam::channel::bounded::<CgroupEvent>(1024);
4310
4311 let root_wd = inotify
4313 .watches()
4314 .add("/sys/fs/cgroup", WatchMask::CREATE | WatchMask::DELETE)
4315 .context("Failed to add watch for /sys/fs/cgroup")?;
4316 wd_to_path.insert(root_wd, PathBuf::from("/sys/fs/cgroup"));
4317
4318 Self::add_recursive_watches(&mut inotify, &mut wd_to_path, Path::new("/sys/fs/cgroup"))?;
4320
4321 std::thread::spawn(move || {
4323 let mut buffer = [0; 4096];
4324 let inotify_fd = inotify.as_raw_fd();
4325 let mut cgroup_path_to_id = HashMap::<String, u64>::new();
4327
4328 for entry in WalkDir::new("/sys/fs/cgroup")
4330 .into_iter()
4331 .filter_map(|e| e.ok())
4332 .filter(|e| e.file_type().is_dir())
4333 {
4334 let path = entry.path();
4335 Self::process_cgroup_creation(
4336 path,
4337 &cgroup_regexes,
4338 &mut cgroup_path_to_id,
4339 &sender,
4340 );
4341 }
4342
4343 while !shutdown.load(Ordering::Relaxed) {
4344 let ready = unsafe {
4346 let mut read_fds: libc::fd_set = std::mem::zeroed();
4347 libc::FD_ZERO(&mut read_fds);
4348 libc::FD_SET(inotify_fd, &mut read_fds);
4349
4350 let mut timeout = libc::timeval {
4351 tv_sec: 0,
4352 tv_usec: 100_000, };
4354
4355 libc::select(
4356 inotify_fd + 1,
4357 &mut read_fds,
4358 std::ptr::null_mut(),
4359 std::ptr::null_mut(),
4360 &mut timeout,
4361 )
4362 };
4363
4364 if ready <= 0 {
4365 continue;
4367 }
4368
4369 let events = match inotify.read_events(&mut buffer) {
4371 Ok(events) => events,
4372 Err(e) => {
4373 error!("Error reading inotify events: {}", e);
4374 break;
4375 }
4376 };
4377
4378 for event in events {
4379 if !event.mask.contains(inotify::EventMask::CREATE)
4380 && !event.mask.contains(inotify::EventMask::DELETE)
4381 {
4382 continue;
4383 }
4384
4385 let name = match event.name {
4386 Some(name) => name,
4387 None => continue,
4388 };
4389
4390 let parent_path = match wd_to_path.get(&event.wd) {
4391 Some(parent) => parent,
4392 None => {
4393 warn!("Unknown watch descriptor: {:?}", event.wd);
4394 continue;
4395 }
4396 };
4397
4398 let path = parent_path.join(name.to_string_lossy().as_ref());
4399
4400 if event.mask.contains(inotify::EventMask::CREATE) {
4401 if !path.is_dir() {
4402 continue;
4403 }
4404
4405 Self::process_cgroup_creation(
4406 &path,
4407 &cgroup_regexes,
4408 &mut cgroup_path_to_id,
4409 &sender,
4410 );
4411
4412 match inotify
4414 .watches()
4415 .add(&path, WatchMask::CREATE | WatchMask::DELETE)
4416 {
4417 Ok(wd) => {
4418 wd_to_path.insert(wd, path.clone());
4419 }
4420 Err(e) => {
4421 warn!(
4422 "Failed to add watch for new cgroup {}: {}",
4423 path.display(),
4424 e
4425 );
4426 }
4427 }
4428 } else if event.mask.contains(inotify::EventMask::DELETE) {
4429 let path_str = path.to_string_lossy().to_string();
4430
4431 let cgroup_id = cgroup_path_to_id.remove(&path_str).unwrap_or(0);
4433
4434 if let Err(e) = sender.send(CgroupEvent::Removed {
4436 path: path_str,
4437 cgroup_id,
4438 }) {
4439 error!("Failed to send cgroup removal event: {}", e);
4440 }
4441
4442 let wd_to_remove = wd_to_path.iter().find_map(|(wd, watched_path)| {
4444 if watched_path == &path {
4445 Some(wd.clone())
4446 } else {
4447 None
4448 }
4449 });
4450 if let Some(wd) = wd_to_remove {
4451 wd_to_path.remove(&wd);
4452 }
4453 }
4454 }
4455 }
4456 });
4457
4458 Ok(receiver)
4459 }
4460
4461 fn add_recursive_watches(
4462 inotify: &mut Inotify,
4463 wd_to_path: &mut HashMap<inotify::WatchDescriptor, PathBuf>,
4464 path: &Path,
4465 ) -> Result<()> {
4466 for entry in WalkDir::new(path)
4467 .into_iter()
4468 .filter_map(|e| e.ok())
4469 .filter(|e| e.file_type().is_dir())
4470 .skip(1)
4471 {
4472 let entry_path = entry.path();
4473 match inotify
4475 .watches()
4476 .add(entry_path, WatchMask::CREATE | WatchMask::DELETE)
4477 {
4478 Ok(wd) => {
4479 wd_to_path.insert(wd, entry_path.to_path_buf());
4480 }
4481 Err(e) => {
4482 debug!("Failed to add watch for {}: {}", entry_path.display(), e);
4483 }
4484 }
4485 }
4486 Ok(())
4487 }
4488
4489 fn run(&mut self, shutdown: Arc<AtomicBool>) -> Result<UserExitInfo> {
4490 let (res_ch, req_ch) = self.stats_server.channels();
4491 let mut next_sched_at = Instant::now() + self.sched_intv;
4492 let enable_layer_refresh = !self.layer_refresh_intv.is_zero();
4493 let mut next_layer_refresh_at = Instant::now() + self.layer_refresh_intv;
4494 let mut cpus_ranges = HashMap::<ThreadId, Vec<(usize, usize)>>::new();
4495
4496 let cgroup_regexes = self.cgroup_regexes.take().unwrap();
4498 let cgroup_event_rx = if !cgroup_regexes.is_empty() {
4499 Some(Self::start_cgroup_watcher(
4500 shutdown.clone(),
4501 cgroup_regexes,
4502 )?)
4503 } else {
4504 None
4505 };
4506
4507 while !shutdown.load(Ordering::Relaxed) && !uei_exited!(&self.skel, uei) {
4508 let now = Instant::now();
4509
4510 if now >= next_sched_at {
4511 self.step()?;
4512 while next_sched_at < now {
4513 next_sched_at += self.sched_intv;
4514 }
4515 }
4516
4517 if enable_layer_refresh && now >= next_layer_refresh_at {
4518 self.skel
4519 .maps
4520 .bss_data
4521 .as_mut()
4522 .unwrap()
4523 .layer_refresh_seq_avgruntime += 1;
4524 while next_layer_refresh_at < now {
4525 next_layer_refresh_at += self.layer_refresh_intv;
4526 }
4527 }
4528
4529 let timeout_duration = next_sched_at.saturating_duration_since(Instant::now());
4531 let never_rx = crossbeam::channel::never();
4532 let cgroup_rx = cgroup_event_rx.as_ref().unwrap_or(&never_rx);
4533
4534 select! {
4535 recv(req_ch) -> msg => match msg {
4536 Ok(StatsReq::Hello(tid)) => {
4537 cpus_ranges.insert(
4538 tid,
4539 self.layers.iter().map(|l| (l.nr_cpus, l.nr_cpus)).collect(),
4540 );
4541 let stats =
4542 Stats::new(&mut self.skel, &self.proc_reader, self.topo.clone(), &self.gpu_task_handler, self.sched_stats.util_compensation)?;
4543 res_ch.send(StatsRes::Hello(Box::new(stats)))?;
4544 }
4545 Ok(StatsReq::Refresh(tid, mut stats)) => {
4546 for i in 0..self.nr_layer_cpus_ranges.len() {
4548 for (_, ranges) in cpus_ranges.iter_mut() {
4549 ranges[i] = (
4550 ranges[i].0.min(self.nr_layer_cpus_ranges[i].0),
4551 ranges[i].1.max(self.nr_layer_cpus_ranges[i].1),
4552 );
4553 }
4554 self.nr_layer_cpus_ranges[i] =
4555 (self.layers[i].nr_cpus, self.layers[i].nr_cpus);
4556 }
4557
4558 stats.refresh(
4559 &mut self.skel,
4560 &self.proc_reader,
4561 now,
4562 self.processing_dur,
4563 &self.gpu_task_handler,
4564 )?;
4565 let sys_stats =
4566 self.generate_sys_stats(&stats, cpus_ranges.get_mut(&tid).unwrap())?;
4567 res_ch.send(StatsRes::Refreshed(Box::new((*stats, sys_stats))))?;
4568 }
4569 Ok(StatsReq::Bye(tid)) => {
4570 cpus_ranges.remove(&tid);
4571 res_ch.send(StatsRes::Bye)?;
4572 }
4573 Err(e) => Err(e)?,
4574 },
4575
4576 recv(cgroup_rx) -> event => match event {
4577 Ok(CgroupEvent::Created { path, cgroup_id, match_bitmap }) => {
4578 self.skel.maps.cgroup_match_bitmap.update(
4580 &cgroup_id.to_ne_bytes(),
4581 &match_bitmap.to_ne_bytes(),
4582 libbpf_rs::MapFlags::ANY,
4583 ).with_context(|| format!(
4584 "Failed to insert cgroup {}({}) into BPF map. Cgroup map may be full \
4585 (max 16384 entries). Aborting.",
4586 cgroup_id, path
4587 ))?;
4588
4589 debug!("Added cgroup {} to BPF map with bitmap 0x{:x}", cgroup_id, match_bitmap);
4590 }
4591 Ok(CgroupEvent::Removed { path, cgroup_id }) => {
4592 if let Err(e) = self.skel.maps.cgroup_match_bitmap.delete(&cgroup_id.to_ne_bytes()) {
4594 warn!("Failed to delete cgroup {} from BPF map: {}", cgroup_id, e);
4595 } else {
4596 debug!("Removed cgroup {}({}) from BPF map", cgroup_id, path);
4597 }
4598 }
4599 Err(e) => {
4600 error!("Error receiving cgroup event: {}", e);
4601 }
4602 },
4603
4604 recv(crossbeam::channel::after(timeout_duration)) -> _ => {
4605 }
4607 }
4608 }
4609
4610 let _ = self.struct_ops.take();
4611 uei_report!(&self.skel, uei)
4612 }
4613}
4614
4615impl Drop for Scheduler<'_> {
4616 fn drop(&mut self) {
4617 info!("Unregister {SCHEDULER_NAME} scheduler");
4618
4619 if !self.netdevs.is_empty() {
4620 for (iface, netdev) in &self.netdevs {
4621 if let Err(e) = netdev.restore_cpumasks() {
4622 warn!("Failed to restore {iface} IRQ affinity: {e}");
4623 }
4624 }
4625 info!("Restored original netdev IRQ affinity");
4626 }
4627
4628 if let Some(struct_ops) = self.struct_ops.take() {
4629 drop(struct_ops);
4630 }
4631 }
4632}
4633
4634fn write_example_file(path: &str) -> Result<()> {
4635 let mut f = fs::OpenOptions::new()
4636 .create_new(true)
4637 .write(true)
4638 .open(path)?;
4639 Ok(f.write_all(serde_json::to_string_pretty(&*EXAMPLE_CONFIG)?.as_bytes())?)
4640}
4641
4642struct HintLayerInfo {
4643 layer_id: usize,
4644 system_cpu_util_below: Option<f64>,
4645 dsq_insert_below: Option<f64>,
4646}
4647
4648fn verify_layer_specs(specs: &[LayerSpec]) -> Result<HashMap<u64, HintLayerInfo>> {
4649 let mut hint_to_layer_map = HashMap::<u64, (usize, String, Option<f64>, Option<f64>)>::new();
4650
4651 let nr_specs = specs.len();
4652 if nr_specs == 0 {
4653 bail!("No layer spec");
4654 }
4655 if nr_specs > MAX_LAYERS {
4656 bail!("Too many layer specs");
4657 }
4658
4659 for (idx, spec) in specs.iter().enumerate() {
4660 if idx < nr_specs - 1 {
4661 if spec.matches.is_empty() {
4662 bail!("Non-terminal spec {:?} has NULL matches", spec.name);
4663 }
4664 } else if spec.matches.len() != 1 || !spec.matches[0].is_empty() {
4665 bail!("Terminal spec {:?} must have an empty match", spec.name);
4666 }
4667
4668 if spec.matches.len() > MAX_LAYER_MATCH_ORS {
4669 bail!(
4670 "Spec {:?} has too many ({}) OR match blocks",
4671 spec.name,
4672 spec.matches.len()
4673 );
4674 }
4675
4676 for (ands_idx, ands) in spec.matches.iter().enumerate() {
4677 if ands.len() > NR_LAYER_MATCH_KINDS {
4678 bail!(
4679 "Spec {:?}'s {}th OR block has too many ({}) match conditions",
4680 spec.name,
4681 ands_idx,
4682 ands.len()
4683 );
4684 }
4685 let mut hint_equals_cnt = 0;
4686 let mut system_cpu_util_below_cnt = 0;
4687 let mut dsq_insert_below_cnt = 0;
4688 let mut hint_value: Option<u64> = None;
4689 let mut system_cpu_util_threshold: Option<f64> = None;
4690 let mut dsq_insert_threshold: Option<f64> = None;
4691 for one in ands.iter() {
4692 match one {
4693 LayerMatch::CgroupPrefix(prefix) => {
4694 if prefix.len() > MAX_PATH {
4695 bail!("Spec {:?} has too long a cgroup prefix", spec.name);
4696 }
4697 }
4698 LayerMatch::CgroupSuffix(suffix) => {
4699 if suffix.len() > MAX_PATH {
4700 bail!("Spec {:?} has too long a cgroup suffix", spec.name);
4701 }
4702 }
4703 LayerMatch::CgroupContains(substr) => {
4704 if substr.len() > MAX_PATH {
4705 bail!("Spec {:?} has too long a cgroup substr", spec.name);
4706 }
4707 }
4708 LayerMatch::CommPrefix(prefix) => {
4709 if prefix.len() > MAX_COMM {
4710 bail!("Spec {:?} has too long a comm prefix", spec.name);
4711 }
4712 }
4713 LayerMatch::PcommPrefix(prefix) => {
4714 if prefix.len() > MAX_COMM {
4715 bail!("Spec {:?} has too long a process name prefix", spec.name);
4716 }
4717 }
4718 LayerMatch::SystemCpuUtilBelow(threshold) => {
4719 if *threshold < 0.0 || *threshold > 1.0 {
4720 bail!(
4721 "Spec {:?} has SystemCpuUtilBelow threshold outside the range [0.0, 1.0]",
4722 spec.name
4723 );
4724 }
4725 system_cpu_util_threshold = Some(*threshold);
4726 system_cpu_util_below_cnt += 1;
4727 }
4728 LayerMatch::DsqInsertBelow(threshold) => {
4729 if *threshold < 0.0 || *threshold > 1.0 {
4730 bail!(
4731 "Spec {:?} has DsqInsertBelow threshold outside the range [0.0, 1.0]",
4732 spec.name
4733 );
4734 }
4735 dsq_insert_threshold = Some(*threshold);
4736 dsq_insert_below_cnt += 1;
4737 }
4738 LayerMatch::HintEquals(hint) => {
4739 if *hint > 1024 {
4740 bail!(
4741 "Spec {:?} has hint value outside the range [0, 1024]",
4742 spec.name
4743 );
4744 }
4745 hint_value = Some(*hint);
4746 hint_equals_cnt += 1;
4747 }
4748 _ => {}
4749 }
4750 }
4751 if hint_equals_cnt > 1 {
4752 bail!("Only 1 HintEquals match permitted per AND block");
4753 }
4754 let high_freq_matcher_cnt = system_cpu_util_below_cnt + dsq_insert_below_cnt;
4755 if high_freq_matcher_cnt > 0 {
4756 if hint_equals_cnt != 1 {
4757 bail!("High-frequency matchers (SystemCpuUtilBelow, DsqInsertBelow) must be used with one HintEquals");
4758 }
4759 if system_cpu_util_below_cnt > 1 {
4760 bail!("Only 1 SystemCpuUtilBelow match permitted per AND block");
4761 }
4762 if dsq_insert_below_cnt > 1 {
4763 bail!("Only 1 DsqInsertBelow match permitted per AND block");
4764 }
4765 if ands.len() != hint_equals_cnt + system_cpu_util_below_cnt + dsq_insert_below_cnt
4766 {
4767 bail!("High-frequency matchers must be used only with HintEquals (no other matchers)");
4768 }
4769 } else if hint_equals_cnt == 1 && ands.len() != 1 {
4770 bail!("HintEquals match cannot be in conjunction with other matches");
4771 }
4772
4773 if let Some(hint) = hint_value {
4775 if let Some((layer_id, name, _, _)) = hint_to_layer_map.get(&hint) {
4776 if *layer_id != idx {
4777 bail!(
4778 "Spec {:?} has hint value ({}) that is already mapped to Spec {:?}",
4779 spec.name,
4780 hint,
4781 name
4782 );
4783 }
4784 } else {
4785 hint_to_layer_map.insert(
4786 hint,
4787 (
4788 idx,
4789 spec.name.clone(),
4790 system_cpu_util_threshold,
4791 dsq_insert_threshold,
4792 ),
4793 );
4794 }
4795 }
4796 }
4797
4798 match spec.kind {
4799 LayerKind::Confined {
4800 cpus_range,
4801 util_range,
4802 ..
4803 }
4804 | LayerKind::Grouped {
4805 cpus_range,
4806 util_range,
4807 ..
4808 } => {
4809 if let Some((cpus_min, cpus_max)) = cpus_range {
4810 if cpus_min > cpus_max {
4811 bail!(
4812 "Spec {:?} has invalid cpus_range({}, {})",
4813 spec.name,
4814 cpus_min,
4815 cpus_max
4816 );
4817 }
4818 }
4819 if util_range.0 >= util_range.1 {
4820 bail!(
4821 "Spec {:?} has invalid util_range ({}, {})",
4822 spec.name,
4823 util_range.0,
4824 util_range.1
4825 );
4826 }
4827 }
4828 _ => {}
4829 }
4830 }
4831
4832 Ok(hint_to_layer_map
4833 .into_iter()
4834 .map(|(k, v)| {
4835 (
4836 k,
4837 HintLayerInfo {
4838 layer_id: v.0,
4839 system_cpu_util_below: v.2,
4840 dsq_insert_below: v.3,
4841 },
4842 )
4843 })
4844 .collect())
4845}
4846
4847fn name_suffix(cgroup: &str, len: usize) -> String {
4848 let suffixlen = std::cmp::min(len, cgroup.len());
4849 let suffixrev: String = cgroup.chars().rev().take(suffixlen).collect();
4850
4851 suffixrev.chars().rev().collect()
4852}
4853
4854fn traverse_sysfs(dir: &Path) -> Result<Vec<PathBuf>> {
4855 let mut paths = vec![];
4856
4857 if !dir.is_dir() {
4858 panic!("path {:?} does not correspond to directory", dir);
4859 }
4860
4861 let direntries = fs::read_dir(dir)?;
4862
4863 for entry in direntries {
4864 let path = entry?.path();
4865 if path.is_dir() {
4866 paths.append(&mut traverse_sysfs(&path)?);
4867 paths.push(path);
4868 }
4869 }
4870
4871 Ok(paths)
4872}
4873
4874fn find_cpumask(cgroup: &str) -> Cpumask {
4875 let mut path = String::from(cgroup);
4876 path.push_str("/cpuset.cpus.effective");
4877
4878 let description = fs::read_to_string(&mut path).unwrap();
4879
4880 Cpumask::from_cpulist(&description).unwrap()
4881}
4882
4883fn expand_template(rule: &LayerMatch) -> Result<Vec<(LayerMatch, Cpumask)>> {
4884 match rule {
4885 LayerMatch::CgroupSuffix(suffix) => Ok(traverse_sysfs(Path::new("/sys/fs/cgroup"))?
4886 .into_iter()
4887 .map(|cgroup| String::from(cgroup.to_str().expect("could not parse cgroup path")))
4888 .filter(|cgroup| cgroup.ends_with(suffix))
4889 .map(|cgroup| {
4890 (
4891 {
4892 let mut slashterminated = cgroup.clone();
4893 slashterminated.push('/');
4894 LayerMatch::CgroupSuffix(name_suffix(&slashterminated, 64))
4895 },
4896 find_cpumask(&cgroup),
4897 )
4898 })
4899 .collect()),
4900 LayerMatch::CgroupRegex(expr) => Ok(traverse_sysfs(Path::new("/sys/fs/cgroup"))?
4901 .into_iter()
4902 .map(|cgroup| String::from(cgroup.to_str().expect("could not parse cgroup path")))
4903 .filter(|cgroup| {
4904 let re = Regex::new(expr).unwrap();
4905 re.is_match(cgroup)
4906 })
4907 .map(|cgroup| {
4908 (
4909 {
4913 let mut slashterminated = cgroup.clone();
4914 slashterminated.push('/');
4915 LayerMatch::CgroupSuffix(name_suffix(&slashterminated, 64))
4916 },
4917 find_cpumask(&cgroup),
4918 )
4919 })
4920 .collect()),
4921 _ => panic!("Unimplemented template enum {:?}", rule),
4922 }
4923}
4924
4925fn create_perf_fds(skel: &mut BpfSkel, event: u64) -> Result<()> {
4926 let mut attr = perf::bindings::perf_event_attr {
4927 size: std::mem::size_of::<perf::bindings::perf_event_attr>() as u32,
4928 type_: perf::bindings::PERF_TYPE_RAW,
4929 config: event,
4930 sample_type: 0u64,
4931 ..Default::default()
4932 };
4933 attr.__bindgen_anon_1.sample_period = 0u64;
4934 attr.set_disabled(0);
4935
4936 let perf_events_map = &skel.maps.scx_pmu_map;
4937 let map_fd = unsafe { libbpf_sys::bpf_map__fd(perf_events_map.as_libbpf_object().as_ptr()) };
4938
4939 let mut failures = 0u64;
4940
4941 for cpu in 0..*NR_CPUS_POSSIBLE {
4942 let fd = unsafe { perf::perf_event_open(&mut attr as *mut _, -1, cpu as i32, -1, 0) };
4943 if fd < 0 {
4944 failures += 1;
4945 trace!(
4946 "perf_event_open failed cpu={cpu} errno={}",
4947 std::io::Error::last_os_error()
4948 );
4949 continue;
4950 }
4951
4952 let key = cpu as u32;
4953 let val = fd as u32;
4954 let ret = unsafe {
4955 libbpf_sys::bpf_map_update_elem(
4956 map_fd,
4957 &key as *const _ as *const _,
4958 &val as *const _ as *const _,
4959 0,
4960 )
4961 };
4962 if ret != 0 {
4963 trace!("bpf_map_update_elem failed cpu={cpu} fd={fd} ret={ret}");
4964 } else {
4965 trace!("mapped cpu={cpu} -> fd={fd}");
4966 }
4967 }
4968
4969 if failures > 0 {
4970 println!("membw tracking: failed to install {failures} counters");
4971 }
4973
4974 Ok(())
4975}
4976
4977fn setup_membw_tracking(skel: &mut OpenBpfSkel) -> Result<u64> {
4979 let pmumanager = PMUManager::new()?;
4980 let codename = &pmumanager.codename as &str;
4981
4982 let pmuspec = match codename {
4983 "amdzen1" | "amdzen2" | "amdzen3" => {
4984 trace!("found AMD codename {codename}");
4985 pmumanager.pmus.get("ls_any_fills_from_sys.mem_io_local")
4986 }
4987 "amdzen4" | "amdzen5" => {
4988 trace!("found AMD codename {codename}");
4989 pmumanager.pmus.get("ls_any_fills_from_sys.dram_io_all")
4990 }
4991
4992 "haswell" | "broadwell" | "broadwellde" | "broadwellx" | "skylake" | "skylakex"
4993 | "cascadelakex" | "arrowlake" | "meteorlake" | "sapphirerapids" | "emeraldrapids"
4994 | "graniterapids" => {
4995 trace!("found Intel codename {codename}");
4996 pmumanager.pmus.get("LONGEST_LAT_CACHE.MISS")
4997 }
4998
4999 _ => {
5000 trace!("found unknown codename {codename}");
5001 None
5002 }
5003 };
5004
5005 let spec = pmuspec.ok_or("not_found").unwrap();
5006 let config = (spec.umask << 8) | spec.event[0];
5007
5008 skel.maps.rodata_data.as_mut().unwrap().membw_event = config;
5010
5011 Ok(config)
5012}
5013
5014#[clap_main::clap_main]
5015fn main(opts: Opts) -> Result<()> {
5016 if opts.version {
5017 println!(
5018 "scx_layered {}",
5019 build_id::full_version(env!("CARGO_PKG_VERSION"))
5020 );
5021 return Ok(());
5022 }
5023
5024 if opts.help_stats {
5025 stats::server_data().describe_meta(&mut std::io::stdout(), None)?;
5026 return Ok(());
5027 }
5028
5029 let env_filter = EnvFilter::try_from_default_env()
5030 .or_else(|_| match EnvFilter::try_new(&opts.log_level) {
5031 Ok(filter) => Ok(filter),
5032 Err(e) => {
5033 eprintln!(
5034 "invalid log envvar: {}, using info, err is: {}",
5035 opts.log_level, e
5036 );
5037 EnvFilter::try_new("info")
5038 }
5039 })
5040 .unwrap_or_else(|_| EnvFilter::new("info"));
5041
5042 match tracing_subscriber::fmt()
5043 .with_env_filter(env_filter)
5044 .with_target(true)
5045 .with_thread_ids(true)
5046 .with_file(true)
5047 .with_line_number(true)
5048 .try_init()
5049 {
5050 Ok(()) => {}
5051 Err(e) => eprintln!("failed to init logger: {}", e),
5052 }
5053
5054 if opts.verbose > 0 {
5055 warn!("Setting verbose via -v is deprecated and will be an error in future releases.");
5056 }
5057
5058 if opts.no_load_frac_limit {
5059 warn!("--no-load-frac-limit is deprecated and noop");
5060 }
5061 if opts.layer_preempt_weight_disable != 0.0 {
5062 warn!("--layer-preempt-weight-disable is deprecated and noop");
5063 }
5064 if opts.layer_growth_weight_disable != 0.0 {
5065 warn!("--layer-growth-weight-disable is deprecated and noop");
5066 }
5067 if opts.local_llc_iteration {
5068 warn!("--local_llc_iteration is deprecated and noop");
5069 }
5070
5071 debug!("opts={:?}", &opts);
5072
5073 if let Some(run_id) = opts.run_id {
5074 info!("scx_layered run_id: {}", run_id);
5075 }
5076
5077 let shutdown = Arc::new(AtomicBool::new(false));
5078 let shutdown_clone = shutdown.clone();
5079 ctrlc::set_handler(move || {
5080 shutdown_clone.store(true, Ordering::Relaxed);
5081 })
5082 .context("Error setting Ctrl-C handler")?;
5083
5084 if let Some(intv) = opts.monitor.or(opts.stats) {
5085 let shutdown_copy = shutdown.clone();
5086 let stats_columns = opts.stats_columns;
5087 let stats_no_llc = opts.stats_no_llc;
5088 let jh = std::thread::spawn(move || {
5089 match stats::monitor(
5090 Duration::from_secs_f64(intv),
5091 shutdown_copy,
5092 stats_columns,
5093 stats_no_llc,
5094 ) {
5095 Ok(_) => {
5096 debug!("stats monitor thread finished successfully")
5097 }
5098 Err(error_object) => {
5099 warn!(
5100 "stats monitor thread finished because of an error {}",
5101 error_object
5102 )
5103 }
5104 }
5105 });
5106 if opts.monitor.is_some() {
5107 let _ = jh.join();
5108 return Ok(());
5109 }
5110 }
5111
5112 if let Some(path) = &opts.example {
5113 write_example_file(path)?;
5114 return Ok(());
5115 }
5116
5117 let mut layer_config = match opts.run_example {
5118 true => EXAMPLE_CONFIG.clone(),
5119 false => LayerConfig { specs: vec![] },
5120 };
5121
5122 for (idx, input) in opts.specs.iter().enumerate() {
5123 let specs = LayerSpec::parse(input)
5124 .context(format!("Failed to parse specs[{}] ({:?})", idx, input))?;
5125
5126 for spec in specs {
5127 match spec.template {
5128 Some(ref rule) => {
5129 let matches = expand_template(rule)?;
5130 if matches.is_empty() {
5133 layer_config.specs.push(spec);
5134 } else {
5135 for (mt, mask) in matches {
5136 let mut genspec = spec.clone();
5137
5138 genspec.cpuset = Some(mask);
5139
5140 for orterm in &mut genspec.matches {
5142 orterm.push(mt.clone());
5143 }
5144
5145 match &mt {
5146 LayerMatch::CgroupSuffix(cgroup) => genspec.name.push_str(cgroup),
5147 _ => bail!("Template match has unexpected type"),
5148 }
5149
5150 layer_config.specs.push(genspec);
5152 }
5153 }
5154 }
5155
5156 None => {
5157 layer_config.specs.push(spec);
5158 }
5159 }
5160 }
5161 }
5162
5163 for spec in layer_config.specs.iter_mut() {
5164 let common = spec.kind.common_mut();
5165
5166 if common.slice_us == 0 {
5167 common.slice_us = opts.slice_us;
5168 }
5169
5170 if common.weight == 0 {
5171 common.weight = DEFAULT_LAYER_WEIGHT;
5172 }
5173 common.weight = common.weight.clamp(MIN_LAYER_WEIGHT, MAX_LAYER_WEIGHT);
5174
5175 if common.preempt {
5176 if common.disallow_open_after_us.is_some() {
5177 warn!(
5178 "Preempt layer {} has non-null disallow_open_after_us, ignored",
5179 &spec.name
5180 );
5181 }
5182 if common.disallow_preempt_after_us.is_some() {
5183 warn!(
5184 "Preempt layer {} has non-null disallow_preempt_after_us, ignored",
5185 &spec.name
5186 );
5187 }
5188 common.disallow_open_after_us = Some(u64::MAX);
5189 common.disallow_preempt_after_us = Some(u64::MAX);
5190 } else {
5191 if common.disallow_open_after_us.is_none() {
5192 common.disallow_open_after_us = Some(*DFL_DISALLOW_OPEN_AFTER_US);
5193 }
5194
5195 if common.disallow_preempt_after_us.is_none() {
5196 common.disallow_preempt_after_us = Some(*DFL_DISALLOW_PREEMPT_AFTER_US);
5197 }
5198 }
5199
5200 if common.idle_smt.is_some() {
5201 warn!("Layer {} has deprecated flag \"idle_smt\"", &spec.name);
5202 }
5203
5204 if common.allow_node_aligned.is_some() {
5205 warn!("Layer {} has deprecated flag \"allow_node_aligned\", node-aligned tasks are now always dispatched on layer DSQs", &spec.name);
5206 }
5207 }
5208
5209 let membw_required = layer_config.specs.iter().any(|spec| match spec.kind {
5210 LayerKind::Confined { membw_gb, .. } | LayerKind::Grouped { membw_gb, .. } => {
5211 membw_gb.is_some()
5212 }
5213 LayerKind::Open { .. } => false,
5214 });
5215
5216 if opts.print_and_exit {
5217 println!("specs={}", serde_json::to_string_pretty(&layer_config)?);
5218 return Ok(());
5219 }
5220
5221 debug!("specs={}", serde_json::to_string_pretty(&layer_config)?);
5222 let hint_to_layer_map = verify_layer_specs(&layer_config.specs)?;
5223
5224 let mut open_object = MaybeUninit::uninit();
5225 loop {
5226 let mut sched = Scheduler::init(
5227 &opts,
5228 &layer_config.specs,
5229 &mut open_object,
5230 &hint_to_layer_map,
5231 membw_required,
5232 )?;
5233 if !sched.run(shutdown.clone())?.should_restart() {
5234 break;
5235 }
5236 }
5237
5238 Ok(())
5239}
5240
5241#[cfg(test)]
5242mod peak_util_tests {
5243 use super::*;
5244
5245 #[test]
5246 fn test_peak_pins_recent_burst() {
5247 let elapsed = Duration::from_millis(100);
5248 let half_life = Duration::from_secs(1);
5249 let mut peak = 0.0;
5250
5251 for _ in 0..5 {
5252 peak = update_peak_util(peak, 0.1, half_life, elapsed);
5253 }
5254
5255 peak = update_peak_util(peak, 2.0, half_life, elapsed);
5256
5257 for _ in 0..5 {
5258 peak = update_peak_util(peak, 0.1, half_life, elapsed);
5259 }
5260
5261 assert!(
5262 peak > 1.3,
5263 "peak should still hold the recent burst, got {peak}"
5264 );
5265 }
5266
5267 #[test]
5268 fn test_peak_decays_after_quiet_period() {
5269 let elapsed = Duration::from_millis(100);
5270 let half_life = Duration::from_secs(1);
5271 let mut peak = 2.0;
5272
5273 for _ in 0..50 {
5274 peak = update_peak_util(peak, 0.0, half_life, elapsed);
5275 }
5276
5277 assert!(
5278 peak < 0.1,
5279 "peak should decay after a long quiet period, got {peak}"
5280 );
5281 }
5282
5283 #[test]
5284 fn test_peak_matches_steady_input() {
5285 let elapsed = Duration::from_millis(100);
5286 let half_life = Duration::from_secs(1);
5287 let mut peak = 0.0;
5288
5289 for _ in 0..30 {
5290 peak = update_peak_util(peak, 1.0, half_life, elapsed);
5291 }
5292
5293 assert!(
5294 (peak - 1.0).abs() < 1e-9,
5295 "peak should match steady-state input, got {peak}"
5296 );
5297 }
5298
5299 #[test]
5300 fn test_peak_only_changes_shrink_target() {
5301 let util_range = (0.7, 0.9);
5302 let (low_no_peak, high_no_peak) = calc_peak_aware_target_range(1.8, 1.8, util_range);
5303 let (low_with_peak, high_with_peak) = calc_peak_aware_target_range(1.8, 3.1, util_range);
5304
5305 assert_eq!(low_no_peak, 2);
5306 assert_eq!(
5307 low_with_peak, low_no_peak,
5308 "peak should not affect growth target"
5309 );
5310 assert_eq!(high_no_peak, 2);
5311 assert_eq!(
5312 high_with_peak, 4,
5313 "peak should only widen the shrink boundary"
5314 );
5315 }
5316}
5317
5318#[cfg(test)]
5319mod xnuma_tests {
5320 use super::*;
5321
5322 const THRESH: (f64, f64) = (0.6, 0.7);
5324 const DELTA: (f64, f64) = (0.2, 0.3);
5325
5326 #[test]
5331 fn test_activation_below_threshold() {
5332 let duty = vec![40.0, 40.0];
5334 let allocs = vec![96, 96];
5335 let gd = vec![true, true];
5336 let cur = vec![false, false];
5337 let result = xnuma_check_active(&duty, &allocs, THRESH, DELTA, &gd, &cur);
5338 assert!(!result[0]);
5340 assert!(!result[1]);
5341 }
5342
5343 #[test]
5344 fn test_activation_above_all_thresholds() {
5345 let duty = vec![90.0, 20.0];
5347 let allocs = vec![96, 96];
5348 let gd = vec![true, false];
5349 let cur = vec![false, false];
5350 let result = xnuma_check_active(&duty, &allocs, THRESH, DELTA, &gd, &cur);
5351 assert!(result[0]);
5353 assert!(!result[1]);
5355 }
5356
5357 #[test]
5358 fn test_activation_requires_growth_denied() {
5359 let duty = vec![90.0, 20.0];
5361 let allocs = vec![96, 96];
5362 let gd = vec![false, false]; let cur = vec![false, false];
5364 let result = xnuma_check_active(&duty, &allocs, THRESH, DELTA, &gd, &cur);
5365 assert!(!result[0]); }
5367
5368 #[test]
5369 fn test_symmetric_high_load_stays_closed() {
5370 let duty = vec![80.0, 80.0];
5372 let allocs = vec![96, 96];
5373 let gd = vec![true, true];
5374 let cur = vec![false, false];
5375 let result = xnuma_check_active(&duty, &allocs, THRESH, DELTA, &gd, &cur);
5376 assert!(!result[0]);
5378 assert!(!result[1]);
5379 }
5380
5381 #[test]
5382 fn test_hysteresis_stays_active() {
5383 let duty = vec![75.0, 20.0];
5385 let allocs = vec![96, 96];
5386 let gd = vec![true, false];
5387 let cur = vec![true, false]; let result = xnuma_check_active(&duty, &allocs, THRESH, DELTA, &gd, &cur);
5389 assert!(result[0]);
5393 }
5394
5395 #[test]
5396 fn test_hysteresis_stays_inactive() {
5397 let duty = vec![75.0, 20.0];
5399 let allocs = vec![96, 96];
5400 let gd = vec![true, false];
5401 let cur = vec![false, false]; let result = xnuma_check_active(&duty, &allocs, THRESH, DELTA, &gd, &cur);
5403 assert!(!result[0]);
5405 }
5406
5407 #[test]
5408 fn test_deactivation_load_drops() {
5409 let duty = vec![50.0, 50.0];
5411 let allocs = vec![96, 96];
5412 let gd = vec![true, true];
5413 let cur = vec![true, false];
5414 let result = xnuma_check_active(&duty, &allocs, THRESH, DELTA, &gd, &cur);
5415 assert!(!result[0]);
5417 }
5418
5419 #[test]
5420 fn test_deactivation_growth_succeeds() {
5421 let duty = vec![90.0, 20.0];
5423 let allocs = vec![96, 96];
5424 let gd = vec![false, false]; let cur = vec![true, false];
5426 let result = xnuma_check_active(&duty, &allocs, THRESH, DELTA, &gd, &cur);
5427 assert!(!result[0]);
5429 }
5430
5431 #[test]
5432 fn test_zero_alloc_with_duty_and_growth_denied() {
5433 let duty = vec![50.0, 0.0];
5435 let allocs = vec![0, 96];
5436 let gd = vec![true, false];
5437 let cur = vec![false, false];
5438 let result = xnuma_check_active(&duty, &allocs, THRESH, DELTA, &gd, &cur);
5439 assert!(result[0]); }
5441
5442 #[test]
5443 fn test_all_zero_alloc() {
5444 let duty = vec![0.0, 0.0];
5445 let allocs = vec![0, 0];
5446 let gd = vec![true, true];
5447 let cur = vec![true, true];
5448 let result = xnuma_check_active(&duty, &allocs, THRESH, DELTA, &gd, &cur);
5449 assert!(!result[0]);
5450 assert!(!result[1]);
5451 }
5452
5453 #[test]
5454 fn test_three_nodes_mixed() {
5455 let duty = vec![90.0, 40.0, 10.0];
5457 let allocs = vec![96, 96, 96];
5458 let gd = vec![true, true, false];
5459 let cur = vec![false, false, false];
5460 let result = xnuma_check_active(&duty, &allocs, THRESH, DELTA, &gd, &cur);
5461 assert!(result[0]);
5464 assert!(!result[1]);
5466 assert!(!result[2]);
5468 }
5469
5470 #[test]
5471 fn test_per_node_independence() {
5472 let duty = vec![90.0, 50.0];
5474 let allocs = vec![96, 96];
5475 let gd = vec![true, true];
5476 let cur = vec![true, true];
5477 let result = xnuma_check_active(&duty, &allocs, THRESH, DELTA, &gd, &cur);
5478 assert!(result[0]);
5482 assert!(!result[1]);
5484 }
5485
5486 #[test]
5491 fn test_rates_balanced_load() {
5492 let duty = vec![48.0, 48.0];
5494 let allocs = vec![96, 96];
5495 let result = xnuma_compute_rates(&duty, &allocs);
5496
5497 for src in 0..2 {
5499 for dst in 0..2 {
5500 assert_eq!(result.rates[src][dst], 0);
5501 }
5502 }
5503 }
5504
5505 #[test]
5506 fn test_rates_one_overloaded() {
5507 let duty = vec![80.0, 40.0];
5512 let allocs = vec![96, 96];
5513 let result = xnuma_compute_rates(&duty, &allocs);
5514
5515 assert!(result.rates[0][1] > 0);
5517 assert_eq!(result.rates[1][0], 0);
5519 assert_eq!(result.rates[0][0], 0);
5521 assert_eq!(result.rates[1][1], 0);
5522
5523 let expected_rate = (20.0 * XNUMA_RATE_DAMPEN * DUTY_CYCLE_SCALE) as u64;
5525 assert_eq!(result.rates[0][1], expected_rate);
5526 }
5527
5528 #[test]
5529 fn test_rates_asymmetric_allocation() {
5530 let duty = vec![48.0, 48.0];
5535 let allocs = vec![48, 144];
5536 let result = xnuma_compute_rates(&duty, &allocs);
5537
5538 let expected_rate = (24.0 * XNUMA_RATE_DAMPEN * DUTY_CYCLE_SCALE) as u64;
5539 assert_eq!(result.rates[0][1], expected_rate);
5540 assert_eq!(result.rates[1][0], 0);
5541 }
5542
5543 #[test]
5548 fn test_rates_three_nodes_one_source() {
5549 let duty = vec![120.0, 30.0, 30.0];
5556 let allocs = vec![96, 96, 96];
5557 let result = xnuma_compute_rates(&duty, &allocs);
5558
5559 let rate_01 = (30.0 * XNUMA_RATE_DAMPEN * DUTY_CYCLE_SCALE) as u64;
5563 let rate_02 = (30.0 * XNUMA_RATE_DAMPEN * DUTY_CYCLE_SCALE) as u64;
5564 assert_eq!(result.rates[0][1], rate_01);
5565 assert_eq!(result.rates[0][2], rate_02);
5566
5567 assert_eq!(result.rates[1][0], 0);
5569 assert_eq!(result.rates[2][0], 0);
5570 assert_eq!(result.rates[1][2], 0);
5571 assert_eq!(result.rates[2][1], 0);
5572 }
5573
5574 #[test]
5575 fn test_rates_three_nodes_unequal_deficit() {
5576 let duty = vec![120.0, 50.0, 10.0];
5583 let allocs = vec![96, 96, 96];
5584 let result = xnuma_compute_rates(&duty, &allocs);
5585
5586 let rate_01 = (10.0 * XNUMA_RATE_DAMPEN * DUTY_CYCLE_SCALE) as u64;
5590 let rate_02 = (50.0 * XNUMA_RATE_DAMPEN * DUTY_CYCLE_SCALE) as u64;
5591 assert_eq!(result.rates[0][1], rate_01);
5592 assert_eq!(result.rates[0][2], rate_02);
5593 }
5594
5595 #[test]
5596 fn test_rates_two_sources_one_sink() {
5597 let duty = vec![80.0, 70.0, 30.0];
5604 let allocs = vec![96, 96, 96];
5605 let result = xnuma_compute_rates(&duty, &allocs);
5606
5607 let rate_02 = (20.0 * XNUMA_RATE_DAMPEN * DUTY_CYCLE_SCALE) as u64;
5611 let rate_12 = (10.0 * XNUMA_RATE_DAMPEN * DUTY_CYCLE_SCALE) as u64;
5612 assert_eq!(result.rates[0][2], rate_02);
5613 assert_eq!(result.rates[1][2], rate_12);
5614
5615 assert_eq!(result.rates[0][1], 0);
5617 assert_eq!(result.rates[1][0], 0);
5618 }
5619
5620 #[test]
5625 fn test_conservation_per_source_outbound() {
5626 let duty = vec![100.0, 30.0, 50.0, 20.0];
5629 let allocs = vec![96, 96, 96, 96];
5630 let nr = 4;
5631 let result = xnuma_compute_rates(&duty, &allocs);
5632
5633 let total_duty: f64 = duty.iter().sum();
5634 let total_alloc: f64 = allocs.iter().map(|&a| a as f64).sum();
5635 let eq_ratio = total_duty / total_alloc;
5636
5637 for src in 0..nr {
5638 let expected = eq_ratio * allocs[src] as f64;
5639 let surplus = (duty[src] - expected).max(0.0);
5640 let total_outbound: u64 = (0..nr).map(|dst| result.rates[src][dst]).sum();
5641 let expected_rate = (surplus * XNUMA_RATE_DAMPEN * DUTY_CYCLE_SCALE) as u64;
5642 assert_eq!(
5643 total_outbound, expected_rate,
5644 "node {} outbound mismatch",
5645 src
5646 );
5647 }
5648 }
5649
5650 #[test]
5651 fn test_conservation_surplus_equals_deficit() {
5652 let duty = vec![100.0, 30.0, 50.0];
5654 let allocs = vec![96, 96, 96];
5655
5656 let total_duty: f64 = duty.iter().sum();
5657 let total_alloc: f64 = allocs.iter().map(|&a| a as f64).sum();
5658 let eq_ratio = total_duty / total_alloc;
5659
5660 let mut total_surplus = 0.0f64;
5661 let mut total_deficit = 0.0f64;
5662 for i in 0..3 {
5663 let expected = eq_ratio * allocs[i] as f64;
5664 let delta = duty[i] - expected;
5665 if delta > 0.0 {
5666 total_surplus += delta;
5667 } else {
5668 total_deficit += -delta;
5669 }
5670 }
5671 assert!((total_surplus - total_deficit).abs() < 1e-10);
5672 }
5673
5674 #[test]
5675 fn test_self_rates_always_zero() {
5676 let duty = vec![100.0, 30.0, 50.0];
5677 let allocs = vec![96, 96, 96];
5678 let result = xnuma_compute_rates(&duty, &allocs);
5679
5680 for nid in 0..3 {
5681 assert_eq!(result.rates[nid][nid], 0);
5682 }
5683 }
5684
5685 #[test]
5686 fn test_deficit_nodes_have_zero_outbound() {
5687 let duty = vec![100.0, 20.0, 30.0, 50.0];
5689 let allocs = vec![96, 96, 96, 96];
5690 let result = xnuma_compute_rates(&duty, &allocs);
5691
5692 for nid in 0..4 {
5696 let outbound: u64 = (0..4).map(|dst| result.rates[nid][dst]).sum();
5697 if outbound == 0 {
5698 for dst in 0..4 {
5699 assert_eq!(
5700 result.rates[nid][dst], 0,
5701 "deficit node {} has non-zero rate to {}",
5702 nid, dst
5703 );
5704 }
5705 }
5706 }
5707 }
5708
5709 #[test]
5714 fn test_rates_zero_duty_everywhere() {
5715 let duty = vec![0.0, 0.0];
5716 let allocs = vec![96, 96];
5717 let result = xnuma_compute_rates(&duty, &allocs);
5718
5719 for src in 0..2 {
5720 for dst in 0..2 {
5721 assert_eq!(result.rates[src][dst], 0);
5722 }
5723 }
5724 }
5725
5726 #[test]
5727 fn test_rates_zero_alloc() {
5728 let duty = vec![50.0, 50.0];
5729 let allocs = vec![0, 0];
5730 let result = xnuma_compute_rates(&duty, &allocs);
5731
5732 for src in 0..2 {
5734 for dst in 0..2 {
5735 assert_eq!(result.rates[src][dst], 0);
5736 }
5737 }
5738 }
5739
5740 #[test]
5741 fn test_rates_all_load_one_node() {
5742 let duty = vec![96.0, 0.0];
5748 let allocs = vec![96, 96];
5749 let result = xnuma_compute_rates(&duty, &allocs);
5750
5751 let expected_rate = (48.0 * XNUMA_RATE_DAMPEN * DUTY_CYCLE_SCALE) as u64;
5752 assert_eq!(result.rates[0][1], expected_rate);
5753 }
5754
5755 #[test]
5756 fn test_rates_single_node() {
5757 let duty = vec![96.0];
5759 let allocs = vec![96];
5760 let result = xnuma_compute_rates(&duty, &allocs);
5761
5762 assert_eq!(result.rates[0][0], 0);
5763 }
5764
5765 #[test]
5766 fn test_rates_one_node_zero_alloc() {
5767 let duty = vec![80.0, 0.0];
5772 let allocs = vec![96, 0];
5773 let result = xnuma_compute_rates(&duty, &allocs);
5774
5775 assert_eq!(result.rates[0][1], 0);
5779 assert_eq!(result.rates[1][0], 0);
5780 }
5781
5782 #[test]
5787 fn test_rate_scaling() {
5788 let duty = vec![80.0, 40.0];
5790 let allocs = vec![96, 96];
5791 let result = xnuma_compute_rates(&duty, &allocs);
5792
5793 let expected = (20.0 * XNUMA_RATE_DAMPEN * DUTY_CYCLE_SCALE) as u64;
5796 assert_eq!(result.rates[0][1], expected);
5797 assert_eq!(expected, 10 * (1 << 20));
5798 }
5799
5800 #[test]
5801 fn test_rates_tiny_imbalance() {
5802 let duty = vec![48.001, 47.999];
5804 let allocs = vec![96, 96];
5805 let result = xnuma_compute_rates(&duty, &allocs);
5806
5807 assert!(result.rates[0][1] > 0);
5809 assert!(result.rates[0][1] < (1 << 20)); }
5811
5812 #[test]
5813 fn test_rates_large_values() {
5814 let duty = vec![300.0, 100.0, 50.0, 50.0, 50.0, 50.0, 50.0, 50.0];
5816 let allocs = vec![96, 96, 96, 96, 96, 96, 96, 96];
5817 let result = xnuma_compute_rates(&duty, &allocs);
5818
5819 assert!(result.rates[0][2] > 0); assert!(result.rates[1][2] > 0); let total_duty: f64 = duty.iter().sum();
5827 let total_alloc: f64 = allocs.iter().map(|&a| a as f64).sum();
5828 let eq_ratio = total_duty / total_alloc;
5829 let nr = 8;
5830 for src in 0..nr {
5831 let surplus = (duty[src] - eq_ratio * allocs[src] as f64).max(0.0);
5832 let outbound: u64 = (0..nr).map(|dst| result.rates[src][dst]).sum();
5833 let expected = (surplus * XNUMA_RATE_DAMPEN * DUTY_CYCLE_SCALE) as u64;
5834 let tolerance = nr as u64; assert!(
5836 outbound.abs_diff(expected) <= tolerance,
5837 "node {} outbound {} vs expected {}, diff {}",
5838 src,
5839 outbound,
5840 expected,
5841 outbound.abs_diff(expected)
5842 );
5843 }
5844 }
5845
5846 #[test]
5851 fn test_hysteresis_cycle() {
5852 let allocs = vec![96, 96];
5853 let gd = vec![true, false]; let active =
5857 xnuma_check_active(&[40.0, 40.0], &allocs, THRESH, DELTA, &gd, &[false, false]);
5858 assert!(!active[0]); let active =
5862 xnuma_check_active(&[90.0, 20.0], &allocs, THRESH, DELTA, &gd, &[false, false]);
5863 assert!(active[0]); let active = xnuma_check_active(&[75.0, 20.0], &allocs, THRESH, DELTA, &gd, &[true, false]);
5867 assert!(active[0]); let active = xnuma_check_active(&[50.0, 50.0], &allocs, THRESH, DELTA, &gd, &[true, false]);
5871 assert!(!active[0]); }
5873
5874 #[test]
5875 fn test_hysteresis_growth_toggle() {
5876 let allocs = vec![96, 96];
5878 let gd_denied = vec![true, false];
5879 let gd_ok = vec![false, false];
5880
5881 let active = xnuma_check_active(
5883 &[90.0, 20.0],
5884 &allocs,
5885 THRESH,
5886 DELTA,
5887 &gd_denied,
5888 &[false, false],
5889 );
5890 assert!(active[0]);
5891
5892 let active = xnuma_check_active(
5894 &[90.0, 20.0],
5895 &allocs,
5896 THRESH,
5897 DELTA,
5898 &gd_ok,
5899 &[true, false],
5900 );
5901 assert!(!active[0]); }
5903
5904 #[test]
5909 fn test_proportional_sink_distribution() {
5910 let duty = vec![180.0, 20.0, 40.0, 0.0];
5919 let allocs = vec![96, 96, 96, 96];
5920 let result = xnuma_compute_rates(&duty, &allocs);
5921
5922 assert_eq!(
5926 result.rates[0][1],
5927 (40.0 * XNUMA_RATE_DAMPEN * DUTY_CYCLE_SCALE) as u64
5928 );
5929 assert_eq!(
5930 result.rates[0][2],
5931 (20.0 * XNUMA_RATE_DAMPEN * DUTY_CYCLE_SCALE) as u64
5932 );
5933 assert_eq!(
5934 result.rates[0][3],
5935 (60.0 * XNUMA_RATE_DAMPEN * DUTY_CYCLE_SCALE) as u64
5936 );
5937
5938 let total_from_n0: u64 = (0..4).map(|dst| result.rates[0][dst]).sum();
5940 assert_eq!(
5941 total_from_n0,
5942 (120.0 * XNUMA_RATE_DAMPEN * DUTY_CYCLE_SCALE) as u64
5943 );
5944 }
5945}
5946
5947#[cfg(test)]
5948mod util_compensation_tests {
5949 fn compute_scale(delta_total: u64, overhead: u64) -> f64 {
5951 let available = delta_total.saturating_sub(overhead);
5952 if available > 0 {
5953 (delta_total as f64 / available as f64).clamp(1.0, 20.0)
5954 } else {
5955 1.0
5956 }
5957 }
5958
5959 fn scaled_aggregate(
5961 deltas: &[Vec<u64>],
5962 scales: &[f64],
5963 nr_layers: usize,
5964 elapsed: f64,
5965 ) -> Vec<f64> {
5966 (0..nr_layers)
5967 .map(|layer| {
5968 let mut sum = 0.0f64;
5969 for (cpu, cpu_deltas) in deltas.iter().enumerate() {
5970 sum += cpu_deltas[layer] as f64 * scales[cpu];
5971 }
5972 sum / 1_000_000_000.0 / elapsed
5973 })
5974 .collect()
5975 }
5976
5977 #[test]
5978 fn test_scale_no_overhead() {
5979 assert!((compute_scale(1000, 0) - 1.0).abs() < 0.01);
5981 }
5982
5983 #[test]
5984 fn test_scale_half_overhead() {
5985 assert!((compute_scale(1000, 500) - 2.0).abs() < 0.01);
5987 }
5988
5989 #[test]
5990 fn test_scale_high_overhead() {
5991 assert!((compute_scale(1000, 900) - 10.0).abs() < 0.01);
5993 }
5994
5995 #[test]
5996 fn test_scale_very_high_overhead() {
5997 assert!((compute_scale(1000, 950) - 20.0).abs() < 0.01);
5999 }
6000
6001 #[test]
6002 fn test_scale_clamped_at_max() {
6003 assert!((compute_scale(1000, 980) - 20.0).abs() < 0.01);
6005 }
6006
6007 #[test]
6008 fn test_scale_all_overhead() {
6009 assert!((compute_scale(1000, 1000) - 1.0).abs() < 0.01);
6011 }
6012
6013 #[test]
6014 fn test_scale_idle_cpu() {
6015 assert!((compute_scale(0, 0) - 1.0).abs() < 0.01);
6017 }
6018
6019 #[test]
6020 fn test_scale_small_overhead_mostly_idle() {
6021 let s = compute_scale(10000, 100);
6024 assert!((s - 1.01).abs() < 0.01, "expected ~1.01, got {}", s);
6025 }
6026
6027 #[test]
6028 fn test_uniform_scale_matches_unscaled() {
6029 let deltas = vec![vec![1_000_000_000u64; 2]; 4];
6030 let scales = vec![1.0; 4];
6031 let result = scaled_aggregate(&deltas, &scales, 2, 1.0);
6032 assert!((result[0] - 4.0).abs() < 0.01);
6033 assert!((result[1] - 4.0).abs() < 0.01);
6034 }
6035
6036 #[test]
6037 fn test_hot_cpu_weighted_more() {
6038 let deltas = vec![vec![900_000_000, 0], vec![100_000_000, 0]];
6042 let scales = vec![2.0, 1.0];
6043 let result = scaled_aggregate(&deltas, &scales, 2, 1.0);
6044 assert!(
6045 (result[0] - 1.9).abs() < 0.01,
6046 "expected 1.9, got {}",
6047 result[0]
6048 );
6049 }
6050
6051 #[test]
6052 fn test_cold_cpu_weighted_less() {
6053 let deltas = vec![vec![100_000_000, 0], vec![900_000_000, 0]];
6054 let scales = vec![2.0, 1.0];
6055 let result = scaled_aggregate(&deltas, &scales, 2, 1.0);
6056 assert!(
6057 (result[0] - 1.1).abs() < 0.01,
6058 "expected 1.1, got {}",
6059 result[0]
6060 );
6061 }
6062
6063 #[test]
6064 fn test_no_usage_no_compensation() {
6065 let deltas = vec![vec![0u64; 2]; 4];
6066 let scales = vec![5.0; 4];
6067 let result = scaled_aggregate(&deltas, &scales, 2, 1.0);
6068 assert_eq!(result[0], 0.0);
6069 assert_eq!(result[1], 0.0);
6070 }
6071
6072 #[test]
6073 fn test_multilayer_independent_scaling() {
6074 let deltas = vec![
6075 vec![800_000_000, 200_000_000],
6076 vec![200_000_000, 800_000_000],
6077 ];
6078 let scales = vec![3.0, 1.0];
6079 let result = scaled_aggregate(&deltas, &scales, 2, 1.0);
6080 assert!((result[0] - 2.6).abs() < 0.01);
6081 assert!((result[1] - 1.4).abs() < 0.01);
6082 }
6083
6084 #[test]
6085 fn test_elapsed_time_normalization() {
6086 let deltas = vec![vec![500_000_000u64; 1]; 1];
6087 let scales = vec![1.0];
6088 let result = scaled_aggregate(&deltas, &scales, 1, 2.0);
6089 assert!((result[0] - 0.25).abs() < 0.01);
6090 }
6091
6092 #[test]
6093 fn test_many_cpus_mixed_scales() {
6094 let deltas = vec![vec![1_000_000_000u64; 1]; 8];
6095 let scales = vec![2.5, 1.0, 2.5, 1.0, 2.5, 1.0, 2.5, 1.0];
6096 let result = scaled_aggregate(&deltas, &scales, 1, 1.0);
6097 assert!((result[0] - 14.0).abs() < 0.01);
6098 }
6099
6100 #[test]
6101 fn test_compensated_ge_raw() {
6102 let deltas = vec![
6104 vec![500_000_000u64; 3],
6105 vec![300_000_000; 3],
6106 vec![200_000_000; 3],
6107 ];
6108 let scales_raw = vec![1.0; 3];
6109 let scales_comp = vec![1.5, 2.0, 1.0];
6110 let raw = scaled_aggregate(&deltas, &scales_raw, 3, 1.0);
6111 let comp = scaled_aggregate(&deltas, &scales_comp, 3, 1.0);
6112 for i in 0..3 {
6113 assert!(
6114 comp[i] >= raw[i] - 0.001,
6115 "layer {}: comp {} < raw {}",
6116 i,
6117 comp[i],
6118 raw[i]
6119 );
6120 }
6121 }
6122}