1mod bpf_skel;
6mod stats;
7
8use std::collections::BTreeMap;
9use std::collections::BTreeSet;
10use std::collections::HashMap;
11use std::collections::HashSet;
12use std::ffi::CString;
13use std::fs;
14use std::io::Write;
15use std::mem::MaybeUninit;
16use std::ops::Sub;
17use std::path::Path;
18use std::path::PathBuf;
19use std::sync::atomic::AtomicBool;
20use std::sync::atomic::Ordering;
21use std::sync::Arc;
22use std::thread::ThreadId;
23use std::time::Duration;
24use std::time::Instant;
25
26use inotify::{Inotify, WatchMask};
27use std::os::unix::io::AsRawFd;
28
29use anyhow::anyhow;
30use anyhow::bail;
31use anyhow::Context;
32use anyhow::Result;
33pub use bpf_skel::*;
34use clap::Parser;
35use crossbeam::channel::Receiver;
36use crossbeam::select;
37use lazy_static::lazy_static;
38use libbpf_rs::libbpf_sys;
39use libbpf_rs::AsRawLibbpf;
40use libbpf_rs::MapCore as _;
41use libbpf_rs::OpenObject;
42use libbpf_rs::ProgramInput;
43use nix::sched::CpuSet;
44use nvml_wrapper::error::NvmlError;
45use nvml_wrapper::Nvml;
46use once_cell::sync::OnceCell;
47use regex::Regex;
48use scx_layered::alloc::{unified_alloc, LayerAlloc, LayerDemand};
49use scx_layered::*;
50use scx_raw_pmu::PMUManager;
51use scx_stats::prelude::*;
52use scx_utils::build_id;
53use scx_utils::compat;
54use scx_utils::init_libbpf_logging;
55use scx_utils::libbpf_clap_opts::LibbpfOpts;
56use scx_utils::perf;
57use scx_utils::pm::{cpu_idle_resume_latency_supported, update_cpu_idle_resume_latency};
58use scx_utils::read_netdevs;
59use scx_utils::scx_enums;
60use scx_utils::scx_ops_attach;
61use scx_utils::scx_ops_load;
62use scx_utils::scx_ops_open;
63use scx_utils::uei_exited;
64use scx_utils::uei_report;
65use scx_utils::CoreType;
66use scx_utils::Cpumask;
67use scx_utils::NetDev;
68use scx_utils::Topology;
69use scx_utils::TopologyArgs;
70use scx_utils::UserExitInfo;
71use scx_utils::NR_CPUS_POSSIBLE;
72use scx_utils::NR_CPU_IDS;
73use stats::LayerStats;
74use stats::StatsReq;
75use stats::StatsRes;
76use stats::SysStats;
77use std::collections::VecDeque;
78use sysinfo::{Pid, ProcessRefreshKind, ProcessesToUpdate, System};
79use tracing::{debug, error, info, trace, warn};
80use tracing_subscriber::filter::EnvFilter;
81use walkdir::WalkDir;
82
83const SCHEDULER_NAME: &str = "scx_layered";
84const MAX_PATH: usize = bpf_intf::consts_MAX_PATH as usize;
85const MAX_COMM: usize = bpf_intf::consts_MAX_COMM as usize;
86const MAX_LAYER_WEIGHT: u32 = bpf_intf::consts_MAX_LAYER_WEIGHT;
87const MIN_LAYER_WEIGHT: u32 = bpf_intf::consts_MIN_LAYER_WEIGHT;
88const MAX_LAYER_MATCH_ORS: usize = bpf_intf::consts_MAX_LAYER_MATCH_ORS as usize;
89const MAX_LAYER_NAME: usize = bpf_intf::consts_MAX_LAYER_NAME as usize;
90const MAX_LAYERS: usize = bpf_intf::consts_MAX_LAYERS as usize;
91const DEFAULT_LAYER_WEIGHT: u32 = bpf_intf::consts_DEFAULT_LAYER_WEIGHT;
92const USAGE_HALF_LIFE: u32 = bpf_intf::consts_USAGE_HALF_LIFE;
93const USAGE_HALF_LIFE_F64: f64 = USAGE_HALF_LIFE as f64 / 1_000_000_000.0;
94
95const LAYER_USAGE_OWNED: usize = bpf_intf::layer_usage_LAYER_USAGE_OWNED as usize;
96const LAYER_USAGE_OPEN: usize = bpf_intf::layer_usage_LAYER_USAGE_OPEN as usize;
97const LAYER_USAGE_SUM_UPTO: usize = bpf_intf::layer_usage_LAYER_USAGE_SUM_UPTO as usize;
98const LAYER_USAGE_PROTECTED: usize = bpf_intf::layer_usage_LAYER_USAGE_PROTECTED as usize;
99const LAYER_USAGE_PROTECTED_PREEMPT: usize =
100 bpf_intf::layer_usage_LAYER_USAGE_PROTECTED_PREEMPT as usize;
101const NR_LAYER_USAGES: usize = bpf_intf::layer_usage_NR_LAYER_USAGES as usize;
102
103const NR_GSTATS: usize = bpf_intf::global_stat_id_NR_GSTATS as usize;
104const NR_LSTATS: usize = bpf_intf::layer_stat_id_NR_LSTATS as usize;
105const NR_LLC_LSTATS: usize = bpf_intf::llc_layer_stat_id_NR_LLC_LSTATS as usize;
106
107const NR_LAYER_MATCH_KINDS: usize = bpf_intf::layer_match_kind_NR_LAYER_MATCH_KINDS as usize;
108
109static NVML: OnceCell<Nvml> = OnceCell::new();
110
111fn nvml() -> Result<&'static Nvml, NvmlError> {
112 NVML.get_or_try_init(Nvml::init)
113}
114
115lazy_static! {
116 static ref USAGE_DECAY: f64 = 0.5f64.powf(1.0 / USAGE_HALF_LIFE_F64);
117 static ref DFL_DISALLOW_OPEN_AFTER_US: u64 = 2 * scx_enums.SCX_SLICE_DFL / 1000;
118 static ref DFL_DISALLOW_PREEMPT_AFTER_US: u64 = 4 * scx_enums.SCX_SLICE_DFL / 1000;
119 static ref EXAMPLE_CONFIG: LayerConfig = serde_json::from_str(
120 r#"[
121 {
122 "name": "batch",
123 "comment": "tasks under system.slice or tasks with nice value > 0",
124 "matches": [[{"CgroupPrefix": "system.slice/"}], [{"NiceAbove": 0}]],
125 "kind": {"Confined": {
126 "util_range": [0.8, 0.9], "cpus_range": [0, 16],
127 "min_exec_us": 1000, "slice_us": 20000, "weight": 100,
128 "xllc_mig_min_us": 1000.0, "perf": 1024
129 }}
130 },
131 {
132 "name": "immediate",
133 "comment": "tasks under workload.slice with nice value < 0",
134 "matches": [[{"CgroupPrefix": "workload.slice/"}, {"NiceBelow": 0}]],
135 "kind": {"Open": {
136 "min_exec_us": 100, "yield_ignore": 0.25, "slice_us": 20000,
137 "preempt": true, "exclusive": true,
138 "prev_over_idle_core": true,
139 "weight": 100, "perf": 1024
140 }}
141 },
142 {
143 "name": "stress-ng",
144 "comment": "stress-ng test layer",
145 "matches": [[{"CommPrefix": "stress-ng"}], [{"PcommPrefix": "stress-ng"}]],
146 "kind": {"Confined": {
147 "util_range": [0.2, 0.8],
148 "min_exec_us": 800, "preempt": true, "slice_us": 800,
149 "weight": 100, "growth_algo": "Topo", "perf": 1024
150 }}
151 },
152 {
153 "name": "normal",
154 "comment": "the rest",
155 "matches": [[]],
156 "kind": {"Grouped": {
157 "util_range": [0.5, 0.6], "util_includes_open_cputime": true,
158 "min_exec_us": 200, "slice_us": 20000, "weight": 100,
159 "xllc_mig_min_us": 100.0, "growth_algo": "Linear", "perf": 1024
160 }}
161 }
162 ]"#,
163 )
164 .unwrap();
165}
166
167#[derive(Debug, Parser)]
455#[command(verbatim_doc_comment)]
456struct Opts {
457 #[clap(short = 'v', long, action = clap::ArgAction::Count)]
459 verbose: u8,
460
461 #[clap(short = 's', long, default_value = "20000")]
463 slice_us: u64,
464
465 #[clap(short = 'M', long, default_value = "0")]
470 max_exec_us: u64,
471
472 #[clap(short = 'i', long, default_value = "0.1")]
474 interval: f64,
475
476 #[clap(short = 'n', long, default_value = "false")]
479 no_load_frac_limit: bool,
480
481 #[clap(long, default_value = "0")]
483 exit_dump_len: u32,
484
485 #[clap(long, default_value = "info")]
488 log_level: String,
489
490 #[arg(short = 't', long, num_args = 0..=1, default_missing_value = "true", require_equals = true)]
494 disable_topology: Option<bool>,
495
496 #[clap(long)]
498 monitor_disable: bool,
499
500 #[clap(short = 'e', long)]
502 example: Option<String>,
503
504 #[clap(long, default_value = "0.0")]
508 layer_preempt_weight_disable: f64,
509
510 #[clap(long, default_value = "0.0")]
514 layer_growth_weight_disable: f64,
515
516 #[clap(long)]
518 stats: Option<f64>,
519
520 #[clap(long)]
523 monitor: Option<f64>,
524
525 #[clap(long, default_value = "95")]
527 stats_columns: usize,
528
529 #[clap(long)]
531 stats_no_llc: bool,
532
533 #[clap(long)]
535 run_example: bool,
536
537 #[clap(long)]
539 allow_partial_core: bool,
540
541 #[clap(long, default_value = "false")]
544 local_llc_iteration: bool,
545
546 #[clap(long, default_value = "10000")]
551 lo_fb_wait_us: u64,
552
553 #[clap(long, default_value = ".05")]
556 lo_fb_share: f64,
557
558 #[clap(long, default_value = "false")]
560 disable_antistall: bool,
561
562 #[clap(long, default_value = "false")]
564 enable_gpu_affinitize: bool,
565
566 #[clap(long, default_value = "900")]
569 gpu_affinitize_secs: u64,
570
571 #[clap(long, default_value = "false")]
576 enable_match_debug: bool,
577
578 #[clap(long, default_value = "3")]
580 antistall_sec: u64,
581
582 #[clap(long, default_value = "false")]
584 enable_gpu_support: bool,
585
586 #[clap(long, default_value = "3")]
594 gpu_kprobe_level: u64,
595
596 #[clap(long, default_value = "false")]
598 netdev_irq_balance: bool,
599
600 #[clap(long, default_value = "false")]
602 disable_queued_wakeup: bool,
603
604 #[clap(long, default_value = "false")]
606 disable_percpu_kthread_preempt: bool,
607
608 #[clap(long, default_value = "false")]
612 percpu_kthread_preempt_all: bool,
613
614 #[clap(short = 'V', long, action = clap::ArgAction::SetTrue)]
616 version: bool,
617
618 #[clap(long)]
620 run_id: Option<u64>,
621
622 #[clap(long)]
624 help_stats: bool,
625
626 specs: Vec<String>,
628
629 #[clap(long, default_value = "2000")]
632 layer_refresh_ms_avgruntime: u64,
633
634 #[clap(long, default_value = "")]
636 task_hint_map: String,
637
638 #[clap(long, default_value = "false")]
640 print_and_exit: bool,
641
642 #[clap(long, default_value = "")]
644 hi_fb_thread_name: String,
645
646 #[clap(flatten, next_help_heading = "Topology Options")]
647 topology: TopologyArgs,
648
649 #[clap(flatten, next_help_heading = "Libbpf Options")]
650 pub libbpf: LibbpfOpts,
651}
652
653#[derive(Debug, Clone)]
655enum CgroupEvent {
656 Created {
657 path: String,
658 cgroup_id: u64, match_bitmap: u64, },
661 Removed {
662 path: String,
663 cgroup_id: u64, },
665}
666
667fn read_total_cpu(reader: &fb_procfs::ProcReader) -> Result<fb_procfs::CpuStat> {
668 reader
669 .read_stat()
670 .context("Failed to read procfs")?
671 .total_cpu
672 .ok_or_else(|| anyhow!("Could not read total cpu stat in proc"))
673}
674
675fn calc_util(curr: &fb_procfs::CpuStat, prev: &fb_procfs::CpuStat) -> Result<f64> {
676 match (curr, prev) {
677 (
678 fb_procfs::CpuStat {
679 user_usec: Some(curr_user),
680 nice_usec: Some(curr_nice),
681 system_usec: Some(curr_system),
682 idle_usec: Some(curr_idle),
683 iowait_usec: Some(curr_iowait),
684 irq_usec: Some(curr_irq),
685 softirq_usec: Some(curr_softirq),
686 stolen_usec: Some(curr_stolen),
687 ..
688 },
689 fb_procfs::CpuStat {
690 user_usec: Some(prev_user),
691 nice_usec: Some(prev_nice),
692 system_usec: Some(prev_system),
693 idle_usec: Some(prev_idle),
694 iowait_usec: Some(prev_iowait),
695 irq_usec: Some(prev_irq),
696 softirq_usec: Some(prev_softirq),
697 stolen_usec: Some(prev_stolen),
698 ..
699 },
700 ) => {
701 let idle_usec = curr_idle.saturating_sub(*prev_idle);
702 let iowait_usec = curr_iowait.saturating_sub(*prev_iowait);
703 let user_usec = curr_user.saturating_sub(*prev_user);
704 let system_usec = curr_system.saturating_sub(*prev_system);
705 let nice_usec = curr_nice.saturating_sub(*prev_nice);
706 let irq_usec = curr_irq.saturating_sub(*prev_irq);
707 let softirq_usec = curr_softirq.saturating_sub(*prev_softirq);
708 let stolen_usec = curr_stolen.saturating_sub(*prev_stolen);
709
710 let busy_usec =
711 user_usec + system_usec + nice_usec + irq_usec + softirq_usec + stolen_usec;
712 let total_usec = idle_usec + busy_usec + iowait_usec;
713 if total_usec > 0 {
714 Ok(((busy_usec as f64) / (total_usec as f64)).clamp(0.0, 1.0))
715 } else {
716 Ok(1.0)
717 }
718 }
719 _ => bail!("Missing stats in cpustat"),
720 }
721}
722
723fn copy_into_cstr(dst: &mut [i8], src: &str) {
724 let cstr = CString::new(src).unwrap();
725 let bytes = unsafe { std::mem::transmute::<&[u8], &[i8]>(cstr.as_bytes_with_nul()) };
726 dst[0..bytes.len()].copy_from_slice(bytes);
727}
728
729#[allow(clippy::needless_range_loop)]
730fn read_cpu_ctxs(skel: &BpfSkel) -> Result<Vec<bpf_intf::cpu_ctx>> {
731 let mut cpu_ctxs = vec![];
732 let cpu_ctxs_vec = skel
733 .maps
734 .cpu_ctxs
735 .lookup_percpu(&0u32.to_ne_bytes(), libbpf_rs::MapFlags::ANY)
736 .context("Failed to lookup cpu_ctx")?
737 .unwrap();
738 for cpu in 0..*NR_CPUS_POSSIBLE {
739 cpu_ctxs.push(
740 *plain::from_bytes(cpu_ctxs_vec[cpu].as_slice())
741 .expect("cpu_ctx: short or misaligned buffer"),
742 );
743 }
744 Ok(cpu_ctxs)
745}
746
747#[derive(Clone, Debug)]
748struct BpfStats {
749 gstats: Vec<u64>,
750 lstats: Vec<Vec<u64>>,
751 lstats_sums: Vec<u64>,
752 llc_lstats: Vec<Vec<Vec<u64>>>, }
754
755impl BpfStats {
756 #[allow(clippy::needless_range_loop)]
757 fn read(skel: &BpfSkel, cpu_ctxs: &[bpf_intf::cpu_ctx]) -> Self {
758 let nr_layers = skel.maps.rodata_data.as_ref().unwrap().nr_layers as usize;
759 let nr_llcs = skel.maps.rodata_data.as_ref().unwrap().nr_llcs as usize;
760 let mut gstats = vec![0u64; NR_GSTATS];
761 let mut lstats = vec![vec![0u64; NR_LSTATS]; nr_layers];
762 let mut llc_lstats = vec![vec![vec![0u64; NR_LLC_LSTATS]; nr_llcs]; nr_layers];
763
764 for cpu in 0..*NR_CPUS_POSSIBLE {
765 for (stat, value) in gstats.iter_mut().enumerate() {
766 *value += cpu_ctxs[cpu].gstats[stat];
767 }
768 for (layer, layer_stats) in lstats.iter_mut().enumerate() {
769 for (stat, value) in layer_stats.iter_mut().enumerate() {
770 *value += cpu_ctxs[cpu].lstats[layer][stat];
771 }
772 }
773 }
774
775 let mut lstats_sums = vec![0u64; NR_LSTATS];
776 for layer_stats in lstats.iter() {
777 for (stat, value) in lstats_sums.iter_mut().enumerate() {
778 *value += layer_stats[stat];
779 }
780 }
781
782 for llc_id in 0..nr_llcs {
783 let v = skel
789 .maps
790 .llc_data
791 .lookup(&(llc_id as u32).to_ne_bytes(), libbpf_rs::MapFlags::ANY)
792 .unwrap()
793 .unwrap();
794 let llcc: &bpf_intf::llc_ctx =
795 plain::from_bytes(v.as_slice()).expect("llc_ctx: short or misaligned buffer");
796
797 for (layer_id, layer_llc_stats) in llc_lstats.iter_mut().enumerate() {
798 for (stat_id, stat_value) in layer_llc_stats[llc_id].iter_mut().enumerate() {
799 *stat_value = llcc.lstats[layer_id][stat_id];
800 }
801 }
802 }
803
804 Self {
805 gstats,
806 lstats,
807 lstats_sums,
808 llc_lstats,
809 }
810 }
811}
812
813impl<'b> Sub<&'b BpfStats> for &BpfStats {
814 type Output = BpfStats;
815
816 fn sub(self, rhs: &'b BpfStats) -> BpfStats {
817 let vec_sub = |l: &[u64], r: &[u64]| l.iter().zip(r.iter()).map(|(l, r)| *l - *r).collect();
818 BpfStats {
819 gstats: vec_sub(&self.gstats, &rhs.gstats),
820 lstats: self
821 .lstats
822 .iter()
823 .zip(rhs.lstats.iter())
824 .map(|(l, r)| vec_sub(l, r))
825 .collect(),
826 lstats_sums: vec_sub(&self.lstats_sums, &rhs.lstats_sums),
827 llc_lstats: self
828 .llc_lstats
829 .iter()
830 .zip(rhs.llc_lstats.iter())
831 .map(|(l_layer, r_layer)| {
832 l_layer
833 .iter()
834 .zip(r_layer.iter())
835 .map(|(l_llc, r_llc)| {
836 let (l_llc, mut r_llc) = (l_llc.clone(), r_llc.clone());
837 r_llc[bpf_intf::llc_layer_stat_id_LLC_LSTAT_LAT as usize] = 0;
839 vec_sub(&l_llc, &r_llc)
840 })
841 .collect()
842 })
843 .collect(),
844 }
845 }
846}
847
848#[derive(Clone, Debug)]
849struct Stats {
850 at: Instant,
851 elapsed: Duration,
852 topo: Arc<Topology>,
853 nr_layers: usize,
854 nr_layer_tasks: Vec<usize>,
855 layer_nr_node_pinned_tasks: Vec<Vec<u64>>,
856
857 total_util: f64, layer_utils: Vec<Vec<f64>>,
859 prev_layer_usages: Vec<Vec<u64>>,
860 layer_node_pinned_utils: Vec<Vec<f64>>,
861 prev_layer_node_pinned_usages: Vec<Vec<u64>>,
862 layer_node_utils: Vec<Vec<f64>>,
863 prev_layer_node_usages: Vec<Vec<u64>>,
864 layer_node_duty_sums: Vec<Vec<f64>>, prev_layer_node_duty_raw: Vec<Vec<u64>>, layer_membws: Vec<Vec<f64>>, prev_layer_membw_agg: Vec<Vec<u64>>, cpu_busy: f64, prev_total_cpu: fb_procfs::CpuStat,
872 prev_pmu_resctrl_membw: (u64, u64), system_cpu_util_ewma: f64, layer_dsq_insert_ewma: Vec<f64>, bpf_stats: BpfStats,
878 prev_bpf_stats: BpfStats,
879
880 processing_dur: Duration,
881 prev_processing_dur: Duration,
882
883 layer_slice_us: Vec<u64>,
884
885 gpu_tasks_affinitized: u64,
886 gpu_task_affinitization_ms: u64,
887}
888
889impl Stats {
890 #[allow(clippy::needless_range_loop)]
891 fn read_layer_usages(cpu_ctxs: &[bpf_intf::cpu_ctx], nr_layers: usize) -> Vec<Vec<u64>> {
892 let mut layer_usages = vec![vec![0u64; NR_LAYER_USAGES]; nr_layers];
893
894 for cpu in 0..*NR_CPUS_POSSIBLE {
895 for (layer, layer_usage) in layer_usages.iter_mut().enumerate() {
896 for (usage, value) in layer_usage.iter_mut().enumerate() {
897 *value += cpu_ctxs[cpu].layer_usages[layer][usage];
898 }
899 }
900 }
901
902 layer_usages
903 }
904
905 #[allow(clippy::needless_range_loop)]
907 fn read_layer_membw_agg(cpu_ctxs: &[bpf_intf::cpu_ctx], nr_layers: usize) -> Vec<Vec<u64>> {
908 let mut layer_membw_agg = vec![vec![0u64; NR_LAYER_USAGES]; nr_layers];
909
910 for cpu in 0..*NR_CPUS_POSSIBLE {
911 for (layer, layer_membw) in layer_membw_agg.iter_mut().enumerate() {
912 for (usage, value) in layer_membw.iter_mut().enumerate() {
913 *value += cpu_ctxs[cpu].layer_membw_agg[layer][usage];
914 }
915 }
916 }
917
918 layer_membw_agg
919 }
920
921 #[allow(clippy::needless_range_loop)]
922 fn read_layer_node_pinned_usages(
923 cpu_ctxs: &[bpf_intf::cpu_ctx],
924 topo: &Topology,
925 nr_layers: usize,
926 nr_nodes: usize,
927 ) -> Vec<Vec<u64>> {
928 let mut usages = vec![vec![0u64; nr_nodes]; nr_layers];
929
930 for cpu in 0..*NR_CPUS_POSSIBLE {
931 let node = topo.all_cpus.get(&cpu).map_or(0, |c| c.node_id);
932 for (layer, layer_usages) in usages.iter_mut().enumerate().take(nr_layers) {
933 layer_usages[node] += cpu_ctxs[cpu].node_pinned_usage[layer];
934 }
935 }
936
937 usages
938 }
939
940 #[allow(clippy::needless_range_loop)]
941 fn read_layer_node_usages(
942 cpu_ctxs: &[bpf_intf::cpu_ctx],
943 topo: &Topology,
944 nr_layers: usize,
945 nr_nodes: usize,
946 ) -> Vec<Vec<u64>> {
947 let mut usages = vec![vec![0u64; nr_nodes]; nr_layers];
948
949 for cpu in 0..*NR_CPUS_POSSIBLE {
950 let node = topo.all_cpus.get(&cpu).map_or(0, |c| c.node_id);
951 for (layer, layer_usages) in usages.iter_mut().enumerate().take(nr_layers) {
952 for usage in 0..=LAYER_USAGE_SUM_UPTO {
953 layer_usages[node] += cpu_ctxs[cpu].layer_usages[layer][usage];
954 }
955 }
956 }
957
958 usages
959 }
960
961 #[allow(clippy::needless_range_loop)]
962 fn read_layer_node_duty_raw(
963 cpu_ctxs: &[bpf_intf::cpu_ctx],
964 topo: &Topology,
965 nr_layers: usize,
966 nr_nodes: usize,
967 ) -> Vec<Vec<u64>> {
968 let mut sums = vec![vec![0u64; nr_nodes]; nr_layers];
969
970 for cpu in 0..*NR_CPUS_POSSIBLE {
971 let node = topo.all_cpus.get(&cpu).map_or(0, |c| c.node_id);
972 for (layer, layer_sums) in sums.iter_mut().enumerate().take(nr_layers) {
973 layer_sums[node] += cpu_ctxs[cpu].layer_duty_sum[layer];
974 }
975 }
976
977 sums
978 }
979
980 fn resctrl_read_total_membw() -> Result<u64> {
992 let mut total_membw = 0u64;
993 for entry in WalkDir::new("/sys/fs/resctrl/mon_data")
994 .min_depth(1)
995 .into_iter()
996 .filter_map(Result::ok)
997 .filter(|x| x.path().is_dir())
998 {
999 let mut path = entry.path().to_path_buf();
1000 path.push("mbm_total_bytes");
1001 total_membw += fs::read_to_string(path)?.trim().parse::<u64>()?;
1002 }
1003
1004 Ok(total_membw)
1005 }
1006
1007 fn new(
1008 skel: &mut BpfSkel,
1009 proc_reader: &fb_procfs::ProcReader,
1010 topo: Arc<Topology>,
1011 gpu_task_affinitizer: &GpuTaskAffinitizer,
1012 ) -> Result<Self> {
1013 let nr_layers = skel.maps.rodata_data.as_ref().unwrap().nr_layers as usize;
1014 let nr_nodes = topo.nodes.len();
1015 let cpu_ctxs = read_cpu_ctxs(skel)?;
1016 let bpf_stats = BpfStats::read(skel, &cpu_ctxs);
1017 let pmu_membw = Self::read_layer_membw_agg(&cpu_ctxs, nr_layers);
1018
1019 Ok(Self {
1020 at: Instant::now(),
1021 elapsed: Default::default(),
1022
1023 topo: topo.clone(),
1024 nr_layers,
1025 nr_layer_tasks: vec![0; nr_layers],
1026 layer_nr_node_pinned_tasks: vec![vec![0; nr_nodes]; nr_layers],
1027
1028 total_util: 0.0,
1029 layer_utils: vec![vec![0.0; NR_LAYER_USAGES]; nr_layers],
1030 prev_layer_usages: Self::read_layer_usages(&cpu_ctxs, nr_layers),
1031 layer_node_pinned_utils: vec![vec![0.0; nr_nodes]; nr_layers],
1032 prev_layer_node_pinned_usages: Self::read_layer_node_pinned_usages(
1033 &cpu_ctxs, &topo, nr_layers, nr_nodes,
1034 ),
1035 layer_node_utils: vec![vec![0.0; nr_nodes]; nr_layers],
1036 prev_layer_node_usages: Self::read_layer_node_usages(
1037 &cpu_ctxs, &topo, nr_layers, nr_nodes,
1038 ),
1039 layer_node_duty_sums: vec![vec![0.0; nr_nodes]; nr_layers],
1040 prev_layer_node_duty_raw: Self::read_layer_node_duty_raw(
1041 &cpu_ctxs, &topo, nr_layers, nr_nodes,
1042 ),
1043 layer_membws: vec![vec![0.0; NR_LAYER_USAGES]; nr_layers],
1044 prev_layer_membw_agg: pmu_membw,
1048 prev_pmu_resctrl_membw: (0, 0),
1049
1050 cpu_busy: 0.0,
1051 prev_total_cpu: read_total_cpu(proc_reader)?,
1052 system_cpu_util_ewma: 0.0,
1053 layer_dsq_insert_ewma: vec![0.0; nr_layers],
1054
1055 bpf_stats: bpf_stats.clone(),
1056 prev_bpf_stats: bpf_stats,
1057
1058 processing_dur: Default::default(),
1059 prev_processing_dur: Default::default(),
1060
1061 layer_slice_us: vec![0; nr_layers],
1062 gpu_tasks_affinitized: gpu_task_affinitizer.tasks_affinitized,
1063 gpu_task_affinitization_ms: gpu_task_affinitizer.last_task_affinitization_ms,
1064 })
1065 }
1066
1067 fn refresh(
1068 &mut self,
1069 skel: &mut BpfSkel,
1070 proc_reader: &fb_procfs::ProcReader,
1071 now: Instant,
1072 cur_processing_dur: Duration,
1073 gpu_task_affinitizer: &GpuTaskAffinitizer,
1074 ) -> Result<()> {
1075 let elapsed = now.duration_since(self.at);
1076 let elapsed_f64 = elapsed.as_secs_f64();
1077 let cpu_ctxs = read_cpu_ctxs(skel)?;
1078
1079 let layers = &skel.maps.bss_data.as_ref().unwrap().layers;
1080 let nr_layer_tasks: Vec<usize> = layers
1081 .iter()
1082 .take(self.nr_layers)
1083 .map(|layer| layer.nr_tasks as usize)
1084 .collect();
1085 let layer_nr_node_pinned_tasks: Vec<Vec<u64>> = layers
1086 .iter()
1087 .take(self.nr_layers)
1088 .map(|layer| {
1089 layer.node[..self.topo.nodes.len()]
1090 .iter()
1091 .map(|n| n.nr_pinned_tasks)
1092 .collect()
1093 })
1094 .collect();
1095 let layer_slice_us: Vec<u64> = layers
1096 .iter()
1097 .take(self.nr_layers)
1098 .map(|layer| layer.slice_ns / 1000_u64)
1099 .collect();
1100
1101 let cur_layer_usages = Self::read_layer_usages(&cpu_ctxs, self.nr_layers);
1102 let cur_layer_node_pinned_usages = Self::read_layer_node_pinned_usages(
1103 &cpu_ctxs,
1104 &self.topo,
1105 self.nr_layers,
1106 self.topo.nodes.len(),
1107 );
1108 let cur_layer_node_usages = Self::read_layer_node_usages(
1109 &cpu_ctxs,
1110 &self.topo,
1111 self.nr_layers,
1112 self.topo.nodes.len(),
1113 );
1114 let cur_layer_node_duty_raw = Self::read_layer_node_duty_raw(
1115 &cpu_ctxs,
1116 &self.topo,
1117 self.nr_layers,
1118 self.topo.nodes.len(),
1119 );
1120 let cur_layer_membw_agg = Self::read_layer_membw_agg(&cpu_ctxs, self.nr_layers);
1121
1122 let (pmu_prev, resctrl_prev) = self.prev_pmu_resctrl_membw;
1130 let pmu_cur: u64 = cur_layer_membw_agg
1131 .iter()
1132 .map(|membw_agg| membw_agg[LAYER_USAGE_OPEN] + membw_agg[LAYER_USAGE_OWNED])
1133 .sum();
1134 let resctrl_cur = Self::resctrl_read_total_membw()?;
1135 let factor = (resctrl_cur - resctrl_prev) as f64 / (pmu_cur - pmu_prev) as f64;
1136
1137 let compute_diff = |cur_agg: &Vec<Vec<u64>>, prev_agg: &Vec<Vec<u64>>| {
1139 cur_agg
1140 .iter()
1141 .zip(prev_agg.iter())
1142 .map(|(cur, prev)| {
1143 cur.iter()
1144 .zip(prev.iter())
1145 .map(|(c, p)| (c - p) as f64 / 1_000_000_000.0 / elapsed_f64)
1146 .collect()
1147 })
1148 .collect()
1149 };
1150
1151 let compute_mem_diff = |cur_agg: &Vec<Vec<u64>>, prev_agg: &Vec<Vec<u64>>| {
1154 cur_agg
1155 .iter()
1156 .zip(prev_agg.iter())
1157 .map(|(cur, prev)| {
1158 cur.iter()
1159 .zip(prev.iter())
1160 .map(|(c, p)| (*c as i64 - *p as i64) as f64 / 1024_f64.powf(3.0))
1161 .collect()
1162 })
1163 .collect()
1164 };
1165
1166 let cur_layer_utils: Vec<Vec<f64>> =
1167 compute_diff(&cur_layer_usages, &self.prev_layer_usages);
1168
1169 let cur_layer_membw: Vec<Vec<f64>> =
1171 compute_mem_diff(&cur_layer_membw_agg, &self.prev_layer_membw_agg);
1172
1173 let cur_layer_membw: Vec<Vec<f64>> = cur_layer_membw
1174 .iter()
1175 .map(|x| x.iter().map(|x| *x * factor).collect())
1176 .collect();
1177
1178 let metric_decay =
1179 |cur_metric: Vec<Vec<f64>>, prev_metric: &Vec<Vec<f64>>, decay_rate: f64| {
1180 cur_metric
1181 .iter()
1182 .zip(prev_metric.iter())
1183 .map(|(cur, prev)| {
1184 cur.iter()
1185 .zip(prev.iter())
1186 .map(|(c, p)| {
1187 let decay = decay_rate.powf(elapsed_f64);
1188 p * decay + c * (1.0 - decay)
1189 })
1190 .collect()
1191 })
1192 .collect()
1193 };
1194
1195 let layer_utils: Vec<Vec<f64>> =
1196 metric_decay(cur_layer_utils, &self.layer_utils, *USAGE_DECAY);
1197 let cur_node_pinned_utils: Vec<Vec<f64>> = compute_diff(
1198 &cur_layer_node_pinned_usages,
1199 &self.prev_layer_node_pinned_usages,
1200 );
1201 let layer_node_pinned_utils: Vec<Vec<f64>> = metric_decay(
1202 cur_node_pinned_utils,
1203 &self.layer_node_pinned_utils,
1204 *USAGE_DECAY,
1205 );
1206 let cur_node_utils: Vec<Vec<f64>> =
1207 compute_diff(&cur_layer_node_usages, &self.prev_layer_node_usages);
1208 let layer_node_utils: Vec<Vec<f64>> =
1209 metric_decay(cur_node_utils, &self.layer_node_utils, *USAGE_DECAY);
1210 let cur_node_duty: Vec<Vec<f64>> =
1211 compute_diff(&cur_layer_node_duty_raw, &self.prev_layer_node_duty_raw);
1212 let layer_node_duty_sums: Vec<Vec<f64>> =
1213 metric_decay(cur_node_duty, &self.layer_node_duty_sums, *USAGE_DECAY);
1214
1215 let layer_membws: Vec<Vec<f64>> = metric_decay(cur_layer_membw, &self.layer_membws, 0.0);
1216
1217 let cur_total_cpu = read_total_cpu(proc_reader)?;
1218 let cpu_busy = calc_util(&cur_total_cpu, &self.prev_total_cpu)?;
1219
1220 const SYS_CPU_UTIL_EWMA_SECS: f64 = 10.0;
1222 let elapsed_f64 = elapsed.as_secs_f64();
1223 let alpha = elapsed_f64 / SYS_CPU_UTIL_EWMA_SECS.max(elapsed_f64);
1224 let system_cpu_util_ewma = alpha * cpu_busy + (1.0 - alpha) * self.system_cpu_util_ewma;
1225
1226 let cur_bpf_stats = BpfStats::read(skel, &cpu_ctxs);
1227 let bpf_stats = &cur_bpf_stats - &self.prev_bpf_stats;
1228
1229 const DSQ_INSERT_EWMA_SECS: f64 = 10.0;
1231 let dsq_alpha = elapsed_f64 / DSQ_INSERT_EWMA_SECS.max(elapsed_f64);
1232 let layer_dsq_insert_ewma: Vec<f64> = (0..self.nr_layers)
1233 .map(|layer_id| {
1234 let sel_local = bpf_stats.lstats[layer_id]
1235 [bpf_intf::layer_stat_id_LSTAT_SEL_LOCAL as usize]
1236 as f64;
1237 let enq_local = bpf_stats.lstats[layer_id]
1238 [bpf_intf::layer_stat_id_LSTAT_ENQ_LOCAL as usize]
1239 as f64;
1240 let enq_dsq = bpf_stats.lstats[layer_id]
1241 [bpf_intf::layer_stat_id_LSTAT_ENQ_DSQ as usize]
1242 as f64;
1243 let total_dispatches = sel_local + enq_local + enq_dsq;
1244
1245 let cur_ratio = if total_dispatches > 0.0 {
1246 enq_dsq / total_dispatches
1247 } else {
1248 0.0
1249 };
1250
1251 dsq_alpha * cur_ratio + (1.0 - dsq_alpha) * self.layer_dsq_insert_ewma[layer_id]
1252 })
1253 .collect();
1254
1255 let processing_dur = cur_processing_dur
1256 .checked_sub(self.prev_processing_dur)
1257 .unwrap();
1258
1259 *self = Self {
1260 at: now,
1261 elapsed,
1262 topo: self.topo.clone(),
1263 nr_layers: self.nr_layers,
1264 nr_layer_tasks,
1265 layer_nr_node_pinned_tasks,
1266
1267 total_util: layer_utils
1268 .iter()
1269 .map(|x| x.iter().take(LAYER_USAGE_SUM_UPTO + 1).sum::<f64>())
1270 .sum(),
1271 layer_utils,
1272 prev_layer_usages: cur_layer_usages,
1273 layer_node_pinned_utils,
1274 prev_layer_node_pinned_usages: cur_layer_node_pinned_usages,
1275 layer_node_utils,
1276 prev_layer_node_usages: cur_layer_node_usages,
1277 layer_node_duty_sums,
1278 prev_layer_node_duty_raw: cur_layer_node_duty_raw,
1279
1280 layer_membws,
1281 prev_layer_membw_agg: cur_layer_membw_agg,
1282 prev_pmu_resctrl_membw: (pmu_cur, resctrl_cur),
1284
1285 cpu_busy,
1286 prev_total_cpu: cur_total_cpu,
1287 system_cpu_util_ewma,
1288 layer_dsq_insert_ewma,
1289
1290 bpf_stats,
1291 prev_bpf_stats: cur_bpf_stats,
1292
1293 processing_dur,
1294 prev_processing_dur: cur_processing_dur,
1295
1296 layer_slice_us,
1297 gpu_tasks_affinitized: gpu_task_affinitizer.tasks_affinitized,
1298 gpu_task_affinitization_ms: gpu_task_affinitizer.last_task_affinitization_ms,
1299 };
1300 Ok(())
1301 }
1302}
1303
1304#[derive(Debug)]
1305struct Layer {
1306 name: String,
1307 kind: LayerKind,
1308 growth_algo: LayerGrowthAlgo,
1309 core_order: Vec<Vec<usize>>,
1310
1311 assigned_llcs: Vec<Vec<usize>>,
1312
1313 nr_cpus: usize,
1314 nr_llc_cpus: Vec<usize>,
1315 nr_node_cpus: Vec<usize>,
1316 cpus: Cpumask,
1317 allowed_cpus: Cpumask,
1318
1319 nr_pinned_cpus: Vec<usize>,
1321}
1322
1323fn get_kallsyms_addr(sym_name: &str) -> Result<u64> {
1324 fs::read_to_string("/proc/kallsyms")?
1325 .lines()
1326 .find(|line| line.contains(sym_name))
1327 .and_then(|line| line.split_whitespace().next())
1328 .and_then(|addr| u64::from_str_radix(addr, 16).ok())
1329 .ok_or_else(|| anyhow!("Symbol '{}' not found", sym_name))
1330}
1331
1332fn resolve_cpus_pct_range(
1333 cpus_range: &Option<(usize, usize)>,
1334 cpus_range_frac: &Option<(f64, f64)>,
1335 max_cpus: usize,
1336) -> Result<(usize, usize)> {
1337 match (cpus_range, cpus_range_frac) {
1338 (Some(_x), Some(_y)) => {
1339 bail!("cpus_range cannot be used with cpus_pct.");
1340 }
1341 (Some((cpus_range_min, cpus_range_max)), None) => Ok((*cpus_range_min, *cpus_range_max)),
1342 (None, Some((cpus_frac_min, cpus_frac_max))) => {
1343 if *cpus_frac_min < 0_f64
1344 || *cpus_frac_min > 1_f64
1345 || *cpus_frac_max < 0_f64
1346 || *cpus_frac_max > 1_f64
1347 {
1348 bail!("cpus_range_frac values must be between 0.0 and 1.0");
1349 }
1350 let cpus_min_count = ((max_cpus as f64) * cpus_frac_min).round_ties_even() as usize;
1351 let cpus_max_count = ((max_cpus as f64) * cpus_frac_max).round_ties_even() as usize;
1352 Ok((
1353 std::cmp::max(cpus_min_count, 1),
1354 std::cmp::min(std::cmp::max(cpus_max_count, 1), max_cpus),
1355 ))
1356 }
1357 (None, None) => Ok((0, max_cpus)),
1358 }
1359}
1360
1361impl Layer {
1362 fn new(spec: &LayerSpec, topo: &Topology, core_order: &Vec<Vec<usize>>) -> Result<Self> {
1363 let name = &spec.name;
1364 let kind = spec.kind.clone();
1365 let mut allowed_cpus = Cpumask::new();
1366 match &kind {
1367 LayerKind::Confined {
1368 cpus_range,
1369 cpus_range_frac,
1370 common: LayerCommon { nodes, llcs, .. },
1371 ..
1372 } => {
1373 let cpus_range =
1374 resolve_cpus_pct_range(cpus_range, cpus_range_frac, topo.all_cpus.len())?;
1375 if cpus_range.0 > cpus_range.1 || cpus_range.1 == 0 {
1376 bail!("invalid cpus_range {:?}", cpus_range);
1377 }
1378 if nodes.is_empty() && llcs.is_empty() {
1379 allowed_cpus.set_all();
1380 } else {
1381 for (node_id, node) in &topo.nodes {
1383 if nodes.contains(node_id) {
1385 for &id in node.all_cpus.keys() {
1386 allowed_cpus.set_cpu(id)?;
1387 }
1388 }
1389 for (llc_id, llc) in &node.llcs {
1391 if llcs.contains(llc_id) {
1392 for &id in llc.all_cpus.keys() {
1393 allowed_cpus.set_cpu(id)?;
1394 }
1395 }
1396 }
1397 }
1398 }
1399 }
1400 LayerKind::Grouped {
1401 common: LayerCommon { nodes, llcs, .. },
1402 ..
1403 }
1404 | LayerKind::Open {
1405 common: LayerCommon { nodes, llcs, .. },
1406 ..
1407 } => {
1408 if nodes.is_empty() && llcs.is_empty() {
1409 allowed_cpus.set_all();
1410 } else {
1411 for (node_id, node) in &topo.nodes {
1413 if nodes.contains(node_id) {
1415 for &id in node.all_cpus.keys() {
1416 allowed_cpus.set_cpu(id)?;
1417 }
1418 }
1419 for (llc_id, llc) in &node.llcs {
1421 if llcs.contains(llc_id) {
1422 for &id in llc.all_cpus.keys() {
1423 allowed_cpus.set_cpu(id)?;
1424 }
1425 }
1426 }
1427 }
1428 }
1429 }
1430 }
1431
1432 if let Some(util_range) = kind.util_range() {
1435 if util_range.0 < 0.0 || util_range.1 < 0.0 || util_range.0 >= util_range.1 {
1436 bail!("invalid util_range {:?}", util_range);
1437 }
1438 }
1439
1440 let layer_growth_algo = kind.common().growth_algo.clone();
1441
1442 debug!(
1443 "layer: {} algo: {:?} core order: {:?}",
1444 name, &layer_growth_algo, core_order
1445 );
1446
1447 Ok(Self {
1448 name: name.into(),
1449 kind,
1450 growth_algo: layer_growth_algo,
1451 core_order: core_order.clone(),
1452
1453 assigned_llcs: vec![vec![]; topo.nodes.len()],
1454
1455 nr_cpus: 0,
1456 nr_llc_cpus: vec![0; topo.all_llcs.len()],
1457 nr_node_cpus: vec![0; topo.nodes.len()],
1458 cpus: Cpumask::new(),
1459 allowed_cpus,
1460
1461 nr_pinned_cpus: vec![0; topo.nodes.len()],
1462 })
1463 }
1464}
1465#[derive(Debug, Clone)]
1466struct NodeInfo {
1467 node_mask: nix::sched::CpuSet,
1468 _node_id: usize,
1469}
1470
1471#[derive(Debug)]
1472struct GpuTaskAffinitizer {
1473 gpu_devs_to_node_info: HashMap<u32, NodeInfo>,
1476 gpu_pids_to_devs: HashMap<Pid, u32>,
1477 last_process_time: Option<Instant>,
1478 sys: System,
1479 pid_map: HashMap<Pid, Vec<Pid>>,
1480 poll_interval: Duration,
1481 enable: bool,
1482 tasks_affinitized: u64,
1483 last_task_affinitization_ms: u64,
1484}
1485
1486impl GpuTaskAffinitizer {
1487 pub fn new(poll_interval: u64, enable: bool) -> GpuTaskAffinitizer {
1488 GpuTaskAffinitizer {
1489 gpu_devs_to_node_info: HashMap::new(),
1490 gpu_pids_to_devs: HashMap::new(),
1491 last_process_time: None,
1492 sys: System::default(),
1493 pid_map: HashMap::new(),
1494 poll_interval: Duration::from_secs(poll_interval),
1495 enable,
1496 tasks_affinitized: 0,
1497 last_task_affinitization_ms: 0,
1498 }
1499 }
1500
1501 fn find_one_cpu(&self, affinity: Vec<u64>) -> Result<u32> {
1502 for (chunk, &mask) in affinity.iter().enumerate() {
1503 let mut inner_offset: u64 = 1;
1504 for _ in 0..64 {
1505 if (mask & inner_offset) != 0 {
1506 return Ok((64 * chunk + u64::trailing_zeros(inner_offset) as usize) as u32);
1507 }
1508 inner_offset <<= 1;
1509 }
1510 }
1511 anyhow::bail!("unable to get CPU from NVML bitmask");
1512 }
1513
1514 fn node_to_cpuset(&self, node: &scx_utils::Node) -> Result<CpuSet> {
1515 let mut cpuset = CpuSet::new();
1516 for cpu_id in node.all_cpus.keys() {
1517 cpuset.set(*cpu_id)?;
1518 }
1519 Ok(cpuset)
1520 }
1521
1522 fn init_dev_node_map(&mut self, topo: Arc<Topology>) -> Result<()> {
1523 let nvml = nvml()?;
1524 let device_count = nvml.device_count()?;
1525
1526 for idx in 0..device_count {
1527 let dev = nvml.device_by_index(idx)?;
1528 let cpu = dev.cpu_affinity(16)?;
1530 let ideal_cpu = self.find_one_cpu(cpu)?;
1531 if let Some(cpu) = topo.all_cpus.get(&(ideal_cpu as usize)) {
1532 self.gpu_devs_to_node_info.insert(
1533 idx,
1534 NodeInfo {
1535 node_mask: self.node_to_cpuset(
1536 topo.nodes.get(&cpu.node_id).expect("topo missing node"),
1537 )?,
1538 _node_id: cpu.node_id,
1539 },
1540 );
1541 }
1542 }
1543 Ok(())
1544 }
1545
1546 fn update_gpu_pids(&mut self) -> Result<()> {
1547 let nvml = nvml()?;
1548 for i in 0..nvml.device_count()? {
1549 let device = nvml.device_by_index(i)?;
1550 for proc in device
1551 .running_compute_processes()?
1552 .into_iter()
1553 .chain(device.running_graphics_processes()?.into_iter())
1554 {
1555 self.gpu_pids_to_devs.insert(Pid::from_u32(proc.pid), i);
1556 }
1557 }
1558 Ok(())
1559 }
1560
1561 fn update_process_info(&mut self) -> Result<()> {
1562 self.sys.refresh_processes_specifics(
1563 ProcessesToUpdate::All,
1564 true,
1565 ProcessRefreshKind::nothing(),
1566 );
1567 self.pid_map.clear();
1568 for (pid, proc_) in self.sys.processes() {
1569 if let Some(ppid) = proc_.parent() {
1570 self.pid_map.entry(ppid).or_default().push(*pid);
1571 }
1572 }
1573 Ok(())
1574 }
1575
1576 fn get_child_pids_and_tids(&self, root_pid: Pid) -> HashSet<Pid> {
1577 let mut work = VecDeque::from([root_pid]);
1578 let mut pids_and_tids: HashSet<Pid> = HashSet::new();
1579
1580 while let Some(pid) = work.pop_front() {
1581 if pids_and_tids.insert(pid) {
1582 if let Some(kids) = self.pid_map.get(&pid) {
1583 work.extend(kids);
1584 }
1585 if let Some(proc_) = self.sys.process(pid) {
1586 if let Some(tasks) = proc_.tasks() {
1587 pids_and_tids.extend(tasks.iter().copied());
1588 }
1589 }
1590 }
1591 }
1592 pids_and_tids
1593 }
1594
1595 fn affinitize_gpu_pids(&mut self) -> Result<()> {
1596 if !self.enable {
1597 return Ok(());
1598 }
1599 for (pid, dev) in &self.gpu_pids_to_devs {
1600 let node_info = self
1601 .gpu_devs_to_node_info
1602 .get(dev)
1603 .expect("Unable to get gpu pid node mask");
1604 for child in self.get_child_pids_and_tids(*pid) {
1605 match nix::sched::sched_setaffinity(
1606 nix::unistd::Pid::from_raw(child.as_u32() as i32),
1607 &node_info.node_mask,
1608 ) {
1609 Ok(_) => {
1610 self.tasks_affinitized += 1;
1612 }
1613 Err(_) => {
1614 debug!(
1615 "Error affinitizing gpu pid {} to node {:#?}",
1616 child.as_u32(),
1617 node_info
1618 );
1619 }
1620 };
1621 }
1622 }
1623 Ok(())
1624 }
1625
1626 pub fn maybe_affinitize(&mut self) {
1627 if !self.enable {
1628 return;
1629 }
1630 let now = Instant::now();
1631
1632 if let Some(last_process_time) = self.last_process_time {
1633 if (now - last_process_time) < self.poll_interval {
1634 return;
1635 }
1636 }
1637
1638 match self.update_gpu_pids() {
1639 Ok(_) => {}
1640 Err(e) => {
1641 error!("Error updating GPU PIDs: {}", e);
1642 }
1643 };
1644 match self.update_process_info() {
1645 Ok(_) => {}
1646 Err(e) => {
1647 error!("Error updating process info to affinitize GPU PIDs: {}", e);
1648 }
1649 };
1650 match self.affinitize_gpu_pids() {
1651 Ok(_) => {}
1652 Err(e) => {
1653 error!("Error updating GPU PIDs: {}", e);
1654 }
1655 };
1656 self.last_process_time = Some(now);
1657 self.last_task_affinitization_ms = (Instant::now() - now).as_millis() as u64;
1658 }
1659
1660 pub fn init(&mut self, topo: Arc<Topology>) {
1661 if !self.enable || self.last_process_time.is_some() {
1662 return;
1663 }
1664
1665 match self.init_dev_node_map(topo) {
1666 Ok(_) => {}
1667 Err(e) => {
1668 error!("Error initializing gpu node dev map: {}", e);
1669 }
1670 };
1671 self.sys = System::new_all();
1672 }
1673}
1674
1675struct Scheduler<'a> {
1676 skel: BpfSkel<'a>,
1677 struct_ops: Option<libbpf_rs::Link>,
1678 layer_specs: Vec<LayerSpec>,
1679
1680 sched_intv: Duration,
1681 layer_refresh_intv: Duration,
1682
1683 cpu_pool: CpuPool,
1684 layers: Vec<Layer>,
1685 idle_qos_enabled: bool,
1686
1687 proc_reader: fb_procfs::ProcReader,
1688 sched_stats: Stats,
1689
1690 cgroup_regexes: Option<HashMap<u32, Regex>>,
1691
1692 nr_layer_cpus_ranges: Vec<(usize, usize)>,
1693 xnuma_mig_src: Vec<Vec<bool>>,
1694 growth_denied: Vec<Vec<bool>>,
1695 processing_dur: Duration,
1696
1697 topo: Arc<Topology>,
1698 netdevs: BTreeMap<String, NetDev>,
1699 stats_server: StatsServer<StatsReq, StatsRes>,
1700 gpu_task_handler: GpuTaskAffinitizer,
1701}
1702
1703const DUTY_CYCLE_SCALE: f64 = (1u64 << 20) as f64;
1704const XNUMA_RATE_DAMPEN: f64 = 0.5;
1705
1706struct XnumaRates {
1708 rates: Vec<Vec<u64>>,
1710}
1711
1712fn xnuma_check_active(
1725 duty_sums: &[f64],
1726 allocs: &[usize],
1727 threshold: (f64, f64),
1728 threshold_delta: (f64, f64),
1729 growth_denied: &[bool],
1730 currently_active: &[bool],
1731) -> Vec<bool> {
1732 let nr_nodes = duty_sums.len();
1733 let total_duty: f64 = duty_sums.iter().sum();
1734 let total_alloc: f64 = allocs.iter().map(|&a| a as f64).sum();
1735 let eq_ratio = if total_alloc > 0.0 {
1736 total_duty / total_alloc
1737 } else {
1738 0.0
1739 };
1740
1741 let (thresh_lo, thresh_hi) = threshold;
1742 let (delta_lo, delta_hi) = threshold_delta;
1743
1744 let mut result = vec![false; nr_nodes];
1745 for nid in 0..nr_nodes {
1746 let alloc = allocs[nid] as f64;
1747 if alloc <= 0.0 {
1748 if duty_sums[nid] > 0.0 && growth_denied[nid] {
1749 result[nid] = true;
1750 }
1751 continue;
1752 }
1753
1754 let load_ratio = duty_sums[nid] / alloc;
1755 let surplus = duty_sums[nid] - eq_ratio * alloc;
1756 let surplus_ratio = surplus / alloc;
1757
1758 let should_activate =
1759 load_ratio > thresh_hi && surplus_ratio > delta_hi && growth_denied[nid];
1760 let should_deactivate =
1761 load_ratio < thresh_lo || surplus_ratio < delta_lo || !growth_denied[nid];
1762
1763 if should_activate {
1764 result[nid] = true;
1765 } else if should_deactivate {
1766 result[nid] = false;
1767 } else {
1768 result[nid] = currently_active[nid];
1769 }
1770 }
1771 result
1772}
1773
1774fn xnuma_compute_rates(duty_sums: &[f64], allocs: &[usize]) -> XnumaRates {
1780 let nr_nodes = duty_sums.len();
1781 let total_duty: f64 = duty_sums.iter().sum();
1782 let total_alloc: f64 = allocs.iter().map(|&a| a as f64).sum();
1783
1784 if total_alloc <= 0.0 {
1785 return XnumaRates {
1786 rates: vec![vec![0u64; nr_nodes]; nr_nodes],
1787 };
1788 }
1789
1790 let eq_ratio = total_duty / total_alloc;
1791
1792 let mut surpluses = vec![0.0f64; nr_nodes];
1793 let mut deficits = vec![0.0f64; nr_nodes];
1794 for nid in 0..nr_nodes {
1795 let expected = eq_ratio * allocs[nid] as f64;
1796 let delta = duty_sums[nid] - expected;
1797 if delta > 0.0 {
1798 surpluses[nid] = delta;
1799 } else {
1800 deficits[nid] = -delta;
1801 }
1802 }
1803
1804 let total_deficit: f64 = deficits.iter().sum();
1805
1806 let mut rates = vec![vec![0u64; nr_nodes]; nr_nodes];
1807 for src in 0..nr_nodes {
1808 for dst in 0..nr_nodes {
1809 if src == dst || total_deficit <= 0.0 || surpluses[src] <= 0.0 {
1810 continue;
1811 }
1812 let migration = surpluses[src] * deficits[dst] / total_deficit * XNUMA_RATE_DAMPEN;
1815 rates[src][dst] = (migration * DUTY_CYCLE_SCALE) as u64;
1816 }
1817 }
1818
1819 XnumaRates { rates }
1820}
1821
1822impl<'a> Scheduler<'a> {
1823 fn init_layers(
1824 skel: &mut OpenBpfSkel,
1825 specs: &[LayerSpec],
1826 topo: &Topology,
1827 ) -> Result<HashMap<u32, Regex>> {
1828 skel.maps.rodata_data.as_mut().unwrap().nr_layers = specs.len() as u32;
1829 let mut perf_set = false;
1830
1831 let mut layer_iteration_order = (0..specs.len()).collect::<Vec<_>>();
1832 let mut layer_weights: Vec<usize> = vec![];
1833 let mut cgroup_regex_id = 0;
1834 let mut cgroup_regexes = HashMap::new();
1835
1836 for (spec_i, spec) in specs.iter().enumerate() {
1837 let layer = &mut skel.maps.bss_data.as_mut().unwrap().layers[spec_i];
1838
1839 for (or_i, or) in spec.matches.iter().enumerate() {
1840 for (and_i, and) in or.iter().enumerate() {
1841 let mt = &mut layer.matches[or_i].matches[and_i];
1842
1843 mt.exclude.write(false);
1845
1846 match and {
1847 LayerMatch::CgroupPrefix(prefix) => {
1848 mt.kind = bpf_intf::layer_match_kind_MATCH_CGROUP_PREFIX as i32;
1849 copy_into_cstr(&mut mt.cgroup_prefix, prefix.as_str());
1850 }
1851 LayerMatch::CgroupSuffix(suffix) => {
1852 mt.kind = bpf_intf::layer_match_kind_MATCH_CGROUP_SUFFIX as i32;
1853 copy_into_cstr(&mut mt.cgroup_suffix, suffix.as_str());
1854 }
1855 LayerMatch::CgroupRegex(regex_str) => {
1856 if cgroup_regex_id >= bpf_intf::consts_MAX_CGROUP_REGEXES {
1857 bail!(
1858 "Too many cgroup regex rules. Maximum allowed: {}",
1859 bpf_intf::consts_MAX_CGROUP_REGEXES
1860 );
1861 }
1862
1863 mt.kind = bpf_intf::layer_match_kind_MATCH_CGROUP_REGEX as i32;
1865 mt.cgroup_regex_id = cgroup_regex_id;
1866
1867 let regex = Regex::new(regex_str).with_context(|| {
1868 format!("Invalid regex '{}' in layer '{}'", regex_str, spec.name)
1869 })?;
1870 cgroup_regexes.insert(cgroup_regex_id, regex);
1871 cgroup_regex_id += 1;
1872 }
1873 LayerMatch::CgroupContains(substr) => {
1874 mt.kind = bpf_intf::layer_match_kind_MATCH_CGROUP_CONTAINS as i32;
1875 copy_into_cstr(&mut mt.cgroup_substr, substr.as_str());
1876 }
1877 LayerMatch::CommPrefix(prefix) => {
1878 mt.kind = bpf_intf::layer_match_kind_MATCH_COMM_PREFIX as i32;
1879 copy_into_cstr(&mut mt.comm_prefix, prefix.as_str());
1880 }
1881 LayerMatch::CommPrefixExclude(prefix) => {
1882 mt.kind = bpf_intf::layer_match_kind_MATCH_COMM_PREFIX as i32;
1883 mt.exclude.write(true);
1884 copy_into_cstr(&mut mt.comm_prefix, prefix.as_str());
1885 }
1886 LayerMatch::PcommPrefix(prefix) => {
1887 mt.kind = bpf_intf::layer_match_kind_MATCH_PCOMM_PREFIX as i32;
1888 copy_into_cstr(&mut mt.pcomm_prefix, prefix.as_str());
1889 }
1890 LayerMatch::PcommPrefixExclude(prefix) => {
1891 mt.kind = bpf_intf::layer_match_kind_MATCH_PCOMM_PREFIX as i32;
1892 mt.exclude.write(true);
1893 copy_into_cstr(&mut mt.pcomm_prefix, prefix.as_str());
1894 }
1895 LayerMatch::NiceAbove(nice) => {
1896 mt.kind = bpf_intf::layer_match_kind_MATCH_NICE_ABOVE as i32;
1897 mt.nice = *nice;
1898 }
1899 LayerMatch::NiceBelow(nice) => {
1900 mt.kind = bpf_intf::layer_match_kind_MATCH_NICE_BELOW as i32;
1901 mt.nice = *nice;
1902 }
1903 LayerMatch::NiceEquals(nice) => {
1904 mt.kind = bpf_intf::layer_match_kind_MATCH_NICE_EQUALS as i32;
1905 mt.nice = *nice;
1906 }
1907 LayerMatch::UIDEquals(user_id) => {
1908 mt.kind = bpf_intf::layer_match_kind_MATCH_USER_ID_EQUALS as i32;
1909 mt.user_id = *user_id;
1910 }
1911 LayerMatch::GIDEquals(group_id) => {
1912 mt.kind = bpf_intf::layer_match_kind_MATCH_GROUP_ID_EQUALS as i32;
1913 mt.group_id = *group_id;
1914 }
1915 LayerMatch::PIDEquals(pid) => {
1916 mt.kind = bpf_intf::layer_match_kind_MATCH_PID_EQUALS as i32;
1917 mt.pid = *pid;
1918 }
1919 LayerMatch::PPIDEquals(ppid) => {
1920 mt.kind = bpf_intf::layer_match_kind_MATCH_PPID_EQUALS as i32;
1921 mt.ppid = *ppid;
1922 }
1923 LayerMatch::TGIDEquals(tgid) => {
1924 mt.kind = bpf_intf::layer_match_kind_MATCH_TGID_EQUALS as i32;
1925 mt.tgid = *tgid;
1926 }
1927 LayerMatch::NSPIDEquals(nsid, pid) => {
1928 mt.kind = bpf_intf::layer_match_kind_MATCH_NSPID_EQUALS as i32;
1929 mt.nsid = *nsid;
1930 mt.pid = *pid;
1931 }
1932 LayerMatch::NSEquals(nsid) => {
1933 mt.kind = bpf_intf::layer_match_kind_MATCH_NS_EQUALS as i32;
1934 mt.nsid = *nsid as u64;
1935 }
1936 LayerMatch::CmdJoin(joincmd) => {
1937 mt.kind = bpf_intf::layer_match_kind_MATCH_SCXCMD_JOIN as i32;
1938 copy_into_cstr(&mut mt.comm_prefix, joincmd);
1939 }
1940 LayerMatch::IsGroupLeader(polarity) => {
1941 mt.kind = bpf_intf::layer_match_kind_MATCH_IS_GROUP_LEADER as i32;
1942 mt.is_group_leader.write(*polarity);
1943 }
1944 LayerMatch::IsKthread(polarity) => {
1945 mt.kind = bpf_intf::layer_match_kind_MATCH_IS_KTHREAD as i32;
1946 mt.is_kthread.write(*polarity);
1947 }
1948 LayerMatch::UsedGpuTid(polarity) => {
1949 mt.kind = bpf_intf::layer_match_kind_MATCH_USED_GPU_TID as i32;
1950 mt.used_gpu_tid.write(*polarity);
1951 }
1952 LayerMatch::UsedGpuPid(polarity) => {
1953 mt.kind = bpf_intf::layer_match_kind_MATCH_USED_GPU_PID as i32;
1954 mt.used_gpu_pid.write(*polarity);
1955 }
1956 LayerMatch::AvgRuntime(min, max) => {
1957 mt.kind = bpf_intf::layer_match_kind_MATCH_AVG_RUNTIME as i32;
1958 mt.min_avg_runtime_us = *min;
1959 mt.max_avg_runtime_us = *max;
1960 }
1961 LayerMatch::HintEquals(hint) => {
1962 mt.kind = bpf_intf::layer_match_kind_MATCH_HINT_EQUALS as i32;
1963 mt.hint = *hint;
1964 }
1965 LayerMatch::SystemCpuUtilBelow(threshold) => {
1966 mt.kind = bpf_intf::layer_match_kind_MATCH_SYSTEM_CPU_UTIL_BELOW as i32;
1967 mt.system_cpu_util_below = (*threshold * 10000.0) as u64;
1968 }
1969 LayerMatch::DsqInsertBelow(threshold) => {
1970 mt.kind = bpf_intf::layer_match_kind_MATCH_DSQ_INSERT_BELOW as i32;
1971 mt.dsq_insert_below = (*threshold * 10000.0) as u64;
1972 }
1973 LayerMatch::NumaNode(node_id) => {
1974 if *node_id as usize >= topo.nodes.len() {
1975 bail!(
1976 "Spec {:?} has invalid NUMA node ID {} (available nodes: 0-{})",
1977 spec.name,
1978 node_id,
1979 topo.nodes.len() - 1
1980 );
1981 }
1982 mt.kind = bpf_intf::layer_match_kind_MATCH_NUMA_NODE as i32;
1983 mt.numa_node_id = *node_id;
1984 }
1985 }
1986 }
1987 layer.matches[or_i].nr_match_ands = or.len() as i32;
1988 }
1989
1990 layer.nr_match_ors = spec.matches.len() as u32;
1991 layer.kind = spec.kind.as_bpf_enum();
1992
1993 {
1994 let LayerCommon {
1995 min_exec_us,
1996 yield_ignore,
1997 perf,
1998 preempt,
1999 preempt_first,
2000 exclusive,
2001 skip_remote_node,
2002 prev_over_idle_core,
2003 growth_algo,
2004 slice_us,
2005 fifo,
2006 weight,
2007 disallow_open_after_us,
2008 disallow_preempt_after_us,
2009 xllc_mig_min_us,
2010 placement,
2011 member_expire_ms,
2012 ..
2013 } = spec.kind.common();
2014
2015 layer.slice_ns = *slice_us * 1000;
2016 layer.fifo.write(*fifo);
2017 layer.min_exec_ns = min_exec_us * 1000;
2018 layer.yield_step_ns = if *yield_ignore > 0.999 {
2019 0
2020 } else if *yield_ignore < 0.001 {
2021 layer.slice_ns
2022 } else {
2023 (layer.slice_ns as f64 * (1.0 - *yield_ignore)) as u64
2024 };
2025 let mut layer_name: String = spec.name.clone();
2026 layer_name.truncate(MAX_LAYER_NAME);
2027 copy_into_cstr(&mut layer.name, layer_name.as_str());
2028 layer.preempt.write(*preempt);
2029 layer.preempt_first.write(*preempt_first);
2030 layer.excl.write(*exclusive);
2031 layer.skip_remote_node.write(*skip_remote_node);
2032 layer.prev_over_idle_core.write(*prev_over_idle_core);
2033 layer.growth_algo = growth_algo.as_bpf_enum();
2034 layer.weight = *weight;
2035 layer.member_expire_ms = *member_expire_ms;
2036 layer.disallow_open_after_ns = match disallow_open_after_us.unwrap() {
2037 v if v == u64::MAX => v,
2038 v => v * 1000,
2039 };
2040 layer.disallow_preempt_after_ns = match disallow_preempt_after_us.unwrap() {
2041 v if v == u64::MAX => v,
2042 v => v * 1000,
2043 };
2044 layer.xllc_mig_min_ns = (xllc_mig_min_us * 1000.0) as u64;
2045 layer_weights.push(layer.weight.try_into().unwrap());
2046 layer.perf = u32::try_from(*perf)?;
2047
2048 let task_place = |place: u32| crate::types::layer_task_place(place);
2049 layer.task_place = match placement {
2050 LayerPlacement::Standard => {
2051 task_place(bpf_intf::layer_task_place_PLACEMENT_STD)
2052 }
2053 LayerPlacement::Sticky => {
2054 task_place(bpf_intf::layer_task_place_PLACEMENT_STICK)
2055 }
2056 LayerPlacement::Floating => {
2057 task_place(bpf_intf::layer_task_place_PLACEMENT_FLOAT)
2058 }
2059 };
2060 }
2061
2062 layer.is_protected.write(match spec.kind {
2063 LayerKind::Open { .. } => false,
2064 LayerKind::Confined { protected, .. } | LayerKind::Grouped { protected, .. } => {
2065 protected
2066 }
2067 });
2068
2069 match &spec.cpuset {
2070 Some(mask) => {
2071 Self::update_cpumask(mask, &mut layer.cpuset);
2072 layer.has_cpuset.write(true);
2073 }
2074 None => {
2075 for i in 0..layer.cpuset.len() {
2076 layer.cpuset[i] = u8::MAX;
2077 }
2078 layer.has_cpuset.write(false);
2079 }
2080 };
2081
2082 perf_set |= layer.perf > 0;
2083 }
2084
2085 layer_iteration_order.sort_by(|i, j| layer_weights[*i].cmp(&layer_weights[*j]));
2086 for (idx, layer_idx) in layer_iteration_order.iter().enumerate() {
2087 skel.maps
2088 .rodata_data
2089 .as_mut()
2090 .unwrap()
2091 .layer_iteration_order[idx] = *layer_idx as u32;
2092 }
2093
2094 if perf_set && !compat::ksym_exists("scx_bpf_cpuperf_set")? {
2095 warn!("cpufreq support not available, ignoring perf configurations");
2096 }
2097
2098 Ok(cgroup_regexes)
2099 }
2100
2101 fn init_nodes(skel: &mut OpenBpfSkel, _opts: &Opts, topo: &Topology) {
2102 skel.maps.rodata_data.as_mut().unwrap().nr_nodes = topo.nodes.len() as u32;
2103 skel.maps.rodata_data.as_mut().unwrap().nr_llcs = 0;
2104
2105 for (&node_id, node) in &topo.nodes {
2106 debug!("configuring node {}, LLCs {:?}", node_id, node.llcs.len());
2107 skel.maps.rodata_data.as_mut().unwrap().nr_llcs += node.llcs.len() as u32;
2108 let raw_numa_slice = node.span.as_raw_slice();
2109 let node_cpumask_slice =
2110 &mut skel.maps.rodata_data.as_mut().unwrap().numa_cpumasks[node_id];
2111 let (left, _) = node_cpumask_slice.split_at_mut(raw_numa_slice.len());
2112 left.clone_from_slice(raw_numa_slice);
2113 debug!(
2114 "node {} mask: {:?}",
2115 node_id,
2116 skel.maps.rodata_data.as_ref().unwrap().numa_cpumasks[node_id]
2117 );
2118
2119 for llc in node.llcs.values() {
2120 debug!("configuring llc {:?} for node {:?}", llc.id, node_id);
2121 skel.maps.rodata_data.as_mut().unwrap().llc_numa_id_map[llc.id] = node_id as u32;
2122 }
2123 }
2124
2125 for cpu in topo.all_cpus.values() {
2126 skel.maps.rodata_data.as_mut().unwrap().cpu_llc_id_map[cpu.id] = cpu.llc_id as u32;
2127 }
2128 }
2129
2130 fn init_cpu_prox_map(topo: &Topology, cpu_ctxs: &mut [bpf_intf::cpu_ctx]) {
2131 let radiate = |mut vec: Vec<usize>, center_id: usize| -> Vec<usize> {
2132 vec.sort_by_key(|&id| (center_id as i32 - id as i32).abs());
2133 vec
2134 };
2135 let radiate_cpu =
2136 |mut vec: Vec<usize>, center_cpu: usize, center_core: usize| -> Vec<usize> {
2137 vec.sort_by_key(|&id| {
2138 (
2139 (center_core as i32 - topo.all_cpus.get(&id).unwrap().core_id as i32).abs(),
2140 (center_cpu as i32 - id as i32).abs(),
2141 )
2142 });
2143 vec
2144 };
2145
2146 for (&cpu_id, cpu) in &topo.all_cpus {
2147 let mut core_span = topo.all_cores[&cpu.core_id].span.clone();
2149 let llc_span = &topo.all_llcs[&cpu.llc_id].span;
2150 let node_span = &topo.nodes[&cpu.node_id].span;
2151 let sys_span = &topo.span;
2152
2153 let sys_span = sys_span.and(&node_span.not());
2155 let node_span = node_span.and(&llc_span.not());
2156 let llc_span = llc_span.and(&core_span.not());
2157 core_span.clear_cpu(cpu_id).unwrap();
2158
2159 let mut sys_order: Vec<usize> = sys_span.iter().collect();
2161 let mut node_order: Vec<usize> = node_span.iter().collect();
2162 let mut llc_order: Vec<usize> = llc_span.iter().collect();
2163 let mut core_order: Vec<usize> = core_span.iter().collect();
2164
2165 sys_order = radiate_cpu(sys_order, cpu_id, cpu.core_id);
2170 node_order = radiate(node_order, cpu.node_id);
2171 llc_order = radiate_cpu(llc_order, cpu_id, cpu.core_id);
2172 core_order = radiate_cpu(core_order, cpu_id, cpu.core_id);
2173
2174 let mut order: Vec<usize> = vec![];
2176 let mut idx: usize = 0;
2177
2178 idx += 1;
2179 order.push(cpu_id);
2180
2181 idx += core_order.len();
2182 order.append(&mut core_order);
2183 let core_end = idx;
2184
2185 idx += llc_order.len();
2186 order.append(&mut llc_order);
2187 let llc_end = idx;
2188
2189 idx += node_order.len();
2190 order.append(&mut node_order);
2191 let node_end = idx;
2192
2193 idx += sys_order.len();
2194 order.append(&mut sys_order);
2195 let sys_end = idx;
2196
2197 debug!(
2198 "CPU[{}] proximity map[{}/{}/{}/{}]: {:?}",
2199 cpu_id, core_end, llc_end, node_end, sys_end, &order
2200 );
2201
2202 let pmap = &mut cpu_ctxs[cpu_id].prox_map;
2204 for (i, &cpu) in order.iter().enumerate() {
2205 pmap.cpus[i] = cpu as u16;
2206 }
2207 pmap.core_end = core_end as u32;
2208 pmap.llc_end = llc_end as u32;
2209 pmap.node_end = node_end as u32;
2210 pmap.sys_end = sys_end as u32;
2211 }
2212 }
2213
2214 fn convert_cpu_ctxs(cpu_ctxs: Vec<bpf_intf::cpu_ctx>) -> Vec<Vec<u8>> {
2215 cpu_ctxs
2216 .iter()
2217 .map(|cpu_ctx| unsafe { plain::as_bytes(cpu_ctx) }.to_vec())
2218 .collect()
2219 }
2220
2221 fn init_cpus(skel: &BpfSkel, layer_specs: &[LayerSpec], topo: &Topology) -> Result<()> {
2222 let key = (0_u32).to_ne_bytes();
2223 let mut cpu_ctxs: Vec<bpf_intf::cpu_ctx> = vec![];
2224 let cpu_ctxs_vec = skel
2225 .maps
2226 .cpu_ctxs
2227 .lookup_percpu(&key, libbpf_rs::MapFlags::ANY)
2228 .context("Failed to lookup cpu_ctx")?
2229 .unwrap();
2230
2231 let op_layers: Vec<u32> = layer_specs
2232 .iter()
2233 .enumerate()
2234 .filter(|(_idx, spec)| match &spec.kind {
2235 LayerKind::Open { .. } => spec.kind.common().preempt,
2236 _ => false,
2237 })
2238 .map(|(idx, _)| idx as u32)
2239 .collect();
2240 let on_layers: Vec<u32> = layer_specs
2241 .iter()
2242 .enumerate()
2243 .filter(|(_idx, spec)| match &spec.kind {
2244 LayerKind::Open { .. } => !spec.kind.common().preempt,
2245 _ => false,
2246 })
2247 .map(|(idx, _)| idx as u32)
2248 .collect();
2249 let gp_layers: Vec<u32> = layer_specs
2250 .iter()
2251 .enumerate()
2252 .filter(|(_idx, spec)| match &spec.kind {
2253 LayerKind::Grouped { .. } => spec.kind.common().preempt,
2254 _ => false,
2255 })
2256 .map(|(idx, _)| idx as u32)
2257 .collect();
2258 let gn_layers: Vec<u32> = layer_specs
2259 .iter()
2260 .enumerate()
2261 .filter(|(_idx, spec)| match &spec.kind {
2262 LayerKind::Grouped { .. } => !spec.kind.common().preempt,
2263 _ => false,
2264 })
2265 .map(|(idx, _)| idx as u32)
2266 .collect();
2267
2268 for cpu in 0..*NR_CPUS_POSSIBLE {
2270 cpu_ctxs.push(
2271 *plain::from_bytes(cpu_ctxs_vec[cpu].as_slice())
2272 .expect("cpu_ctx: short or misaligned buffer"),
2273 );
2274
2275 let topo_cpu = topo.all_cpus.get(&cpu).unwrap();
2276 let is_big = topo_cpu.core_type == CoreType::Big { turbo: true };
2277 cpu_ctxs[cpu].cpu = cpu as i32;
2278 cpu_ctxs[cpu].layer_id = MAX_LAYERS as u32;
2279 cpu_ctxs[cpu].is_big = is_big;
2280
2281 fastrand::seed(cpu as u64);
2282
2283 let mut ogp_order = op_layers.clone();
2284 ogp_order.append(&mut gp_layers.clone());
2285 fastrand::shuffle(&mut ogp_order);
2286
2287 let mut ogn_order = on_layers.clone();
2288 ogn_order.append(&mut gn_layers.clone());
2289 fastrand::shuffle(&mut ogn_order);
2290
2291 let mut op_order = op_layers.clone();
2292 fastrand::shuffle(&mut op_order);
2293
2294 let mut on_order = on_layers.clone();
2295 fastrand::shuffle(&mut on_order);
2296
2297 let mut gp_order = gp_layers.clone();
2298 fastrand::shuffle(&mut gp_order);
2299
2300 let mut gn_order = gn_layers.clone();
2301 fastrand::shuffle(&mut gn_order);
2302
2303 for i in 0..MAX_LAYERS {
2304 cpu_ctxs[cpu].ogp_layer_order[i] =
2305 ogp_order.get(i).cloned().unwrap_or(MAX_LAYERS as u32);
2306 cpu_ctxs[cpu].ogn_layer_order[i] =
2307 ogn_order.get(i).cloned().unwrap_or(MAX_LAYERS as u32);
2308
2309 cpu_ctxs[cpu].op_layer_order[i] =
2310 op_order.get(i).cloned().unwrap_or(MAX_LAYERS as u32);
2311 cpu_ctxs[cpu].on_layer_order[i] =
2312 on_order.get(i).cloned().unwrap_or(MAX_LAYERS as u32);
2313 cpu_ctxs[cpu].gp_layer_order[i] =
2314 gp_order.get(i).cloned().unwrap_or(MAX_LAYERS as u32);
2315 cpu_ctxs[cpu].gn_layer_order[i] =
2316 gn_order.get(i).cloned().unwrap_or(MAX_LAYERS as u32);
2317 }
2318 }
2319
2320 Self::init_cpu_prox_map(topo, &mut cpu_ctxs);
2321
2322 skel.maps
2323 .cpu_ctxs
2324 .update_percpu(
2325 &key,
2326 &Self::convert_cpu_ctxs(cpu_ctxs),
2327 libbpf_rs::MapFlags::ANY,
2328 )
2329 .context("Failed to update cpu_ctx")?;
2330
2331 Ok(())
2332 }
2333
2334 fn init_llc_prox_map(skel: &mut BpfSkel, topo: &Topology) -> Result<()> {
2335 for (&llc_id, llc) in &topo.all_llcs {
2336 let mut node_order: Vec<usize> =
2338 topo.nodes[&llc.node_id].llcs.keys().cloned().collect();
2339 let mut sys_order: Vec<usize> = topo.all_llcs.keys().cloned().collect();
2340
2341 sys_order.retain(|id| !node_order.contains(id));
2343 node_order.retain(|&id| id != llc_id);
2344
2345 fastrand::seed(llc_id as u64);
2348 fastrand::shuffle(&mut sys_order);
2349 fastrand::shuffle(&mut node_order);
2350
2351 let mut order: Vec<usize> = vec![];
2353 let mut idx: usize = 0;
2354
2355 idx += 1;
2356 order.push(llc_id);
2357
2358 idx += node_order.len();
2359 order.append(&mut node_order);
2360 let node_end = idx;
2361
2362 idx += sys_order.len();
2363 order.append(&mut sys_order);
2364 let sys_end = idx;
2365
2366 debug!(
2367 "LLC[{}] proximity map[{}/{}]: {:?}",
2368 llc_id, node_end, sys_end, &order
2369 );
2370
2371 let key = (llc_id as u32).to_ne_bytes();
2376 let v = skel
2377 .maps
2378 .llc_data
2379 .lookup(&key, libbpf_rs::MapFlags::ANY)
2380 .unwrap()
2381 .unwrap();
2382 let mut llcc: bpf_intf::llc_ctx =
2383 *plain::from_bytes(v.as_slice()).expect("llc_ctx: short or misaligned buffer");
2384
2385 let pmap = &mut llcc.prox_map;
2386 for (i, &llc_id) in order.iter().enumerate() {
2387 pmap.llcs[i] = llc_id as u16;
2388 }
2389 pmap.node_end = node_end as u32;
2390 pmap.sys_end = sys_end as u32;
2391
2392 skel.maps.llc_data.update(
2393 &key,
2394 unsafe { plain::as_bytes(&llcc) },
2395 libbpf_rs::MapFlags::ANY,
2396 )?
2397 }
2398
2399 Ok(())
2400 }
2401
2402 fn init_node_prox_map(skel: &mut BpfSkel, topo: &Topology) -> Result<()> {
2403 for (&node_id, node) in &topo.nodes {
2404 let mut order: Vec<(usize, usize)> = node
2405 .distance
2406 .iter()
2407 .enumerate()
2408 .filter(|&(nid, _)| nid != node_id)
2409 .map(|(nid, &dist)| (nid, dist))
2410 .collect();
2411 order.sort_by_key(|&(_, dist)| dist);
2412
2413 let key = (node_id as u32).to_ne_bytes();
2414
2415 let v = skel.maps.node_data.lookup(&key, libbpf_rs::MapFlags::ANY);
2417 let mut nodec: bpf_intf::node_ctx = match v {
2418 Ok(Some(v)) => {
2419 *plain::from_bytes(v.as_slice()).expect("node_ctx: short or misaligned buffer")
2420 }
2421 _ => unsafe { MaybeUninit::zeroed().assume_init() },
2422 };
2423
2424 let pmap = &mut nodec.prox_map;
2425 for (i, &(nid, _)) in order.iter().enumerate() {
2426 pmap.nodes[i] = nid as u16;
2427 }
2428 pmap.sys_end = order.len() as u32;
2429
2430 debug!(
2431 "NODE[{}] prox_map[{}]: {:?}",
2432 node_id,
2433 pmap.sys_end,
2434 &order.iter().map(|(n, d)| (*n, *d)).collect::<Vec<_>>()
2435 );
2436
2437 skel.maps.node_data.update(
2438 &key,
2439 unsafe { plain::as_bytes(&nodec) },
2440 libbpf_rs::MapFlags::ANY,
2441 )?;
2442 }
2443 Ok(())
2444 }
2445
2446 fn init_node_ctx(skel: &mut BpfSkel, topo: &Topology, nr_layers: usize) -> Result<()> {
2447 let all_layers: Vec<u32> = (0..nr_layers as u32).collect();
2448 let node_empty_layers: Vec<Vec<u32>> =
2449 (0..topo.nodes.len()).map(|_| all_layers.clone()).collect();
2450 Self::refresh_node_ctx(skel, topo, &node_empty_layers, true);
2451 Ok(())
2452 }
2453
2454 fn init(
2455 opts: &'a Opts,
2456 layer_specs: &[LayerSpec],
2457 open_object: &'a mut MaybeUninit<OpenObject>,
2458 hint_to_layer_map: &HashMap<u64, HintLayerInfo>,
2459 membw_tracking: bool,
2460 ) -> Result<Self> {
2461 let nr_layers = layer_specs.len();
2462 let mut disable_topology = opts.disable_topology.unwrap_or(false);
2463
2464 let topo = Arc::new(if disable_topology {
2465 Topology::with_flattened_llc_node()?
2466 } else if opts.topology.virt_llc.is_some() {
2467 Topology::with_args(&opts.topology)?
2468 } else {
2469 Topology::new()?
2470 });
2471
2472 if topo.nodes.keys().enumerate().any(|(i, &k)| i != k) {
2479 bail!("Holes in node IDs detected: {:?}", topo.nodes.keys());
2480 }
2481 if topo.all_llcs.keys().enumerate().any(|(i, &k)| i != k) {
2482 bail!("Holes in LLC IDs detected: {:?}", topo.all_llcs.keys());
2483 }
2484 if topo.all_cpus.keys().enumerate().any(|(i, &k)| i != k) {
2485 bail!("Holes in CPU IDs detected: {:?}", topo.all_cpus.keys());
2486 }
2487
2488 let netdevs = if opts.netdev_irq_balance {
2489 let devs = read_netdevs()?;
2490 let total_irqs: usize = devs.values().map(|d| d.irqs.len()).sum();
2491 let breakdown = devs
2492 .iter()
2493 .map(|(iface, d)| format!("{iface}={}", d.irqs.len()))
2494 .collect::<Vec<_>>()
2495 .join(", ");
2496 info!(
2497 "Netdev IRQ balancing enabled: overriding {total_irqs} IRQ{} \
2498 across {} interface{} [{breakdown}]",
2499 if total_irqs == 1 { "" } else { "s" },
2500 devs.len(),
2501 if devs.len() == 1 { "" } else { "s" },
2502 );
2503 devs
2504 } else {
2505 BTreeMap::new()
2506 };
2507
2508 if !disable_topology {
2509 if topo.nodes.len() == 1 && topo.nodes[&0].llcs.len() == 1 {
2510 disable_topology = true;
2511 };
2512 info!(
2513 "Topology awareness not specified, selecting {} based on hardware",
2514 if disable_topology {
2515 "disabled"
2516 } else {
2517 "enabled"
2518 }
2519 );
2520 };
2521
2522 let cpu_pool = CpuPool::new(topo.clone(), opts.allow_partial_core)?;
2523
2524 let layer_specs: Vec<_> = if disable_topology {
2527 info!("Disabling topology awareness");
2528 layer_specs
2529 .iter()
2530 .cloned()
2531 .map(|mut s| {
2532 s.kind.common_mut().nodes.clear();
2533 s.kind.common_mut().llcs.clear();
2534 s
2535 })
2536 .collect()
2537 } else {
2538 layer_specs.to_vec()
2539 };
2540
2541 for spec in layer_specs.iter() {
2543 let mut seen = BTreeSet::new();
2544 for &node_id in spec.nodes().iter() {
2545 if !topo.nodes.contains_key(&node_id) {
2546 bail!(
2547 "layer {:?}: nodes references node {} which does not \
2548 exist in the topology (available: {:?})",
2549 spec.name,
2550 node_id,
2551 topo.nodes.keys().collect::<Vec<_>>()
2552 );
2553 }
2554 if !seen.insert(node_id) {
2555 bail!(
2556 "layer {:?}: nodes contains duplicate node {}",
2557 spec.name,
2558 node_id
2559 );
2560 }
2561 }
2562
2563 seen.clear();
2564 for &llc_id in spec.llcs().iter() {
2565 if !topo.all_llcs.contains_key(&llc_id) {
2566 bail!(
2567 "layer {:?}: llcs references LLC {} which does not \
2568 exist in the topology (available: {:?})",
2569 spec.name,
2570 llc_id,
2571 topo.all_llcs.keys().collect::<Vec<_>>()
2572 );
2573 }
2574 if !seen.insert(llc_id) {
2575 bail!(
2576 "layer {:?}: llcs contains duplicate LLC {}",
2577 spec.name,
2578 llc_id
2579 );
2580 }
2581 }
2582 }
2583
2584 for spec in layer_specs.iter() {
2585 let has_numa_node_match = spec
2586 .matches
2587 .iter()
2588 .flatten()
2589 .any(|m| matches!(m, LayerMatch::NumaNode(_)));
2590 let has_node_spread_algo = matches!(
2591 spec.kind.common().growth_algo,
2592 LayerGrowthAlgo::NodeSpread
2593 | LayerGrowthAlgo::NodeSpreadReverse
2594 | LayerGrowthAlgo::NodeSpreadRandom
2595 );
2596 if has_numa_node_match && has_node_spread_algo {
2597 bail!(
2598 "layer {:?}: NumaNode matcher cannot be combined with {:?} \
2599 growth algorithm. NodeSpread* allocates CPUs equally across \
2600 ALL NUMA nodes, but NumaNode restricts tasks to one node's \
2601 CPUs — CPUs on other nodes are wasted and utilization \
2602 will never exceed 1/numa_nodes. Use a non-spread algorithm \
2603 (e.g. Linear, Topo) instead.",
2604 spec.name,
2605 spec.kind.common().growth_algo
2606 );
2607 }
2608 }
2609
2610 init_libbpf_logging(None);
2612 let kfuncs_in_syscall = scx_bpf_compat::kfuncs_supported_in_syscall()?;
2613 if !kfuncs_in_syscall {
2614 warn!("Using slow path: kfuncs not supported in syscall programs (a8e03b6bbb2c ∉ ker)");
2615 }
2616
2617 let debug_level = if opts.log_level.contains("trace") {
2619 2
2620 } else if opts.log_level.contains("debug") {
2621 1
2622 } else {
2623 0
2624 };
2625 let mut skel_builder = BpfSkelBuilder::default();
2626 skel_builder.obj_builder.debug(debug_level > 1);
2627
2628 info!(
2629 "Running scx_layered (build ID: {})",
2630 build_id::full_version(env!("CARGO_PKG_VERSION"))
2631 );
2632 let open_opts = opts.libbpf.clone().into_bpf_open_opts();
2633 let mut skel = scx_ops_open!(skel_builder, open_object, layered, open_opts)?;
2634
2635 skel.progs.scx_pmu_switch_tc.set_autoload(membw_tracking);
2637 skel.progs.scx_pmu_tick_tc.set_autoload(membw_tracking);
2638
2639 let mut loaded_kprobes = HashSet::new();
2640
2641 if opts.enable_gpu_support {
2644 if opts.gpu_kprobe_level >= 1 {
2647 compat::cond_kprobe_load("nvidia_open", &skel.progs.kprobe_nvidia_open)?;
2648 loaded_kprobes.insert("nvidia_open");
2649 }
2650 if opts.gpu_kprobe_level >= 2 {
2653 compat::cond_kprobe_load("nvidia_mmap", &skel.progs.kprobe_nvidia_mmap)?;
2654 loaded_kprobes.insert("nvidia_mmap");
2655 }
2656 if opts.gpu_kprobe_level >= 3 {
2657 compat::cond_kprobe_load("nvidia_poll", &skel.progs.kprobe_nvidia_poll)?;
2658 loaded_kprobes.insert("nvidia_poll");
2659 }
2660 }
2661
2662 let ext_sched_class_addr = get_kallsyms_addr("ext_sched_class");
2663 let idle_sched_class_addr = get_kallsyms_addr("idle_sched_class");
2664
2665 let event = if membw_tracking {
2666 setup_membw_tracking(&mut skel)?
2667 } else {
2668 0
2669 };
2670
2671 let rodata = skel.maps.rodata_data.as_mut().unwrap();
2672
2673 if let (Ok(ext_addr), Ok(idle_addr)) = (ext_sched_class_addr, idle_sched_class_addr) {
2674 rodata.ext_sched_class_addr = ext_addr;
2675 rodata.idle_sched_class_addr = idle_addr;
2676 } else {
2677 warn!(
2678 "Unable to get sched_class addresses from /proc/kallsyms, disabling skip_preempt."
2679 );
2680 }
2681
2682 rodata.slice_ns = scx_enums.SCX_SLICE_DFL;
2683 rodata.max_exec_ns = 20 * scx_enums.SCX_SLICE_DFL;
2684
2685 skel.struct_ops.layered_mut().exit_dump_len = opts.exit_dump_len;
2687
2688 if !opts.disable_queued_wakeup {
2689 match *compat::SCX_OPS_ALLOW_QUEUED_WAKEUP {
2690 0 => info!("Kernel does not support queued wakeup optimization"),
2691 v => skel.struct_ops.layered_mut().flags |= v,
2692 }
2693 }
2694
2695 rodata.percpu_kthread_preempt = !opts.disable_percpu_kthread_preempt;
2696 rodata.percpu_kthread_preempt_all =
2697 !opts.disable_percpu_kthread_preempt && opts.percpu_kthread_preempt_all;
2698 rodata.debug = debug_level as u32;
2699 rodata.slice_ns = opts.slice_us * 1000;
2700 rodata.max_exec_ns = if opts.max_exec_us > 0 {
2701 opts.max_exec_us * 1000
2702 } else {
2703 opts.slice_us * 1000 * 20
2704 };
2705 rodata.nr_cpu_ids = *NR_CPU_IDS as u32;
2706 rodata.nr_possible_cpus = *NR_CPUS_POSSIBLE as u32;
2707 rodata.smt_enabled = topo.smt_enabled;
2708 rodata.has_little_cores = topo.has_little_cores();
2709 rodata.antistall_sec = opts.antistall_sec;
2710 rodata.monitor_disable = opts.monitor_disable;
2711 rodata.lo_fb_wait_ns = opts.lo_fb_wait_us * 1000;
2712 rodata.lo_fb_share_ppk = ((opts.lo_fb_share * 1024.0) as u32).clamp(1, 1024);
2713 rodata.enable_antistall = !opts.disable_antistall;
2714 rodata.enable_match_debug = opts.enable_match_debug;
2715 rodata.enable_gpu_support = opts.enable_gpu_support;
2716 rodata.kfuncs_supported_in_syscall = kfuncs_in_syscall;
2717
2718 for (cpu, sib) in topo.sibling_cpus().iter().enumerate() {
2719 rodata.__sibling_cpu[cpu] = *sib;
2720 }
2721 for cpu in topo.all_cpus.keys() {
2722 rodata.all_cpus[cpu / 8] |= 1 << (cpu % 8);
2723 }
2724
2725 rodata.nr_op_layers = layer_specs
2726 .iter()
2727 .filter(|spec| match &spec.kind {
2728 LayerKind::Open { .. } => spec.kind.common().preempt,
2729 _ => false,
2730 })
2731 .count() as u32;
2732 rodata.nr_on_layers = layer_specs
2733 .iter()
2734 .filter(|spec| match &spec.kind {
2735 LayerKind::Open { .. } => !spec.kind.common().preempt,
2736 _ => false,
2737 })
2738 .count() as u32;
2739 rodata.nr_gp_layers = layer_specs
2740 .iter()
2741 .filter(|spec| match &spec.kind {
2742 LayerKind::Grouped { .. } => spec.kind.common().preempt,
2743 _ => false,
2744 })
2745 .count() as u32;
2746 rodata.nr_gn_layers = layer_specs
2747 .iter()
2748 .filter(|spec| match &spec.kind {
2749 LayerKind::Grouped { .. } => !spec.kind.common().preempt,
2750 _ => false,
2751 })
2752 .count() as u32;
2753 rodata.nr_excl_layers = layer_specs
2754 .iter()
2755 .filter(|spec| spec.kind.common().exclusive)
2756 .count() as u32;
2757
2758 let mut min_open = u64::MAX;
2759 let mut min_preempt = u64::MAX;
2760
2761 for spec in layer_specs.iter() {
2762 if let LayerKind::Open { common, .. } = &spec.kind {
2763 min_open = min_open.min(common.disallow_open_after_us.unwrap());
2764 min_preempt = min_preempt.min(common.disallow_preempt_after_us.unwrap());
2765 }
2766 }
2767
2768 rodata.min_open_layer_disallow_open_after_ns = match min_open {
2769 u64::MAX => *DFL_DISALLOW_OPEN_AFTER_US,
2770 v => v,
2771 };
2772 rodata.min_open_layer_disallow_preempt_after_ns = match min_preempt {
2773 u64::MAX => *DFL_DISALLOW_PREEMPT_AFTER_US,
2774 v => v,
2775 };
2776
2777 let layered_task_hint_map_path = &opts.task_hint_map;
2782 let hint_map = &mut skel.maps.scx_layered_task_hint_map;
2783 if !layered_task_hint_map_path.is_empty() {
2785 hint_map.set_pin_path(layered_task_hint_map_path).unwrap();
2786 rodata.task_hint_map_enabled = true;
2787 }
2788
2789 if !opts.hi_fb_thread_name.is_empty() {
2790 let bpf_hi_fb_thread_name = &mut rodata.hi_fb_thread_name;
2791 copy_into_cstr(bpf_hi_fb_thread_name, opts.hi_fb_thread_name.as_str());
2792 rodata.enable_hi_fb_thread_name_match = true;
2793 }
2794
2795 let cgroup_regexes = Self::init_layers(&mut skel, &layer_specs, &topo)?;
2796 skel.maps.rodata_data.as_mut().unwrap().nr_cgroup_regexes = cgroup_regexes.len() as u32;
2797 Self::init_nodes(&mut skel, opts, &topo);
2798
2799 let mut skel = scx_ops_load!(skel, layered, uei)?;
2800
2801 if !hint_to_layer_map.is_empty() {
2803 for (k, v) in hint_to_layer_map.iter() {
2804 let key: u32 = *k as u32;
2805
2806 let mut info_bytes = vec![0u8; std::mem::size_of::<bpf_intf::hint_layer_info>()];
2808 let info_ptr = info_bytes.as_mut_ptr() as *mut bpf_intf::hint_layer_info;
2809 unsafe {
2810 (*info_ptr).layer_id = v.layer_id as u32;
2811 (*info_ptr).system_cpu_util_below = match v.system_cpu_util_below {
2812 Some(threshold) => (threshold * 10000.0) as u64,
2813 None => u64::MAX, };
2815 (*info_ptr).dsq_insert_below = match v.dsq_insert_below {
2816 Some(threshold) => (threshold * 10000.0) as u64,
2817 None => u64::MAX, };
2819 }
2820
2821 skel.maps.hint_to_layer_id_map.update(
2822 &key.to_ne_bytes(),
2823 &info_bytes,
2824 libbpf_rs::MapFlags::ANY,
2825 )?;
2826 }
2827 }
2828
2829 if membw_tracking {
2830 create_perf_fds(&mut skel, event)?;
2831 }
2832
2833 let mut layers = vec![];
2834 let layer_growth_orders =
2835 LayerGrowthAlgo::layer_core_orders(&cpu_pool, &layer_specs, &topo)?;
2836 for (idx, spec) in layer_specs.iter().enumerate() {
2837 let growth_order = layer_growth_orders
2838 .get(&idx)
2839 .with_context(|| "layer has no growth order".to_string())?;
2840 layers.push(Layer::new(spec, &topo, growth_order)?);
2841 }
2842
2843 let mut idle_qos_enabled = layers
2844 .iter()
2845 .any(|layer| layer.kind.common().idle_resume_us.unwrap_or(0) > 0);
2846 if idle_qos_enabled && !cpu_idle_resume_latency_supported() {
2847 warn!("idle_resume_us not supported, ignoring");
2848 idle_qos_enabled = false;
2849 }
2850
2851 Self::init_cpus(&skel, &layer_specs, &topo)?;
2852 Self::init_llc_prox_map(&mut skel, &topo)?;
2853 Self::init_node_prox_map(&mut skel, &topo)?;
2854 Self::init_node_ctx(&mut skel, &topo, nr_layers)?;
2855
2856 let proc_reader = fb_procfs::ProcReader::new();
2858
2859 let input = ProgramInput {
2861 ..Default::default()
2862 };
2863 let prog = &mut skel.progs.initialize_pid_namespace;
2864
2865 let _ = prog.test_run(input);
2866
2867 if !layered_task_hint_map_path.is_empty() {
2876 let path = CString::new(layered_task_hint_map_path.as_bytes()).unwrap();
2877 let mode: libc::mode_t = 0o666;
2878 unsafe {
2879 if libc::chmod(path.as_ptr(), mode) != 0 {
2880 trace!("'chmod' to 666 of task hint map failed, continuing...");
2881 }
2882 }
2883 }
2884
2885 let struct_ops = scx_ops_attach!(skel, layered)?;
2887
2888 if opts.enable_gpu_support {
2890 if loaded_kprobes.contains("nvidia_open") {
2891 compat::cond_kprobe_attach("nvidia_open", &skel.progs.kprobe_nvidia_open)?;
2892 }
2893 if loaded_kprobes.contains("nvidia_mmap") {
2894 compat::cond_kprobe_attach("nvidia_mmap", &skel.progs.kprobe_nvidia_mmap)?;
2895 }
2896 if loaded_kprobes.contains("nvidia_poll") {
2897 compat::cond_kprobe_attach("nvidia_poll", &skel.progs.kprobe_nvidia_poll)?;
2898 }
2899 }
2900
2901 let stats_server = StatsServer::new(stats::server_data()).launch()?;
2902 let mut gpu_task_handler =
2903 GpuTaskAffinitizer::new(opts.gpu_affinitize_secs, opts.enable_gpu_affinitize);
2904 gpu_task_handler.init(topo.clone());
2905
2906 let sched = Self {
2907 struct_ops: Some(struct_ops),
2908 layer_specs,
2909
2910 sched_intv: Duration::from_secs_f64(opts.interval),
2911 layer_refresh_intv: Duration::from_millis(opts.layer_refresh_ms_avgruntime),
2912
2913 cpu_pool,
2914 layers,
2915 idle_qos_enabled,
2916
2917 sched_stats: Stats::new(&mut skel, &proc_reader, topo.clone(), &gpu_task_handler)?,
2918
2919 cgroup_regexes: Some(cgroup_regexes),
2920 nr_layer_cpus_ranges: vec![(0, 0); nr_layers],
2921 xnuma_mig_src: vec![vec![false; topo.nodes.len()]; nr_layers],
2922 growth_denied: vec![vec![false; topo.nodes.len()]; nr_layers],
2923 processing_dur: Default::default(),
2924
2925 proc_reader,
2926 skel,
2927
2928 topo,
2929 netdevs,
2930 stats_server,
2931 gpu_task_handler,
2932 };
2933
2934 info!("Layered Scheduler Attached. Run `scx_layered --monitor` for metrics.");
2935
2936 Ok(sched)
2937 }
2938
2939 fn update_cpumask(mask: &Cpumask, bpfmask: &mut [u8]) {
2940 for cpu in 0..mask.len() {
2941 if mask.test_cpu(cpu) {
2942 bpfmask[cpu / 8] |= 1 << (cpu % 8);
2943 } else {
2944 bpfmask[cpu / 8] &= !(1 << (cpu % 8));
2945 }
2946 }
2947 }
2948
2949 fn update_bpf_layer_cpumask(layer: &Layer, bpf_layer: &mut types::layer) {
2950 trace!("[{}] Updating BPF CPUs: {}", layer.name, &layer.cpus);
2951 Self::update_cpumask(&layer.cpus, &mut bpf_layer.cpus);
2952
2953 bpf_layer.nr_cpus = layer.nr_cpus as u32;
2954 for (llc_id, &nr_llc_cpus) in layer.nr_llc_cpus.iter().enumerate() {
2955 bpf_layer.nr_llc_cpus[llc_id] = nr_llc_cpus as u32;
2956 }
2957 for (node_id, &nr_node_cpus) in layer.nr_node_cpus.iter().enumerate() {
2958 bpf_layer.node[node_id].nr_cpus = nr_node_cpus as u32;
2959 }
2960
2961 bpf_layer.refresh_cpus = 1;
2962 }
2963
2964 fn update_netdev_cpumasks(&mut self) -> Result<()> {
2965 let available_cpus = self.cpu_pool.available_cpus();
2966 if available_cpus.is_empty() {
2967 return Ok(());
2968 }
2969
2970 for (iface, netdev) in self.netdevs.iter_mut() {
2971 let node = self
2972 .topo
2973 .nodes
2974 .values()
2975 .find(|n| n.id == netdev.node())
2976 .ok_or_else(|| anyhow!("Failed to get netdev node"))?;
2977 let node_cpus = node.span.clone();
2978 for (irq, irqmask) in netdev.irqs.iter_mut() {
2979 irqmask.clear_all();
2980 for cpu in available_cpus.iter() {
2981 if !node_cpus.test_cpu(cpu) {
2982 continue;
2983 }
2984 let _ = irqmask.set_cpu(cpu);
2985 }
2986 if irqmask.weight() == 0 {
2988 for cpu in node_cpus.iter() {
2989 let _ = irqmask.set_cpu(cpu);
2990 }
2991 }
2992 trace!("{} updating irq {} cpumask {:?}", iface, irq, irqmask);
2993 }
2994 netdev.apply_cpumasks()?;
2995 debug!(
2996 "{iface}: applied affinity override to {} IRQ{}",
2997 netdev.irqs.len(),
2998 if netdev.irqs.len() == 1 { "" } else { "s" },
2999 );
3000 }
3001
3002 Ok(())
3003 }
3004
3005 fn clamp_target_by_membw(
3006 &self,
3007 layer: &Layer,
3008 membw_limit: f64,
3009 membw: f64,
3010 curtarget: u64,
3011 ) -> usize {
3012 let ncpu: u64 = layer.cpus.weight() as u64;
3013 let membw = (membw * 1024_f64.powf(3.0)).round() as u64;
3014 let membw_limit = (membw_limit * 1024_f64.powf(3.0)).round() as u64;
3015 let last_membw_percpu = if ncpu > 0 { membw / ncpu } else { 0 };
3016
3017 if membw_limit == 0 || last_membw_percpu == 0 {
3020 return curtarget as usize;
3021 }
3022
3023 (membw_limit / last_membw_percpu) as usize
3024 }
3025
3026 fn calc_raw_demands(&self, targets: &[(usize, usize)]) -> Vec<LayerDemand> {
3030 let au = self.cpu_pool.alloc_unit();
3031 let pinned_utils = &self.sched_stats.layer_node_pinned_utils;
3032 let nr_nodes = self.topo.nodes.len();
3033
3034 targets
3035 .iter()
3036 .enumerate()
3037 .map(|(idx, &(target, _min))| {
3038 let layer = &self.layers[idx];
3039 let weight = layer.kind.common().weight as usize;
3040
3041 if matches!(layer.kind, LayerKind::Open { .. }) {
3043 return LayerDemand {
3044 raw_pinned: vec![0; nr_nodes],
3045 raw_unpinned: 0,
3046 weight,
3047 spread: false,
3048 };
3049 }
3050
3051 let spread = matches!(
3052 layer.growth_algo,
3053 LayerGrowthAlgo::NodeSpread
3054 | LayerGrowthAlgo::NodeSpreadReverse
3055 | LayerGrowthAlgo::NodeSpreadRandom
3056 | LayerGrowthAlgo::RoundRobin
3057 );
3058
3059 let util_high = match &layer.kind {
3060 LayerKind::Confined { util_range, .. }
3061 | LayerKind::Grouped { util_range, .. } => util_range.1,
3062 _ => 1.0,
3063 };
3064
3065 let mut raw_pinned = vec![0usize; nr_nodes];
3067 for n in 0..nr_nodes {
3068 let pu = pinned_utils[idx][n];
3069 if pu < 0.01 {
3070 continue;
3071 }
3072 let node_span = &self.topo.nodes[&n].span;
3074 if layer.allowed_cpus.and(node_span).is_empty() {
3075 continue;
3076 }
3077 let cpus = (pu / util_high).ceil() as usize;
3078 let units = cpus.div_ceil(au);
3080 raw_pinned[n] = units;
3081 }
3082
3083 let target_units = target.div_ceil(au);
3085 let pinned_units: usize = raw_pinned.iter().sum();
3086 let raw_unpinned = target_units.saturating_sub(pinned_units);
3087
3088 LayerDemand {
3089 raw_pinned,
3090 raw_unpinned,
3091 weight,
3092 spread,
3093 }
3094 })
3095 .collect()
3096 }
3097
3098 fn calc_target_nr_cpus(&self) -> Vec<(usize, usize)> {
3104 let nr_cpus = self.cpu_pool.topo.all_cpus.len();
3105 let utils = &self.sched_stats.layer_utils;
3106 let membws = &self.sched_stats.layer_membws;
3107
3108 let mut records: Vec<(u64, u64, u64, usize, usize, usize)> = vec![];
3109 let mut targets: Vec<(usize, usize)> = vec![];
3110
3111 for (idx, layer) in self.layers.iter().enumerate() {
3112 targets.push(match &layer.kind {
3113 LayerKind::Confined {
3114 util_range,
3115 cpus_range,
3116 cpus_range_frac,
3117 membw_gb,
3118 ..
3119 }
3120 | LayerKind::Grouped {
3121 util_range,
3122 cpus_range,
3123 cpus_range_frac,
3124 membw_gb,
3125 ..
3126 } => {
3127 let cpus_range =
3128 resolve_cpus_pct_range(cpus_range, cpus_range_frac, nr_cpus).unwrap();
3129
3130 let owned = utils[idx][LAYER_USAGE_OWNED];
3135 let open = utils[idx][LAYER_USAGE_OPEN];
3136
3137 let membw_owned = membws[idx][LAYER_USAGE_OWNED];
3138 let membw_open = membws[idx][LAYER_USAGE_OPEN];
3139
3140 let mut util = owned;
3141 let mut membw = membw_owned;
3142 if layer.kind.util_includes_open_cputime() || layer.nr_cpus == 0 {
3143 util += open;
3144 membw += membw_open;
3145 }
3146
3147 let util = if util < 0.01 { 0.0 } else { util };
3148 let low = (util / util_range.1).ceil() as usize;
3149 let high = ((util / util_range.0).floor() as usize).max(low);
3150
3151 let membw_limit = match membw_gb {
3152 Some(membw_limit) => *membw_limit,
3153 None => 0.0,
3154 };
3155
3156 trace!(
3157 "layer {0} (membw, membw_limit): ({membw} gi_b, {membw_limit} gi_b)",
3158 layer.name
3159 );
3160
3161 let target = layer.cpus.weight().clamp(low, high);
3162
3163 records.push((
3164 (owned * 100.0) as u64,
3165 (open * 100.0) as u64,
3166 (util * 100.0) as u64,
3167 low,
3168 high,
3169 target,
3170 ));
3171
3172 let target = target.clamp(cpus_range.0, cpus_range.1);
3173 let membw_target =
3174 self.clamp_target_by_membw(layer, membw_limit, membw, target as u64);
3175
3176 trace!("CPU target pre- and post-membw adjustment: {target} -> {membw_target}");
3177
3178 if membw_target < cpus_range.0 {
3181 warn!("cannot satisfy memory bw limit for layer {}", layer.name);
3182 warn!("membw_target {membw_target} low {}", cpus_range.0);
3183 };
3184
3185 let target = membw_target.clamp(cpus_range.0, target);
3188
3189 (target, cpus_range.0)
3190 }
3191 LayerKind::Open { .. } => (0, 0),
3192 });
3193 }
3194
3195 trace!("(owned, open, util, low, high, target): {:?}", &records);
3196 targets
3197 }
3198
3199 fn compute_target_llcs(target: usize, topo: &Topology) -> (usize, usize) {
3203 let cores_per_llc = topo.all_cores.len() / topo.all_llcs.len();
3205 let cpus_per_core = topo.all_cores.first_key_value().unwrap().1.cpus.len();
3207 let cpus_per_llc = cores_per_llc * cpus_per_core;
3208
3209 let full = target / cpus_per_llc;
3210 let extra = target % cpus_per_llc;
3211
3212 (full, extra.div_ceil(cpus_per_core))
3213 }
3214
3215 fn recompute_layer_core_order(
3220 &mut self,
3221 layer_targets: &[(usize, usize)],
3222 layer_allocs: &[LayerAlloc],
3223 au: usize,
3224 ) -> Result<bool> {
3225 let nr_nodes = self.topo.nodes.len();
3226
3227 debug!(
3229 " free: before pass: free_llcs={:?}",
3230 self.cpu_pool.free_llcs
3231 );
3232 for &(idx, _) in layer_targets.iter().rev() {
3233 let layer = &mut self.layers[idx];
3234
3235 if layer.growth_algo != LayerGrowthAlgo::StickyDynamic {
3236 continue;
3237 }
3238
3239 let alloc = &layer_allocs[idx];
3240
3241 for n in 0..nr_nodes {
3242 let assigned_on_n = layer.assigned_llcs[n].len();
3243 let target_full_n =
3244 Self::compute_target_llcs(alloc.node_target(n) * au, &self.topo).0;
3245 let mut to_free = assigned_on_n.saturating_sub(target_full_n);
3246
3247 debug!(
3248 " free: layer={} node={} assigned={} target_full={} to_free={}",
3249 layer.name, n, assigned_on_n, target_full_n, to_free,
3250 );
3251
3252 while to_free > 0 {
3253 if let Some(llc) = layer.assigned_llcs[n].pop() {
3254 self.cpu_pool.return_llc(llc);
3255 to_free -= 1;
3256 debug!(" layer={} freed_llc={} from node={}", layer.name, llc, n);
3257 } else {
3258 break;
3259 }
3260 }
3261 }
3262 }
3263 debug!(" free: after pass: free_llcs={:?}", self.cpu_pool.free_llcs);
3264
3265 for &(idx, _) in layer_targets.iter().rev() {
3267 let layer = &mut self.layers[idx];
3268
3269 if layer.growth_algo != LayerGrowthAlgo::StickyDynamic {
3270 continue;
3271 }
3272
3273 let alloc = &layer_allocs[idx];
3274
3275 for n in 0..nr_nodes {
3276 let cur_on_n = layer.assigned_llcs[n].len();
3277 let target_full_n =
3278 Self::compute_target_llcs(alloc.node_target(n) * au, &self.topo).0;
3279 let mut to_alloc = target_full_n.saturating_sub(cur_on_n);
3280
3281 debug!(
3282 " alloc: layer={} node={} cur={} target_full={} to_alloc={} free={}",
3283 layer.name,
3284 n,
3285 cur_on_n,
3286 target_full_n,
3287 to_alloc,
3288 self.cpu_pool.free_llcs.get(&n).map_or(0, |v| v.len()),
3289 );
3290
3291 while to_alloc > 0 {
3292 if let Some(llc) = self.cpu_pool.take_llc_from_node(n) {
3293 layer.assigned_llcs[n].push(llc);
3294 to_alloc -= 1;
3295 debug!(" layer={} alloc_llc={} on node={}", layer.name, llc, n);
3296 } else {
3297 break;
3298 }
3299 }
3300 }
3301
3302 debug!(
3303 " alloc: layer={} assigned_llcs={:?}",
3304 layer.name, layer.assigned_llcs
3305 );
3306 }
3307
3308 let cores_per_llc = self.topo.all_cores.len() / self.topo.all_llcs.len();
3310 let cpus_per_core = self.topo.all_cores.first_key_value().unwrap().1.cpus.len();
3311 let cpus_per_llc = cores_per_llc * cpus_per_core;
3312
3313 for &(idx, _) in layer_targets.iter() {
3314 let layer = &mut self.layers[idx];
3315
3316 if layer.growth_algo != LayerGrowthAlgo::StickyDynamic {
3317 continue;
3318 }
3319
3320 layer.core_order = vec![Vec::new(); nr_nodes];
3321 let alloc = &layer_allocs[idx];
3322
3323 for n in 0..nr_nodes {
3324 let mut extra = Self::compute_target_llcs(alloc.node_target(n) * au, &self.topo).1;
3325
3326 if let Some(node_llcs) = self.cpu_pool.free_llcs.get_mut(&n) {
3327 for entry in node_llcs.iter_mut() {
3328 if extra == 0 {
3329 break;
3330 }
3331 let avail = cpus_per_llc - entry.1;
3332 let mut used = extra.min(avail);
3333 let cores_to_add = used;
3334
3335 let shift = entry.1;
3336 entry.1 += used;
3337
3338 let llc_id = entry.0;
3339 let llc = self.topo.all_llcs.get(&llc_id).unwrap();
3340
3341 for core in llc.cores.iter().skip(shift) {
3342 if used == 0 {
3343 break;
3344 }
3345 layer.core_order[n].push(core.1.id);
3346 used -= 1;
3347 }
3348
3349 extra -= cores_to_add;
3350 }
3351 }
3352 }
3353
3354 for node_cores in &mut layer.core_order {
3355 node_cores.reverse();
3356 }
3357 }
3358
3359 for node_llcs in self.cpu_pool.free_llcs.values_mut() {
3361 for entry in node_llcs.iter_mut() {
3362 entry.1 = 0;
3363 }
3364 }
3365
3366 for &(idx, _) in layer_targets.iter() {
3368 let layer = &mut self.layers[idx];
3369
3370 if layer.growth_algo != LayerGrowthAlgo::StickyDynamic {
3371 continue;
3372 }
3373
3374 let all_assigned: HashSet<usize> =
3375 layer.assigned_llcs.iter().flatten().copied().collect();
3376
3377 for core in self.topo.all_cores.iter() {
3378 let llc_id = core.1.llc_id;
3379 if all_assigned.contains(&llc_id) {
3380 let nid = core.1.node_id;
3381 layer.core_order[nid].push(core.1.id);
3382 }
3383 }
3384 for node_cores in &mut layer.core_order {
3385 node_cores.reverse();
3386 }
3387
3388 debug!(
3389 " alloc: layer={} core_order={:?}",
3390 layer.name, layer.core_order
3391 );
3392 }
3393
3394 let mut updated = false;
3397
3398 for &(idx, _) in layer_targets.iter() {
3400 let layer = &mut self.layers[idx];
3401
3402 if layer.growth_algo != LayerGrowthAlgo::StickyDynamic {
3403 continue;
3404 }
3405
3406 for n in 0..nr_nodes {
3407 let mut node_target = Cpumask::new();
3408 for &core_id in &layer.core_order[n] {
3409 if let Some(core) = self.topo.all_cores.get(&core_id) {
3410 node_target |= &core.span;
3411 }
3412 }
3413 node_target &= &layer.allowed_cpus;
3414
3415 let node_span = &self.topo.nodes[&n].span;
3416 let node_cur = layer.cpus.and(node_span);
3417 let cpus_to_free = node_cur.and(&node_target.not());
3418
3419 if cpus_to_free.weight() > 0 {
3420 debug!(
3421 " apply: layer={} freeing CPUs on node {}: {}",
3422 layer.name, n, cpus_to_free
3423 );
3424 layer.cpus &= &cpus_to_free.not();
3425 layer.nr_cpus -= cpus_to_free.weight();
3426 for cpu in cpus_to_free.iter() {
3427 layer.nr_llc_cpus[self.cpu_pool.topo.all_cpus[&cpu].llc_id] -= 1;
3428 layer.nr_node_cpus[n] -= 1;
3429 }
3430 self.cpu_pool.free(&cpus_to_free)?;
3431 updated = true;
3432 }
3433 }
3434 }
3435
3436 for &(idx, _) in layer_targets.iter() {
3438 let layer = &mut self.layers[idx];
3439
3440 if layer.growth_algo != LayerGrowthAlgo::StickyDynamic {
3441 continue;
3442 }
3443
3444 for n in 0..nr_nodes {
3445 let mut node_target = Cpumask::new();
3446 for &core_id in &layer.core_order[n] {
3447 if let Some(core) = self.topo.all_cores.get(&core_id) {
3448 node_target |= &core.span;
3449 }
3450 }
3451 node_target &= &layer.allowed_cpus;
3452
3453 let available_cpus = self.cpu_pool.available_cpus();
3454 let desired_to_alloc = node_target.and(&layer.cpus.clone().not());
3455 let cpus_to_alloc = desired_to_alloc.clone().and(&available_cpus);
3456
3457 if desired_to_alloc.weight() > cpus_to_alloc.weight() {
3458 debug!(
3459 " apply: layer={} node {} wanted to alloc {} CPUs but only {} available",
3460 layer.name,
3461 n,
3462 desired_to_alloc.weight(),
3463 cpus_to_alloc.weight()
3464 );
3465 }
3466
3467 if cpus_to_alloc.weight() > 0 {
3468 debug!(
3469 " apply: layer={} allocating CPUs on node {}: {}",
3470 layer.name, n, cpus_to_alloc
3471 );
3472 layer.cpus |= &cpus_to_alloc;
3473 layer.nr_cpus += cpus_to_alloc.weight();
3474 for cpu in cpus_to_alloc.iter() {
3475 layer.nr_llc_cpus[self.cpu_pool.topo.all_cpus[&cpu].llc_id] += 1;
3476 layer.nr_node_cpus[n] += 1;
3477 }
3478 self.cpu_pool.mark_allocated(&cpus_to_alloc)?;
3479 updated = true;
3480 }
3481 }
3482
3483 debug!(
3484 " apply: layer={} final cpus.weight()={} nr_cpus={}",
3485 layer.name,
3486 layer.cpus.weight(),
3487 layer.nr_cpus
3488 );
3489 }
3490
3491 Ok(updated)
3492 }
3493
3494 fn refresh_node_ctx(
3495 skel: &mut BpfSkel,
3496 topo: &Topology,
3497 node_empty_layers: &[Vec<u32>],
3498 init: bool,
3499 ) {
3500 for &nid in topo.nodes.keys() {
3501 let mut arg: bpf_intf::refresh_node_ctx_arg =
3502 unsafe { MaybeUninit::zeroed().assume_init() };
3503 arg.node_id = nid as u32;
3504 arg.init = init as u32;
3505
3506 let empty = &node_empty_layers[nid];
3507 arg.nr_empty_layer_ids = empty.len() as u32;
3508 for (i, &lid) in empty.iter().enumerate() {
3509 arg.empty_layer_ids[i] = lid;
3510 }
3511 for i in empty.len()..MAX_LAYERS {
3512 arg.empty_layer_ids[i] = MAX_LAYERS as u32;
3513 }
3514
3515 if init {
3516 let node = &topo.nodes[&nid];
3518 let llcs: Vec<u32> = node.llcs.keys().map(|&id| id as u32).collect();
3519 arg.nr_llcs = llcs.len() as u32;
3520 for (i, &llc_id) in llcs.iter().enumerate() {
3521 arg.llcs[i] = llc_id;
3522 }
3523 }
3524
3525 let input = ProgramInput {
3526 context_in: Some(unsafe { plain::as_mut_bytes(&mut arg) }),
3527 ..Default::default()
3528 };
3529 let _ = skel.progs.refresh_node_ctx.test_run(input);
3530 }
3531 }
3532
3533 fn refresh_cpumasks(&mut self) -> Result<()> {
3534 let layer_is_open = |layer: &Layer| matches!(layer.kind, LayerKind::Open { .. });
3535
3536 let mut updated = false;
3537 let raw_targets = self.calc_target_nr_cpus();
3538 let au = self.cpu_pool.alloc_unit();
3539 let total_cpus = self.cpu_pool.topo.all_cpus.len();
3540
3541 let targets: Vec<(usize, usize)> = raw_targets
3546 .iter()
3547 .enumerate()
3548 .map(|(idx, &(target, min))| {
3549 let cur = self.layers[idx].nr_cpus;
3550 if target < cur {
3551 let dampened = cur - (cur - target).div_ceil(2);
3552 (dampened.max(min), min)
3553 } else {
3554 (target, min)
3555 }
3556 })
3557 .collect();
3558
3559 let demands = self.calc_raw_demands(&targets);
3561 let nr_nodes = self.topo.nodes.len();
3562 let node_caps: Vec<usize> = self
3563 .topo
3564 .nodes
3565 .values()
3566 .map(|n| n.span.weight() / au)
3567 .collect();
3568 let all_layer_nodes: Vec<&[usize]> = self
3569 .layer_specs
3570 .iter()
3571 .map(|s| s.nodes().as_slice())
3572 .collect();
3573 let norders: Vec<Vec<usize>> = (0..self.layers.len())
3574 .map(|idx| {
3575 layer_core_growth::node_order(
3576 self.layer_specs[idx].nodes(),
3577 &self.topo,
3578 idx,
3579 &all_layer_nodes,
3580 )
3581 })
3582 .collect();
3583 let cur_node_cpus: Vec<Vec<usize>> = self
3584 .layers
3585 .iter()
3586 .map(|layer| (0..nr_nodes).map(|n| layer.nr_node_cpus[n] / au).collect())
3587 .collect();
3588 let layer_allocs = unified_alloc(
3589 total_cpus / au,
3590 &node_caps,
3591 &demands,
3592 &cur_node_cpus,
3593 &norders,
3594 );
3595
3596 let cpu_targets: Vec<usize> = layer_allocs.iter().map(|a| a.total() * au).collect();
3599
3600 let prev_nr_cpus: Vec<usize> = self.layers.iter().map(|l| l.nr_cpus).collect();
3602
3603 let mut ascending: Vec<(usize, usize)> = cpu_targets.iter().copied().enumerate().collect();
3604 ascending.sort_by(|a, b| a.1.cmp(&b.1));
3605
3606 let prev_node_cpus: Vec<Vec<usize>> =
3608 self.layers.iter().map(|l| l.nr_node_cpus.clone()).collect();
3609
3610 let use_sd_alloc = self.topo.all_llcs.len() > 1;
3614 let sticky_dynamic_updated = if use_sd_alloc {
3615 self.recompute_layer_core_order(&ascending, &layer_allocs, au)?
3616 } else {
3617 false
3618 };
3619 updated |= sticky_dynamic_updated;
3620
3621 if sticky_dynamic_updated {
3623 for (idx, layer) in self.layers.iter().enumerate() {
3624 if layer.growth_algo == LayerGrowthAlgo::StickyDynamic {
3625 Self::update_bpf_layer_cpumask(
3626 layer,
3627 &mut self.skel.maps.bss_data.as_mut().unwrap().layers[idx],
3628 );
3629 }
3630 }
3631 }
3632
3633 for &(idx, _target) in ascending.iter().rev() {
3635 let layer = &mut self.layers[idx];
3636 if layer_is_open(layer) {
3637 continue;
3638 }
3639
3640 if layer.growth_algo == LayerGrowthAlgo::StickyDynamic && use_sd_alloc {
3642 continue;
3643 }
3644
3645 let alloc = &layer_allocs[idx];
3646 let mut freed = false;
3647
3648 for n in 0..nr_nodes {
3649 let desired = alloc.node_target(n) * au;
3650 let mut to_free = layer.nr_node_cpus[n].saturating_sub(desired);
3651 let node_span = &self.topo.nodes[&n].span;
3652
3653 while to_free > 0 {
3654 let node_cands = layer.cpus.and(node_span);
3655 let cpus_to_free = match self
3656 .cpu_pool
3657 .next_to_free(&node_cands, layer.core_order[n].iter().rev())?
3658 {
3659 Some(ret) => ret,
3660 None => break,
3661 };
3662 let nr = cpus_to_free.weight();
3663 trace!(
3664 "[{}] freeing CPUs on node {}: {}",
3665 layer.name,
3666 n,
3667 &cpus_to_free
3668 );
3669 layer.cpus &= &cpus_to_free.not();
3670 layer.nr_cpus -= nr;
3671 for cpu in cpus_to_free.iter() {
3672 let node_id = self.cpu_pool.topo.all_cpus[&cpu].node_id;
3673 layer.nr_llc_cpus[self.cpu_pool.topo.all_cpus[&cpu].llc_id] -= 1;
3674 layer.nr_node_cpus[node_id] -= 1;
3675 layer.nr_pinned_cpus[node_id] =
3676 layer.nr_pinned_cpus[node_id].min(layer.nr_node_cpus[node_id]);
3677 }
3678 self.cpu_pool.free(&cpus_to_free)?;
3679 to_free = to_free.saturating_sub(nr);
3680 freed = true;
3681 }
3682 }
3683
3684 if freed {
3685 Self::update_bpf_layer_cpumask(
3686 layer,
3687 &mut self.skel.maps.bss_data.as_mut().unwrap().layers[idx],
3688 );
3689 updated = true;
3690 }
3691 }
3692
3693 for &(idx, _target) in &ascending {
3695 let layer = &mut self.layers[idx];
3696
3697 if layer_is_open(layer) {
3698 continue;
3699 }
3700
3701 if layer.growth_algo == LayerGrowthAlgo::StickyDynamic && use_sd_alloc {
3703 continue;
3704 }
3705
3706 let alloc = &layer_allocs[idx];
3707 let norder = &norders[idx];
3708 let mut alloced = false;
3709
3710 for &node_id in norder.iter() {
3711 let node_target = alloc.node_target(node_id) * au;
3712 let cur_node = layer.nr_node_cpus[node_id];
3713 if node_target <= cur_node {
3714 continue;
3715 }
3716 let pinned_target = alloc.pinned[node_id] * au;
3717 let mut nr_to_alloc = node_target - cur_node;
3718 let node_span = &self.topo.nodes[&node_id].span;
3719 let node_allowed = layer.allowed_cpus.and(node_span);
3720
3721 while nr_to_alloc > 0 {
3722 let nr_alloced = match self.cpu_pool.alloc_cpus(
3723 &node_allowed,
3724 &layer.core_order[node_id],
3725 nr_to_alloc,
3726 ) {
3727 Some(new_cpus) => {
3728 let nr = new_cpus.weight();
3729 layer.cpus |= &new_cpus;
3730 layer.nr_cpus += nr;
3731 for cpu in new_cpus.iter() {
3732 layer.nr_llc_cpus[self.cpu_pool.topo.all_cpus[&cpu].llc_id] += 1;
3733 let nid = self.cpu_pool.topo.all_cpus[&cpu].node_id;
3734 layer.nr_node_cpus[nid] += 1;
3735 if layer.nr_pinned_cpus[nid] < pinned_target {
3736 layer.nr_pinned_cpus[nid] += 1;
3737 }
3738 }
3739 nr
3740 }
3741 None => 0,
3742 };
3743 if nr_alloced == 0 {
3744 break;
3745 }
3746 alloced = true;
3747 nr_to_alloc -= nr_alloced.min(nr_to_alloc);
3748 }
3749 }
3750
3751 if alloced {
3752 Self::update_bpf_layer_cpumask(
3753 layer,
3754 &mut self.skel.maps.bss_data.as_mut().unwrap().layers[idx],
3755 );
3756 updated = true;
3757 }
3758 }
3759
3760 let node_utils = &self.sched_stats.layer_node_utils;
3765 let pinned_utils = &self.sched_stats.layer_node_pinned_utils;
3766 for (idx, layer) in self.layers.iter().enumerate() {
3767 self.growth_denied[idx].fill(false);
3768 let util_high = match layer.kind.util_range() {
3769 Some((_, high)) => high,
3770 None => continue,
3771 };
3772 for n in 0..nr_nodes {
3773 let unpinned_util = (node_utils[idx][n] - pinned_utils[idx][n]).max(0.0);
3774 let unpinned_cpus_needed = unpinned_util / util_high;
3775 let unpinned_cpus_have =
3776 layer.nr_node_cpus[n].saturating_sub(layer.nr_pinned_cpus[n]) as f64;
3777 let wanted = unpinned_cpus_needed > unpinned_cpus_have;
3778 let got = layer.nr_node_cpus[n] > prev_node_cpus[idx][n];
3779 if wanted && !got {
3780 self.growth_denied[idx][n] = true;
3781 }
3782 }
3783 }
3784
3785 if updated {
3787 for (idx, layer) in self.layers.iter().enumerate() {
3788 if layer_is_open(layer) {
3789 continue;
3790 }
3791 let prev = prev_nr_cpus[idx];
3792 let cur = layer.nr_cpus;
3793 if prev != cur {
3794 debug!(
3795 "ALLOC {} algo={:?} cpus:{}→{} mask={:x}",
3796 layer.name, layer.growth_algo, prev, cur, layer.cpus,
3797 );
3798 }
3799 }
3800 debug!(
3801 "ALLOC pool_available={}",
3802 self.cpu_pool.available_cpus().weight()
3803 );
3804 }
3805
3806 if updated {
3808 let nr_nodes = self.topo.nodes.len();
3809 for (idx, layer) in self.layers.iter().enumerate() {
3810 if layer_is_open(layer) {
3811 continue;
3812 }
3813 let prev = &prev_node_cpus[idx];
3814 let cur = &layer.nr_node_cpus;
3815 if prev == cur {
3816 continue;
3817 }
3818 let per_node: String = (0..nr_nodes)
3819 .map(|n| format!("n{}:{}→{}", n, prev[n], cur[n]))
3820 .collect::<Vec<_>>()
3821 .join(" ");
3822 let prev_total: usize = prev.iter().sum();
3823 let cur_total: usize = cur[..nr_nodes].iter().sum();
3824 let target: String = (0..nr_nodes)
3825 .map(|n| format!("n{}:{}", n, layer_allocs[idx].node_target(n) * au))
3826 .collect::<Vec<_>>()
3827 .join(" ");
3828 debug!(
3829 "ALLOC {} algo={:?} {} total:{}→{} target:[{}] mask={:x}",
3830 layer.name,
3831 layer.growth_algo,
3832 per_node,
3833 prev_total,
3834 cur_total,
3835 target,
3836 layer.cpus,
3837 );
3838 }
3839 debug!(
3840 "ALLOC pool_available={}",
3841 self.cpu_pool.available_cpus().weight()
3842 );
3843 }
3844
3845 if updated {
3847 for (idx, layer) in self.layers.iter_mut().enumerate() {
3848 if !layer_is_open(layer) {
3849 continue;
3850 }
3851
3852 let bpf_layer = &mut self.skel.maps.bss_data.as_mut().unwrap().layers[idx];
3853 let available_cpus = self.cpu_pool.available_cpus().and(&layer.allowed_cpus);
3854 let nr_available_cpus = available_cpus.weight();
3855
3856 layer.cpus = available_cpus;
3860 layer.nr_cpus = nr_available_cpus;
3861 for llc in self.cpu_pool.topo.all_llcs.values() {
3862 layer.nr_llc_cpus[llc.id] = layer.cpus.and(&llc.span).weight();
3863 }
3864 for node in self.cpu_pool.topo.nodes.values() {
3865 layer.nr_node_cpus[node.id] = layer.cpus.and(&node.span).weight();
3866 layer.nr_pinned_cpus[node.id] = 0;
3867 }
3868 Self::update_bpf_layer_cpumask(layer, bpf_layer);
3869 }
3870
3871 for (&node_id, &cpu) in &self.cpu_pool.fallback_cpus {
3872 self.skel.maps.bss_data.as_mut().unwrap().fallback_cpus[node_id] = cpu as u32;
3873 }
3874
3875 for (lidx, layer) in self.layers.iter().enumerate() {
3876 self.nr_layer_cpus_ranges[lidx] = (
3877 self.nr_layer_cpus_ranges[lidx].0.min(layer.nr_cpus),
3878 self.nr_layer_cpus_ranges[lidx].1.max(layer.nr_cpus),
3879 );
3880 }
3881
3882 let input = ProgramInput {
3884 ..Default::default()
3885 };
3886 let prog = &mut self.skel.progs.refresh_layer_cpumasks;
3887 let _ = prog.test_run(input);
3888
3889 let nr_nodes = self.topo.nodes.len();
3891 let node_empty_layers: Vec<Vec<u32>> = (0..nr_nodes)
3892 .map(|nid| {
3893 self.layers
3894 .iter()
3895 .enumerate()
3896 .filter(|(_lidx, layer)| layer.nr_node_cpus[nid] == 0)
3897 .map(|(lidx, _)| lidx as u32)
3898 .collect()
3899 })
3900 .collect();
3901 Self::refresh_node_ctx(&mut self.skel, &self.topo, &node_empty_layers, false);
3902 }
3903
3904 if let Err(e) = self.update_netdev_cpumasks() {
3905 warn!("Failed to update netdev IRQ cpumasks: {:#}", e);
3906 }
3907 Ok(())
3908 }
3909
3910 fn refresh_idle_qos(&mut self) -> Result<()> {
3911 if !self.idle_qos_enabled {
3912 return Ok(());
3913 }
3914
3915 let mut cpu_idle_qos = vec![0; *NR_CPU_IDS];
3916 for layer in self.layers.iter() {
3917 let idle_resume_us = layer.kind.common().idle_resume_us.unwrap_or(0) as i32;
3918 for cpu in layer.cpus.iter() {
3919 cpu_idle_qos[cpu] = idle_resume_us;
3920 }
3921 }
3922
3923 for (cpu, idle_resume_usec) in cpu_idle_qos.iter().enumerate() {
3924 update_cpu_idle_resume_latency(cpu, *idle_resume_usec)?;
3925 }
3926
3927 Ok(())
3928 }
3929
3930 fn refresh_xnuma(&mut self) {
3931 let nr_nodes = self.topo.nodes.len();
3932 if nr_nodes <= 1 {
3933 return;
3934 }
3935
3936 let duty_sums = &self.sched_stats.layer_node_duty_sums;
3937
3938 for (layer_idx, spec) in self.layer_specs.iter().enumerate() {
3939 let common = spec.kind.common();
3940 let threshold = common.xnuma_threshold;
3941 let threshold_delta = common.xnuma_threshold_delta;
3942 let bpf_layer = &mut self.skel.maps.bss_data.as_mut().unwrap().layers[layer_idx];
3943
3944 if threshold.0 <= 0.0 && threshold.1 <= 0.0 {
3945 for src in 0..nr_nodes {
3947 bpf_layer.node[src].xnuma_is_mig_src.write(true);
3948 for dst in 0..nr_nodes {
3949 bpf_layer.node[src].xnuma[dst].rate = u64::MAX;
3950 }
3951 }
3952 self.xnuma_mig_src[layer_idx].fill(false);
3953 continue;
3954 }
3955
3956 let layer = &self.layers[layer_idx];
3957 let is_mig_src = xnuma_check_active(
3958 &duty_sums[layer_idx],
3959 &layer.nr_node_cpus,
3960 threshold,
3961 threshold_delta,
3962 &self.growth_denied[layer_idx],
3963 &self.xnuma_mig_src[layer_idx],
3964 );
3965
3966 self.xnuma_mig_src[layer_idx] = is_mig_src.clone();
3967
3968 let result = xnuma_compute_rates(&duty_sums[layer_idx], &layer.nr_node_cpus);
3969
3970 for src in 0..nr_nodes {
3973 for dst in 0..nr_nodes {
3974 bpf_layer.node[src].xnuma[dst].rate = result.rates[src][dst];
3975 }
3976 }
3977 for (nid, is_src) in is_mig_src.iter().enumerate().take(nr_nodes) {
3978 bpf_layer.node[nid].xnuma_is_mig_src.write(*is_src);
3979 }
3980 }
3981 }
3982
3983 fn step(&mut self) -> Result<()> {
3984 let started_at = Instant::now();
3985 self.sched_stats.refresh(
3986 &mut self.skel,
3987 &self.proc_reader,
3988 started_at,
3989 self.processing_dur,
3990 &self.gpu_task_handler,
3991 )?;
3992
3993 self.skel
3995 .maps
3996 .bss_data
3997 .as_mut()
3998 .unwrap()
3999 .system_cpu_util_ewma = (self.sched_stats.system_cpu_util_ewma * 10000.0) as u64;
4000
4001 for layer_id in 0..self.sched_stats.nr_layers {
4002 self.skel
4003 .maps
4004 .bss_data
4005 .as_mut()
4006 .unwrap()
4007 .layer_dsq_insert_ewma[layer_id] =
4008 (self.sched_stats.layer_dsq_insert_ewma[layer_id] * 10000.0) as u64;
4009 }
4010
4011 self.refresh_cpumasks()?;
4012 self.refresh_xnuma();
4013 self.refresh_idle_qos()?;
4014 self.gpu_task_handler.maybe_affinitize();
4015 self.processing_dur += Instant::now().duration_since(started_at);
4016 Ok(())
4017 }
4018
4019 fn generate_sys_stats(
4020 &mut self,
4021 stats: &Stats,
4022 cpus_ranges: &mut [(usize, usize)],
4023 ) -> Result<SysStats> {
4024 let bstats = &stats.bpf_stats;
4025 let mut sys_stats = SysStats::new(stats, bstats, &self.cpu_pool.fallback_cpus)?;
4026
4027 for (lidx, (spec, layer)) in self.layer_specs.iter().zip(self.layers.iter()).enumerate() {
4028 let layer_stats = LayerStats::new(
4029 lidx,
4030 layer,
4031 stats,
4032 bstats,
4033 cpus_ranges[lidx],
4034 self.xnuma_mig_src[lidx].iter().any(|&a| a),
4035 );
4036 sys_stats.layers.insert(spec.name.to_string(), layer_stats);
4037 cpus_ranges[lidx] = (layer.nr_cpus, layer.nr_cpus);
4038 }
4039
4040 Ok(sys_stats)
4041 }
4042
4043 fn process_cgroup_creation(
4045 path: &Path,
4046 cgroup_regexes: &HashMap<u32, Regex>,
4047 cgroup_path_to_id: &mut HashMap<String, u64>,
4048 sender: &crossbeam::channel::Sender<CgroupEvent>,
4049 ) {
4050 let path_str = path.to_string_lossy().to_string();
4051
4052 let cgroup_id = std::fs::metadata(path)
4054 .map(|metadata| {
4055 use std::os::unix::fs::MetadataExt;
4056 metadata.ino()
4057 })
4058 .unwrap_or(0);
4059
4060 let mut match_bitmap = 0u64;
4062 for (rule_id, regex) in cgroup_regexes {
4063 if regex.is_match(&path_str) {
4064 match_bitmap |= 1u64 << rule_id;
4065 }
4066 }
4067
4068 cgroup_path_to_id.insert(path_str.clone(), cgroup_id);
4070
4071 if let Err(e) = sender.send(CgroupEvent::Created {
4073 path: path_str,
4074 cgroup_id,
4075 match_bitmap,
4076 }) {
4077 error!("Failed to send cgroup creation event: {}", e);
4078 }
4079 }
4080
4081 fn start_cgroup_watcher(
4082 shutdown: Arc<AtomicBool>,
4083 cgroup_regexes: HashMap<u32, Regex>,
4084 ) -> Result<Receiver<CgroupEvent>> {
4085 let mut inotify = Inotify::init().context("Failed to initialize inotify")?;
4086 let mut wd_to_path = HashMap::new();
4087
4088 let (sender, receiver) = crossbeam::channel::bounded::<CgroupEvent>(1024);
4090
4091 let root_wd = inotify
4093 .watches()
4094 .add("/sys/fs/cgroup", WatchMask::CREATE | WatchMask::DELETE)
4095 .context("Failed to add watch for /sys/fs/cgroup")?;
4096 wd_to_path.insert(root_wd, PathBuf::from("/sys/fs/cgroup"));
4097
4098 Self::add_recursive_watches(&mut inotify, &mut wd_to_path, Path::new("/sys/fs/cgroup"))?;
4100
4101 std::thread::spawn(move || {
4103 let mut buffer = [0; 4096];
4104 let inotify_fd = inotify.as_raw_fd();
4105 let mut cgroup_path_to_id = HashMap::<String, u64>::new();
4107
4108 for entry in WalkDir::new("/sys/fs/cgroup")
4110 .into_iter()
4111 .filter_map(|e| e.ok())
4112 .filter(|e| e.file_type().is_dir())
4113 {
4114 let path = entry.path();
4115 Self::process_cgroup_creation(
4116 path,
4117 &cgroup_regexes,
4118 &mut cgroup_path_to_id,
4119 &sender,
4120 );
4121 }
4122
4123 while !shutdown.load(Ordering::Relaxed) {
4124 let ready = unsafe {
4126 let mut read_fds: libc::fd_set = std::mem::zeroed();
4127 libc::FD_ZERO(&mut read_fds);
4128 libc::FD_SET(inotify_fd, &mut read_fds);
4129
4130 let mut timeout = libc::timeval {
4131 tv_sec: 0,
4132 tv_usec: 100_000, };
4134
4135 libc::select(
4136 inotify_fd + 1,
4137 &mut read_fds,
4138 std::ptr::null_mut(),
4139 std::ptr::null_mut(),
4140 &mut timeout,
4141 )
4142 };
4143
4144 if ready <= 0 {
4145 continue;
4147 }
4148
4149 let events = match inotify.read_events(&mut buffer) {
4151 Ok(events) => events,
4152 Err(e) => {
4153 error!("Error reading inotify events: {}", e);
4154 break;
4155 }
4156 };
4157
4158 for event in events {
4159 if !event.mask.contains(inotify::EventMask::CREATE)
4160 && !event.mask.contains(inotify::EventMask::DELETE)
4161 {
4162 continue;
4163 }
4164
4165 let name = match event.name {
4166 Some(name) => name,
4167 None => continue,
4168 };
4169
4170 let parent_path = match wd_to_path.get(&event.wd) {
4171 Some(parent) => parent,
4172 None => {
4173 warn!("Unknown watch descriptor: {:?}", event.wd);
4174 continue;
4175 }
4176 };
4177
4178 let path = parent_path.join(name.to_string_lossy().as_ref());
4179
4180 if event.mask.contains(inotify::EventMask::CREATE) {
4181 if !path.is_dir() {
4182 continue;
4183 }
4184
4185 Self::process_cgroup_creation(
4186 &path,
4187 &cgroup_regexes,
4188 &mut cgroup_path_to_id,
4189 &sender,
4190 );
4191
4192 match inotify
4194 .watches()
4195 .add(&path, WatchMask::CREATE | WatchMask::DELETE)
4196 {
4197 Ok(wd) => {
4198 wd_to_path.insert(wd, path.clone());
4199 }
4200 Err(e) => {
4201 warn!(
4202 "Failed to add watch for new cgroup {}: {}",
4203 path.display(),
4204 e
4205 );
4206 }
4207 }
4208 } else if event.mask.contains(inotify::EventMask::DELETE) {
4209 let path_str = path.to_string_lossy().to_string();
4210
4211 let cgroup_id = cgroup_path_to_id.remove(&path_str).unwrap_or(0);
4213
4214 if let Err(e) = sender.send(CgroupEvent::Removed {
4216 path: path_str,
4217 cgroup_id,
4218 }) {
4219 error!("Failed to send cgroup removal event: {}", e);
4220 }
4221
4222 let wd_to_remove = wd_to_path.iter().find_map(|(wd, watched_path)| {
4224 if watched_path == &path {
4225 Some(wd.clone())
4226 } else {
4227 None
4228 }
4229 });
4230 if let Some(wd) = wd_to_remove {
4231 wd_to_path.remove(&wd);
4232 }
4233 }
4234 }
4235 }
4236 });
4237
4238 Ok(receiver)
4239 }
4240
4241 fn add_recursive_watches(
4242 inotify: &mut Inotify,
4243 wd_to_path: &mut HashMap<inotify::WatchDescriptor, PathBuf>,
4244 path: &Path,
4245 ) -> Result<()> {
4246 for entry in WalkDir::new(path)
4247 .into_iter()
4248 .filter_map(|e| e.ok())
4249 .filter(|e| e.file_type().is_dir())
4250 .skip(1)
4251 {
4252 let entry_path = entry.path();
4253 match inotify
4255 .watches()
4256 .add(entry_path, WatchMask::CREATE | WatchMask::DELETE)
4257 {
4258 Ok(wd) => {
4259 wd_to_path.insert(wd, entry_path.to_path_buf());
4260 }
4261 Err(e) => {
4262 debug!("Failed to add watch for {}: {}", entry_path.display(), e);
4263 }
4264 }
4265 }
4266 Ok(())
4267 }
4268
4269 fn run(&mut self, shutdown: Arc<AtomicBool>) -> Result<UserExitInfo> {
4270 let (res_ch, req_ch) = self.stats_server.channels();
4271 let mut next_sched_at = Instant::now() + self.sched_intv;
4272 let enable_layer_refresh = !self.layer_refresh_intv.is_zero();
4273 let mut next_layer_refresh_at = Instant::now() + self.layer_refresh_intv;
4274 let mut cpus_ranges = HashMap::<ThreadId, Vec<(usize, usize)>>::new();
4275
4276 let cgroup_regexes = self.cgroup_regexes.take().unwrap();
4278 let cgroup_event_rx = if !cgroup_regexes.is_empty() {
4279 Some(Self::start_cgroup_watcher(
4280 shutdown.clone(),
4281 cgroup_regexes,
4282 )?)
4283 } else {
4284 None
4285 };
4286
4287 while !shutdown.load(Ordering::Relaxed) && !uei_exited!(&self.skel, uei) {
4288 let now = Instant::now();
4289
4290 if now >= next_sched_at {
4291 self.step()?;
4292 while next_sched_at < now {
4293 next_sched_at += self.sched_intv;
4294 }
4295 }
4296
4297 if enable_layer_refresh && now >= next_layer_refresh_at {
4298 self.skel
4299 .maps
4300 .bss_data
4301 .as_mut()
4302 .unwrap()
4303 .layer_refresh_seq_avgruntime += 1;
4304 while next_layer_refresh_at < now {
4305 next_layer_refresh_at += self.layer_refresh_intv;
4306 }
4307 }
4308
4309 let timeout_duration = next_sched_at.saturating_duration_since(Instant::now());
4311 let never_rx = crossbeam::channel::never();
4312 let cgroup_rx = cgroup_event_rx.as_ref().unwrap_or(&never_rx);
4313
4314 select! {
4315 recv(req_ch) -> msg => match msg {
4316 Ok(StatsReq::Hello(tid)) => {
4317 cpus_ranges.insert(
4318 tid,
4319 self.layers.iter().map(|l| (l.nr_cpus, l.nr_cpus)).collect(),
4320 );
4321 let stats =
4322 Stats::new(&mut self.skel, &self.proc_reader, self.topo.clone(), &self.gpu_task_handler)?;
4323 res_ch.send(StatsRes::Hello(Box::new(stats)))?;
4324 }
4325 Ok(StatsReq::Refresh(tid, mut stats)) => {
4326 for i in 0..self.nr_layer_cpus_ranges.len() {
4328 for (_, ranges) in cpus_ranges.iter_mut() {
4329 ranges[i] = (
4330 ranges[i].0.min(self.nr_layer_cpus_ranges[i].0),
4331 ranges[i].1.max(self.nr_layer_cpus_ranges[i].1),
4332 );
4333 }
4334 self.nr_layer_cpus_ranges[i] =
4335 (self.layers[i].nr_cpus, self.layers[i].nr_cpus);
4336 }
4337
4338 stats.refresh(
4339 &mut self.skel,
4340 &self.proc_reader,
4341 now,
4342 self.processing_dur,
4343 &self.gpu_task_handler,
4344 )?;
4345 let sys_stats =
4346 self.generate_sys_stats(&stats, cpus_ranges.get_mut(&tid).unwrap())?;
4347 res_ch.send(StatsRes::Refreshed(Box::new((*stats, sys_stats))))?;
4348 }
4349 Ok(StatsReq::Bye(tid)) => {
4350 cpus_ranges.remove(&tid);
4351 res_ch.send(StatsRes::Bye)?;
4352 }
4353 Err(e) => Err(e)?,
4354 },
4355
4356 recv(cgroup_rx) -> event => match event {
4357 Ok(CgroupEvent::Created { path, cgroup_id, match_bitmap }) => {
4358 self.skel.maps.cgroup_match_bitmap.update(
4360 &cgroup_id.to_ne_bytes(),
4361 &match_bitmap.to_ne_bytes(),
4362 libbpf_rs::MapFlags::ANY,
4363 ).with_context(|| format!(
4364 "Failed to insert cgroup {}({}) into BPF map. Cgroup map may be full \
4365 (max 16384 entries). Aborting.",
4366 cgroup_id, path
4367 ))?;
4368
4369 debug!("Added cgroup {} to BPF map with bitmap 0x{:x}", cgroup_id, match_bitmap);
4370 }
4371 Ok(CgroupEvent::Removed { path, cgroup_id }) => {
4372 if let Err(e) = self.skel.maps.cgroup_match_bitmap.delete(&cgroup_id.to_ne_bytes()) {
4374 warn!("Failed to delete cgroup {} from BPF map: {}", cgroup_id, e);
4375 } else {
4376 debug!("Removed cgroup {}({}) from BPF map", cgroup_id, path);
4377 }
4378 }
4379 Err(e) => {
4380 error!("Error receiving cgroup event: {}", e);
4381 }
4382 },
4383
4384 recv(crossbeam::channel::after(timeout_duration)) -> _ => {
4385 }
4387 }
4388 }
4389
4390 let _ = self.struct_ops.take();
4391 uei_report!(&self.skel, uei)
4392 }
4393}
4394
4395impl Drop for Scheduler<'_> {
4396 fn drop(&mut self) {
4397 info!("Unregister {SCHEDULER_NAME} scheduler");
4398
4399 if !self.netdevs.is_empty() {
4400 for (iface, netdev) in &self.netdevs {
4401 if let Err(e) = netdev.restore_cpumasks() {
4402 warn!("Failed to restore {iface} IRQ affinity: {e}");
4403 }
4404 }
4405 info!("Restored original netdev IRQ affinity");
4406 }
4407
4408 if let Some(struct_ops) = self.struct_ops.take() {
4409 drop(struct_ops);
4410 }
4411 }
4412}
4413
4414fn write_example_file(path: &str) -> Result<()> {
4415 let mut f = fs::OpenOptions::new()
4416 .create_new(true)
4417 .write(true)
4418 .open(path)?;
4419 Ok(f.write_all(serde_json::to_string_pretty(&*EXAMPLE_CONFIG)?.as_bytes())?)
4420}
4421
4422struct HintLayerInfo {
4423 layer_id: usize,
4424 system_cpu_util_below: Option<f64>,
4425 dsq_insert_below: Option<f64>,
4426}
4427
4428fn verify_layer_specs(specs: &[LayerSpec]) -> Result<HashMap<u64, HintLayerInfo>> {
4429 let mut hint_to_layer_map = HashMap::<u64, (usize, String, Option<f64>, Option<f64>)>::new();
4430
4431 let nr_specs = specs.len();
4432 if nr_specs == 0 {
4433 bail!("No layer spec");
4434 }
4435 if nr_specs > MAX_LAYERS {
4436 bail!("Too many layer specs");
4437 }
4438
4439 for (idx, spec) in specs.iter().enumerate() {
4440 if idx < nr_specs - 1 {
4441 if spec.matches.is_empty() {
4442 bail!("Non-terminal spec {:?} has NULL matches", spec.name);
4443 }
4444 } else if spec.matches.len() != 1 || !spec.matches[0].is_empty() {
4445 bail!("Terminal spec {:?} must have an empty match", spec.name);
4446 }
4447
4448 if spec.matches.len() > MAX_LAYER_MATCH_ORS {
4449 bail!(
4450 "Spec {:?} has too many ({}) OR match blocks",
4451 spec.name,
4452 spec.matches.len()
4453 );
4454 }
4455
4456 for (ands_idx, ands) in spec.matches.iter().enumerate() {
4457 if ands.len() > NR_LAYER_MATCH_KINDS {
4458 bail!(
4459 "Spec {:?}'s {}th OR block has too many ({}) match conditions",
4460 spec.name,
4461 ands_idx,
4462 ands.len()
4463 );
4464 }
4465 let mut hint_equals_cnt = 0;
4466 let mut system_cpu_util_below_cnt = 0;
4467 let mut dsq_insert_below_cnt = 0;
4468 let mut hint_value: Option<u64> = None;
4469 let mut system_cpu_util_threshold: Option<f64> = None;
4470 let mut dsq_insert_threshold: Option<f64> = None;
4471 for one in ands.iter() {
4472 match one {
4473 LayerMatch::CgroupPrefix(prefix) => {
4474 if prefix.len() > MAX_PATH {
4475 bail!("Spec {:?} has too long a cgroup prefix", spec.name);
4476 }
4477 }
4478 LayerMatch::CgroupSuffix(suffix) => {
4479 if suffix.len() > MAX_PATH {
4480 bail!("Spec {:?} has too long a cgroup suffix", spec.name);
4481 }
4482 }
4483 LayerMatch::CgroupContains(substr) => {
4484 if substr.len() > MAX_PATH {
4485 bail!("Spec {:?} has too long a cgroup substr", spec.name);
4486 }
4487 }
4488 LayerMatch::CommPrefix(prefix) => {
4489 if prefix.len() > MAX_COMM {
4490 bail!("Spec {:?} has too long a comm prefix", spec.name);
4491 }
4492 }
4493 LayerMatch::PcommPrefix(prefix) => {
4494 if prefix.len() > MAX_COMM {
4495 bail!("Spec {:?} has too long a process name prefix", spec.name);
4496 }
4497 }
4498 LayerMatch::SystemCpuUtilBelow(threshold) => {
4499 if *threshold < 0.0 || *threshold > 1.0 {
4500 bail!(
4501 "Spec {:?} has SystemCpuUtilBelow threshold outside the range [0.0, 1.0]",
4502 spec.name
4503 );
4504 }
4505 system_cpu_util_threshold = Some(*threshold);
4506 system_cpu_util_below_cnt += 1;
4507 }
4508 LayerMatch::DsqInsertBelow(threshold) => {
4509 if *threshold < 0.0 || *threshold > 1.0 {
4510 bail!(
4511 "Spec {:?} has DsqInsertBelow threshold outside the range [0.0, 1.0]",
4512 spec.name
4513 );
4514 }
4515 dsq_insert_threshold = Some(*threshold);
4516 dsq_insert_below_cnt += 1;
4517 }
4518 LayerMatch::HintEquals(hint) => {
4519 if *hint > 1024 {
4520 bail!(
4521 "Spec {:?} has hint value outside the range [0, 1024]",
4522 spec.name
4523 );
4524 }
4525 hint_value = Some(*hint);
4526 hint_equals_cnt += 1;
4527 }
4528 _ => {}
4529 }
4530 }
4531 if hint_equals_cnt > 1 {
4532 bail!("Only 1 HintEquals match permitted per AND block");
4533 }
4534 let high_freq_matcher_cnt = system_cpu_util_below_cnt + dsq_insert_below_cnt;
4535 if high_freq_matcher_cnt > 0 {
4536 if hint_equals_cnt != 1 {
4537 bail!("High-frequency matchers (SystemCpuUtilBelow, DsqInsertBelow) must be used with one HintEquals");
4538 }
4539 if system_cpu_util_below_cnt > 1 {
4540 bail!("Only 1 SystemCpuUtilBelow match permitted per AND block");
4541 }
4542 if dsq_insert_below_cnt > 1 {
4543 bail!("Only 1 DsqInsertBelow match permitted per AND block");
4544 }
4545 if ands.len() != hint_equals_cnt + system_cpu_util_below_cnt + dsq_insert_below_cnt
4546 {
4547 bail!("High-frequency matchers must be used only with HintEquals (no other matchers)");
4548 }
4549 } else if hint_equals_cnt == 1 && ands.len() != 1 {
4550 bail!("HintEquals match cannot be in conjunction with other matches");
4551 }
4552
4553 if let Some(hint) = hint_value {
4555 if let Some((layer_id, name, _, _)) = hint_to_layer_map.get(&hint) {
4556 if *layer_id != idx {
4557 bail!(
4558 "Spec {:?} has hint value ({}) that is already mapped to Spec {:?}",
4559 spec.name,
4560 hint,
4561 name
4562 );
4563 }
4564 } else {
4565 hint_to_layer_map.insert(
4566 hint,
4567 (
4568 idx,
4569 spec.name.clone(),
4570 system_cpu_util_threshold,
4571 dsq_insert_threshold,
4572 ),
4573 );
4574 }
4575 }
4576 }
4577
4578 match spec.kind {
4579 LayerKind::Confined {
4580 cpus_range,
4581 util_range,
4582 ..
4583 }
4584 | LayerKind::Grouped {
4585 cpus_range,
4586 util_range,
4587 ..
4588 } => {
4589 if let Some((cpus_min, cpus_max)) = cpus_range {
4590 if cpus_min > cpus_max {
4591 bail!(
4592 "Spec {:?} has invalid cpus_range({}, {})",
4593 spec.name,
4594 cpus_min,
4595 cpus_max
4596 );
4597 }
4598 }
4599 if util_range.0 >= util_range.1 {
4600 bail!(
4601 "Spec {:?} has invalid util_range ({}, {})",
4602 spec.name,
4603 util_range.0,
4604 util_range.1
4605 );
4606 }
4607 }
4608 _ => {}
4609 }
4610 }
4611
4612 Ok(hint_to_layer_map
4613 .into_iter()
4614 .map(|(k, v)| {
4615 (
4616 k,
4617 HintLayerInfo {
4618 layer_id: v.0,
4619 system_cpu_util_below: v.2,
4620 dsq_insert_below: v.3,
4621 },
4622 )
4623 })
4624 .collect())
4625}
4626
4627fn name_suffix(cgroup: &str, len: usize) -> String {
4628 let suffixlen = std::cmp::min(len, cgroup.len());
4629 let suffixrev: String = cgroup.chars().rev().take(suffixlen).collect();
4630
4631 suffixrev.chars().rev().collect()
4632}
4633
4634fn traverse_sysfs(dir: &Path) -> Result<Vec<PathBuf>> {
4635 let mut paths = vec![];
4636
4637 if !dir.is_dir() {
4638 panic!("path {:?} does not correspond to directory", dir);
4639 }
4640
4641 let direntries = fs::read_dir(dir)?;
4642
4643 for entry in direntries {
4644 let path = entry?.path();
4645 if path.is_dir() {
4646 paths.append(&mut traverse_sysfs(&path)?);
4647 paths.push(path);
4648 }
4649 }
4650
4651 Ok(paths)
4652}
4653
4654fn find_cpumask(cgroup: &str) -> Cpumask {
4655 let mut path = String::from(cgroup);
4656 path.push_str("/cpuset.cpus.effective");
4657
4658 let description = fs::read_to_string(&mut path).unwrap();
4659
4660 Cpumask::from_cpulist(&description).unwrap()
4661}
4662
4663fn expand_template(rule: &LayerMatch) -> Result<Vec<(LayerMatch, Cpumask)>> {
4664 match rule {
4665 LayerMatch::CgroupSuffix(suffix) => Ok(traverse_sysfs(Path::new("/sys/fs/cgroup"))?
4666 .into_iter()
4667 .map(|cgroup| String::from(cgroup.to_str().expect("could not parse cgroup path")))
4668 .filter(|cgroup| cgroup.ends_with(suffix))
4669 .map(|cgroup| {
4670 (
4671 {
4672 let mut slashterminated = cgroup.clone();
4673 slashterminated.push('/');
4674 LayerMatch::CgroupSuffix(name_suffix(&slashterminated, 64))
4675 },
4676 find_cpumask(&cgroup),
4677 )
4678 })
4679 .collect()),
4680 LayerMatch::CgroupRegex(expr) => Ok(traverse_sysfs(Path::new("/sys/fs/cgroup"))?
4681 .into_iter()
4682 .map(|cgroup| String::from(cgroup.to_str().expect("could not parse cgroup path")))
4683 .filter(|cgroup| {
4684 let re = Regex::new(expr).unwrap();
4685 re.is_match(cgroup)
4686 })
4687 .map(|cgroup| {
4688 (
4689 {
4693 let mut slashterminated = cgroup.clone();
4694 slashterminated.push('/');
4695 LayerMatch::CgroupSuffix(name_suffix(&slashterminated, 64))
4696 },
4697 find_cpumask(&cgroup),
4698 )
4699 })
4700 .collect()),
4701 _ => panic!("Unimplemented template enum {:?}", rule),
4702 }
4703}
4704
4705fn create_perf_fds(skel: &mut BpfSkel, event: u64) -> Result<()> {
4706 let mut attr = perf::bindings::perf_event_attr {
4707 size: std::mem::size_of::<perf::bindings::perf_event_attr>() as u32,
4708 type_: perf::bindings::PERF_TYPE_RAW,
4709 config: event,
4710 sample_type: 0u64,
4711 ..Default::default()
4712 };
4713 attr.__bindgen_anon_1.sample_period = 0u64;
4714 attr.set_disabled(0);
4715
4716 let perf_events_map = &skel.maps.scx_pmu_map;
4717 let map_fd = unsafe { libbpf_sys::bpf_map__fd(perf_events_map.as_libbpf_object().as_ptr()) };
4718
4719 let mut failures = 0u64;
4720
4721 for cpu in 0..*NR_CPUS_POSSIBLE {
4722 let fd = unsafe { perf::perf_event_open(&mut attr as *mut _, -1, cpu as i32, -1, 0) };
4723 if fd < 0 {
4724 failures += 1;
4725 trace!(
4726 "perf_event_open failed cpu={cpu} errno={}",
4727 std::io::Error::last_os_error()
4728 );
4729 continue;
4730 }
4731
4732 let key = cpu as u32;
4733 let val = fd as u32;
4734 let ret = unsafe {
4735 libbpf_sys::bpf_map_update_elem(
4736 map_fd,
4737 &key as *const _ as *const _,
4738 &val as *const _ as *const _,
4739 0,
4740 )
4741 };
4742 if ret != 0 {
4743 trace!("bpf_map_update_elem failed cpu={cpu} fd={fd} ret={ret}");
4744 } else {
4745 trace!("mapped cpu={cpu} -> fd={fd}");
4746 }
4747 }
4748
4749 if failures > 0 {
4750 println!("membw tracking: failed to install {failures} counters");
4751 }
4753
4754 Ok(())
4755}
4756
4757fn setup_membw_tracking(skel: &mut OpenBpfSkel) -> Result<u64> {
4759 let pmumanager = PMUManager::new()?;
4760 let codename = &pmumanager.codename as &str;
4761
4762 let pmuspec = match codename {
4763 "amdzen1" | "amdzen2" | "amdzen3" => {
4764 trace!("found AMD codename {codename}");
4765 pmumanager.pmus.get("ls_any_fills_from_sys.mem_io_local")
4766 }
4767 "amdzen4" | "amdzen5" => {
4768 trace!("found AMD codename {codename}");
4769 pmumanager.pmus.get("ls_any_fills_from_sys.dram_io_all")
4770 }
4771
4772 "haswell" | "broadwell" | "broadwellde" | "broadwellx" | "skylake" | "skylakex"
4773 | "cascadelakex" | "arrowlake" | "meteorlake" | "sapphirerapids" | "emeraldrapids"
4774 | "graniterapids" => {
4775 trace!("found Intel codename {codename}");
4776 pmumanager.pmus.get("LONGEST_LAT_CACHE.MISS")
4777 }
4778
4779 _ => {
4780 trace!("found unknown codename {codename}");
4781 None
4782 }
4783 };
4784
4785 let spec = pmuspec.ok_or("not_found").unwrap();
4786 let config = (spec.umask << 8) | spec.event[0];
4787
4788 skel.maps.rodata_data.as_mut().unwrap().membw_event = config;
4790
4791 Ok(config)
4792}
4793
4794#[clap_main::clap_main]
4795fn main(opts: Opts) -> Result<()> {
4796 if opts.version {
4797 println!(
4798 "scx_layered {}",
4799 build_id::full_version(env!("CARGO_PKG_VERSION"))
4800 );
4801 return Ok(());
4802 }
4803
4804 if opts.help_stats {
4805 stats::server_data().describe_meta(&mut std::io::stdout(), None)?;
4806 return Ok(());
4807 }
4808
4809 let env_filter = EnvFilter::try_from_default_env()
4810 .or_else(|_| match EnvFilter::try_new(&opts.log_level) {
4811 Ok(filter) => Ok(filter),
4812 Err(e) => {
4813 eprintln!(
4814 "invalid log envvar: {}, using info, err is: {}",
4815 opts.log_level, e
4816 );
4817 EnvFilter::try_new("info")
4818 }
4819 })
4820 .unwrap_or_else(|_| EnvFilter::new("info"));
4821
4822 match tracing_subscriber::fmt()
4823 .with_env_filter(env_filter)
4824 .with_target(true)
4825 .with_thread_ids(true)
4826 .with_file(true)
4827 .with_line_number(true)
4828 .try_init()
4829 {
4830 Ok(()) => {}
4831 Err(e) => eprintln!("failed to init logger: {}", e),
4832 }
4833
4834 if opts.verbose > 0 {
4835 warn!("Setting verbose via -v is deprecated and will be an error in future releases.");
4836 }
4837
4838 if opts.no_load_frac_limit {
4839 warn!("--no-load-frac-limit is deprecated and noop");
4840 }
4841 if opts.layer_preempt_weight_disable != 0.0 {
4842 warn!("--layer-preempt-weight-disable is deprecated and noop");
4843 }
4844 if opts.layer_growth_weight_disable != 0.0 {
4845 warn!("--layer-growth-weight-disable is deprecated and noop");
4846 }
4847 if opts.local_llc_iteration {
4848 warn!("--local_llc_iteration is deprecated and noop");
4849 }
4850
4851 debug!("opts={:?}", &opts);
4852
4853 if let Some(run_id) = opts.run_id {
4854 info!("scx_layered run_id: {}", run_id);
4855 }
4856
4857 let shutdown = Arc::new(AtomicBool::new(false));
4858 let shutdown_clone = shutdown.clone();
4859 ctrlc::set_handler(move || {
4860 shutdown_clone.store(true, Ordering::Relaxed);
4861 })
4862 .context("Error setting Ctrl-C handler")?;
4863
4864 if let Some(intv) = opts.monitor.or(opts.stats) {
4865 let shutdown_copy = shutdown.clone();
4866 let stats_columns = opts.stats_columns;
4867 let stats_no_llc = opts.stats_no_llc;
4868 let jh = std::thread::spawn(move || {
4869 match stats::monitor(
4870 Duration::from_secs_f64(intv),
4871 shutdown_copy,
4872 stats_columns,
4873 stats_no_llc,
4874 ) {
4875 Ok(_) => {
4876 debug!("stats monitor thread finished successfully")
4877 }
4878 Err(error_object) => {
4879 warn!(
4880 "stats monitor thread finished because of an error {}",
4881 error_object
4882 )
4883 }
4884 }
4885 });
4886 if opts.monitor.is_some() {
4887 let _ = jh.join();
4888 return Ok(());
4889 }
4890 }
4891
4892 if let Some(path) = &opts.example {
4893 write_example_file(path)?;
4894 return Ok(());
4895 }
4896
4897 let mut layer_config = match opts.run_example {
4898 true => EXAMPLE_CONFIG.clone(),
4899 false => LayerConfig { specs: vec![] },
4900 };
4901
4902 for (idx, input) in opts.specs.iter().enumerate() {
4903 let specs = LayerSpec::parse(input)
4904 .context(format!("Failed to parse specs[{}] ({:?})", idx, input))?;
4905
4906 for spec in specs {
4907 match spec.template {
4908 Some(ref rule) => {
4909 let matches = expand_template(rule)?;
4910 if matches.is_empty() {
4913 layer_config.specs.push(spec);
4914 } else {
4915 for (mt, mask) in matches {
4916 let mut genspec = spec.clone();
4917
4918 genspec.cpuset = Some(mask);
4919
4920 for orterm in &mut genspec.matches {
4922 orterm.push(mt.clone());
4923 }
4924
4925 match &mt {
4926 LayerMatch::CgroupSuffix(cgroup) => genspec.name.push_str(cgroup),
4927 _ => bail!("Template match has unexpected type"),
4928 }
4929
4930 layer_config.specs.push(genspec);
4932 }
4933 }
4934 }
4935
4936 None => {
4937 layer_config.specs.push(spec);
4938 }
4939 }
4940 }
4941 }
4942
4943 for spec in layer_config.specs.iter_mut() {
4944 let common = spec.kind.common_mut();
4945
4946 if common.slice_us == 0 {
4947 common.slice_us = opts.slice_us;
4948 }
4949
4950 if common.weight == 0 {
4951 common.weight = DEFAULT_LAYER_WEIGHT;
4952 }
4953 common.weight = common.weight.clamp(MIN_LAYER_WEIGHT, MAX_LAYER_WEIGHT);
4954
4955 if common.preempt {
4956 if common.disallow_open_after_us.is_some() {
4957 warn!(
4958 "Preempt layer {} has non-null disallow_open_after_us, ignored",
4959 &spec.name
4960 );
4961 }
4962 if common.disallow_preempt_after_us.is_some() {
4963 warn!(
4964 "Preempt layer {} has non-null disallow_preempt_after_us, ignored",
4965 &spec.name
4966 );
4967 }
4968 common.disallow_open_after_us = Some(u64::MAX);
4969 common.disallow_preempt_after_us = Some(u64::MAX);
4970 } else {
4971 if common.disallow_open_after_us.is_none() {
4972 common.disallow_open_after_us = Some(*DFL_DISALLOW_OPEN_AFTER_US);
4973 }
4974
4975 if common.disallow_preempt_after_us.is_none() {
4976 common.disallow_preempt_after_us = Some(*DFL_DISALLOW_PREEMPT_AFTER_US);
4977 }
4978 }
4979
4980 if common.idle_smt.is_some() {
4981 warn!("Layer {} has deprecated flag \"idle_smt\"", &spec.name);
4982 }
4983
4984 if common.allow_node_aligned.is_some() {
4985 warn!("Layer {} has deprecated flag \"allow_node_aligned\", node-aligned tasks are now always dispatched on layer DSQs", &spec.name);
4986 }
4987 }
4988
4989 let membw_required = layer_config.specs.iter().any(|spec| match spec.kind {
4990 LayerKind::Confined { membw_gb, .. } | LayerKind::Grouped { membw_gb, .. } => {
4991 membw_gb.is_some()
4992 }
4993 LayerKind::Open { .. } => false,
4994 });
4995
4996 if opts.print_and_exit {
4997 println!("specs={}", serde_json::to_string_pretty(&layer_config)?);
4998 return Ok(());
4999 }
5000
5001 debug!("specs={}", serde_json::to_string_pretty(&layer_config)?);
5002 let hint_to_layer_map = verify_layer_specs(&layer_config.specs)?;
5003
5004 let mut open_object = MaybeUninit::uninit();
5005 loop {
5006 let mut sched = Scheduler::init(
5007 &opts,
5008 &layer_config.specs,
5009 &mut open_object,
5010 &hint_to_layer_map,
5011 membw_required,
5012 )?;
5013 if !sched.run(shutdown.clone())?.should_restart() {
5014 break;
5015 }
5016 }
5017
5018 Ok(())
5019}
5020
5021#[cfg(test)]
5022mod xnuma_tests {
5023 use super::*;
5024
5025 const THRESH: (f64, f64) = (0.6, 0.7);
5027 const DELTA: (f64, f64) = (0.2, 0.3);
5028
5029 #[test]
5034 fn test_activation_below_threshold() {
5035 let duty = vec![40.0, 40.0];
5037 let allocs = vec![96, 96];
5038 let gd = vec![true, true];
5039 let cur = vec![false, false];
5040 let result = xnuma_check_active(&duty, &allocs, THRESH, DELTA, &gd, &cur);
5041 assert!(!result[0]);
5043 assert!(!result[1]);
5044 }
5045
5046 #[test]
5047 fn test_activation_above_all_thresholds() {
5048 let duty = vec![90.0, 20.0];
5050 let allocs = vec![96, 96];
5051 let gd = vec![true, false];
5052 let cur = vec![false, false];
5053 let result = xnuma_check_active(&duty, &allocs, THRESH, DELTA, &gd, &cur);
5054 assert!(result[0]);
5056 assert!(!result[1]);
5058 }
5059
5060 #[test]
5061 fn test_activation_requires_growth_denied() {
5062 let duty = vec![90.0, 20.0];
5064 let allocs = vec![96, 96];
5065 let gd = vec![false, false]; let cur = vec![false, false];
5067 let result = xnuma_check_active(&duty, &allocs, THRESH, DELTA, &gd, &cur);
5068 assert!(!result[0]); }
5070
5071 #[test]
5072 fn test_symmetric_high_load_stays_closed() {
5073 let duty = vec![80.0, 80.0];
5075 let allocs = vec![96, 96];
5076 let gd = vec![true, true];
5077 let cur = vec![false, false];
5078 let result = xnuma_check_active(&duty, &allocs, THRESH, DELTA, &gd, &cur);
5079 assert!(!result[0]);
5081 assert!(!result[1]);
5082 }
5083
5084 #[test]
5085 fn test_hysteresis_stays_active() {
5086 let duty = vec![75.0, 20.0];
5088 let allocs = vec![96, 96];
5089 let gd = vec![true, false];
5090 let cur = vec![true, false]; let result = xnuma_check_active(&duty, &allocs, THRESH, DELTA, &gd, &cur);
5092 assert!(result[0]);
5096 }
5097
5098 #[test]
5099 fn test_hysteresis_stays_inactive() {
5100 let duty = vec![75.0, 20.0];
5102 let allocs = vec![96, 96];
5103 let gd = vec![true, false];
5104 let cur = vec![false, false]; let result = xnuma_check_active(&duty, &allocs, THRESH, DELTA, &gd, &cur);
5106 assert!(!result[0]);
5108 }
5109
5110 #[test]
5111 fn test_deactivation_load_drops() {
5112 let duty = vec![50.0, 50.0];
5114 let allocs = vec![96, 96];
5115 let gd = vec![true, true];
5116 let cur = vec![true, false];
5117 let result = xnuma_check_active(&duty, &allocs, THRESH, DELTA, &gd, &cur);
5118 assert!(!result[0]);
5120 }
5121
5122 #[test]
5123 fn test_deactivation_growth_succeeds() {
5124 let duty = vec![90.0, 20.0];
5126 let allocs = vec![96, 96];
5127 let gd = vec![false, false]; let cur = vec![true, false];
5129 let result = xnuma_check_active(&duty, &allocs, THRESH, DELTA, &gd, &cur);
5130 assert!(!result[0]);
5132 }
5133
5134 #[test]
5135 fn test_zero_alloc_with_duty_and_growth_denied() {
5136 let duty = vec![50.0, 0.0];
5138 let allocs = vec![0, 96];
5139 let gd = vec![true, false];
5140 let cur = vec![false, false];
5141 let result = xnuma_check_active(&duty, &allocs, THRESH, DELTA, &gd, &cur);
5142 assert!(result[0]); }
5144
5145 #[test]
5146 fn test_all_zero_alloc() {
5147 let duty = vec![0.0, 0.0];
5148 let allocs = vec![0, 0];
5149 let gd = vec![true, true];
5150 let cur = vec![true, true];
5151 let result = xnuma_check_active(&duty, &allocs, THRESH, DELTA, &gd, &cur);
5152 assert!(!result[0]);
5153 assert!(!result[1]);
5154 }
5155
5156 #[test]
5157 fn test_three_nodes_mixed() {
5158 let duty = vec![90.0, 40.0, 10.0];
5160 let allocs = vec![96, 96, 96];
5161 let gd = vec![true, true, false];
5162 let cur = vec![false, false, false];
5163 let result = xnuma_check_active(&duty, &allocs, THRESH, DELTA, &gd, &cur);
5164 assert!(result[0]);
5167 assert!(!result[1]);
5169 assert!(!result[2]);
5171 }
5172
5173 #[test]
5174 fn test_per_node_independence() {
5175 let duty = vec![90.0, 50.0];
5177 let allocs = vec![96, 96];
5178 let gd = vec![true, true];
5179 let cur = vec![true, true];
5180 let result = xnuma_check_active(&duty, &allocs, THRESH, DELTA, &gd, &cur);
5181 assert!(result[0]);
5185 assert!(!result[1]);
5187 }
5188
5189 #[test]
5194 fn test_rates_balanced_load() {
5195 let duty = vec![48.0, 48.0];
5197 let allocs = vec![96, 96];
5198 let result = xnuma_compute_rates(&duty, &allocs);
5199
5200 for src in 0..2 {
5202 for dst in 0..2 {
5203 assert_eq!(result.rates[src][dst], 0);
5204 }
5205 }
5206 }
5207
5208 #[test]
5209 fn test_rates_one_overloaded() {
5210 let duty = vec![80.0, 40.0];
5215 let allocs = vec![96, 96];
5216 let result = xnuma_compute_rates(&duty, &allocs);
5217
5218 assert!(result.rates[0][1] > 0);
5220 assert_eq!(result.rates[1][0], 0);
5222 assert_eq!(result.rates[0][0], 0);
5224 assert_eq!(result.rates[1][1], 0);
5225
5226 let expected_rate = (20.0 * XNUMA_RATE_DAMPEN * DUTY_CYCLE_SCALE) as u64;
5228 assert_eq!(result.rates[0][1], expected_rate);
5229 }
5230
5231 #[test]
5232 fn test_rates_asymmetric_allocation() {
5233 let duty = vec![48.0, 48.0];
5238 let allocs = vec![48, 144];
5239 let result = xnuma_compute_rates(&duty, &allocs);
5240
5241 let expected_rate = (24.0 * XNUMA_RATE_DAMPEN * DUTY_CYCLE_SCALE) as u64;
5242 assert_eq!(result.rates[0][1], expected_rate);
5243 assert_eq!(result.rates[1][0], 0);
5244 }
5245
5246 #[test]
5251 fn test_rates_three_nodes_one_source() {
5252 let duty = vec![120.0, 30.0, 30.0];
5259 let allocs = vec![96, 96, 96];
5260 let result = xnuma_compute_rates(&duty, &allocs);
5261
5262 let rate_01 = (30.0 * XNUMA_RATE_DAMPEN * DUTY_CYCLE_SCALE) as u64;
5266 let rate_02 = (30.0 * XNUMA_RATE_DAMPEN * DUTY_CYCLE_SCALE) as u64;
5267 assert_eq!(result.rates[0][1], rate_01);
5268 assert_eq!(result.rates[0][2], rate_02);
5269
5270 assert_eq!(result.rates[1][0], 0);
5272 assert_eq!(result.rates[2][0], 0);
5273 assert_eq!(result.rates[1][2], 0);
5274 assert_eq!(result.rates[2][1], 0);
5275 }
5276
5277 #[test]
5278 fn test_rates_three_nodes_unequal_deficit() {
5279 let duty = vec![120.0, 50.0, 10.0];
5286 let allocs = vec![96, 96, 96];
5287 let result = xnuma_compute_rates(&duty, &allocs);
5288
5289 let rate_01 = (10.0 * XNUMA_RATE_DAMPEN * DUTY_CYCLE_SCALE) as u64;
5293 let rate_02 = (50.0 * XNUMA_RATE_DAMPEN * DUTY_CYCLE_SCALE) as u64;
5294 assert_eq!(result.rates[0][1], rate_01);
5295 assert_eq!(result.rates[0][2], rate_02);
5296 }
5297
5298 #[test]
5299 fn test_rates_two_sources_one_sink() {
5300 let duty = vec![80.0, 70.0, 30.0];
5307 let allocs = vec![96, 96, 96];
5308 let result = xnuma_compute_rates(&duty, &allocs);
5309
5310 let rate_02 = (20.0 * XNUMA_RATE_DAMPEN * DUTY_CYCLE_SCALE) as u64;
5314 let rate_12 = (10.0 * XNUMA_RATE_DAMPEN * DUTY_CYCLE_SCALE) as u64;
5315 assert_eq!(result.rates[0][2], rate_02);
5316 assert_eq!(result.rates[1][2], rate_12);
5317
5318 assert_eq!(result.rates[0][1], 0);
5320 assert_eq!(result.rates[1][0], 0);
5321 }
5322
5323 #[test]
5328 fn test_conservation_per_source_outbound() {
5329 let duty = vec![100.0, 30.0, 50.0, 20.0];
5332 let allocs = vec![96, 96, 96, 96];
5333 let nr = 4;
5334 let result = xnuma_compute_rates(&duty, &allocs);
5335
5336 let total_duty: f64 = duty.iter().sum();
5337 let total_alloc: f64 = allocs.iter().map(|&a| a as f64).sum();
5338 let eq_ratio = total_duty / total_alloc;
5339
5340 for src in 0..nr {
5341 let expected = eq_ratio * allocs[src] as f64;
5342 let surplus = (duty[src] - expected).max(0.0);
5343 let total_outbound: u64 = (0..nr).map(|dst| result.rates[src][dst]).sum();
5344 let expected_rate = (surplus * XNUMA_RATE_DAMPEN * DUTY_CYCLE_SCALE) as u64;
5345 assert_eq!(
5346 total_outbound, expected_rate,
5347 "node {} outbound mismatch",
5348 src
5349 );
5350 }
5351 }
5352
5353 #[test]
5354 fn test_conservation_surplus_equals_deficit() {
5355 let duty = vec![100.0, 30.0, 50.0];
5357 let allocs = vec![96, 96, 96];
5358
5359 let total_duty: f64 = duty.iter().sum();
5360 let total_alloc: f64 = allocs.iter().map(|&a| a as f64).sum();
5361 let eq_ratio = total_duty / total_alloc;
5362
5363 let mut total_surplus = 0.0f64;
5364 let mut total_deficit = 0.0f64;
5365 for i in 0..3 {
5366 let expected = eq_ratio * allocs[i] as f64;
5367 let delta = duty[i] - expected;
5368 if delta > 0.0 {
5369 total_surplus += delta;
5370 } else {
5371 total_deficit += -delta;
5372 }
5373 }
5374 assert!((total_surplus - total_deficit).abs() < 1e-10);
5375 }
5376
5377 #[test]
5378 fn test_self_rates_always_zero() {
5379 let duty = vec![100.0, 30.0, 50.0];
5380 let allocs = vec![96, 96, 96];
5381 let result = xnuma_compute_rates(&duty, &allocs);
5382
5383 for nid in 0..3 {
5384 assert_eq!(result.rates[nid][nid], 0);
5385 }
5386 }
5387
5388 #[test]
5389 fn test_deficit_nodes_have_zero_outbound() {
5390 let duty = vec![100.0, 20.0, 30.0, 50.0];
5392 let allocs = vec![96, 96, 96, 96];
5393 let result = xnuma_compute_rates(&duty, &allocs);
5394
5395 for nid in 0..4 {
5399 let outbound: u64 = (0..4).map(|dst| result.rates[nid][dst]).sum();
5400 if outbound == 0 {
5401 for dst in 0..4 {
5402 assert_eq!(
5403 result.rates[nid][dst], 0,
5404 "deficit node {} has non-zero rate to {}",
5405 nid, dst
5406 );
5407 }
5408 }
5409 }
5410 }
5411
5412 #[test]
5417 fn test_rates_zero_duty_everywhere() {
5418 let duty = vec![0.0, 0.0];
5419 let allocs = vec![96, 96];
5420 let result = xnuma_compute_rates(&duty, &allocs);
5421
5422 for src in 0..2 {
5423 for dst in 0..2 {
5424 assert_eq!(result.rates[src][dst], 0);
5425 }
5426 }
5427 }
5428
5429 #[test]
5430 fn test_rates_zero_alloc() {
5431 let duty = vec![50.0, 50.0];
5432 let allocs = vec![0, 0];
5433 let result = xnuma_compute_rates(&duty, &allocs);
5434
5435 for src in 0..2 {
5437 for dst in 0..2 {
5438 assert_eq!(result.rates[src][dst], 0);
5439 }
5440 }
5441 }
5442
5443 #[test]
5444 fn test_rates_all_load_one_node() {
5445 let duty = vec![96.0, 0.0];
5451 let allocs = vec![96, 96];
5452 let result = xnuma_compute_rates(&duty, &allocs);
5453
5454 let expected_rate = (48.0 * XNUMA_RATE_DAMPEN * DUTY_CYCLE_SCALE) as u64;
5455 assert_eq!(result.rates[0][1], expected_rate);
5456 }
5457
5458 #[test]
5459 fn test_rates_single_node() {
5460 let duty = vec![96.0];
5462 let allocs = vec![96];
5463 let result = xnuma_compute_rates(&duty, &allocs);
5464
5465 assert_eq!(result.rates[0][0], 0);
5466 }
5467
5468 #[test]
5469 fn test_rates_one_node_zero_alloc() {
5470 let duty = vec![80.0, 0.0];
5475 let allocs = vec![96, 0];
5476 let result = xnuma_compute_rates(&duty, &allocs);
5477
5478 assert_eq!(result.rates[0][1], 0);
5482 assert_eq!(result.rates[1][0], 0);
5483 }
5484
5485 #[test]
5490 fn test_rate_scaling() {
5491 let duty = vec![80.0, 40.0];
5493 let allocs = vec![96, 96];
5494 let result = xnuma_compute_rates(&duty, &allocs);
5495
5496 let expected = (20.0 * XNUMA_RATE_DAMPEN * DUTY_CYCLE_SCALE) as u64;
5499 assert_eq!(result.rates[0][1], expected);
5500 assert_eq!(expected, 10 * (1 << 20));
5501 }
5502
5503 #[test]
5504 fn test_rates_tiny_imbalance() {
5505 let duty = vec![48.001, 47.999];
5507 let allocs = vec![96, 96];
5508 let result = xnuma_compute_rates(&duty, &allocs);
5509
5510 assert!(result.rates[0][1] > 0);
5512 assert!(result.rates[0][1] < (1 << 20)); }
5514
5515 #[test]
5516 fn test_rates_large_values() {
5517 let duty = vec![300.0, 100.0, 50.0, 50.0, 50.0, 50.0, 50.0, 50.0];
5519 let allocs = vec![96, 96, 96, 96, 96, 96, 96, 96];
5520 let result = xnuma_compute_rates(&duty, &allocs);
5521
5522 assert!(result.rates[0][2] > 0); assert!(result.rates[1][2] > 0); let total_duty: f64 = duty.iter().sum();
5530 let total_alloc: f64 = allocs.iter().map(|&a| a as f64).sum();
5531 let eq_ratio = total_duty / total_alloc;
5532 let nr = 8;
5533 for src in 0..nr {
5534 let surplus = (duty[src] - eq_ratio * allocs[src] as f64).max(0.0);
5535 let outbound: u64 = (0..nr).map(|dst| result.rates[src][dst]).sum();
5536 let expected = (surplus * XNUMA_RATE_DAMPEN * DUTY_CYCLE_SCALE) as u64;
5537 let tolerance = nr as u64; assert!(
5539 outbound.abs_diff(expected) <= tolerance,
5540 "node {} outbound {} vs expected {}, diff {}",
5541 src,
5542 outbound,
5543 expected,
5544 outbound.abs_diff(expected)
5545 );
5546 }
5547 }
5548
5549 #[test]
5554 fn test_hysteresis_cycle() {
5555 let allocs = vec![96, 96];
5556 let gd = vec![true, false]; let active =
5560 xnuma_check_active(&[40.0, 40.0], &allocs, THRESH, DELTA, &gd, &[false, false]);
5561 assert!(!active[0]); let active =
5565 xnuma_check_active(&[90.0, 20.0], &allocs, THRESH, DELTA, &gd, &[false, false]);
5566 assert!(active[0]); let active = xnuma_check_active(&[75.0, 20.0], &allocs, THRESH, DELTA, &gd, &[true, false]);
5570 assert!(active[0]); let active = xnuma_check_active(&[50.0, 50.0], &allocs, THRESH, DELTA, &gd, &[true, false]);
5574 assert!(!active[0]); }
5576
5577 #[test]
5578 fn test_hysteresis_growth_toggle() {
5579 let allocs = vec![96, 96];
5581 let gd_denied = vec![true, false];
5582 let gd_ok = vec![false, false];
5583
5584 let active = xnuma_check_active(
5586 &[90.0, 20.0],
5587 &allocs,
5588 THRESH,
5589 DELTA,
5590 &gd_denied,
5591 &[false, false],
5592 );
5593 assert!(active[0]);
5594
5595 let active = xnuma_check_active(
5597 &[90.0, 20.0],
5598 &allocs,
5599 THRESH,
5600 DELTA,
5601 &gd_ok,
5602 &[true, false],
5603 );
5604 assert!(!active[0]); }
5606
5607 #[test]
5612 fn test_proportional_sink_distribution() {
5613 let duty = vec![180.0, 20.0, 40.0, 0.0];
5622 let allocs = vec![96, 96, 96, 96];
5623 let result = xnuma_compute_rates(&duty, &allocs);
5624
5625 assert_eq!(
5629 result.rates[0][1],
5630 (40.0 * XNUMA_RATE_DAMPEN * DUTY_CYCLE_SCALE) as u64
5631 );
5632 assert_eq!(
5633 result.rates[0][2],
5634 (20.0 * XNUMA_RATE_DAMPEN * DUTY_CYCLE_SCALE) as u64
5635 );
5636 assert_eq!(
5637 result.rates[0][3],
5638 (60.0 * XNUMA_RATE_DAMPEN * DUTY_CYCLE_SCALE) as u64
5639 );
5640
5641 let total_from_n0: u64 = (0..4).map(|dst| result.rates[0][dst]).sum();
5643 assert_eq!(
5644 total_from_n0,
5645 (120.0 * XNUMA_RATE_DAMPEN * DUTY_CYCLE_SCALE) as u64
5646 );
5647 }
5648}