Skip to main content

scx_layered/
main.rs

1// Copyright (c) Meta Platforms, Inc. and affiliates.
2
3// This software may be used and distributed according to the terms of the
4// GNU General Public License version 2.
5mod bpf_skel;
6mod stats;
7
8use std::collections::BTreeMap;
9use std::collections::BTreeSet;
10use std::collections::HashMap;
11use std::collections::HashSet;
12use std::ffi::CString;
13use std::fs;
14use std::io::Write;
15use std::mem::MaybeUninit;
16use std::ops::Sub;
17use std::path::Path;
18use std::path::PathBuf;
19use std::sync::atomic::AtomicBool;
20use std::sync::atomic::Ordering;
21use std::sync::Arc;
22use std::thread::ThreadId;
23use std::time::Duration;
24use std::time::Instant;
25
26use inotify::{Inotify, WatchMask};
27use std::os::unix::io::AsRawFd;
28
29use anyhow::anyhow;
30use anyhow::bail;
31use anyhow::Context;
32use anyhow::Result;
33pub use bpf_skel::*;
34use clap::Parser;
35use crossbeam::channel::Receiver;
36use crossbeam::select;
37use lazy_static::lazy_static;
38use libbpf_rs::libbpf_sys;
39use libbpf_rs::AsRawLibbpf;
40use libbpf_rs::MapCore as _;
41use libbpf_rs::OpenObject;
42use libbpf_rs::ProgramInput;
43use nix::sched::CpuSet;
44use nvml_wrapper::error::NvmlError;
45use nvml_wrapper::Nvml;
46use once_cell::sync::OnceCell;
47use regex::Regex;
48use scx_layered::alloc::{unified_alloc, LayerAlloc, LayerDemand};
49use scx_layered::*;
50use scx_raw_pmu::PMUManager;
51use scx_stats::prelude::*;
52use scx_utils::build_id;
53use scx_utils::compat;
54use scx_utils::init_libbpf_logging;
55use scx_utils::libbpf_clap_opts::LibbpfOpts;
56use scx_utils::perf;
57use scx_utils::pm::{cpu_idle_resume_latency_supported, update_cpu_idle_resume_latency};
58use scx_utils::read_netdevs;
59use scx_utils::scx_enums;
60use scx_utils::scx_ops_attach;
61use scx_utils::scx_ops_load;
62use scx_utils::scx_ops_open;
63use scx_utils::uei_exited;
64use scx_utils::uei_report;
65use scx_utils::CoreType;
66use scx_utils::Cpumask;
67use scx_utils::NetDev;
68use scx_utils::Topology;
69use scx_utils::TopologyArgs;
70use scx_utils::UserExitInfo;
71use scx_utils::NR_CPUS_POSSIBLE;
72use scx_utils::NR_CPU_IDS;
73use stats::LayerStats;
74use stats::StatsReq;
75use stats::StatsRes;
76use stats::SysStats;
77use std::collections::VecDeque;
78use sysinfo::{Pid, ProcessRefreshKind, ProcessesToUpdate, System};
79use tracing::{debug, error, info, trace, warn};
80use tracing_subscriber::filter::EnvFilter;
81use walkdir::WalkDir;
82
83const SCHEDULER_NAME: &str = "scx_layered";
84const MAX_PATH: usize = bpf_intf::consts_MAX_PATH as usize;
85const MAX_COMM: usize = bpf_intf::consts_MAX_COMM as usize;
86const MAX_LAYER_WEIGHT: u32 = bpf_intf::consts_MAX_LAYER_WEIGHT;
87const MIN_LAYER_WEIGHT: u32 = bpf_intf::consts_MIN_LAYER_WEIGHT;
88const MAX_LAYER_MATCH_ORS: usize = bpf_intf::consts_MAX_LAYER_MATCH_ORS as usize;
89const MAX_LAYER_NAME: usize = bpf_intf::consts_MAX_LAYER_NAME as usize;
90const MAX_LAYERS: usize = bpf_intf::consts_MAX_LAYERS as usize;
91const DEFAULT_LAYER_WEIGHT: u32 = bpf_intf::consts_DEFAULT_LAYER_WEIGHT;
92const USAGE_HALF_LIFE: u32 = bpf_intf::consts_USAGE_HALF_LIFE;
93const USAGE_HALF_LIFE_F64: f64 = USAGE_HALF_LIFE as f64 / 1_000_000_000.0;
94
95const LAYER_USAGE_OWNED: usize = bpf_intf::layer_usage_LAYER_USAGE_OWNED as usize;
96const LAYER_USAGE_OPEN: usize = bpf_intf::layer_usage_LAYER_USAGE_OPEN as usize;
97const LAYER_USAGE_SUM_UPTO: usize = bpf_intf::layer_usage_LAYER_USAGE_SUM_UPTO as usize;
98const LAYER_USAGE_PROTECTED: usize = bpf_intf::layer_usage_LAYER_USAGE_PROTECTED as usize;
99const LAYER_USAGE_PROTECTED_PREEMPT: usize =
100    bpf_intf::layer_usage_LAYER_USAGE_PROTECTED_PREEMPT as usize;
101const NR_LAYER_USAGES: usize = bpf_intf::layer_usage_NR_LAYER_USAGES as usize;
102
103const NR_GSTATS: usize = bpf_intf::global_stat_id_NR_GSTATS as usize;
104const NR_LSTATS: usize = bpf_intf::layer_stat_id_NR_LSTATS as usize;
105const NR_LLC_LSTATS: usize = bpf_intf::llc_layer_stat_id_NR_LLC_LSTATS as usize;
106
107const NR_LAYER_MATCH_KINDS: usize = bpf_intf::layer_match_kind_NR_LAYER_MATCH_KINDS as usize;
108
109static NVML: OnceCell<Nvml> = OnceCell::new();
110
111fn nvml() -> Result<&'static Nvml, NvmlError> {
112    NVML.get_or_try_init(Nvml::init)
113}
114
115lazy_static! {
116    static ref USAGE_DECAY: f64 = 0.5f64.powf(1.0 / USAGE_HALF_LIFE_F64);
117    static ref DFL_DISALLOW_OPEN_AFTER_US: u64 = 2 * scx_enums.SCX_SLICE_DFL / 1000;
118    static ref DFL_DISALLOW_PREEMPT_AFTER_US: u64 = 4 * scx_enums.SCX_SLICE_DFL / 1000;
119    static ref EXAMPLE_CONFIG: LayerConfig = serde_json::from_str(
120        r#"[
121          {
122            "name": "batch",
123            "comment": "tasks under system.slice or tasks with nice value > 0",
124            "matches": [[{"CgroupPrefix": "system.slice/"}], [{"NiceAbove": 0}]],
125            "kind": {"Confined": {
126              "util_range": [0.8, 0.9], "cpus_range": [0, 16],
127              "min_exec_us": 1000, "slice_us": 20000, "weight": 100,
128              "xllc_mig_min_us": 1000.0, "perf": 1024
129            }}
130          },
131          {
132            "name": "immediate",
133            "comment": "tasks under workload.slice with nice value < 0",
134            "matches": [[{"CgroupPrefix": "workload.slice/"}, {"NiceBelow": 0}]],
135            "kind": {"Open": {
136              "min_exec_us": 100, "yield_ignore": 0.25, "slice_us": 20000,
137              "preempt": true, "exclusive": true,
138              "prev_over_idle_core": true,
139              "weight": 100, "perf": 1024
140            }}
141          },
142          {
143            "name": "stress-ng",
144            "comment": "stress-ng test layer",
145            "matches": [[{"CommPrefix": "stress-ng"}], [{"PcommPrefix": "stress-ng"}]],
146            "kind": {"Confined": {
147              "util_range": [0.2, 0.8],
148              "min_exec_us": 800, "preempt": true, "slice_us": 800,
149              "weight": 100, "growth_algo": "Topo", "perf": 1024
150            }}
151          },
152          {
153            "name": "normal",
154            "comment": "the rest",
155            "matches": [[]],
156            "kind": {"Grouped": {
157              "util_range": [0.5, 0.6], "util_includes_open_cputime": true,
158              "min_exec_us": 200, "slice_us": 20000, "weight": 100,
159              "xllc_mig_min_us": 100.0, "growth_algo": "Linear", "perf": 1024
160            }}
161          }
162        ]"#,
163    )
164    .unwrap();
165}
166
167/// scx_layered: A highly configurable multi-layer sched_ext scheduler
168///
169/// scx_layered allows classifying tasks into multiple layers and applying
170/// different scheduling policies to them. The configuration is specified in
171/// json and composed of two parts - matches and policies.
172///
173/// Matches
174/// =======
175///
176/// Whenever a task is forked or its attributes are changed, the task goes
177/// through a series of matches to determine the layer it belongs to. A
178/// match set is composed of OR groups of AND blocks. An example:
179///
180///   "matches": [
181///     [
182///       {
183///         "CgroupPrefix": "system.slice/"
184///       }
185///     ],
186///     [
187///       {
188///         "CommPrefix": "fbagent"
189///       },
190///       {
191///         "NiceAbove": 0
192///       }
193///     ]
194///   ],
195///
196/// The outer array contains the OR groups and the inner AND blocks, so the
197/// above matches:
198///
199/// - Tasks which are in the cgroup sub-hierarchy under "system.slice".
200///
201/// - Or tasks whose comm starts with "fbagent" and have a nice value > 0.
202///
203/// Currently, the following matches are supported:
204///
205/// - CgroupPrefix: Matches the prefix of the cgroup that the task belongs
206///   to. As this is a string match, whether the pattern has the trailing
207///   '/' makes a difference. For example, "TOP/CHILD/" only matches tasks
208///   which are under that particular cgroup while "TOP/CHILD" also matches
209///   tasks under "TOP/CHILD0/" or "TOP/CHILD1/".
210///
211/// - CommPrefix: Matches the task's comm prefix.
212///
213/// - PcommPrefix: Matches the task's thread group leader's comm prefix.
214///
215/// - NiceAbove: Matches if the task's nice value is greater than the
216///   pattern.
217///
218/// - NiceBelow: Matches if the task's nice value is smaller than the
219///   pattern.
220///
221/// - NiceEquals: Matches if the task's nice value is exactly equal to
222///   the pattern.
223///
224/// - UIDEquals: Matches if the task's effective user id matches the value
225///
226/// - GIDEquals: Matches if the task's effective group id matches the value.
227///
228/// - PIDEquals: Matches if the task's pid matches the value.
229///
230/// - PPIDEquals: Matches if the task's ppid matches the value.
231///
232/// - TGIDEquals: Matches if the task's tgid matches the value.
233///
234/// - NSPIDEquals: Matches if the task's namespace id and pid matches the values.
235///
236/// - NSEquals: Matches if the task's namespace id matches the values.
237///
238/// - IsGroupLeader: Bool. When true, matches if the task is group leader
239///   (i.e. PID == TGID), aka the thread from which other threads are made.
240///   When false, matches if the task is *not* the group leader (i.e. the rest).
241///
242/// - CmdJoin: Matches when the task uses pthread_setname_np to send a join/leave
243///   command to the scheduler. See examples/cmdjoin.c for more details.
244///
245/// - UsedGpuTid: Bool. When true, matches if the tasks which have used
246///   gpus by tid.
247///
248/// - UsedGpuPid: Bool. When true, matches if the tasks which have used gpu
249///   by tgid/pid.
250///
251/// - [EXPERIMENTAL] AvgRuntime: (u64, u64). Match tasks whose average runtime
252///   is within the provided values [min, max).
253///
254/// - HintEquals: u64. Match tasks whose hint value equals this value.
255///   The value must be in the range [0, 1024].
256///
257/// - SystemCpuUtilBelow: f64. Match when the system CPU utilization fraction
258///   is below the specified threshold (a value in the range [0.0, 1.0]). This
259///   option can only be used in conjunction with HintEquals.
260///
261/// - DsqInsertBelow: f64. Match when the layer DSQ insertion fraction is below
262///   the specified threshold (a value in the range [0.0, 1.0]). This option can
263///   only be used in conjunction with HintEquals.
264///
265/// While there are complexity limitations as the matches are performed in
266/// BPF, it is straightforward to add more types of matches.
267///
268/// Templates
269/// ---------
270///
271/// Templates let us create a variable number of layers dynamically at initialization
272/// time out of a cgroup name suffix/prefix. Sometimes we know there are multiple
273/// applications running on a machine, each with their own cgroup but do not know the
274/// exact names of the applications or cgroups, e.g., in cloud computing contexts where
275/// workloads are placed on machines dynamically and run under cgroups whose name is
276/// autogenerated. In that case, we cannot hardcode the cgroup match rules when writing
277/// the configuration. We thus cannot easily prevent tasks from different cgroups from
278/// falling into the same layer and affecting each other's performance.
279///
280///
281/// Templates offer a solution to this problem by generating one layer for each such cgroup,
282/// provided these cgroups share a suffix, and that the suffix is unique to them. Templates
283/// have a cgroup suffix rule that we use to find the relevant cgroups in the system. For each
284/// such cgroup, we copy the layer config and add a matching rule that matches just this cgroup.
285///
286///
287/// Policies
288/// ========
289///
290/// The following is an example policy configuration for a layer.
291///
292///   "kind": {
293///     "Confined": {
294///       "cpus_range": [1, 8],
295///       "util_range": [0.8, 0.9]
296///     }
297///   }
298///
299/// It's of "Confined" kind, which tries to concentrate the layer's tasks
300/// into a limited number of CPUs. In the above case, the number of CPUs
301/// assigned to the layer is scaled between 1 and 8 so that the per-cpu
302/// utilization is kept between 80% and 90%. If the CPUs are loaded higher
303/// than 90%, more CPUs are allocated to the layer. If the utilization drops
304/// below 80%, the layer loses CPUs.
305///
306/// Currently, the following policy kinds are supported:
307///
308/// - Confined: Tasks are restricted to the allocated CPUs. The number of
309///   CPUs allocated is modulated to keep the per-CPU utilization in
310///   "util_range". The range can optionally be restricted with the
311///   "cpus_range" property.
312///
313/// - Grouped: Similar to Confined but tasks may spill outside if there are
314///   idle CPUs outside the allocated ones. The range can optionally be
315///   restricted with the "cpus_range" property.
316///
317/// - Open: Prefer the CPUs which are not occupied by Confined or Grouped
318///   layers. Tasks in this group will spill into occupied CPUs if there are
319///   no unoccupied idle CPUs.
320///
321/// All layers take the following options:
322///
323/// - min_exec_us: Minimum execution time in microseconds. Whenever a task
324///   is scheduled in, this is the minimum CPU time that it's charged no
325///   matter how short the actual execution time may be.
326///
327/// - yield_ignore: Yield ignore ratio. If 0.0, yield(2) forfeits a whole
328///   execution slice. 0.25 yields three quarters of an execution slice and
329///   so on. If 1.0, yield is completely ignored.
330///
331/// - slice_us: Scheduling slice duration in microseconds.
332///
333/// - fifo: Use FIFO queues within the layer instead of the default vtime.
334///
335/// - preempt: If true, tasks in the layer will preempt tasks which belong
336///   to other non-preempting layers when no idle CPUs are available.
337///
338/// - preempt_first: If true, tasks in the layer will try to preempt tasks
339///   in their previous CPUs before trying to find idle CPUs.
340///
341/// - exclusive: If true, tasks in the layer will occupy the whole core. The
342///   other logical CPUs sharing the same core will be kept idle. This isn't
343///   a hard guarantee, so don't depend on it for security purposes.
344///
345/// - allow_node_aligned: DEPRECATED. Node-aligned tasks are now always
346///   dispatched on layer DSQs. This field is ignored if specified.
347///
348/// - prev_over_idle_core: On SMT enabled systems, prefer using the same CPU
349///   when picking a CPU for tasks on this layer, even if that CPUs SMT
350///   sibling is processing a task.
351///
352/// - weight: Weight of the layer, which is a range from 1 to 10000 with a
353///   default of 100. Layer weights are used during contention to prevent
354///   starvation across layers. Weights are used in combination with
355///   utilization to determine the infeasible adjusted weight with higher
356///   weights having a larger adjustment in adjusted utilization.
357///
358/// - disallow_open_after_us: Duration to wait after machine reaches saturation
359///   before confining tasks in Open layers.
360///
361/// - cpus_range_frac: Array of 2 floats between 0 and 1.0. Lower and upper
362///   bound fractions of all CPUs to give to a layer. Mutually exclusive
363///   with cpus_range.
364///
365/// - disallow_preempt_after_us: Duration to wait after machine reaches saturation
366///   before confining tasks to preempt.
367///
368/// - xllc_mig_min_us: Skip cross-LLC migrations if they are likely to run on
369///   their existing LLC sooner than this.
370///
371/// - idle_smt: *** DEPRECATED ****
372///
373/// - growth_algo: Determines the order in which CPUs are allocated to the
374///   layer as it grows. All algorithms are NUMA-aware and produce per-node
375///   core orderings. Most are locality algorithms that prefer the layer's
376///   home node and spill to remote nodes only when local capacity is
377///   exhausted. NUMA-spread algorithms (RoundRobin, NodeSpread*) instead
378///   enforce equal CPU counts across all NUMA nodes. Default: Sticky.
379///
380/// - perf: CPU performance target. 0 means no configuration. A value
381///   between 1 and 1024 indicates the performance level CPUs running tasks
382///   in this layer are configured to using scx_bpf_cpuperf_set().
383///
384/// - idle_resume_us: Sets the idle resume QoS value. CPU idle time governors are expected to
385///   regard the minimum of the global (effective) CPU latency limit and the effective resume
386///   latency constraint for the given CPU as the upper limit for the exit latency of the idle
387///   states. See the latest kernel docs for more details:
388///   https://www.kernel.org/doc/html/latest/admin-guide/pm/cpuidle.html
389///
390/// - nodes: If set the layer will use the set of NUMA nodes for scheduling
391///   decisions. If unset then all available NUMA nodes will be used. If the
392///   llcs value is set the cpuset of NUMA nodes will be or'ed with the LLC
393///   config.
394///
395/// - llcs: If set the layer will use the set of LLCs (last level caches)
396///   for scheduling decisions. If unset then all LLCs will be used. If
397///   the nodes value is set the cpuset of LLCs will be or'ed with the nodes
398///   config.
399///
400///
401/// Similar to matches, adding new policies and extending existing ones
402/// should be relatively straightforward.
403///
404/// Configuration example and running scx_layered
405/// =============================================
406///
407/// An scx_layered config is composed of layer configs. A layer config is
408/// composed of a name, a set of matches, and a policy block. Running the
409/// following will write an example configuration into example.json.
410///
411///   $ scx_layered -e example.json
412///
413/// Note that the last layer in the configuration must have an empty match set
414/// as a catch-all for tasks which haven't been matched into previous layers.
415///
416/// The configuration can be specified in multiple json files and
417/// command line arguments, which are concatenated in the specified
418/// order. Each must contain valid layer configurations.
419///
420/// By default, an argument to scx_layered is interpreted as a JSON string. If
421/// the argument is a pointer to a JSON file, it should be prefixed with file:
422/// or f: as follows:
423///
424///   $ scx_layered file:example.json
425///   ...
426///   $ scx_layered f:example.json
427///
428/// Monitoring Statistics
429/// =====================
430///
431/// Run with `--stats INTERVAL` to enable stats monitoring. There is
432/// also an scx_stat server listening on /var/run/scx/root/stat that can
433/// be monitored by running `scx_layered --monitor INTERVAL` separately.
434///
435///   ```bash
436///   $ scx_layered --monitor 1
437///   tot= 117909 local=86.20 open_idle= 0.21 affn_viol= 1.37 proc=6ms
438///   busy= 34.2 util= 1733.6 load=  21744.1 fb_cpus=[n0:1]
439///     batch    : util/frac=   11.8/  0.7 load/frac=     29.7:  0.1 tasks=  2597
440///                tot=   3478 local=67.80 open_idle= 0.00 preempt= 0.00 affn_viol= 0.00
441///                cpus=  2 [  2,  2] 04000001 00000000
442///     immediate: util/frac= 1218.8/ 70.3 load/frac=  21399.9: 98.4 tasks=  1107
443///                tot=  68997 local=90.57 open_idle= 0.26 preempt= 9.36 affn_viol= 0.00
444///                cpus= 50 [ 50, 50] fbfffffe 000fffff
445///     normal   : util/frac=  502.9/ 29.0 load/frac=    314.5:  1.4 tasks=  3512
446///                tot=  45434 local=80.97 open_idle= 0.16 preempt= 0.00 affn_viol= 3.56
447///                cpus= 50 [ 50, 50] fbfffffe 000fffff
448///   ```
449///
450/// Global statistics: see [`SysStats`]
451///
452/// Per-layer statistics: see [`LayerStats`]
453///
454#[derive(Debug, Parser)]
455#[command(verbatim_doc_comment)]
456struct Opts {
457    /// Deprecated, noop, use RUST_LOG or --log-level instead.
458    #[clap(short = 'v', long, action = clap::ArgAction::Count)]
459    verbose: u8,
460
461    /// Scheduling slice duration in microseconds.
462    #[clap(short = 's', long, default_value = "20000")]
463    slice_us: u64,
464
465    /// Maximum consecutive execution time in microseconds. A task may be
466    /// allowed to keep executing on a CPU for this long. Note that this is
467    /// the upper limit and a task may have to moved off the CPU earlier. 0
468    /// indicates default - 20 * slice_us.
469    #[clap(short = 'M', long, default_value = "0")]
470    max_exec_us: u64,
471
472    /// Scheduling interval in seconds.
473    #[clap(short = 'i', long, default_value = "0.1")]
474    interval: f64,
475
476    /// ***DEPRECATED*** Disable load-fraction based max layer CPU limit.
477    /// recommended.
478    #[clap(short = 'n', long, default_value = "false")]
479    no_load_frac_limit: bool,
480
481    /// Exit debug dump buffer length. 0 indicates default.
482    #[clap(long, default_value = "0")]
483    exit_dump_len: u32,
484
485    /// Specify the logging level. Accepts rust's envfilter syntax for modular
486    /// logging: https://docs.rs/tracing-subscriber/latest/tracing_subscriber/filter/struct.EnvFilter.html#example-syntax. Examples: ["info", "warn,tokio=info"]
487    #[clap(long, default_value = "info")]
488    log_level: String,
489
490    /// Disable topology awareness. When enabled, the "nodes" and "llcs" settings on
491    /// a layer are ignored. Defaults to false on topologies with multiple NUMA nodes
492    /// or LLCs, and true otherwise.
493    #[arg(short = 't', long, num_args = 0..=1, default_missing_value = "true", require_equals = true)]
494    disable_topology: Option<bool>,
495
496    /// Disable monitor
497    #[clap(long)]
498    monitor_disable: bool,
499
500    /// Write example layer specifications into the file and exit.
501    #[clap(short = 'e', long)]
502    example: Option<String>,
503
504    /// ***DEPRECATED*** Disables preemption if the weighted load fraction
505    /// of a layer (load_frac_adj) exceeds the threshold. The default is
506    /// disabled (0.0).
507    #[clap(long, default_value = "0.0")]
508    layer_preempt_weight_disable: f64,
509
510    /// ***DEPRECATED*** Disables layer growth if the weighted load fraction
511    /// of a layer (load_frac_adj) exceeds the threshold. The default is
512    /// disabled (0.0).
513    #[clap(long, default_value = "0.0")]
514    layer_growth_weight_disable: f64,
515
516    /// Enable stats monitoring with the specified interval.
517    #[clap(long)]
518    stats: Option<f64>,
519
520    /// Run in stats monitoring mode with the specified interval. Scheduler
521    /// is not launched.
522    #[clap(long)]
523    monitor: Option<f64>,
524
525    /// Column limit for stats monitor output.
526    #[clap(long, default_value = "95")]
527    stats_columns: usize,
528
529    /// Disable per-LLC stats in monitor output.
530    #[clap(long)]
531    stats_no_llc: bool,
532
533    /// Run with example layer specifications (useful for e.g. CI pipelines)
534    #[clap(long)]
535    run_example: bool,
536
537    /// Allocate CPUs at an SMT granularity (not core)
538    #[clap(long)]
539    allow_partial_core: bool,
540
541    /// ***DEPRECATED *** Enables iteration over local LLCs first for
542    /// dispatch.
543    #[clap(long, default_value = "false")]
544    local_llc_iteration: bool,
545
546    /// Low priority fallback DSQs are used to execute tasks with custom CPU
547    /// affinities. These DSQs are immediately executed iff a CPU is
548    /// otherwise idle. However, after the specified wait, they are
549    /// guaranteed upto --lo-fb-share fraction of each CPU.
550    #[clap(long, default_value = "10000")]
551    lo_fb_wait_us: u64,
552
553    /// The fraction of CPU time guaranteed to low priority fallback DSQs.
554    /// See --lo-fb-wait-us.
555    #[clap(long, default_value = ".05")]
556    lo_fb_share: f64,
557
558    /// Disable antistall
559    #[clap(long, default_value = "false")]
560    disable_antistall: bool,
561
562    /// Enable numa topology based gpu task affinitization.
563    #[clap(long, default_value = "false")]
564    enable_gpu_affinitize: bool,
565
566    /// Interval at which to reaffinitize gpu tasks to numa nodes.
567    /// Defaults to 900s
568    #[clap(long, default_value = "900")]
569    gpu_affinitize_secs: u64,
570
571    /// Enable match debug
572    /// This stores a mapping of task tid
573    /// to layer id such that bpftool map dump
574    /// can be used to debug layer matches.
575    #[clap(long, default_value = "false")]
576    enable_match_debug: bool,
577
578    /// Maximum task runnable_at delay (in seconds) before antistall turns on
579    #[clap(long, default_value = "3")]
580    antistall_sec: u64,
581
582    /// Enable gpu support
583    #[clap(long, default_value = "false")]
584    enable_gpu_support: bool,
585
586    /// Gpu Kprobe Level
587    /// The value set here determines how aggressive
588    /// the kprobes enabled on gpu driver functions are.
589    /// Higher values are more aggressive, incurring more system overhead
590    /// and more accurately identifying PIDs using GPUs in a more timely manner.
591    /// Lower values incur less system overhead, at the cost of less accurately
592    /// identifying GPU pids and taking longer to do so.
593    #[clap(long, default_value = "3")]
594    gpu_kprobe_level: u64,
595
596    /// Enable netdev IRQ balancing. This is experimental and should be used with caution.
597    #[clap(long, default_value = "false")]
598    netdev_irq_balance: bool,
599
600    /// Disable queued wakeup optimization.
601    #[clap(long, default_value = "false")]
602    disable_queued_wakeup: bool,
603
604    /// Per-cpu kthreads are preempting by default. Make it not so.
605    #[clap(long, default_value = "false")]
606    disable_percpu_kthread_preempt: bool,
607
608    /// Only highpri (nice < 0) per-cpu kthreads are preempting by default.
609    /// Make every per-cpu kthread preempting. Meaningful only if
610    /// --disable-percpu-kthread-preempt is not set.
611    #[clap(long, default_value = "false")]
612    percpu_kthread_preempt_all: bool,
613
614    /// Print scheduler version and exit.
615    #[clap(short = 'V', long, action = clap::ArgAction::SetTrue)]
616    version: bool,
617
618    /// Optional run ID for tracking scheduler instances.
619    #[clap(long)]
620    run_id: Option<u64>,
621
622    /// Show descriptions for statistics.
623    #[clap(long)]
624    help_stats: bool,
625
626    /// Layer specification. See --help.
627    specs: Vec<String>,
628
629    /// Periodically force tasks in layers using the AvgRuntime match rule to reevaluate which layer they belong to. Default period of 2s.
630    /// turns this off.
631    #[clap(long, default_value = "2000")]
632    layer_refresh_ms_avgruntime: u64,
633
634    /// Set the path for pinning the task hint map.
635    #[clap(long, default_value = "")]
636    task_hint_map: String,
637
638    /// Print the config (after template expansion) and exit.
639    #[clap(long, default_value = "false")]
640    print_and_exit: bool,
641
642    /// Enable affinitized task to use hi fallback queue to get more CPU time.
643    #[clap(long, default_value = "")]
644    hi_fb_thread_name: String,
645
646    #[clap(flatten, next_help_heading = "Topology Options")]
647    topology: TopologyArgs,
648
649    #[clap(flatten, next_help_heading = "Libbpf Options")]
650    pub libbpf: LibbpfOpts,
651}
652
653// Cgroup event types for inter-thread communication
654#[derive(Debug, Clone)]
655enum CgroupEvent {
656    Created {
657        path: String,
658        cgroup_id: u64,    // inode number
659        match_bitmap: u64, // bitmap of matched regex rules
660    },
661    Removed {
662        path: String,
663        cgroup_id: u64, // inode number
664    },
665}
666
667fn read_total_cpu(reader: &fb_procfs::ProcReader) -> Result<fb_procfs::CpuStat> {
668    reader
669        .read_stat()
670        .context("Failed to read procfs")?
671        .total_cpu
672        .ok_or_else(|| anyhow!("Could not read total cpu stat in proc"))
673}
674
675fn calc_util(curr: &fb_procfs::CpuStat, prev: &fb_procfs::CpuStat) -> Result<f64> {
676    match (curr, prev) {
677        (
678            fb_procfs::CpuStat {
679                user_usec: Some(curr_user),
680                nice_usec: Some(curr_nice),
681                system_usec: Some(curr_system),
682                idle_usec: Some(curr_idle),
683                iowait_usec: Some(curr_iowait),
684                irq_usec: Some(curr_irq),
685                softirq_usec: Some(curr_softirq),
686                stolen_usec: Some(curr_stolen),
687                ..
688            },
689            fb_procfs::CpuStat {
690                user_usec: Some(prev_user),
691                nice_usec: Some(prev_nice),
692                system_usec: Some(prev_system),
693                idle_usec: Some(prev_idle),
694                iowait_usec: Some(prev_iowait),
695                irq_usec: Some(prev_irq),
696                softirq_usec: Some(prev_softirq),
697                stolen_usec: Some(prev_stolen),
698                ..
699            },
700        ) => {
701            let idle_usec = curr_idle.saturating_sub(*prev_idle);
702            let iowait_usec = curr_iowait.saturating_sub(*prev_iowait);
703            let user_usec = curr_user.saturating_sub(*prev_user);
704            let system_usec = curr_system.saturating_sub(*prev_system);
705            let nice_usec = curr_nice.saturating_sub(*prev_nice);
706            let irq_usec = curr_irq.saturating_sub(*prev_irq);
707            let softirq_usec = curr_softirq.saturating_sub(*prev_softirq);
708            let stolen_usec = curr_stolen.saturating_sub(*prev_stolen);
709
710            let busy_usec =
711                user_usec + system_usec + nice_usec + irq_usec + softirq_usec + stolen_usec;
712            let total_usec = idle_usec + busy_usec + iowait_usec;
713            if total_usec > 0 {
714                Ok(((busy_usec as f64) / (total_usec as f64)).clamp(0.0, 1.0))
715            } else {
716                Ok(1.0)
717            }
718        }
719        _ => bail!("Missing stats in cpustat"),
720    }
721}
722
723fn copy_into_cstr(dst: &mut [i8], src: &str) {
724    let cstr = CString::new(src).unwrap();
725    let bytes = unsafe { std::mem::transmute::<&[u8], &[i8]>(cstr.as_bytes_with_nul()) };
726    dst[0..bytes.len()].copy_from_slice(bytes);
727}
728
729#[allow(clippy::needless_range_loop)]
730fn read_cpu_ctxs(skel: &BpfSkel) -> Result<Vec<bpf_intf::cpu_ctx>> {
731    let mut cpu_ctxs = vec![];
732    let cpu_ctxs_vec = skel
733        .maps
734        .cpu_ctxs
735        .lookup_percpu(&0u32.to_ne_bytes(), libbpf_rs::MapFlags::ANY)
736        .context("Failed to lookup cpu_ctx")?
737        .unwrap();
738    for cpu in 0..*NR_CPUS_POSSIBLE {
739        cpu_ctxs.push(
740            *plain::from_bytes(cpu_ctxs_vec[cpu].as_slice())
741                .expect("cpu_ctx: short or misaligned buffer"),
742        );
743    }
744    Ok(cpu_ctxs)
745}
746
747#[derive(Clone, Debug)]
748struct BpfStats {
749    gstats: Vec<u64>,
750    lstats: Vec<Vec<u64>>,
751    lstats_sums: Vec<u64>,
752    llc_lstats: Vec<Vec<Vec<u64>>>, // [layer][llc][stat]
753}
754
755impl BpfStats {
756    #[allow(clippy::needless_range_loop)]
757    fn read(skel: &BpfSkel, cpu_ctxs: &[bpf_intf::cpu_ctx]) -> Self {
758        let nr_layers = skel.maps.rodata_data.as_ref().unwrap().nr_layers as usize;
759        let nr_llcs = skel.maps.rodata_data.as_ref().unwrap().nr_llcs as usize;
760        let mut gstats = vec![0u64; NR_GSTATS];
761        let mut lstats = vec![vec![0u64; NR_LSTATS]; nr_layers];
762        let mut llc_lstats = vec![vec![vec![0u64; NR_LLC_LSTATS]; nr_llcs]; nr_layers];
763
764        for cpu in 0..*NR_CPUS_POSSIBLE {
765            for (stat, value) in gstats.iter_mut().enumerate() {
766                *value += cpu_ctxs[cpu].gstats[stat];
767            }
768            for (layer, layer_stats) in lstats.iter_mut().enumerate() {
769                for (stat, value) in layer_stats.iter_mut().enumerate() {
770                    *value += cpu_ctxs[cpu].lstats[layer][stat];
771                }
772            }
773        }
774
775        let mut lstats_sums = vec![0u64; NR_LSTATS];
776        for layer_stats in lstats.iter() {
777            for (stat, value) in lstats_sums.iter_mut().enumerate() {
778                *value += layer_stats[stat];
779            }
780        }
781
782        for llc_id in 0..nr_llcs {
783            // XXX - This would be a lot easier if llc_ctx were in
784            // the bss. Unfortunately, kernel < v6.12 crashes and
785            // kernel >= v6.12 fails verification after such
786            // conversion due to seemingly verifier bugs. Convert to
787            // bss maps later.
788            let v = skel
789                .maps
790                .llc_data
791                .lookup(&(llc_id as u32).to_ne_bytes(), libbpf_rs::MapFlags::ANY)
792                .unwrap()
793                .unwrap();
794            let llcc: &bpf_intf::llc_ctx =
795                plain::from_bytes(v.as_slice()).expect("llc_ctx: short or misaligned buffer");
796
797            for (layer_id, layer_llc_stats) in llc_lstats.iter_mut().enumerate() {
798                for (stat_id, stat_value) in layer_llc_stats[llc_id].iter_mut().enumerate() {
799                    *stat_value = llcc.lstats[layer_id][stat_id];
800                }
801            }
802        }
803
804        Self {
805            gstats,
806            lstats,
807            lstats_sums,
808            llc_lstats,
809        }
810    }
811}
812
813impl<'b> Sub<&'b BpfStats> for &BpfStats {
814    type Output = BpfStats;
815
816    fn sub(self, rhs: &'b BpfStats) -> BpfStats {
817        let vec_sub = |l: &[u64], r: &[u64]| l.iter().zip(r.iter()).map(|(l, r)| *l - *r).collect();
818        BpfStats {
819            gstats: vec_sub(&self.gstats, &rhs.gstats),
820            lstats: self
821                .lstats
822                .iter()
823                .zip(rhs.lstats.iter())
824                .map(|(l, r)| vec_sub(l, r))
825                .collect(),
826            lstats_sums: vec_sub(&self.lstats_sums, &rhs.lstats_sums),
827            llc_lstats: self
828                .llc_lstats
829                .iter()
830                .zip(rhs.llc_lstats.iter())
831                .map(|(l_layer, r_layer)| {
832                    l_layer
833                        .iter()
834                        .zip(r_layer.iter())
835                        .map(|(l_llc, r_llc)| {
836                            let (l_llc, mut r_llc) = (l_llc.clone(), r_llc.clone());
837                            // Lat is not subtractable, take L side.
838                            r_llc[bpf_intf::llc_layer_stat_id_LLC_LSTAT_LAT as usize] = 0;
839                            vec_sub(&l_llc, &r_llc)
840                        })
841                        .collect()
842                })
843                .collect(),
844        }
845    }
846}
847
848#[derive(Clone, Debug)]
849struct Stats {
850    at: Instant,
851    elapsed: Duration,
852    topo: Arc<Topology>,
853    nr_layers: usize,
854    nr_layer_tasks: Vec<usize>,
855    layer_nr_node_pinned_tasks: Vec<Vec<u64>>,
856
857    total_util: f64, // Running AVG of sum of layer_utils
858    layer_utils: Vec<Vec<f64>>,
859    prev_layer_usages: Vec<Vec<u64>>,
860    layer_node_pinned_utils: Vec<Vec<f64>>,
861    prev_layer_node_pinned_usages: Vec<Vec<u64>>,
862    layer_node_utils: Vec<Vec<f64>>,
863    prev_layer_node_usages: Vec<Vec<u64>>,
864    layer_node_duty_sums: Vec<Vec<f64>>, // Per-node per-layer duty in CPU units (EWMA)
865    prev_layer_node_duty_raw: Vec<Vec<u64>>, // Raw accumulated layer_duty_sum values
866
867    layer_membws: Vec<Vec<f64>>, // Estimated memory bandsidth consumption
868    prev_layer_membw_agg: Vec<Vec<u64>>, // Estimated aggregate membw consumption
869
870    cpu_busy: f64, // Read from /proc, maybe higher than total_util
871    prev_total_cpu: fb_procfs::CpuStat,
872    prev_pmu_resctrl_membw: (u64, u64), // (PMU-reported membw, resctrl-reported membw)
873
874    system_cpu_util_ewma: f64,       // 10s EWMA of system CPU utilization
875    layer_dsq_insert_ewma: Vec<f64>, // 10s EWMA of per-layer DSQ insertion ratio
876
877    bpf_stats: BpfStats,
878    prev_bpf_stats: BpfStats,
879
880    processing_dur: Duration,
881    prev_processing_dur: Duration,
882
883    layer_slice_us: Vec<u64>,
884
885    gpu_tasks_affinitized: u64,
886    gpu_task_affinitization_ms: u64,
887}
888
889impl Stats {
890    #[allow(clippy::needless_range_loop)]
891    fn read_layer_usages(cpu_ctxs: &[bpf_intf::cpu_ctx], nr_layers: usize) -> Vec<Vec<u64>> {
892        let mut layer_usages = vec![vec![0u64; NR_LAYER_USAGES]; nr_layers];
893
894        for cpu in 0..*NR_CPUS_POSSIBLE {
895            for (layer, layer_usage) in layer_usages.iter_mut().enumerate() {
896                for (usage, value) in layer_usage.iter_mut().enumerate() {
897                    *value += cpu_ctxs[cpu].layer_usages[layer][usage];
898                }
899            }
900        }
901
902        layer_usages
903    }
904
905    // Same as above, but for aggregate memory bandwidth consumption.
906    #[allow(clippy::needless_range_loop)]
907    fn read_layer_membw_agg(cpu_ctxs: &[bpf_intf::cpu_ctx], nr_layers: usize) -> Vec<Vec<u64>> {
908        let mut layer_membw_agg = vec![vec![0u64; NR_LAYER_USAGES]; nr_layers];
909
910        for cpu in 0..*NR_CPUS_POSSIBLE {
911            for (layer, layer_membw) in layer_membw_agg.iter_mut().enumerate() {
912                for (usage, value) in layer_membw.iter_mut().enumerate() {
913                    *value += cpu_ctxs[cpu].layer_membw_agg[layer][usage];
914                }
915            }
916        }
917
918        layer_membw_agg
919    }
920
921    #[allow(clippy::needless_range_loop)]
922    fn read_layer_node_pinned_usages(
923        cpu_ctxs: &[bpf_intf::cpu_ctx],
924        topo: &Topology,
925        nr_layers: usize,
926        nr_nodes: usize,
927    ) -> Vec<Vec<u64>> {
928        let mut usages = vec![vec![0u64; nr_nodes]; nr_layers];
929
930        for cpu in 0..*NR_CPUS_POSSIBLE {
931            let node = topo.all_cpus.get(&cpu).map_or(0, |c| c.node_id);
932            for (layer, layer_usages) in usages.iter_mut().enumerate().take(nr_layers) {
933                layer_usages[node] += cpu_ctxs[cpu].node_pinned_usage[layer];
934            }
935        }
936
937        usages
938    }
939
940    #[allow(clippy::needless_range_loop)]
941    fn read_layer_node_usages(
942        cpu_ctxs: &[bpf_intf::cpu_ctx],
943        topo: &Topology,
944        nr_layers: usize,
945        nr_nodes: usize,
946    ) -> Vec<Vec<u64>> {
947        let mut usages = vec![vec![0u64; nr_nodes]; nr_layers];
948
949        for cpu in 0..*NR_CPUS_POSSIBLE {
950            let node = topo.all_cpus.get(&cpu).map_or(0, |c| c.node_id);
951            for (layer, layer_usages) in usages.iter_mut().enumerate().take(nr_layers) {
952                for usage in 0..=LAYER_USAGE_SUM_UPTO {
953                    layer_usages[node] += cpu_ctxs[cpu].layer_usages[layer][usage];
954                }
955            }
956        }
957
958        usages
959    }
960
961    #[allow(clippy::needless_range_loop)]
962    fn read_layer_node_duty_raw(
963        cpu_ctxs: &[bpf_intf::cpu_ctx],
964        topo: &Topology,
965        nr_layers: usize,
966        nr_nodes: usize,
967    ) -> Vec<Vec<u64>> {
968        let mut sums = vec![vec![0u64; nr_nodes]; nr_layers];
969
970        for cpu in 0..*NR_CPUS_POSSIBLE {
971            let node = topo.all_cpus.get(&cpu).map_or(0, |c| c.node_id);
972            for (layer, layer_sums) in sums.iter_mut().enumerate().take(nr_layers) {
973                layer_sums[node] += cpu_ctxs[cpu].layer_duty_sum[layer];
974            }
975        }
976
977        sums
978    }
979
980    /// Use the membw reported by resctrl to normalize the values reported by hw counters.
981    /// We have the following problem:
982    /// 1) We want per-task memory bandwidth reporting. We cannot do this with resctrl, much
983    ///    less transparently, since we would require different RMID for each task.
984    /// 2) We want to directly use perf counters for tracking per-task memory bandwidth, but
985    ///    we can't: Non-resctrl counters do not measure the right thing (e.g., they only measure
986    ///    proxies like load operations),
987    /// 3) Resctrl counters are not accessible directly so we cannot read them from the BPF side.
988    ///
989    /// Approximate per-task memory bandwidth using perf counters to measure _relative_ memory
990    /// bandwidth usage.
991    fn resctrl_read_total_membw() -> Result<u64> {
992        let mut total_membw = 0u64;
993        for entry in WalkDir::new("/sys/fs/resctrl/mon_data")
994            .min_depth(1)
995            .into_iter()
996            .filter_map(Result::ok)
997            .filter(|x| x.path().is_dir())
998        {
999            let mut path = entry.path().to_path_buf();
1000            path.push("mbm_total_bytes");
1001            total_membw += fs::read_to_string(path)?.trim().parse::<u64>()?;
1002        }
1003
1004        Ok(total_membw)
1005    }
1006
1007    fn new(
1008        skel: &mut BpfSkel,
1009        proc_reader: &fb_procfs::ProcReader,
1010        topo: Arc<Topology>,
1011        gpu_task_affinitizer: &GpuTaskAffinitizer,
1012    ) -> Result<Self> {
1013        let nr_layers = skel.maps.rodata_data.as_ref().unwrap().nr_layers as usize;
1014        let nr_nodes = topo.nodes.len();
1015        let cpu_ctxs = read_cpu_ctxs(skel)?;
1016        let bpf_stats = BpfStats::read(skel, &cpu_ctxs);
1017        let pmu_membw = Self::read_layer_membw_agg(&cpu_ctxs, nr_layers);
1018
1019        Ok(Self {
1020            at: Instant::now(),
1021            elapsed: Default::default(),
1022
1023            topo: topo.clone(),
1024            nr_layers,
1025            nr_layer_tasks: vec![0; nr_layers],
1026            layer_nr_node_pinned_tasks: vec![vec![0; nr_nodes]; nr_layers],
1027
1028            total_util: 0.0,
1029            layer_utils: vec![vec![0.0; NR_LAYER_USAGES]; nr_layers],
1030            prev_layer_usages: Self::read_layer_usages(&cpu_ctxs, nr_layers),
1031            layer_node_pinned_utils: vec![vec![0.0; nr_nodes]; nr_layers],
1032            prev_layer_node_pinned_usages: Self::read_layer_node_pinned_usages(
1033                &cpu_ctxs, &topo, nr_layers, nr_nodes,
1034            ),
1035            layer_node_utils: vec![vec![0.0; nr_nodes]; nr_layers],
1036            prev_layer_node_usages: Self::read_layer_node_usages(
1037                &cpu_ctxs, &topo, nr_layers, nr_nodes,
1038            ),
1039            layer_node_duty_sums: vec![vec![0.0; nr_nodes]; nr_layers],
1040            prev_layer_node_duty_raw: Self::read_layer_node_duty_raw(
1041                &cpu_ctxs, &topo, nr_layers, nr_nodes,
1042            ),
1043            layer_membws: vec![vec![0.0; NR_LAYER_USAGES]; nr_layers],
1044            // This is not normalized because we don't have enough history to do so.
1045            // It should not matter too much, since the value is dropped on the first
1046            // iteration.
1047            prev_layer_membw_agg: pmu_membw,
1048            prev_pmu_resctrl_membw: (0, 0),
1049
1050            cpu_busy: 0.0,
1051            prev_total_cpu: read_total_cpu(proc_reader)?,
1052            system_cpu_util_ewma: 0.0,
1053            layer_dsq_insert_ewma: vec![0.0; nr_layers],
1054
1055            bpf_stats: bpf_stats.clone(),
1056            prev_bpf_stats: bpf_stats,
1057
1058            processing_dur: Default::default(),
1059            prev_processing_dur: Default::default(),
1060
1061            layer_slice_us: vec![0; nr_layers],
1062            gpu_tasks_affinitized: gpu_task_affinitizer.tasks_affinitized,
1063            gpu_task_affinitization_ms: gpu_task_affinitizer.last_task_affinitization_ms,
1064        })
1065    }
1066
1067    fn refresh(
1068        &mut self,
1069        skel: &mut BpfSkel,
1070        proc_reader: &fb_procfs::ProcReader,
1071        now: Instant,
1072        cur_processing_dur: Duration,
1073        gpu_task_affinitizer: &GpuTaskAffinitizer,
1074    ) -> Result<()> {
1075        let elapsed = now.duration_since(self.at);
1076        let elapsed_f64 = elapsed.as_secs_f64();
1077        let cpu_ctxs = read_cpu_ctxs(skel)?;
1078
1079        let layers = &skel.maps.bss_data.as_ref().unwrap().layers;
1080        let nr_layer_tasks: Vec<usize> = layers
1081            .iter()
1082            .take(self.nr_layers)
1083            .map(|layer| layer.nr_tasks as usize)
1084            .collect();
1085        let layer_nr_node_pinned_tasks: Vec<Vec<u64>> = layers
1086            .iter()
1087            .take(self.nr_layers)
1088            .map(|layer| {
1089                layer.node[..self.topo.nodes.len()]
1090                    .iter()
1091                    .map(|n| n.nr_pinned_tasks)
1092                    .collect()
1093            })
1094            .collect();
1095        let layer_slice_us: Vec<u64> = layers
1096            .iter()
1097            .take(self.nr_layers)
1098            .map(|layer| layer.slice_ns / 1000_u64)
1099            .collect();
1100
1101        let cur_layer_usages = Self::read_layer_usages(&cpu_ctxs, self.nr_layers);
1102        let cur_layer_node_pinned_usages = Self::read_layer_node_pinned_usages(
1103            &cpu_ctxs,
1104            &self.topo,
1105            self.nr_layers,
1106            self.topo.nodes.len(),
1107        );
1108        let cur_layer_node_usages = Self::read_layer_node_usages(
1109            &cpu_ctxs,
1110            &self.topo,
1111            self.nr_layers,
1112            self.topo.nodes.len(),
1113        );
1114        let cur_layer_node_duty_raw = Self::read_layer_node_duty_raw(
1115            &cpu_ctxs,
1116            &self.topo,
1117            self.nr_layers,
1118            self.topo.nodes.len(),
1119        );
1120        let cur_layer_membw_agg = Self::read_layer_membw_agg(&cpu_ctxs, self.nr_layers);
1121
1122        // Memory BW normalization. It requires finding the delta according to perf, the delta
1123        // according to resctl, and finding the factor between them. This also helps in
1124        // determining whether this delta is stable.
1125        //
1126        // We scale only the raw PMC delta - the part of the counter incremented between two
1127        // periods. This is because the scaling factor is only valid for that time period.
1128        // Non-scaled samples are not comparable, while scaled ones are.
1129        let (pmu_prev, resctrl_prev) = self.prev_pmu_resctrl_membw;
1130        let pmu_cur: u64 = cur_layer_membw_agg
1131            .iter()
1132            .map(|membw_agg| membw_agg[LAYER_USAGE_OPEN] + membw_agg[LAYER_USAGE_OWNED])
1133            .sum();
1134        let resctrl_cur = Self::resctrl_read_total_membw()?;
1135        let factor = (resctrl_cur - resctrl_prev) as f64 / (pmu_cur - pmu_prev) as f64;
1136
1137        // Computes the runtime deltas and converts them from ns to s.
1138        let compute_diff = |cur_agg: &Vec<Vec<u64>>, prev_agg: &Vec<Vec<u64>>| {
1139            cur_agg
1140                .iter()
1141                .zip(prev_agg.iter())
1142                .map(|(cur, prev)| {
1143                    cur.iter()
1144                        .zip(prev.iter())
1145                        .map(|(c, p)| (c - p) as f64 / 1_000_000_000.0 / elapsed_f64)
1146                        .collect()
1147                })
1148                .collect()
1149        };
1150
1151        // Computes the total memory traffic done since last computation and normalizes to GBs.
1152        // We derive the rate of consumption elsewhere.
1153        let compute_mem_diff = |cur_agg: &Vec<Vec<u64>>, prev_agg: &Vec<Vec<u64>>| {
1154            cur_agg
1155                .iter()
1156                .zip(prev_agg.iter())
1157                .map(|(cur, prev)| {
1158                    cur.iter()
1159                        .zip(prev.iter())
1160                        .map(|(c, p)| (*c as i64 - *p as i64) as f64 / 1024_f64.powf(3.0))
1161                        .collect()
1162                })
1163                .collect()
1164        };
1165
1166        let cur_layer_utils: Vec<Vec<f64>> =
1167            compute_diff(&cur_layer_usages, &self.prev_layer_usages);
1168
1169        // Scale the raw value delta by the resctrl/pmc computed factor.
1170        let cur_layer_membw: Vec<Vec<f64>> =
1171            compute_mem_diff(&cur_layer_membw_agg, &self.prev_layer_membw_agg);
1172
1173        let cur_layer_membw: Vec<Vec<f64>> = cur_layer_membw
1174            .iter()
1175            .map(|x| x.iter().map(|x| *x * factor).collect())
1176            .collect();
1177
1178        let metric_decay =
1179            |cur_metric: Vec<Vec<f64>>, prev_metric: &Vec<Vec<f64>>, decay_rate: f64| {
1180                cur_metric
1181                    .iter()
1182                    .zip(prev_metric.iter())
1183                    .map(|(cur, prev)| {
1184                        cur.iter()
1185                            .zip(prev.iter())
1186                            .map(|(c, p)| {
1187                                let decay = decay_rate.powf(elapsed_f64);
1188                                p * decay + c * (1.0 - decay)
1189                            })
1190                            .collect()
1191                    })
1192                    .collect()
1193            };
1194
1195        let layer_utils: Vec<Vec<f64>> =
1196            metric_decay(cur_layer_utils, &self.layer_utils, *USAGE_DECAY);
1197        let cur_node_pinned_utils: Vec<Vec<f64>> = compute_diff(
1198            &cur_layer_node_pinned_usages,
1199            &self.prev_layer_node_pinned_usages,
1200        );
1201        let layer_node_pinned_utils: Vec<Vec<f64>> = metric_decay(
1202            cur_node_pinned_utils,
1203            &self.layer_node_pinned_utils,
1204            *USAGE_DECAY,
1205        );
1206        let cur_node_utils: Vec<Vec<f64>> =
1207            compute_diff(&cur_layer_node_usages, &self.prev_layer_node_usages);
1208        let layer_node_utils: Vec<Vec<f64>> =
1209            metric_decay(cur_node_utils, &self.layer_node_utils, *USAGE_DECAY);
1210        let cur_node_duty: Vec<Vec<f64>> =
1211            compute_diff(&cur_layer_node_duty_raw, &self.prev_layer_node_duty_raw);
1212        let layer_node_duty_sums: Vec<Vec<f64>> =
1213            metric_decay(cur_node_duty, &self.layer_node_duty_sums, *USAGE_DECAY);
1214
1215        let layer_membws: Vec<Vec<f64>> = metric_decay(cur_layer_membw, &self.layer_membws, 0.0);
1216
1217        let cur_total_cpu = read_total_cpu(proc_reader)?;
1218        let cpu_busy = calc_util(&cur_total_cpu, &self.prev_total_cpu)?;
1219
1220        // Calculate system CPU utilization EWMA (10 second window)
1221        const SYS_CPU_UTIL_EWMA_SECS: f64 = 10.0;
1222        let elapsed_f64 = elapsed.as_secs_f64();
1223        let alpha = elapsed_f64 / SYS_CPU_UTIL_EWMA_SECS.max(elapsed_f64);
1224        let system_cpu_util_ewma = alpha * cpu_busy + (1.0 - alpha) * self.system_cpu_util_ewma;
1225
1226        let cur_bpf_stats = BpfStats::read(skel, &cpu_ctxs);
1227        let bpf_stats = &cur_bpf_stats - &self.prev_bpf_stats;
1228
1229        // Calculate per-layer DSQ insertion EWMA (10 second window)
1230        const DSQ_INSERT_EWMA_SECS: f64 = 10.0;
1231        let dsq_alpha = elapsed_f64 / DSQ_INSERT_EWMA_SECS.max(elapsed_f64);
1232        let layer_dsq_insert_ewma: Vec<f64> = (0..self.nr_layers)
1233            .map(|layer_id| {
1234                let sel_local = bpf_stats.lstats[layer_id]
1235                    [bpf_intf::layer_stat_id_LSTAT_SEL_LOCAL as usize]
1236                    as f64;
1237                let enq_local = bpf_stats.lstats[layer_id]
1238                    [bpf_intf::layer_stat_id_LSTAT_ENQ_LOCAL as usize]
1239                    as f64;
1240                let enq_dsq = bpf_stats.lstats[layer_id]
1241                    [bpf_intf::layer_stat_id_LSTAT_ENQ_DSQ as usize]
1242                    as f64;
1243                let total_dispatches = sel_local + enq_local + enq_dsq;
1244
1245                let cur_ratio = if total_dispatches > 0.0 {
1246                    enq_dsq / total_dispatches
1247                } else {
1248                    0.0
1249                };
1250
1251                dsq_alpha * cur_ratio + (1.0 - dsq_alpha) * self.layer_dsq_insert_ewma[layer_id]
1252            })
1253            .collect();
1254
1255        let processing_dur = cur_processing_dur
1256            .checked_sub(self.prev_processing_dur)
1257            .unwrap();
1258
1259        *self = Self {
1260            at: now,
1261            elapsed,
1262            topo: self.topo.clone(),
1263            nr_layers: self.nr_layers,
1264            nr_layer_tasks,
1265            layer_nr_node_pinned_tasks,
1266
1267            total_util: layer_utils
1268                .iter()
1269                .map(|x| x.iter().take(LAYER_USAGE_SUM_UPTO + 1).sum::<f64>())
1270                .sum(),
1271            layer_utils,
1272            prev_layer_usages: cur_layer_usages,
1273            layer_node_pinned_utils,
1274            prev_layer_node_pinned_usages: cur_layer_node_pinned_usages,
1275            layer_node_utils,
1276            prev_layer_node_usages: cur_layer_node_usages,
1277            layer_node_duty_sums,
1278            prev_layer_node_duty_raw: cur_layer_node_duty_raw,
1279
1280            layer_membws,
1281            prev_layer_membw_agg: cur_layer_membw_agg,
1282            // Was updated during normalization.
1283            prev_pmu_resctrl_membw: (pmu_cur, resctrl_cur),
1284
1285            cpu_busy,
1286            prev_total_cpu: cur_total_cpu,
1287            system_cpu_util_ewma,
1288            layer_dsq_insert_ewma,
1289
1290            bpf_stats,
1291            prev_bpf_stats: cur_bpf_stats,
1292
1293            processing_dur,
1294            prev_processing_dur: cur_processing_dur,
1295
1296            layer_slice_us,
1297            gpu_tasks_affinitized: gpu_task_affinitizer.tasks_affinitized,
1298            gpu_task_affinitization_ms: gpu_task_affinitizer.last_task_affinitization_ms,
1299        };
1300        Ok(())
1301    }
1302}
1303
1304#[derive(Debug)]
1305struct Layer {
1306    name: String,
1307    kind: LayerKind,
1308    growth_algo: LayerGrowthAlgo,
1309    core_order: Vec<Vec<usize>>,
1310
1311    assigned_llcs: Vec<Vec<usize>>,
1312
1313    nr_cpus: usize,
1314    nr_llc_cpus: Vec<usize>,
1315    nr_node_cpus: Vec<usize>,
1316    cpus: Cpumask,
1317    allowed_cpus: Cpumask,
1318
1319    /// Per-node count of CPUs allocated for pinned demand.
1320    nr_pinned_cpus: Vec<usize>,
1321}
1322
1323fn get_kallsyms_addr(sym_name: &str) -> Result<u64> {
1324    fs::read_to_string("/proc/kallsyms")?
1325        .lines()
1326        .find(|line| line.contains(sym_name))
1327        .and_then(|line| line.split_whitespace().next())
1328        .and_then(|addr| u64::from_str_radix(addr, 16).ok())
1329        .ok_or_else(|| anyhow!("Symbol '{}' not found", sym_name))
1330}
1331
1332fn resolve_cpus_pct_range(
1333    cpus_range: &Option<(usize, usize)>,
1334    cpus_range_frac: &Option<(f64, f64)>,
1335    max_cpus: usize,
1336) -> Result<(usize, usize)> {
1337    match (cpus_range, cpus_range_frac) {
1338        (Some(_x), Some(_y)) => {
1339            bail!("cpus_range cannot be used with cpus_pct.");
1340        }
1341        (Some((cpus_range_min, cpus_range_max)), None) => Ok((*cpus_range_min, *cpus_range_max)),
1342        (None, Some((cpus_frac_min, cpus_frac_max))) => {
1343            if *cpus_frac_min < 0_f64
1344                || *cpus_frac_min > 1_f64
1345                || *cpus_frac_max < 0_f64
1346                || *cpus_frac_max > 1_f64
1347            {
1348                bail!("cpus_range_frac values must be between 0.0 and 1.0");
1349            }
1350            let cpus_min_count = ((max_cpus as f64) * cpus_frac_min).round_ties_even() as usize;
1351            let cpus_max_count = ((max_cpus as f64) * cpus_frac_max).round_ties_even() as usize;
1352            Ok((
1353                std::cmp::max(cpus_min_count, 1),
1354                std::cmp::min(std::cmp::max(cpus_max_count, 1), max_cpus),
1355            ))
1356        }
1357        (None, None) => Ok((0, max_cpus)),
1358    }
1359}
1360
1361impl Layer {
1362    fn new(spec: &LayerSpec, topo: &Topology, core_order: &Vec<Vec<usize>>) -> Result<Self> {
1363        let name = &spec.name;
1364        let kind = spec.kind.clone();
1365        let mut allowed_cpus = Cpumask::new();
1366        match &kind {
1367            LayerKind::Confined {
1368                cpus_range,
1369                cpus_range_frac,
1370                common: LayerCommon { nodes, llcs, .. },
1371                ..
1372            } => {
1373                let cpus_range =
1374                    resolve_cpus_pct_range(cpus_range, cpus_range_frac, topo.all_cpus.len())?;
1375                if cpus_range.0 > cpus_range.1 || cpus_range.1 == 0 {
1376                    bail!("invalid cpus_range {:?}", cpus_range);
1377                }
1378                if nodes.is_empty() && llcs.is_empty() {
1379                    allowed_cpus.set_all();
1380                } else {
1381                    // build up the cpus bitset
1382                    for (node_id, node) in &topo.nodes {
1383                        // first do the matching for nodes
1384                        if nodes.contains(node_id) {
1385                            for &id in node.all_cpus.keys() {
1386                                allowed_cpus.set_cpu(id)?;
1387                            }
1388                        }
1389                        // next match on any LLCs
1390                        for (llc_id, llc) in &node.llcs {
1391                            if llcs.contains(llc_id) {
1392                                for &id in llc.all_cpus.keys() {
1393                                    allowed_cpus.set_cpu(id)?;
1394                                }
1395                            }
1396                        }
1397                    }
1398                }
1399            }
1400            LayerKind::Grouped {
1401                common: LayerCommon { nodes, llcs, .. },
1402                ..
1403            }
1404            | LayerKind::Open {
1405                common: LayerCommon { nodes, llcs, .. },
1406                ..
1407            } => {
1408                if nodes.is_empty() && llcs.is_empty() {
1409                    allowed_cpus.set_all();
1410                } else {
1411                    // build up the cpus bitset
1412                    for (node_id, node) in &topo.nodes {
1413                        // first do the matching for nodes
1414                        if nodes.contains(node_id) {
1415                            for &id in node.all_cpus.keys() {
1416                                allowed_cpus.set_cpu(id)?;
1417                            }
1418                        }
1419                        // next match on any LLCs
1420                        for (llc_id, llc) in &node.llcs {
1421                            if llcs.contains(llc_id) {
1422                                for &id in llc.all_cpus.keys() {
1423                                    allowed_cpus.set_cpu(id)?;
1424                                }
1425                            }
1426                        }
1427                    }
1428                }
1429            }
1430        }
1431
1432        // Util can be above 1.0 for grouped layers if
1433        // util_includes_open_cputime is set.
1434        if let Some(util_range) = kind.util_range() {
1435            if util_range.0 < 0.0 || util_range.1 < 0.0 || util_range.0 >= util_range.1 {
1436                bail!("invalid util_range {:?}", util_range);
1437            }
1438        }
1439
1440        let layer_growth_algo = kind.common().growth_algo.clone();
1441
1442        debug!(
1443            "layer: {} algo: {:?} core order: {:?}",
1444            name, &layer_growth_algo, core_order
1445        );
1446
1447        Ok(Self {
1448            name: name.into(),
1449            kind,
1450            growth_algo: layer_growth_algo,
1451            core_order: core_order.clone(),
1452
1453            assigned_llcs: vec![vec![]; topo.nodes.len()],
1454
1455            nr_cpus: 0,
1456            nr_llc_cpus: vec![0; topo.all_llcs.len()],
1457            nr_node_cpus: vec![0; topo.nodes.len()],
1458            cpus: Cpumask::new(),
1459            allowed_cpus,
1460
1461            nr_pinned_cpus: vec![0; topo.nodes.len()],
1462        })
1463    }
1464}
1465#[derive(Debug, Clone)]
1466struct NodeInfo {
1467    node_mask: nix::sched::CpuSet,
1468    _node_id: usize,
1469}
1470
1471#[derive(Debug)]
1472struct GpuTaskAffinitizer {
1473    // This struct tracks information necessary to numa affinitize
1474    // gpu tasks periodically when needed.
1475    gpu_devs_to_node_info: HashMap<u32, NodeInfo>,
1476    gpu_pids_to_devs: HashMap<Pid, u32>,
1477    last_process_time: Option<Instant>,
1478    sys: System,
1479    pid_map: HashMap<Pid, Vec<Pid>>,
1480    poll_interval: Duration,
1481    enable: bool,
1482    tasks_affinitized: u64,
1483    last_task_affinitization_ms: u64,
1484}
1485
1486impl GpuTaskAffinitizer {
1487    pub fn new(poll_interval: u64, enable: bool) -> GpuTaskAffinitizer {
1488        GpuTaskAffinitizer {
1489            gpu_devs_to_node_info: HashMap::new(),
1490            gpu_pids_to_devs: HashMap::new(),
1491            last_process_time: None,
1492            sys: System::default(),
1493            pid_map: HashMap::new(),
1494            poll_interval: Duration::from_secs(poll_interval),
1495            enable,
1496            tasks_affinitized: 0,
1497            last_task_affinitization_ms: 0,
1498        }
1499    }
1500
1501    fn find_one_cpu(&self, affinity: Vec<u64>) -> Result<u32> {
1502        for (chunk, &mask) in affinity.iter().enumerate() {
1503            let mut inner_offset: u64 = 1;
1504            for _ in 0..64 {
1505                if (mask & inner_offset) != 0 {
1506                    return Ok((64 * chunk + u64::trailing_zeros(inner_offset) as usize) as u32);
1507                }
1508                inner_offset <<= 1;
1509            }
1510        }
1511        anyhow::bail!("unable to get CPU from NVML bitmask");
1512    }
1513
1514    fn node_to_cpuset(&self, node: &scx_utils::Node) -> Result<CpuSet> {
1515        let mut cpuset = CpuSet::new();
1516        for cpu_id in node.all_cpus.keys() {
1517            cpuset.set(*cpu_id)?;
1518        }
1519        Ok(cpuset)
1520    }
1521
1522    fn init_dev_node_map(&mut self, topo: Arc<Topology>) -> Result<()> {
1523        let nvml = nvml()?;
1524        let device_count = nvml.device_count()?;
1525
1526        for idx in 0..device_count {
1527            let dev = nvml.device_by_index(idx)?;
1528            // For machines w/ up to 1024 CPUs.
1529            let cpu = dev.cpu_affinity(16)?;
1530            let ideal_cpu = self.find_one_cpu(cpu)?;
1531            if let Some(cpu) = topo.all_cpus.get(&(ideal_cpu as usize)) {
1532                self.gpu_devs_to_node_info.insert(
1533                    idx,
1534                    NodeInfo {
1535                        node_mask: self.node_to_cpuset(
1536                            topo.nodes.get(&cpu.node_id).expect("topo missing node"),
1537                        )?,
1538                        _node_id: cpu.node_id,
1539                    },
1540                );
1541            }
1542        }
1543        Ok(())
1544    }
1545
1546    fn update_gpu_pids(&mut self) -> Result<()> {
1547        let nvml = nvml()?;
1548        for i in 0..nvml.device_count()? {
1549            let device = nvml.device_by_index(i)?;
1550            for proc in device
1551                .running_compute_processes()?
1552                .into_iter()
1553                .chain(device.running_graphics_processes()?.into_iter())
1554            {
1555                self.gpu_pids_to_devs.insert(Pid::from_u32(proc.pid), i);
1556            }
1557        }
1558        Ok(())
1559    }
1560
1561    fn update_process_info(&mut self) -> Result<()> {
1562        self.sys.refresh_processes_specifics(
1563            ProcessesToUpdate::All,
1564            true,
1565            ProcessRefreshKind::nothing(),
1566        );
1567        self.pid_map.clear();
1568        for (pid, proc_) in self.sys.processes() {
1569            if let Some(ppid) = proc_.parent() {
1570                self.pid_map.entry(ppid).or_default().push(*pid);
1571            }
1572        }
1573        Ok(())
1574    }
1575
1576    fn get_child_pids_and_tids(&self, root_pid: Pid) -> HashSet<Pid> {
1577        let mut work = VecDeque::from([root_pid]);
1578        let mut pids_and_tids: HashSet<Pid> = HashSet::new();
1579
1580        while let Some(pid) = work.pop_front() {
1581            if pids_and_tids.insert(pid) {
1582                if let Some(kids) = self.pid_map.get(&pid) {
1583                    work.extend(kids);
1584                }
1585                if let Some(proc_) = self.sys.process(pid) {
1586                    if let Some(tasks) = proc_.tasks() {
1587                        pids_and_tids.extend(tasks.iter().copied());
1588                    }
1589                }
1590            }
1591        }
1592        pids_and_tids
1593    }
1594
1595    fn affinitize_gpu_pids(&mut self) -> Result<()> {
1596        if !self.enable {
1597            return Ok(());
1598        }
1599        for (pid, dev) in &self.gpu_pids_to_devs {
1600            let node_info = self
1601                .gpu_devs_to_node_info
1602                .get(dev)
1603                .expect("Unable to get gpu pid node mask");
1604            for child in self.get_child_pids_and_tids(*pid) {
1605                match nix::sched::sched_setaffinity(
1606                    nix::unistd::Pid::from_raw(child.as_u32() as i32),
1607                    &node_info.node_mask,
1608                ) {
1609                    Ok(_) => {
1610                        // Increment the global counter for successful affinitization
1611                        self.tasks_affinitized += 1;
1612                    }
1613                    Err(_) => {
1614                        debug!(
1615                            "Error affinitizing gpu pid {} to node {:#?}",
1616                            child.as_u32(),
1617                            node_info
1618                        );
1619                    }
1620                };
1621            }
1622        }
1623        Ok(())
1624    }
1625
1626    pub fn maybe_affinitize(&mut self) {
1627        if !self.enable {
1628            return;
1629        }
1630        let now = Instant::now();
1631
1632        if let Some(last_process_time) = self.last_process_time {
1633            if (now - last_process_time) < self.poll_interval {
1634                return;
1635            }
1636        }
1637
1638        match self.update_gpu_pids() {
1639            Ok(_) => {}
1640            Err(e) => {
1641                error!("Error updating GPU PIDs: {}", e);
1642            }
1643        };
1644        match self.update_process_info() {
1645            Ok(_) => {}
1646            Err(e) => {
1647                error!("Error updating process info to affinitize GPU PIDs: {}", e);
1648            }
1649        };
1650        match self.affinitize_gpu_pids() {
1651            Ok(_) => {}
1652            Err(e) => {
1653                error!("Error updating GPU PIDs: {}", e);
1654            }
1655        };
1656        self.last_process_time = Some(now);
1657        self.last_task_affinitization_ms = (Instant::now() - now).as_millis() as u64;
1658    }
1659
1660    pub fn init(&mut self, topo: Arc<Topology>) {
1661        if !self.enable || self.last_process_time.is_some() {
1662            return;
1663        }
1664
1665        match self.init_dev_node_map(topo) {
1666            Ok(_) => {}
1667            Err(e) => {
1668                error!("Error initializing gpu node dev map: {}", e);
1669            }
1670        };
1671        self.sys = System::new_all();
1672    }
1673}
1674
1675struct Scheduler<'a> {
1676    skel: BpfSkel<'a>,
1677    struct_ops: Option<libbpf_rs::Link>,
1678    layer_specs: Vec<LayerSpec>,
1679
1680    sched_intv: Duration,
1681    layer_refresh_intv: Duration,
1682
1683    cpu_pool: CpuPool,
1684    layers: Vec<Layer>,
1685    idle_qos_enabled: bool,
1686
1687    proc_reader: fb_procfs::ProcReader,
1688    sched_stats: Stats,
1689
1690    cgroup_regexes: Option<HashMap<u32, Regex>>,
1691
1692    nr_layer_cpus_ranges: Vec<(usize, usize)>,
1693    xnuma_mig_src: Vec<Vec<bool>>,
1694    growth_denied: Vec<Vec<bool>>,
1695    processing_dur: Duration,
1696
1697    topo: Arc<Topology>,
1698    netdevs: BTreeMap<String, NetDev>,
1699    stats_server: StatsServer<StatsReq, StatsRes>,
1700    gpu_task_handler: GpuTaskAffinitizer,
1701}
1702
1703const DUTY_CYCLE_SCALE: f64 = (1u64 << 20) as f64;
1704const XNUMA_RATE_DAMPEN: f64 = 0.5;
1705
1706/// Result of xnuma water-fill computation for a single layer.
1707struct XnumaRates {
1708    /// rates[src][dst]: migration rate in duty-cycle-scaled units.
1709    rates: Vec<Vec<u64>>,
1710}
1711
1712/// Determine per-node migration source state with two-threshold hysteresis.
1713///
1714/// Each (layer, node) independently decides if it's a migration source.
1715/// Open (is_mig_src=true) requires all three:
1716///   1. load/alloc > threshold.1 (significant load)
1717///   2. surplus/alloc > delta.1 (significant imbalance)
1718///   3. growth_denied (allocation can't solve it)
1719///
1720/// Close (is_mig_src=false) when any one:
1721///   1. load/alloc < threshold.0 (load dropped)
1722///   2. surplus/alloc < delta.0 (imbalance resolved)
1723///   3. !growth_denied (growth succeeded)
1724fn xnuma_check_active(
1725    duty_sums: &[f64],
1726    allocs: &[usize],
1727    threshold: (f64, f64),
1728    threshold_delta: (f64, f64),
1729    growth_denied: &[bool],
1730    currently_active: &[bool],
1731) -> Vec<bool> {
1732    let nr_nodes = duty_sums.len();
1733    let total_duty: f64 = duty_sums.iter().sum();
1734    let total_alloc: f64 = allocs.iter().map(|&a| a as f64).sum();
1735    let eq_ratio = if total_alloc > 0.0 {
1736        total_duty / total_alloc
1737    } else {
1738        0.0
1739    };
1740
1741    let (thresh_lo, thresh_hi) = threshold;
1742    let (delta_lo, delta_hi) = threshold_delta;
1743
1744    let mut result = vec![false; nr_nodes];
1745    for nid in 0..nr_nodes {
1746        let alloc = allocs[nid] as f64;
1747        if alloc <= 0.0 {
1748            if duty_sums[nid] > 0.0 && growth_denied[nid] {
1749                result[nid] = true;
1750            }
1751            continue;
1752        }
1753
1754        let load_ratio = duty_sums[nid] / alloc;
1755        let surplus = duty_sums[nid] - eq_ratio * alloc;
1756        let surplus_ratio = surplus / alloc;
1757
1758        let should_activate =
1759            load_ratio > thresh_hi && surplus_ratio > delta_hi && growth_denied[nid];
1760        let should_deactivate =
1761            load_ratio < thresh_lo || surplus_ratio < delta_lo || !growth_denied[nid];
1762
1763        if should_activate {
1764            result[nid] = true;
1765        } else if should_deactivate {
1766            result[nid] = false;
1767        } else {
1768            result[nid] = currently_active[nid];
1769        }
1770    }
1771    result
1772}
1773
1774/// Compute water-fill migration rates for a single layer.
1775///
1776/// Finds the equalization ratio (water line) across all nodes, then
1777/// computes per-(src, dst) migration rates proportional to each source's
1778/// surplus and each destination's share of total deficit.
1779fn xnuma_compute_rates(duty_sums: &[f64], allocs: &[usize]) -> XnumaRates {
1780    let nr_nodes = duty_sums.len();
1781    let total_duty: f64 = duty_sums.iter().sum();
1782    let total_alloc: f64 = allocs.iter().map(|&a| a as f64).sum();
1783
1784    if total_alloc <= 0.0 {
1785        return XnumaRates {
1786            rates: vec![vec![0u64; nr_nodes]; nr_nodes],
1787        };
1788    }
1789
1790    let eq_ratio = total_duty / total_alloc;
1791
1792    let mut surpluses = vec![0.0f64; nr_nodes];
1793    let mut deficits = vec![0.0f64; nr_nodes];
1794    for nid in 0..nr_nodes {
1795        let expected = eq_ratio * allocs[nid] as f64;
1796        let delta = duty_sums[nid] - expected;
1797        if delta > 0.0 {
1798            surpluses[nid] = delta;
1799        } else {
1800            deficits[nid] = -delta;
1801        }
1802    }
1803
1804    let total_deficit: f64 = deficits.iter().sum();
1805
1806    let mut rates = vec![vec![0u64; nr_nodes]; nr_nodes];
1807    for src in 0..nr_nodes {
1808        for dst in 0..nr_nodes {
1809            if src == dst || total_deficit <= 0.0 || surpluses[src] <= 0.0 {
1810                continue;
1811            }
1812            // Dampen: transfer half the surplus per cycle so convergence
1813            // is gradual rather than a single-step overcorrection.
1814            let migration = surpluses[src] * deficits[dst] / total_deficit * XNUMA_RATE_DAMPEN;
1815            rates[src][dst] = (migration * DUTY_CYCLE_SCALE) as u64;
1816        }
1817    }
1818
1819    XnumaRates { rates }
1820}
1821
1822impl<'a> Scheduler<'a> {
1823    fn init_layers(
1824        skel: &mut OpenBpfSkel,
1825        specs: &[LayerSpec],
1826        topo: &Topology,
1827    ) -> Result<HashMap<u32, Regex>> {
1828        skel.maps.rodata_data.as_mut().unwrap().nr_layers = specs.len() as u32;
1829        let mut perf_set = false;
1830
1831        let mut layer_iteration_order = (0..specs.len()).collect::<Vec<_>>();
1832        let mut layer_weights: Vec<usize> = vec![];
1833        let mut cgroup_regex_id = 0;
1834        let mut cgroup_regexes = HashMap::new();
1835
1836        for (spec_i, spec) in specs.iter().enumerate() {
1837            let layer = &mut skel.maps.bss_data.as_mut().unwrap().layers[spec_i];
1838
1839            for (or_i, or) in spec.matches.iter().enumerate() {
1840                for (and_i, and) in or.iter().enumerate() {
1841                    let mt = &mut layer.matches[or_i].matches[and_i];
1842
1843                    // Rules are allowlist-based by default
1844                    mt.exclude.write(false);
1845
1846                    match and {
1847                        LayerMatch::CgroupPrefix(prefix) => {
1848                            mt.kind = bpf_intf::layer_match_kind_MATCH_CGROUP_PREFIX as i32;
1849                            copy_into_cstr(&mut mt.cgroup_prefix, prefix.as_str());
1850                        }
1851                        LayerMatch::CgroupSuffix(suffix) => {
1852                            mt.kind = bpf_intf::layer_match_kind_MATCH_CGROUP_SUFFIX as i32;
1853                            copy_into_cstr(&mut mt.cgroup_suffix, suffix.as_str());
1854                        }
1855                        LayerMatch::CgroupRegex(regex_str) => {
1856                            if cgroup_regex_id >= bpf_intf::consts_MAX_CGROUP_REGEXES {
1857                                bail!(
1858                                    "Too many cgroup regex rules. Maximum allowed: {}",
1859                                    bpf_intf::consts_MAX_CGROUP_REGEXES
1860                                );
1861                            }
1862
1863                            // CgroupRegex matching handled in userspace via cgroup watcher
1864                            mt.kind = bpf_intf::layer_match_kind_MATCH_CGROUP_REGEX as i32;
1865                            mt.cgroup_regex_id = cgroup_regex_id;
1866
1867                            let regex = Regex::new(regex_str).with_context(|| {
1868                                format!("Invalid regex '{}' in layer '{}'", regex_str, spec.name)
1869                            })?;
1870                            cgroup_regexes.insert(cgroup_regex_id, regex);
1871                            cgroup_regex_id += 1;
1872                        }
1873                        LayerMatch::CgroupContains(substr) => {
1874                            mt.kind = bpf_intf::layer_match_kind_MATCH_CGROUP_CONTAINS as i32;
1875                            copy_into_cstr(&mut mt.cgroup_substr, substr.as_str());
1876                        }
1877                        LayerMatch::CommPrefix(prefix) => {
1878                            mt.kind = bpf_intf::layer_match_kind_MATCH_COMM_PREFIX as i32;
1879                            copy_into_cstr(&mut mt.comm_prefix, prefix.as_str());
1880                        }
1881                        LayerMatch::CommPrefixExclude(prefix) => {
1882                            mt.kind = bpf_intf::layer_match_kind_MATCH_COMM_PREFIX as i32;
1883                            mt.exclude.write(true);
1884                            copy_into_cstr(&mut mt.comm_prefix, prefix.as_str());
1885                        }
1886                        LayerMatch::PcommPrefix(prefix) => {
1887                            mt.kind = bpf_intf::layer_match_kind_MATCH_PCOMM_PREFIX as i32;
1888                            copy_into_cstr(&mut mt.pcomm_prefix, prefix.as_str());
1889                        }
1890                        LayerMatch::PcommPrefixExclude(prefix) => {
1891                            mt.kind = bpf_intf::layer_match_kind_MATCH_PCOMM_PREFIX as i32;
1892                            mt.exclude.write(true);
1893                            copy_into_cstr(&mut mt.pcomm_prefix, prefix.as_str());
1894                        }
1895                        LayerMatch::NiceAbove(nice) => {
1896                            mt.kind = bpf_intf::layer_match_kind_MATCH_NICE_ABOVE as i32;
1897                            mt.nice = *nice;
1898                        }
1899                        LayerMatch::NiceBelow(nice) => {
1900                            mt.kind = bpf_intf::layer_match_kind_MATCH_NICE_BELOW as i32;
1901                            mt.nice = *nice;
1902                        }
1903                        LayerMatch::NiceEquals(nice) => {
1904                            mt.kind = bpf_intf::layer_match_kind_MATCH_NICE_EQUALS as i32;
1905                            mt.nice = *nice;
1906                        }
1907                        LayerMatch::UIDEquals(user_id) => {
1908                            mt.kind = bpf_intf::layer_match_kind_MATCH_USER_ID_EQUALS as i32;
1909                            mt.user_id = *user_id;
1910                        }
1911                        LayerMatch::GIDEquals(group_id) => {
1912                            mt.kind = bpf_intf::layer_match_kind_MATCH_GROUP_ID_EQUALS as i32;
1913                            mt.group_id = *group_id;
1914                        }
1915                        LayerMatch::PIDEquals(pid) => {
1916                            mt.kind = bpf_intf::layer_match_kind_MATCH_PID_EQUALS as i32;
1917                            mt.pid = *pid;
1918                        }
1919                        LayerMatch::PPIDEquals(ppid) => {
1920                            mt.kind = bpf_intf::layer_match_kind_MATCH_PPID_EQUALS as i32;
1921                            mt.ppid = *ppid;
1922                        }
1923                        LayerMatch::TGIDEquals(tgid) => {
1924                            mt.kind = bpf_intf::layer_match_kind_MATCH_TGID_EQUALS as i32;
1925                            mt.tgid = *tgid;
1926                        }
1927                        LayerMatch::NSPIDEquals(nsid, pid) => {
1928                            mt.kind = bpf_intf::layer_match_kind_MATCH_NSPID_EQUALS as i32;
1929                            mt.nsid = *nsid;
1930                            mt.pid = *pid;
1931                        }
1932                        LayerMatch::NSEquals(nsid) => {
1933                            mt.kind = bpf_intf::layer_match_kind_MATCH_NS_EQUALS as i32;
1934                            mt.nsid = *nsid as u64;
1935                        }
1936                        LayerMatch::CmdJoin(joincmd) => {
1937                            mt.kind = bpf_intf::layer_match_kind_MATCH_SCXCMD_JOIN as i32;
1938                            copy_into_cstr(&mut mt.comm_prefix, joincmd);
1939                        }
1940                        LayerMatch::IsGroupLeader(polarity) => {
1941                            mt.kind = bpf_intf::layer_match_kind_MATCH_IS_GROUP_LEADER as i32;
1942                            mt.is_group_leader.write(*polarity);
1943                        }
1944                        LayerMatch::IsKthread(polarity) => {
1945                            mt.kind = bpf_intf::layer_match_kind_MATCH_IS_KTHREAD as i32;
1946                            mt.is_kthread.write(*polarity);
1947                        }
1948                        LayerMatch::UsedGpuTid(polarity) => {
1949                            mt.kind = bpf_intf::layer_match_kind_MATCH_USED_GPU_TID as i32;
1950                            mt.used_gpu_tid.write(*polarity);
1951                        }
1952                        LayerMatch::UsedGpuPid(polarity) => {
1953                            mt.kind = bpf_intf::layer_match_kind_MATCH_USED_GPU_PID as i32;
1954                            mt.used_gpu_pid.write(*polarity);
1955                        }
1956                        LayerMatch::AvgRuntime(min, max) => {
1957                            mt.kind = bpf_intf::layer_match_kind_MATCH_AVG_RUNTIME as i32;
1958                            mt.min_avg_runtime_us = *min;
1959                            mt.max_avg_runtime_us = *max;
1960                        }
1961                        LayerMatch::HintEquals(hint) => {
1962                            mt.kind = bpf_intf::layer_match_kind_MATCH_HINT_EQUALS as i32;
1963                            mt.hint = *hint;
1964                        }
1965                        LayerMatch::SystemCpuUtilBelow(threshold) => {
1966                            mt.kind = bpf_intf::layer_match_kind_MATCH_SYSTEM_CPU_UTIL_BELOW as i32;
1967                            mt.system_cpu_util_below = (*threshold * 10000.0) as u64;
1968                        }
1969                        LayerMatch::DsqInsertBelow(threshold) => {
1970                            mt.kind = bpf_intf::layer_match_kind_MATCH_DSQ_INSERT_BELOW as i32;
1971                            mt.dsq_insert_below = (*threshold * 10000.0) as u64;
1972                        }
1973                        LayerMatch::NumaNode(node_id) => {
1974                            if *node_id as usize >= topo.nodes.len() {
1975                                bail!(
1976                                    "Spec {:?} has invalid NUMA node ID {} (available nodes: 0-{})",
1977                                    spec.name,
1978                                    node_id,
1979                                    topo.nodes.len() - 1
1980                                );
1981                            }
1982                            mt.kind = bpf_intf::layer_match_kind_MATCH_NUMA_NODE as i32;
1983                            mt.numa_node_id = *node_id;
1984                        }
1985                    }
1986                }
1987                layer.matches[or_i].nr_match_ands = or.len() as i32;
1988            }
1989
1990            layer.nr_match_ors = spec.matches.len() as u32;
1991            layer.kind = spec.kind.as_bpf_enum();
1992
1993            {
1994                let LayerCommon {
1995                    min_exec_us,
1996                    yield_ignore,
1997                    perf,
1998                    preempt,
1999                    preempt_first,
2000                    exclusive,
2001                    skip_remote_node,
2002                    prev_over_idle_core,
2003                    growth_algo,
2004                    slice_us,
2005                    fifo,
2006                    weight,
2007                    disallow_open_after_us,
2008                    disallow_preempt_after_us,
2009                    xllc_mig_min_us,
2010                    placement,
2011                    member_expire_ms,
2012                    ..
2013                } = spec.kind.common();
2014
2015                layer.slice_ns = *slice_us * 1000;
2016                layer.fifo.write(*fifo);
2017                layer.min_exec_ns = min_exec_us * 1000;
2018                layer.yield_step_ns = if *yield_ignore > 0.999 {
2019                    0
2020                } else if *yield_ignore < 0.001 {
2021                    layer.slice_ns
2022                } else {
2023                    (layer.slice_ns as f64 * (1.0 - *yield_ignore)) as u64
2024                };
2025                let mut layer_name: String = spec.name.clone();
2026                layer_name.truncate(MAX_LAYER_NAME);
2027                copy_into_cstr(&mut layer.name, layer_name.as_str());
2028                layer.preempt.write(*preempt);
2029                layer.preempt_first.write(*preempt_first);
2030                layer.excl.write(*exclusive);
2031                layer.skip_remote_node.write(*skip_remote_node);
2032                layer.prev_over_idle_core.write(*prev_over_idle_core);
2033                layer.growth_algo = growth_algo.as_bpf_enum();
2034                layer.weight = *weight;
2035                layer.member_expire_ms = *member_expire_ms;
2036                layer.disallow_open_after_ns = match disallow_open_after_us.unwrap() {
2037                    v if v == u64::MAX => v,
2038                    v => v * 1000,
2039                };
2040                layer.disallow_preempt_after_ns = match disallow_preempt_after_us.unwrap() {
2041                    v if v == u64::MAX => v,
2042                    v => v * 1000,
2043                };
2044                layer.xllc_mig_min_ns = (xllc_mig_min_us * 1000.0) as u64;
2045                layer_weights.push(layer.weight.try_into().unwrap());
2046                layer.perf = u32::try_from(*perf)?;
2047
2048                let task_place = |place: u32| crate::types::layer_task_place(place);
2049                layer.task_place = match placement {
2050                    LayerPlacement::Standard => {
2051                        task_place(bpf_intf::layer_task_place_PLACEMENT_STD)
2052                    }
2053                    LayerPlacement::Sticky => {
2054                        task_place(bpf_intf::layer_task_place_PLACEMENT_STICK)
2055                    }
2056                    LayerPlacement::Floating => {
2057                        task_place(bpf_intf::layer_task_place_PLACEMENT_FLOAT)
2058                    }
2059                };
2060            }
2061
2062            layer.is_protected.write(match spec.kind {
2063                LayerKind::Open { .. } => false,
2064                LayerKind::Confined { protected, .. } | LayerKind::Grouped { protected, .. } => {
2065                    protected
2066                }
2067            });
2068
2069            match &spec.cpuset {
2070                Some(mask) => {
2071                    Self::update_cpumask(mask, &mut layer.cpuset);
2072                    layer.has_cpuset.write(true);
2073                }
2074                None => {
2075                    for i in 0..layer.cpuset.len() {
2076                        layer.cpuset[i] = u8::MAX;
2077                    }
2078                    layer.has_cpuset.write(false);
2079                }
2080            };
2081
2082            perf_set |= layer.perf > 0;
2083        }
2084
2085        layer_iteration_order.sort_by(|i, j| layer_weights[*i].cmp(&layer_weights[*j]));
2086        for (idx, layer_idx) in layer_iteration_order.iter().enumerate() {
2087            skel.maps
2088                .rodata_data
2089                .as_mut()
2090                .unwrap()
2091                .layer_iteration_order[idx] = *layer_idx as u32;
2092        }
2093
2094        if perf_set && !compat::ksym_exists("scx_bpf_cpuperf_set")? {
2095            warn!("cpufreq support not available, ignoring perf configurations");
2096        }
2097
2098        Ok(cgroup_regexes)
2099    }
2100
2101    fn init_nodes(skel: &mut OpenBpfSkel, _opts: &Opts, topo: &Topology) {
2102        skel.maps.rodata_data.as_mut().unwrap().nr_nodes = topo.nodes.len() as u32;
2103        skel.maps.rodata_data.as_mut().unwrap().nr_llcs = 0;
2104
2105        for (&node_id, node) in &topo.nodes {
2106            debug!("configuring node {}, LLCs {:?}", node_id, node.llcs.len());
2107            skel.maps.rodata_data.as_mut().unwrap().nr_llcs += node.llcs.len() as u32;
2108            let raw_numa_slice = node.span.as_raw_slice();
2109            let node_cpumask_slice =
2110                &mut skel.maps.rodata_data.as_mut().unwrap().numa_cpumasks[node_id];
2111            let (left, _) = node_cpumask_slice.split_at_mut(raw_numa_slice.len());
2112            left.clone_from_slice(raw_numa_slice);
2113            debug!(
2114                "node {} mask: {:?}",
2115                node_id,
2116                skel.maps.rodata_data.as_ref().unwrap().numa_cpumasks[node_id]
2117            );
2118
2119            for llc in node.llcs.values() {
2120                debug!("configuring llc {:?} for node {:?}", llc.id, node_id);
2121                skel.maps.rodata_data.as_mut().unwrap().llc_numa_id_map[llc.id] = node_id as u32;
2122            }
2123        }
2124
2125        for cpu in topo.all_cpus.values() {
2126            skel.maps.rodata_data.as_mut().unwrap().cpu_llc_id_map[cpu.id] = cpu.llc_id as u32;
2127        }
2128    }
2129
2130    fn init_cpu_prox_map(topo: &Topology, cpu_ctxs: &mut [bpf_intf::cpu_ctx]) {
2131        let radiate = |mut vec: Vec<usize>, center_id: usize| -> Vec<usize> {
2132            vec.sort_by_key(|&id| (center_id as i32 - id as i32).abs());
2133            vec
2134        };
2135        let radiate_cpu =
2136            |mut vec: Vec<usize>, center_cpu: usize, center_core: usize| -> Vec<usize> {
2137                vec.sort_by_key(|&id| {
2138                    (
2139                        (center_core as i32 - topo.all_cpus.get(&id).unwrap().core_id as i32).abs(),
2140                        (center_cpu as i32 - id as i32).abs(),
2141                    )
2142                });
2143                vec
2144            };
2145
2146        for (&cpu_id, cpu) in &topo.all_cpus {
2147            // Collect the spans.
2148            let mut core_span = topo.all_cores[&cpu.core_id].span.clone();
2149            let llc_span = &topo.all_llcs[&cpu.llc_id].span;
2150            let node_span = &topo.nodes[&cpu.node_id].span;
2151            let sys_span = &topo.span;
2152
2153            // Make the spans exclusive.
2154            let sys_span = sys_span.and(&node_span.not());
2155            let node_span = node_span.and(&llc_span.not());
2156            let llc_span = llc_span.and(&core_span.not());
2157            core_span.clear_cpu(cpu_id).unwrap();
2158
2159            // Convert them into arrays.
2160            let mut sys_order: Vec<usize> = sys_span.iter().collect();
2161            let mut node_order: Vec<usize> = node_span.iter().collect();
2162            let mut llc_order: Vec<usize> = llc_span.iter().collect();
2163            let mut core_order: Vec<usize> = core_span.iter().collect();
2164
2165            // Shuffle them so that different CPUs follow different orders.
2166            // Each CPU radiates in both directions based on the cpu id and
2167            // radiates out to the closest cores based on core ids.
2168
2169            sys_order = radiate_cpu(sys_order, cpu_id, cpu.core_id);
2170            node_order = radiate(node_order, cpu.node_id);
2171            llc_order = radiate_cpu(llc_order, cpu_id, cpu.core_id);
2172            core_order = radiate_cpu(core_order, cpu_id, cpu.core_id);
2173
2174            // Concatenate and record the topology boundaries.
2175            let mut order: Vec<usize> = vec![];
2176            let mut idx: usize = 0;
2177
2178            idx += 1;
2179            order.push(cpu_id);
2180
2181            idx += core_order.len();
2182            order.append(&mut core_order);
2183            let core_end = idx;
2184
2185            idx += llc_order.len();
2186            order.append(&mut llc_order);
2187            let llc_end = idx;
2188
2189            idx += node_order.len();
2190            order.append(&mut node_order);
2191            let node_end = idx;
2192
2193            idx += sys_order.len();
2194            order.append(&mut sys_order);
2195            let sys_end = idx;
2196
2197            debug!(
2198                "CPU[{}] proximity map[{}/{}/{}/{}]: {:?}",
2199                cpu_id, core_end, llc_end, node_end, sys_end, &order
2200            );
2201
2202            // Record in cpu_ctx.
2203            let pmap = &mut cpu_ctxs[cpu_id].prox_map;
2204            for (i, &cpu) in order.iter().enumerate() {
2205                pmap.cpus[i] = cpu as u16;
2206            }
2207            pmap.core_end = core_end as u32;
2208            pmap.llc_end = llc_end as u32;
2209            pmap.node_end = node_end as u32;
2210            pmap.sys_end = sys_end as u32;
2211        }
2212    }
2213
2214    fn convert_cpu_ctxs(cpu_ctxs: Vec<bpf_intf::cpu_ctx>) -> Vec<Vec<u8>> {
2215        cpu_ctxs
2216            .iter()
2217            .map(|cpu_ctx| unsafe { plain::as_bytes(cpu_ctx) }.to_vec())
2218            .collect()
2219    }
2220
2221    fn init_cpus(skel: &BpfSkel, layer_specs: &[LayerSpec], topo: &Topology) -> Result<()> {
2222        let key = (0_u32).to_ne_bytes();
2223        let mut cpu_ctxs: Vec<bpf_intf::cpu_ctx> = vec![];
2224        let cpu_ctxs_vec = skel
2225            .maps
2226            .cpu_ctxs
2227            .lookup_percpu(&key, libbpf_rs::MapFlags::ANY)
2228            .context("Failed to lookup cpu_ctx")?
2229            .unwrap();
2230
2231        let op_layers: Vec<u32> = layer_specs
2232            .iter()
2233            .enumerate()
2234            .filter(|(_idx, spec)| match &spec.kind {
2235                LayerKind::Open { .. } => spec.kind.common().preempt,
2236                _ => false,
2237            })
2238            .map(|(idx, _)| idx as u32)
2239            .collect();
2240        let on_layers: Vec<u32> = layer_specs
2241            .iter()
2242            .enumerate()
2243            .filter(|(_idx, spec)| match &spec.kind {
2244                LayerKind::Open { .. } => !spec.kind.common().preempt,
2245                _ => false,
2246            })
2247            .map(|(idx, _)| idx as u32)
2248            .collect();
2249        let gp_layers: Vec<u32> = layer_specs
2250            .iter()
2251            .enumerate()
2252            .filter(|(_idx, spec)| match &spec.kind {
2253                LayerKind::Grouped { .. } => spec.kind.common().preempt,
2254                _ => false,
2255            })
2256            .map(|(idx, _)| idx as u32)
2257            .collect();
2258        let gn_layers: Vec<u32> = layer_specs
2259            .iter()
2260            .enumerate()
2261            .filter(|(_idx, spec)| match &spec.kind {
2262                LayerKind::Grouped { .. } => !spec.kind.common().preempt,
2263                _ => false,
2264            })
2265            .map(|(idx, _)| idx as u32)
2266            .collect();
2267
2268        // FIXME - this incorrectly assumes all possible CPUs are consecutive.
2269        for cpu in 0..*NR_CPUS_POSSIBLE {
2270            cpu_ctxs.push(
2271                *plain::from_bytes(cpu_ctxs_vec[cpu].as_slice())
2272                    .expect("cpu_ctx: short or misaligned buffer"),
2273            );
2274
2275            let topo_cpu = topo.all_cpus.get(&cpu).unwrap();
2276            let is_big = topo_cpu.core_type == CoreType::Big { turbo: true };
2277            cpu_ctxs[cpu].cpu = cpu as i32;
2278            cpu_ctxs[cpu].layer_id = MAX_LAYERS as u32;
2279            cpu_ctxs[cpu].is_big = is_big;
2280
2281            fastrand::seed(cpu as u64);
2282
2283            let mut ogp_order = op_layers.clone();
2284            ogp_order.append(&mut gp_layers.clone());
2285            fastrand::shuffle(&mut ogp_order);
2286
2287            let mut ogn_order = on_layers.clone();
2288            ogn_order.append(&mut gn_layers.clone());
2289            fastrand::shuffle(&mut ogn_order);
2290
2291            let mut op_order = op_layers.clone();
2292            fastrand::shuffle(&mut op_order);
2293
2294            let mut on_order = on_layers.clone();
2295            fastrand::shuffle(&mut on_order);
2296
2297            let mut gp_order = gp_layers.clone();
2298            fastrand::shuffle(&mut gp_order);
2299
2300            let mut gn_order = gn_layers.clone();
2301            fastrand::shuffle(&mut gn_order);
2302
2303            for i in 0..MAX_LAYERS {
2304                cpu_ctxs[cpu].ogp_layer_order[i] =
2305                    ogp_order.get(i).cloned().unwrap_or(MAX_LAYERS as u32);
2306                cpu_ctxs[cpu].ogn_layer_order[i] =
2307                    ogn_order.get(i).cloned().unwrap_or(MAX_LAYERS as u32);
2308
2309                cpu_ctxs[cpu].op_layer_order[i] =
2310                    op_order.get(i).cloned().unwrap_or(MAX_LAYERS as u32);
2311                cpu_ctxs[cpu].on_layer_order[i] =
2312                    on_order.get(i).cloned().unwrap_or(MAX_LAYERS as u32);
2313                cpu_ctxs[cpu].gp_layer_order[i] =
2314                    gp_order.get(i).cloned().unwrap_or(MAX_LAYERS as u32);
2315                cpu_ctxs[cpu].gn_layer_order[i] =
2316                    gn_order.get(i).cloned().unwrap_or(MAX_LAYERS as u32);
2317            }
2318        }
2319
2320        Self::init_cpu_prox_map(topo, &mut cpu_ctxs);
2321
2322        skel.maps
2323            .cpu_ctxs
2324            .update_percpu(
2325                &key,
2326                &Self::convert_cpu_ctxs(cpu_ctxs),
2327                libbpf_rs::MapFlags::ANY,
2328            )
2329            .context("Failed to update cpu_ctx")?;
2330
2331        Ok(())
2332    }
2333
2334    fn init_llc_prox_map(skel: &mut BpfSkel, topo: &Topology) -> Result<()> {
2335        for (&llc_id, llc) in &topo.all_llcs {
2336            // Collect the orders.
2337            let mut node_order: Vec<usize> =
2338                topo.nodes[&llc.node_id].llcs.keys().cloned().collect();
2339            let mut sys_order: Vec<usize> = topo.all_llcs.keys().cloned().collect();
2340
2341            // Make the orders exclusive.
2342            sys_order.retain(|id| !node_order.contains(id));
2343            node_order.retain(|&id| id != llc_id);
2344
2345            // Shuffle so that different LLCs follow different orders. See
2346            // init_cpu_prox_map().
2347            fastrand::seed(llc_id as u64);
2348            fastrand::shuffle(&mut sys_order);
2349            fastrand::shuffle(&mut node_order);
2350
2351            // Concatenate and record the node boundary.
2352            let mut order: Vec<usize> = vec![];
2353            let mut idx: usize = 0;
2354
2355            idx += 1;
2356            order.push(llc_id);
2357
2358            idx += node_order.len();
2359            order.append(&mut node_order);
2360            let node_end = idx;
2361
2362            idx += sys_order.len();
2363            order.append(&mut sys_order);
2364            let sys_end = idx;
2365
2366            debug!(
2367                "LLC[{}] proximity map[{}/{}]: {:?}",
2368                llc_id, node_end, sys_end, &order
2369            );
2370
2371            // Record in llc_ctx.
2372            //
2373            // XXX - This would be a lot easier if llc_ctx were in the bss.
2374            // See BpfStats::read().
2375            let key = (llc_id as u32).to_ne_bytes();
2376            let v = skel
2377                .maps
2378                .llc_data
2379                .lookup(&key, libbpf_rs::MapFlags::ANY)
2380                .unwrap()
2381                .unwrap();
2382            let mut llcc: bpf_intf::llc_ctx =
2383                *plain::from_bytes(v.as_slice()).expect("llc_ctx: short or misaligned buffer");
2384
2385            let pmap = &mut llcc.prox_map;
2386            for (i, &llc_id) in order.iter().enumerate() {
2387                pmap.llcs[i] = llc_id as u16;
2388            }
2389            pmap.node_end = node_end as u32;
2390            pmap.sys_end = sys_end as u32;
2391
2392            skel.maps.llc_data.update(
2393                &key,
2394                unsafe { plain::as_bytes(&llcc) },
2395                libbpf_rs::MapFlags::ANY,
2396            )?
2397        }
2398
2399        Ok(())
2400    }
2401
2402    fn init_node_prox_map(skel: &mut BpfSkel, topo: &Topology) -> Result<()> {
2403        for (&node_id, node) in &topo.nodes {
2404            let mut order: Vec<(usize, usize)> = node
2405                .distance
2406                .iter()
2407                .enumerate()
2408                .filter(|&(nid, _)| nid != node_id)
2409                .map(|(nid, &dist)| (nid, dist))
2410                .collect();
2411            order.sort_by_key(|&(_, dist)| dist);
2412
2413            let key = (node_id as u32).to_ne_bytes();
2414
2415            // The map entry may not exist yet — create a zeroed one.
2416            let v = skel.maps.node_data.lookup(&key, libbpf_rs::MapFlags::ANY);
2417            let mut nodec: bpf_intf::node_ctx = match v {
2418                Ok(Some(v)) => {
2419                    *plain::from_bytes(v.as_slice()).expect("node_ctx: short or misaligned buffer")
2420                }
2421                _ => unsafe { MaybeUninit::zeroed().assume_init() },
2422            };
2423
2424            let pmap = &mut nodec.prox_map;
2425            for (i, &(nid, _)) in order.iter().enumerate() {
2426                pmap.nodes[i] = nid as u16;
2427            }
2428            pmap.sys_end = order.len() as u32;
2429
2430            debug!(
2431                "NODE[{}] prox_map[{}]: {:?}",
2432                node_id,
2433                pmap.sys_end,
2434                &order.iter().map(|(n, d)| (*n, *d)).collect::<Vec<_>>()
2435            );
2436
2437            skel.maps.node_data.update(
2438                &key,
2439                unsafe { plain::as_bytes(&nodec) },
2440                libbpf_rs::MapFlags::ANY,
2441            )?;
2442        }
2443        Ok(())
2444    }
2445
2446    fn init_node_ctx(skel: &mut BpfSkel, topo: &Topology, nr_layers: usize) -> Result<()> {
2447        let all_layers: Vec<u32> = (0..nr_layers as u32).collect();
2448        let node_empty_layers: Vec<Vec<u32>> =
2449            (0..topo.nodes.len()).map(|_| all_layers.clone()).collect();
2450        Self::refresh_node_ctx(skel, topo, &node_empty_layers, true);
2451        Ok(())
2452    }
2453
2454    fn init(
2455        opts: &'a Opts,
2456        layer_specs: &[LayerSpec],
2457        open_object: &'a mut MaybeUninit<OpenObject>,
2458        hint_to_layer_map: &HashMap<u64, HintLayerInfo>,
2459        membw_tracking: bool,
2460    ) -> Result<Self> {
2461        let nr_layers = layer_specs.len();
2462        let mut disable_topology = opts.disable_topology.unwrap_or(false);
2463
2464        let topo = Arc::new(if disable_topology {
2465            Topology::with_flattened_llc_node()?
2466        } else if opts.topology.virt_llc.is_some() {
2467            Topology::with_args(&opts.topology)?
2468        } else {
2469            Topology::new()?
2470        });
2471
2472        /*
2473         * FIXME: scx_layered incorrectly assumes that node, LLC and CPU IDs
2474         * are consecutive. Verify that they are on this system and bail if
2475         * not. It's lucky that core ID is not used anywhere as core IDs are
2476         * not consecutive on some Ryzen CPUs.
2477         */
2478        if topo.nodes.keys().enumerate().any(|(i, &k)| i != k) {
2479            bail!("Holes in node IDs detected: {:?}", topo.nodes.keys());
2480        }
2481        if topo.all_llcs.keys().enumerate().any(|(i, &k)| i != k) {
2482            bail!("Holes in LLC IDs detected: {:?}", topo.all_llcs.keys());
2483        }
2484        if topo.all_cpus.keys().enumerate().any(|(i, &k)| i != k) {
2485            bail!("Holes in CPU IDs detected: {:?}", topo.all_cpus.keys());
2486        }
2487
2488        let netdevs = if opts.netdev_irq_balance {
2489            let devs = read_netdevs()?;
2490            let total_irqs: usize = devs.values().map(|d| d.irqs.len()).sum();
2491            let breakdown = devs
2492                .iter()
2493                .map(|(iface, d)| format!("{iface}={}", d.irqs.len()))
2494                .collect::<Vec<_>>()
2495                .join(", ");
2496            info!(
2497                "Netdev IRQ balancing enabled: overriding {total_irqs} IRQ{} \
2498                 across {} interface{} [{breakdown}]",
2499                if total_irqs == 1 { "" } else { "s" },
2500                devs.len(),
2501                if devs.len() == 1 { "" } else { "s" },
2502            );
2503            devs
2504        } else {
2505            BTreeMap::new()
2506        };
2507
2508        if !disable_topology {
2509            if topo.nodes.len() == 1 && topo.nodes[&0].llcs.len() == 1 {
2510                disable_topology = true;
2511            };
2512            info!(
2513                "Topology awareness not specified, selecting {} based on hardware",
2514                if disable_topology {
2515                    "disabled"
2516                } else {
2517                    "enabled"
2518                }
2519            );
2520        };
2521
2522        let cpu_pool = CpuPool::new(topo.clone(), opts.allow_partial_core)?;
2523
2524        // If disabling topology awareness clear out any set NUMA/LLC configs and
2525        // it will fallback to using all cores.
2526        let layer_specs: Vec<_> = if disable_topology {
2527            info!("Disabling topology awareness");
2528            layer_specs
2529                .iter()
2530                .cloned()
2531                .map(|mut s| {
2532                    s.kind.common_mut().nodes.clear();
2533                    s.kind.common_mut().llcs.clear();
2534                    s
2535                })
2536                .collect()
2537        } else {
2538            layer_specs.to_vec()
2539        };
2540
2541        // Validate that spec node/LLC references exist in the topology.
2542        for spec in layer_specs.iter() {
2543            let mut seen = BTreeSet::new();
2544            for &node_id in spec.nodes().iter() {
2545                if !topo.nodes.contains_key(&node_id) {
2546                    bail!(
2547                        "layer {:?}: nodes references node {} which does not \
2548                         exist in the topology (available: {:?})",
2549                        spec.name,
2550                        node_id,
2551                        topo.nodes.keys().collect::<Vec<_>>()
2552                    );
2553                }
2554                if !seen.insert(node_id) {
2555                    bail!(
2556                        "layer {:?}: nodes contains duplicate node {}",
2557                        spec.name,
2558                        node_id
2559                    );
2560                }
2561            }
2562
2563            seen.clear();
2564            for &llc_id in spec.llcs().iter() {
2565                if !topo.all_llcs.contains_key(&llc_id) {
2566                    bail!(
2567                        "layer {:?}: llcs references LLC {} which does not \
2568                         exist in the topology (available: {:?})",
2569                        spec.name,
2570                        llc_id,
2571                        topo.all_llcs.keys().collect::<Vec<_>>()
2572                    );
2573                }
2574                if !seen.insert(llc_id) {
2575                    bail!(
2576                        "layer {:?}: llcs contains duplicate LLC {}",
2577                        spec.name,
2578                        llc_id
2579                    );
2580                }
2581            }
2582        }
2583
2584        for spec in layer_specs.iter() {
2585            let has_numa_node_match = spec
2586                .matches
2587                .iter()
2588                .flatten()
2589                .any(|m| matches!(m, LayerMatch::NumaNode(_)));
2590            let has_node_spread_algo = matches!(
2591                spec.kind.common().growth_algo,
2592                LayerGrowthAlgo::NodeSpread
2593                    | LayerGrowthAlgo::NodeSpreadReverse
2594                    | LayerGrowthAlgo::NodeSpreadRandom
2595            );
2596            if has_numa_node_match && has_node_spread_algo {
2597                bail!(
2598                    "layer {:?}: NumaNode matcher cannot be combined with {:?} \
2599                     growth algorithm. NodeSpread* allocates CPUs equally across \
2600                     ALL NUMA nodes, but NumaNode restricts tasks to one node's \
2601                     CPUs — CPUs on other nodes are wasted and utilization \
2602                     will never exceed 1/numa_nodes. Use a non-spread algorithm \
2603                     (e.g. Linear, Topo) instead.",
2604                    spec.name,
2605                    spec.kind.common().growth_algo
2606                );
2607            }
2608        }
2609
2610        // Check kernel features
2611        init_libbpf_logging(None);
2612        let kfuncs_in_syscall = scx_bpf_compat::kfuncs_supported_in_syscall()?;
2613        if !kfuncs_in_syscall {
2614            warn!("Using slow path: kfuncs not supported in syscall programs (a8e03b6bbb2c ∉ ker)");
2615        }
2616
2617        // Open the BPF prog first for verification.
2618        let debug_level = if opts.log_level.contains("trace") {
2619            2
2620        } else if opts.log_level.contains("debug") {
2621            1
2622        } else {
2623            0
2624        };
2625        let mut skel_builder = BpfSkelBuilder::default();
2626        skel_builder.obj_builder.debug(debug_level > 1);
2627
2628        info!(
2629            "Running scx_layered (build ID: {})",
2630            build_id::full_version(env!("CARGO_PKG_VERSION"))
2631        );
2632        let open_opts = opts.libbpf.clone().into_bpf_open_opts();
2633        let mut skel = scx_ops_open!(skel_builder, open_object, layered, open_opts)?;
2634
2635        // No memory BW tracking by default
2636        skel.progs.scx_pmu_switch_tc.set_autoload(membw_tracking);
2637        skel.progs.scx_pmu_tick_tc.set_autoload(membw_tracking);
2638
2639        let mut loaded_kprobes = HashSet::new();
2640
2641        // enable autoloads for conditionally loaded things
2642        // immediately after creating skel (because this is always before loading)
2643        if opts.enable_gpu_support {
2644            // by default, enable open if gpu support is enabled.
2645            // open has been observed to be relatively cheap to kprobe.
2646            if opts.gpu_kprobe_level >= 1 {
2647                compat::cond_kprobe_load("nvidia_open", &skel.progs.kprobe_nvidia_open)?;
2648                loaded_kprobes.insert("nvidia_open");
2649            }
2650            // enable the rest progressively based upon how often they are called
2651            // for observed workloads
2652            if opts.gpu_kprobe_level >= 2 {
2653                compat::cond_kprobe_load("nvidia_mmap", &skel.progs.kprobe_nvidia_mmap)?;
2654                loaded_kprobes.insert("nvidia_mmap");
2655            }
2656            if opts.gpu_kprobe_level >= 3 {
2657                compat::cond_kprobe_load("nvidia_poll", &skel.progs.kprobe_nvidia_poll)?;
2658                loaded_kprobes.insert("nvidia_poll");
2659            }
2660        }
2661
2662        let ext_sched_class_addr = get_kallsyms_addr("ext_sched_class");
2663        let idle_sched_class_addr = get_kallsyms_addr("idle_sched_class");
2664
2665        let event = if membw_tracking {
2666            setup_membw_tracking(&mut skel)?
2667        } else {
2668            0
2669        };
2670
2671        let rodata = skel.maps.rodata_data.as_mut().unwrap();
2672
2673        if let (Ok(ext_addr), Ok(idle_addr)) = (ext_sched_class_addr, idle_sched_class_addr) {
2674            rodata.ext_sched_class_addr = ext_addr;
2675            rodata.idle_sched_class_addr = idle_addr;
2676        } else {
2677            warn!(
2678                "Unable to get sched_class addresses from /proc/kallsyms, disabling skip_preempt."
2679            );
2680        }
2681
2682        rodata.slice_ns = scx_enums.SCX_SLICE_DFL;
2683        rodata.max_exec_ns = 20 * scx_enums.SCX_SLICE_DFL;
2684
2685        // Initialize skel according to @opts.
2686        skel.struct_ops.layered_mut().exit_dump_len = opts.exit_dump_len;
2687
2688        if !opts.disable_queued_wakeup {
2689            match *compat::SCX_OPS_ALLOW_QUEUED_WAKEUP {
2690                0 => info!("Kernel does not support queued wakeup optimization"),
2691                v => skel.struct_ops.layered_mut().flags |= v,
2692            }
2693        }
2694
2695        rodata.percpu_kthread_preempt = !opts.disable_percpu_kthread_preempt;
2696        rodata.percpu_kthread_preempt_all =
2697            !opts.disable_percpu_kthread_preempt && opts.percpu_kthread_preempt_all;
2698        rodata.debug = debug_level as u32;
2699        rodata.slice_ns = opts.slice_us * 1000;
2700        rodata.max_exec_ns = if opts.max_exec_us > 0 {
2701            opts.max_exec_us * 1000
2702        } else {
2703            opts.slice_us * 1000 * 20
2704        };
2705        rodata.nr_cpu_ids = *NR_CPU_IDS as u32;
2706        rodata.nr_possible_cpus = *NR_CPUS_POSSIBLE as u32;
2707        rodata.smt_enabled = topo.smt_enabled;
2708        rodata.has_little_cores = topo.has_little_cores();
2709        rodata.antistall_sec = opts.antistall_sec;
2710        rodata.monitor_disable = opts.monitor_disable;
2711        rodata.lo_fb_wait_ns = opts.lo_fb_wait_us * 1000;
2712        rodata.lo_fb_share_ppk = ((opts.lo_fb_share * 1024.0) as u32).clamp(1, 1024);
2713        rodata.enable_antistall = !opts.disable_antistall;
2714        rodata.enable_match_debug = opts.enable_match_debug;
2715        rodata.enable_gpu_support = opts.enable_gpu_support;
2716        rodata.kfuncs_supported_in_syscall = kfuncs_in_syscall;
2717
2718        for (cpu, sib) in topo.sibling_cpus().iter().enumerate() {
2719            rodata.__sibling_cpu[cpu] = *sib;
2720        }
2721        for cpu in topo.all_cpus.keys() {
2722            rodata.all_cpus[cpu / 8] |= 1 << (cpu % 8);
2723        }
2724
2725        rodata.nr_op_layers = layer_specs
2726            .iter()
2727            .filter(|spec| match &spec.kind {
2728                LayerKind::Open { .. } => spec.kind.common().preempt,
2729                _ => false,
2730            })
2731            .count() as u32;
2732        rodata.nr_on_layers = layer_specs
2733            .iter()
2734            .filter(|spec| match &spec.kind {
2735                LayerKind::Open { .. } => !spec.kind.common().preempt,
2736                _ => false,
2737            })
2738            .count() as u32;
2739        rodata.nr_gp_layers = layer_specs
2740            .iter()
2741            .filter(|spec| match &spec.kind {
2742                LayerKind::Grouped { .. } => spec.kind.common().preempt,
2743                _ => false,
2744            })
2745            .count() as u32;
2746        rodata.nr_gn_layers = layer_specs
2747            .iter()
2748            .filter(|spec| match &spec.kind {
2749                LayerKind::Grouped { .. } => !spec.kind.common().preempt,
2750                _ => false,
2751            })
2752            .count() as u32;
2753        rodata.nr_excl_layers = layer_specs
2754            .iter()
2755            .filter(|spec| spec.kind.common().exclusive)
2756            .count() as u32;
2757
2758        let mut min_open = u64::MAX;
2759        let mut min_preempt = u64::MAX;
2760
2761        for spec in layer_specs.iter() {
2762            if let LayerKind::Open { common, .. } = &spec.kind {
2763                min_open = min_open.min(common.disallow_open_after_us.unwrap());
2764                min_preempt = min_preempt.min(common.disallow_preempt_after_us.unwrap());
2765            }
2766        }
2767
2768        rodata.min_open_layer_disallow_open_after_ns = match min_open {
2769            u64::MAX => *DFL_DISALLOW_OPEN_AFTER_US,
2770            v => v,
2771        };
2772        rodata.min_open_layer_disallow_preempt_after_ns = match min_preempt {
2773            u64::MAX => *DFL_DISALLOW_PREEMPT_AFTER_US,
2774            v => v,
2775        };
2776
2777        // We set the pin path before loading the skeleton. This will ensure
2778        // libbpf creates and pins the map, or reuses the pinned map fd for us,
2779        // so that we can keep reusing the older map already pinned on scheduler
2780        // restarts.
2781        let layered_task_hint_map_path = &opts.task_hint_map;
2782        let hint_map = &mut skel.maps.scx_layered_task_hint_map;
2783        // Only set pin path if a path is provided.
2784        if !layered_task_hint_map_path.is_empty() {
2785            hint_map.set_pin_path(layered_task_hint_map_path).unwrap();
2786            rodata.task_hint_map_enabled = true;
2787        }
2788
2789        if !opts.hi_fb_thread_name.is_empty() {
2790            let bpf_hi_fb_thread_name = &mut rodata.hi_fb_thread_name;
2791            copy_into_cstr(bpf_hi_fb_thread_name, opts.hi_fb_thread_name.as_str());
2792            rodata.enable_hi_fb_thread_name_match = true;
2793        }
2794
2795        let cgroup_regexes = Self::init_layers(&mut skel, &layer_specs, &topo)?;
2796        skel.maps.rodata_data.as_mut().unwrap().nr_cgroup_regexes = cgroup_regexes.len() as u32;
2797        Self::init_nodes(&mut skel, opts, &topo);
2798
2799        let mut skel = scx_ops_load!(skel, layered, uei)?;
2800
2801        // Populate the mapping of hints to layer IDs for faster lookups
2802        if !hint_to_layer_map.is_empty() {
2803            for (k, v) in hint_to_layer_map.iter() {
2804                let key: u32 = *k as u32;
2805
2806                // Create hint_layer_info struct
2807                let mut info_bytes = vec![0u8; std::mem::size_of::<bpf_intf::hint_layer_info>()];
2808                let info_ptr = info_bytes.as_mut_ptr() as *mut bpf_intf::hint_layer_info;
2809                unsafe {
2810                    (*info_ptr).layer_id = v.layer_id as u32;
2811                    (*info_ptr).system_cpu_util_below = match v.system_cpu_util_below {
2812                        Some(threshold) => (threshold * 10000.0) as u64,
2813                        None => u64::MAX, // disabled sentinel
2814                    };
2815                    (*info_ptr).dsq_insert_below = match v.dsq_insert_below {
2816                        Some(threshold) => (threshold * 10000.0) as u64,
2817                        None => u64::MAX, // disabled sentinel
2818                    };
2819                }
2820
2821                skel.maps.hint_to_layer_id_map.update(
2822                    &key.to_ne_bytes(),
2823                    &info_bytes,
2824                    libbpf_rs::MapFlags::ANY,
2825                )?;
2826            }
2827        }
2828
2829        if membw_tracking {
2830            create_perf_fds(&mut skel, event)?;
2831        }
2832
2833        let mut layers = vec![];
2834        let layer_growth_orders =
2835            LayerGrowthAlgo::layer_core_orders(&cpu_pool, &layer_specs, &topo)?;
2836        for (idx, spec) in layer_specs.iter().enumerate() {
2837            let growth_order = layer_growth_orders
2838                .get(&idx)
2839                .with_context(|| "layer has no growth order".to_string())?;
2840            layers.push(Layer::new(spec, &topo, growth_order)?);
2841        }
2842
2843        let mut idle_qos_enabled = layers
2844            .iter()
2845            .any(|layer| layer.kind.common().idle_resume_us.unwrap_or(0) > 0);
2846        if idle_qos_enabled && !cpu_idle_resume_latency_supported() {
2847            warn!("idle_resume_us not supported, ignoring");
2848            idle_qos_enabled = false;
2849        }
2850
2851        Self::init_cpus(&skel, &layer_specs, &topo)?;
2852        Self::init_llc_prox_map(&mut skel, &topo)?;
2853        Self::init_node_prox_map(&mut skel, &topo)?;
2854        Self::init_node_ctx(&mut skel, &topo, nr_layers)?;
2855
2856        // Other stuff.
2857        let proc_reader = fb_procfs::ProcReader::new();
2858
2859        // Handle setup if layered is running in a pid namespace.
2860        let input = ProgramInput {
2861            ..Default::default()
2862        };
2863        let prog = &mut skel.progs.initialize_pid_namespace;
2864
2865        let _ = prog.test_run(input);
2866
2867        // XXX If we try to refresh the cpumasks here before attaching, we
2868        // sometimes (non-deterministically) don't see the updated values in
2869        // BPF. It would be better to update the cpumasks here before we
2870        // attach, but the value will quickly converge anyways so it's not a
2871        // huge problem in the interim until we figure it out.
2872
2873        // Allow all tasks to open and write to BPF task hint map, now that
2874        // we should have it pinned at the desired location.
2875        if !layered_task_hint_map_path.is_empty() {
2876            let path = CString::new(layered_task_hint_map_path.as_bytes()).unwrap();
2877            let mode: libc::mode_t = 0o666;
2878            unsafe {
2879                if libc::chmod(path.as_ptr(), mode) != 0 {
2880                    trace!("'chmod' to 666 of task hint map failed, continuing...");
2881                }
2882            }
2883        }
2884
2885        // Attach.
2886        let struct_ops = scx_ops_attach!(skel, layered)?;
2887
2888        // Turn on installed kprobes
2889        if opts.enable_gpu_support {
2890            if loaded_kprobes.contains("nvidia_open") {
2891                compat::cond_kprobe_attach("nvidia_open", &skel.progs.kprobe_nvidia_open)?;
2892            }
2893            if loaded_kprobes.contains("nvidia_mmap") {
2894                compat::cond_kprobe_attach("nvidia_mmap", &skel.progs.kprobe_nvidia_mmap)?;
2895            }
2896            if loaded_kprobes.contains("nvidia_poll") {
2897                compat::cond_kprobe_attach("nvidia_poll", &skel.progs.kprobe_nvidia_poll)?;
2898            }
2899        }
2900
2901        let stats_server = StatsServer::new(stats::server_data()).launch()?;
2902        let mut gpu_task_handler =
2903            GpuTaskAffinitizer::new(opts.gpu_affinitize_secs, opts.enable_gpu_affinitize);
2904        gpu_task_handler.init(topo.clone());
2905
2906        let sched = Self {
2907            struct_ops: Some(struct_ops),
2908            layer_specs,
2909
2910            sched_intv: Duration::from_secs_f64(opts.interval),
2911            layer_refresh_intv: Duration::from_millis(opts.layer_refresh_ms_avgruntime),
2912
2913            cpu_pool,
2914            layers,
2915            idle_qos_enabled,
2916
2917            sched_stats: Stats::new(&mut skel, &proc_reader, topo.clone(), &gpu_task_handler)?,
2918
2919            cgroup_regexes: Some(cgroup_regexes),
2920            nr_layer_cpus_ranges: vec![(0, 0); nr_layers],
2921            xnuma_mig_src: vec![vec![false; topo.nodes.len()]; nr_layers],
2922            growth_denied: vec![vec![false; topo.nodes.len()]; nr_layers],
2923            processing_dur: Default::default(),
2924
2925            proc_reader,
2926            skel,
2927
2928            topo,
2929            netdevs,
2930            stats_server,
2931            gpu_task_handler,
2932        };
2933
2934        info!("Layered Scheduler Attached. Run `scx_layered --monitor` for metrics.");
2935
2936        Ok(sched)
2937    }
2938
2939    fn update_cpumask(mask: &Cpumask, bpfmask: &mut [u8]) {
2940        for cpu in 0..mask.len() {
2941            if mask.test_cpu(cpu) {
2942                bpfmask[cpu / 8] |= 1 << (cpu % 8);
2943            } else {
2944                bpfmask[cpu / 8] &= !(1 << (cpu % 8));
2945            }
2946        }
2947    }
2948
2949    fn update_bpf_layer_cpumask(layer: &Layer, bpf_layer: &mut types::layer) {
2950        trace!("[{}] Updating BPF CPUs: {}", layer.name, &layer.cpus);
2951        Self::update_cpumask(&layer.cpus, &mut bpf_layer.cpus);
2952
2953        bpf_layer.nr_cpus = layer.nr_cpus as u32;
2954        for (llc_id, &nr_llc_cpus) in layer.nr_llc_cpus.iter().enumerate() {
2955            bpf_layer.nr_llc_cpus[llc_id] = nr_llc_cpus as u32;
2956        }
2957        for (node_id, &nr_node_cpus) in layer.nr_node_cpus.iter().enumerate() {
2958            bpf_layer.node[node_id].nr_cpus = nr_node_cpus as u32;
2959        }
2960
2961        bpf_layer.refresh_cpus = 1;
2962    }
2963
2964    fn update_netdev_cpumasks(&mut self) -> Result<()> {
2965        let available_cpus = self.cpu_pool.available_cpus();
2966        if available_cpus.is_empty() {
2967            return Ok(());
2968        }
2969
2970        for (iface, netdev) in self.netdevs.iter_mut() {
2971            let node = self
2972                .topo
2973                .nodes
2974                .values()
2975                .find(|n| n.id == netdev.node())
2976                .ok_or_else(|| anyhow!("Failed to get netdev node"))?;
2977            let node_cpus = node.span.clone();
2978            for (irq, irqmask) in netdev.irqs.iter_mut() {
2979                irqmask.clear_all();
2980                for cpu in available_cpus.iter() {
2981                    if !node_cpus.test_cpu(cpu) {
2982                        continue;
2983                    }
2984                    let _ = irqmask.set_cpu(cpu);
2985                }
2986                // If no CPUs are available in the node then spread the load across the node
2987                if irqmask.weight() == 0 {
2988                    for cpu in node_cpus.iter() {
2989                        let _ = irqmask.set_cpu(cpu);
2990                    }
2991                }
2992                trace!("{} updating irq {} cpumask {:?}", iface, irq, irqmask);
2993            }
2994            netdev.apply_cpumasks()?;
2995            debug!(
2996                "{iface}: applied affinity override to {} IRQ{}",
2997                netdev.irqs.len(),
2998                if netdev.irqs.len() == 1 { "" } else { "s" },
2999            );
3000        }
3001
3002        Ok(())
3003    }
3004
3005    fn clamp_target_by_membw(
3006        &self,
3007        layer: &Layer,
3008        membw_limit: f64,
3009        membw: f64,
3010        curtarget: u64,
3011    ) -> usize {
3012        let ncpu: u64 = layer.cpus.weight() as u64;
3013        let membw = (membw * 1024_f64.powf(3.0)).round() as u64;
3014        let membw_limit = (membw_limit * 1024_f64.powf(3.0)).round() as u64;
3015        let last_membw_percpu = if ncpu > 0 { membw / ncpu } else { 0 };
3016
3017        // Either there is no memory bandwidth limit set, or the counters
3018        // are not fully initialized yet. Just return the current target.
3019        if membw_limit == 0 || last_membw_percpu == 0 {
3020            return curtarget as usize;
3021        }
3022
3023        (membw_limit / last_membw_percpu) as usize
3024    }
3025
3026    /// Decompose per-layer CPU targets into per-node pinned demand and
3027    /// unpinned demand for unified_alloc(). Uses layer_node_pinned_utils
3028    /// to split each layer's target. All outputs are in alloc units.
3029    fn calc_raw_demands(&self, targets: &[(usize, usize)]) -> Vec<LayerDemand> {
3030        let au = self.cpu_pool.alloc_unit();
3031        let pinned_utils = &self.sched_stats.layer_node_pinned_utils;
3032        let nr_nodes = self.topo.nodes.len();
3033
3034        targets
3035            .iter()
3036            .enumerate()
3037            .map(|(idx, &(target, _min))| {
3038                let layer = &self.layers[idx];
3039                let weight = layer.kind.common().weight as usize;
3040
3041                // Open layers don't participate in allocation.
3042                if matches!(layer.kind, LayerKind::Open { .. }) {
3043                    return LayerDemand {
3044                        raw_pinned: vec![0; nr_nodes],
3045                        raw_unpinned: 0,
3046                        weight,
3047                        spread: false,
3048                    };
3049                }
3050
3051                let spread = matches!(
3052                    layer.growth_algo,
3053                    LayerGrowthAlgo::NodeSpread
3054                        | LayerGrowthAlgo::NodeSpreadReverse
3055                        | LayerGrowthAlgo::NodeSpreadRandom
3056                        | LayerGrowthAlgo::RoundRobin
3057                );
3058
3059                let util_high = match &layer.kind {
3060                    LayerKind::Confined { util_range, .. }
3061                    | LayerKind::Grouped { util_range, .. } => util_range.1,
3062                    _ => 1.0,
3063                };
3064
3065                // Convert per-node pinned utilization to CPU demand.
3066                let mut raw_pinned = vec![0usize; nr_nodes];
3067                for n in 0..nr_nodes {
3068                    let pu = pinned_utils[idx][n];
3069                    if pu < 0.01 {
3070                        continue;
3071                    }
3072                    // Check this layer has allowed_cpus on this node.
3073                    let node_span = &self.topo.nodes[&n].span;
3074                    if layer.allowed_cpus.and(node_span).is_empty() {
3075                        continue;
3076                    }
3077                    let cpus = (pu / util_high).ceil() as usize;
3078                    // Round up to alloc units.
3079                    let units = cpus.div_ceil(au);
3080                    raw_pinned[n] = units;
3081                }
3082
3083                // Unpinned = remainder of the target.
3084                let target_units = target.div_ceil(au);
3085                let pinned_units: usize = raw_pinned.iter().sum();
3086                let raw_unpinned = target_units.saturating_sub(pinned_units);
3087
3088                LayerDemand {
3089                    raw_pinned,
3090                    raw_unpinned,
3091                    weight,
3092                    spread,
3093                }
3094            })
3095            .collect()
3096    }
3097
3098    /// Calculate how many CPUs each layer would like to have if there were
3099    /// no competition. The CPU range is determined by applying the inverse
3100    /// of util_range and then capping by cpus_range. If the current
3101    /// allocation is within the acceptable range, no change is made.
3102    /// Returns (target, min) pair for each layer.
3103    fn calc_target_nr_cpus(&self) -> Vec<(usize, usize)> {
3104        let nr_cpus = self.cpu_pool.topo.all_cpus.len();
3105        let utils = &self.sched_stats.layer_utils;
3106        let membws = &self.sched_stats.layer_membws;
3107
3108        let mut records: Vec<(u64, u64, u64, usize, usize, usize)> = vec![];
3109        let mut targets: Vec<(usize, usize)> = vec![];
3110
3111        for (idx, layer) in self.layers.iter().enumerate() {
3112            targets.push(match &layer.kind {
3113                LayerKind::Confined {
3114                    util_range,
3115                    cpus_range,
3116                    cpus_range_frac,
3117                    membw_gb,
3118                    ..
3119                }
3120                | LayerKind::Grouped {
3121                    util_range,
3122                    cpus_range,
3123                    cpus_range_frac,
3124                    membw_gb,
3125                    ..
3126                } => {
3127                    let cpus_range =
3128                        resolve_cpus_pct_range(cpus_range, cpus_range_frac, nr_cpus).unwrap();
3129
3130                    // A grouped layer can choose to include open cputime
3131                    // for sizing. Also, as an empty layer can only get CPU
3132                    // time through fallback (counted as owned) or open
3133                    // execution, add open cputime for empty layers.
3134                    let owned = utils[idx][LAYER_USAGE_OWNED];
3135                    let open = utils[idx][LAYER_USAGE_OPEN];
3136
3137                    let membw_owned = membws[idx][LAYER_USAGE_OWNED];
3138                    let membw_open = membws[idx][LAYER_USAGE_OPEN];
3139
3140                    let mut util = owned;
3141                    let mut membw = membw_owned;
3142                    if layer.kind.util_includes_open_cputime() || layer.nr_cpus == 0 {
3143                        util += open;
3144                        membw += membw_open;
3145                    }
3146
3147                    let util = if util < 0.01 { 0.0 } else { util };
3148                    let low = (util / util_range.1).ceil() as usize;
3149                    let high = ((util / util_range.0).floor() as usize).max(low);
3150
3151                    let membw_limit = match membw_gb {
3152                        Some(membw_limit) => *membw_limit,
3153                        None => 0.0,
3154                    };
3155
3156                    trace!(
3157                        "layer {0} (membw, membw_limit): ({membw} gi_b, {membw_limit} gi_b)",
3158                        layer.name
3159                    );
3160
3161                    let target = layer.cpus.weight().clamp(low, high);
3162
3163                    records.push((
3164                        (owned * 100.0) as u64,
3165                        (open * 100.0) as u64,
3166                        (util * 100.0) as u64,
3167                        low,
3168                        high,
3169                        target,
3170                    ));
3171
3172                    let target = target.clamp(cpus_range.0, cpus_range.1);
3173                    let membw_target =
3174                        self.clamp_target_by_membw(layer, membw_limit, membw, target as u64);
3175
3176                    trace!("CPU target pre- and post-membw adjustment: {target} -> {membw_target}");
3177
3178                    // If there's no way to drop our memory usage down enough,
3179                    // pin the target CPUs to low and drop a warning.
3180                    if membw_target < cpus_range.0 {
3181                        warn!("cannot satisfy memory bw limit for layer {}", layer.name);
3182                        warn!("membw_target {membw_target} low {}", cpus_range.0);
3183                    };
3184
3185                    // Memory bandwidth target cannot override imposed limits or bump
3186                    // the target above what CPU usage-based throttling requires.
3187                    let target = membw_target.clamp(cpus_range.0, target);
3188
3189                    (target, cpus_range.0)
3190                }
3191                LayerKind::Open { .. } => (0, 0),
3192            });
3193        }
3194
3195        trace!("(owned, open, util, low, high, target): {:?}", &records);
3196        targets
3197    }
3198
3199    // Figure out a tuple (LLCs, extra_cpus) in terms of the target CPUs
3200    // computed by weighted_target_nr_cpus. Returns the number of full LLCs
3201    // occupied by a layer, and any extra CPUs that don't occupy a full LLC.
3202    fn compute_target_llcs(target: usize, topo: &Topology) -> (usize, usize) {
3203        // TODO(kkd): We assume each LLC has equal number of cores.
3204        let cores_per_llc = topo.all_cores.len() / topo.all_llcs.len();
3205        // TODO(kkd): We assume each core has fixed number of threads.
3206        let cpus_per_core = topo.all_cores.first_key_value().unwrap().1.cpus.len();
3207        let cpus_per_llc = cores_per_llc * cpus_per_core;
3208
3209        let full = target / cpus_per_llc;
3210        let extra = target % cpus_per_llc;
3211
3212        (full, extra.div_ceil(cpus_per_core))
3213    }
3214
3215    // Recalculate the core order for layers using StickyDynamic growth
3216    // algorithm. Uses per-node targets from unified_alloc() to decide how
3217    // many LLCs each layer gets on each node, then builds core_order and
3218    // applies CPU changes.
3219    fn recompute_layer_core_order(
3220        &mut self,
3221        layer_targets: &[(usize, usize)],
3222        layer_allocs: &[LayerAlloc],
3223        au: usize,
3224    ) -> Result<bool> {
3225        let nr_nodes = self.topo.nodes.len();
3226
3227        // Phase 1 — Free per-node: return excess LLCs to cpu_pool.
3228        debug!(
3229            " free: before pass: free_llcs={:?}",
3230            self.cpu_pool.free_llcs
3231        );
3232        for &(idx, _) in layer_targets.iter().rev() {
3233            let layer = &mut self.layers[idx];
3234
3235            if layer.growth_algo != LayerGrowthAlgo::StickyDynamic {
3236                continue;
3237            }
3238
3239            let alloc = &layer_allocs[idx];
3240
3241            for n in 0..nr_nodes {
3242                let assigned_on_n = layer.assigned_llcs[n].len();
3243                let target_full_n =
3244                    Self::compute_target_llcs(alloc.node_target(n) * au, &self.topo).0;
3245                let mut to_free = assigned_on_n.saturating_sub(target_full_n);
3246
3247                debug!(
3248                    " free: layer={} node={} assigned={} target_full={} to_free={}",
3249                    layer.name, n, assigned_on_n, target_full_n, to_free,
3250                );
3251
3252                while to_free > 0 {
3253                    if let Some(llc) = layer.assigned_llcs[n].pop() {
3254                        self.cpu_pool.return_llc(llc);
3255                        to_free -= 1;
3256                        debug!(" layer={} freed_llc={} from node={}", layer.name, llc, n);
3257                    } else {
3258                        break;
3259                    }
3260                }
3261            }
3262        }
3263        debug!(" free: after pass: free_llcs={:?}", self.cpu_pool.free_llcs);
3264
3265        // Phase 2 — Acquire per-node: claim LLCs from cpu_pool.
3266        for &(idx, _) in layer_targets.iter().rev() {
3267            let layer = &mut self.layers[idx];
3268
3269            if layer.growth_algo != LayerGrowthAlgo::StickyDynamic {
3270                continue;
3271            }
3272
3273            let alloc = &layer_allocs[idx];
3274
3275            for n in 0..nr_nodes {
3276                let cur_on_n = layer.assigned_llcs[n].len();
3277                let target_full_n =
3278                    Self::compute_target_llcs(alloc.node_target(n) * au, &self.topo).0;
3279                let mut to_alloc = target_full_n.saturating_sub(cur_on_n);
3280
3281                debug!(
3282                    " alloc: layer={} node={} cur={} target_full={} to_alloc={} free={}",
3283                    layer.name,
3284                    n,
3285                    cur_on_n,
3286                    target_full_n,
3287                    to_alloc,
3288                    self.cpu_pool.free_llcs.get(&n).map_or(0, |v| v.len()),
3289                );
3290
3291                while to_alloc > 0 {
3292                    if let Some(llc) = self.cpu_pool.take_llc_from_node(n) {
3293                        layer.assigned_llcs[n].push(llc);
3294                        to_alloc -= 1;
3295                        debug!(" layer={} alloc_llc={} on node={}", layer.name, llc, n);
3296                    } else {
3297                        break;
3298                    }
3299                }
3300            }
3301
3302            debug!(
3303                " alloc: layer={} assigned_llcs={:?}",
3304                layer.name, layer.assigned_llcs
3305            );
3306        }
3307
3308        // Phase 3 — Spillover per-node: consume extra cores from free LLCs.
3309        let cores_per_llc = self.topo.all_cores.len() / self.topo.all_llcs.len();
3310        let cpus_per_core = self.topo.all_cores.first_key_value().unwrap().1.cpus.len();
3311        let cpus_per_llc = cores_per_llc * cpus_per_core;
3312
3313        for &(idx, _) in layer_targets.iter() {
3314            let layer = &mut self.layers[idx];
3315
3316            if layer.growth_algo != LayerGrowthAlgo::StickyDynamic {
3317                continue;
3318            }
3319
3320            layer.core_order = vec![Vec::new(); nr_nodes];
3321            let alloc = &layer_allocs[idx];
3322
3323            for n in 0..nr_nodes {
3324                let mut extra = Self::compute_target_llcs(alloc.node_target(n) * au, &self.topo).1;
3325
3326                if let Some(node_llcs) = self.cpu_pool.free_llcs.get_mut(&n) {
3327                    for entry in node_llcs.iter_mut() {
3328                        if extra == 0 {
3329                            break;
3330                        }
3331                        let avail = cpus_per_llc - entry.1;
3332                        let mut used = extra.min(avail);
3333                        let cores_to_add = used;
3334
3335                        let shift = entry.1;
3336                        entry.1 += used;
3337
3338                        let llc_id = entry.0;
3339                        let llc = self.topo.all_llcs.get(&llc_id).unwrap();
3340
3341                        for core in llc.cores.iter().skip(shift) {
3342                            if used == 0 {
3343                                break;
3344                            }
3345                            layer.core_order[n].push(core.1.id);
3346                            used -= 1;
3347                        }
3348
3349                        extra -= cores_to_add;
3350                    }
3351                }
3352            }
3353
3354            for node_cores in &mut layer.core_order {
3355                node_cores.reverse();
3356            }
3357        }
3358
3359        // Reset consumed entries in free LLCs.
3360        for node_llcs in self.cpu_pool.free_llcs.values_mut() {
3361            for entry in node_llcs.iter_mut() {
3362                entry.1 = 0;
3363            }
3364        }
3365
3366        // Phase 4 — Build core_order: append cores from assigned LLCs.
3367        for &(idx, _) in layer_targets.iter() {
3368            let layer = &mut self.layers[idx];
3369
3370            if layer.growth_algo != LayerGrowthAlgo::StickyDynamic {
3371                continue;
3372            }
3373
3374            let all_assigned: HashSet<usize> =
3375                layer.assigned_llcs.iter().flatten().copied().collect();
3376
3377            for core in self.topo.all_cores.iter() {
3378                let llc_id = core.1.llc_id;
3379                if all_assigned.contains(&llc_id) {
3380                    let nid = core.1.node_id;
3381                    layer.core_order[nid].push(core.1.id);
3382                }
3383            }
3384            for node_cores in &mut layer.core_order {
3385                node_cores.reverse();
3386            }
3387
3388            debug!(
3389                " alloc: layer={} core_order={:?}",
3390                layer.name, layer.core_order
3391            );
3392        }
3393
3394        // Phase 5 — Apply CPU changes for StickyDynamic layers.
3395        // Two phases: first free per-node, then allocate per-node.
3396        let mut updated = false;
3397
3398        // Free excess CPUs per-node.
3399        for &(idx, _) in layer_targets.iter() {
3400            let layer = &mut self.layers[idx];
3401
3402            if layer.growth_algo != LayerGrowthAlgo::StickyDynamic {
3403                continue;
3404            }
3405
3406            for n in 0..nr_nodes {
3407                let mut node_target = Cpumask::new();
3408                for &core_id in &layer.core_order[n] {
3409                    if let Some(core) = self.topo.all_cores.get(&core_id) {
3410                        node_target |= &core.span;
3411                    }
3412                }
3413                node_target &= &layer.allowed_cpus;
3414
3415                let node_span = &self.topo.nodes[&n].span;
3416                let node_cur = layer.cpus.and(node_span);
3417                let cpus_to_free = node_cur.and(&node_target.not());
3418
3419                if cpus_to_free.weight() > 0 {
3420                    debug!(
3421                        " apply: layer={} freeing CPUs on node {}: {}",
3422                        layer.name, n, cpus_to_free
3423                    );
3424                    layer.cpus &= &cpus_to_free.not();
3425                    layer.nr_cpus -= cpus_to_free.weight();
3426                    for cpu in cpus_to_free.iter() {
3427                        layer.nr_llc_cpus[self.cpu_pool.topo.all_cpus[&cpu].llc_id] -= 1;
3428                        layer.nr_node_cpus[n] -= 1;
3429                    }
3430                    self.cpu_pool.free(&cpus_to_free)?;
3431                    updated = true;
3432                }
3433            }
3434        }
3435
3436        // Allocate needed CPUs per-node.
3437        for &(idx, _) in layer_targets.iter() {
3438            let layer = &mut self.layers[idx];
3439
3440            if layer.growth_algo != LayerGrowthAlgo::StickyDynamic {
3441                continue;
3442            }
3443
3444            for n in 0..nr_nodes {
3445                let mut node_target = Cpumask::new();
3446                for &core_id in &layer.core_order[n] {
3447                    if let Some(core) = self.topo.all_cores.get(&core_id) {
3448                        node_target |= &core.span;
3449                    }
3450                }
3451                node_target &= &layer.allowed_cpus;
3452
3453                let available_cpus = self.cpu_pool.available_cpus();
3454                let desired_to_alloc = node_target.and(&layer.cpus.clone().not());
3455                let cpus_to_alloc = desired_to_alloc.clone().and(&available_cpus);
3456
3457                if desired_to_alloc.weight() > cpus_to_alloc.weight() {
3458                    debug!(
3459                        " apply: layer={} node {} wanted to alloc {} CPUs but only {} available",
3460                        layer.name,
3461                        n,
3462                        desired_to_alloc.weight(),
3463                        cpus_to_alloc.weight()
3464                    );
3465                }
3466
3467                if cpus_to_alloc.weight() > 0 {
3468                    debug!(
3469                        " apply: layer={} allocating CPUs on node {}: {}",
3470                        layer.name, n, cpus_to_alloc
3471                    );
3472                    layer.cpus |= &cpus_to_alloc;
3473                    layer.nr_cpus += cpus_to_alloc.weight();
3474                    for cpu in cpus_to_alloc.iter() {
3475                        layer.nr_llc_cpus[self.cpu_pool.topo.all_cpus[&cpu].llc_id] += 1;
3476                        layer.nr_node_cpus[n] += 1;
3477                    }
3478                    self.cpu_pool.mark_allocated(&cpus_to_alloc)?;
3479                    updated = true;
3480                }
3481            }
3482
3483            debug!(
3484                " apply: layer={} final cpus.weight()={} nr_cpus={}",
3485                layer.name,
3486                layer.cpus.weight(),
3487                layer.nr_cpus
3488            );
3489        }
3490
3491        Ok(updated)
3492    }
3493
3494    fn refresh_node_ctx(
3495        skel: &mut BpfSkel,
3496        topo: &Topology,
3497        node_empty_layers: &[Vec<u32>],
3498        init: bool,
3499    ) {
3500        for &nid in topo.nodes.keys() {
3501            let mut arg: bpf_intf::refresh_node_ctx_arg =
3502                unsafe { MaybeUninit::zeroed().assume_init() };
3503            arg.node_id = nid as u32;
3504            arg.init = init as u32;
3505
3506            let empty = &node_empty_layers[nid];
3507            arg.nr_empty_layer_ids = empty.len() as u32;
3508            for (i, &lid) in empty.iter().enumerate() {
3509                arg.empty_layer_ids[i] = lid;
3510            }
3511            for i in empty.len()..MAX_LAYERS {
3512                arg.empty_layer_ids[i] = MAX_LAYERS as u32;
3513            }
3514
3515            if init {
3516                // Per-node LLC list — static topology, only set during init.
3517                let node = &topo.nodes[&nid];
3518                let llcs: Vec<u32> = node.llcs.keys().map(|&id| id as u32).collect();
3519                arg.nr_llcs = llcs.len() as u32;
3520                for (i, &llc_id) in llcs.iter().enumerate() {
3521                    arg.llcs[i] = llc_id;
3522                }
3523            }
3524
3525            let input = ProgramInput {
3526                context_in: Some(unsafe { plain::as_mut_bytes(&mut arg) }),
3527                ..Default::default()
3528            };
3529            let _ = skel.progs.refresh_node_ctx.test_run(input);
3530        }
3531    }
3532
3533    fn refresh_cpumasks(&mut self) -> Result<()> {
3534        let layer_is_open = |layer: &Layer| matches!(layer.kind, LayerKind::Open { .. });
3535
3536        let mut updated = false;
3537        let raw_targets = self.calc_target_nr_cpus();
3538        let au = self.cpu_pool.alloc_unit();
3539        let total_cpus = self.cpu_pool.topo.all_cpus.len();
3540
3541        // Dampen shrink: only drop halfway per cycle to avoid unnecessary
3542        // changes. There's some dampening built into util metrics but slow
3543        // down freeing further. This is solely based on intuition. Drop or
3544        // update according to real-world behavior.
3545        let targets: Vec<(usize, usize)> = raw_targets
3546            .iter()
3547            .enumerate()
3548            .map(|(idx, &(target, min))| {
3549                let cur = self.layers[idx].nr_cpus;
3550                if target < cur {
3551                    let dampened = cur - (cur - target).div_ceil(2);
3552                    (dampened.max(min), min)
3553                } else {
3554                    (target, min)
3555                }
3556            })
3557            .collect();
3558
3559        // Build demands for unified_alloc and compute per-node allocations.
3560        let demands = self.calc_raw_demands(&targets);
3561        let nr_nodes = self.topo.nodes.len();
3562        let node_caps: Vec<usize> = self
3563            .topo
3564            .nodes
3565            .values()
3566            .map(|n| n.span.weight() / au)
3567            .collect();
3568        let all_layer_nodes: Vec<&[usize]> = self
3569            .layer_specs
3570            .iter()
3571            .map(|s| s.nodes().as_slice())
3572            .collect();
3573        let norders: Vec<Vec<usize>> = (0..self.layers.len())
3574            .map(|idx| {
3575                layer_core_growth::node_order(
3576                    self.layer_specs[idx].nodes(),
3577                    &self.topo,
3578                    idx,
3579                    &all_layer_nodes,
3580                )
3581            })
3582            .collect();
3583        let cur_node_cpus: Vec<Vec<usize>> = self
3584            .layers
3585            .iter()
3586            .map(|layer| (0..nr_nodes).map(|n| layer.nr_node_cpus[n] / au).collect())
3587            .collect();
3588        let layer_allocs = unified_alloc(
3589            total_cpus / au,
3590            &node_caps,
3591            &demands,
3592            &cur_node_cpus,
3593            &norders,
3594        );
3595
3596        // Convert allocations back to CPU counts. Shrink dampening is
3597        // already applied to the targets fed into unified_alloc above.
3598        let cpu_targets: Vec<usize> = layer_allocs.iter().map(|a| a.total() * au).collect();
3599
3600        // Snapshot per-layer CPU counts for ALLOC debug logging.
3601        let prev_nr_cpus: Vec<usize> = self.layers.iter().map(|l| l.nr_cpus).collect();
3602
3603        let mut ascending: Vec<(usize, usize)> = cpu_targets.iter().copied().enumerate().collect();
3604        ascending.sort_by(|a, b| a.1.cmp(&b.1));
3605
3606        // Snapshot per-layer per-node CPU counts before allocation changes.
3607        let prev_node_cpus: Vec<Vec<usize>> =
3608            self.layers.iter().map(|l| l.nr_node_cpus.clone()).collect();
3609
3610        // Per-node SD allocation requires multiple LLCs. On flat topologies
3611        // (single LLC, e.g. VMs with topology disabled), SD layers fall through
3612        // to the non-SD grow/shrink loops below.
3613        let use_sd_alloc = self.topo.all_llcs.len() > 1;
3614        let sticky_dynamic_updated = if use_sd_alloc {
3615            self.recompute_layer_core_order(&ascending, &layer_allocs, au)?
3616        } else {
3617            false
3618        };
3619        updated |= sticky_dynamic_updated;
3620
3621        // Update BPF cpumasks for StickyDynamic layers if they were updated
3622        if sticky_dynamic_updated {
3623            for (idx, layer) in self.layers.iter().enumerate() {
3624                if layer.growth_algo == LayerGrowthAlgo::StickyDynamic {
3625                    Self::update_bpf_layer_cpumask(
3626                        layer,
3627                        &mut self.skel.maps.bss_data.as_mut().unwrap().layers[idx],
3628                    );
3629                }
3630            }
3631        }
3632
3633        // Shrink per-node: free excess CPUs from each node.
3634        for &(idx, _target) in ascending.iter().rev() {
3635            let layer = &mut self.layers[idx];
3636            if layer_is_open(layer) {
3637                continue;
3638            }
3639
3640            // Skip StickyDynamic layers when per-node SD allocation is active.
3641            if layer.growth_algo == LayerGrowthAlgo::StickyDynamic && use_sd_alloc {
3642                continue;
3643            }
3644
3645            let alloc = &layer_allocs[idx];
3646            let mut freed = false;
3647
3648            for n in 0..nr_nodes {
3649                let desired = alloc.node_target(n) * au;
3650                let mut to_free = layer.nr_node_cpus[n].saturating_sub(desired);
3651                let node_span = &self.topo.nodes[&n].span;
3652
3653                while to_free > 0 {
3654                    let node_cands = layer.cpus.and(node_span);
3655                    let cpus_to_free = match self
3656                        .cpu_pool
3657                        .next_to_free(&node_cands, layer.core_order[n].iter().rev())?
3658                    {
3659                        Some(ret) => ret,
3660                        None => break,
3661                    };
3662                    let nr = cpus_to_free.weight();
3663                    trace!(
3664                        "[{}] freeing CPUs on node {}: {}",
3665                        layer.name,
3666                        n,
3667                        &cpus_to_free
3668                    );
3669                    layer.cpus &= &cpus_to_free.not();
3670                    layer.nr_cpus -= nr;
3671                    for cpu in cpus_to_free.iter() {
3672                        let node_id = self.cpu_pool.topo.all_cpus[&cpu].node_id;
3673                        layer.nr_llc_cpus[self.cpu_pool.topo.all_cpus[&cpu].llc_id] -= 1;
3674                        layer.nr_node_cpus[node_id] -= 1;
3675                        layer.nr_pinned_cpus[node_id] =
3676                            layer.nr_pinned_cpus[node_id].min(layer.nr_node_cpus[node_id]);
3677                    }
3678                    self.cpu_pool.free(&cpus_to_free)?;
3679                    to_free = to_free.saturating_sub(nr);
3680                    freed = true;
3681                }
3682            }
3683
3684            if freed {
3685                Self::update_bpf_layer_cpumask(
3686                    layer,
3687                    &mut self.skel.maps.bss_data.as_mut().unwrap().layers[idx],
3688                );
3689                updated = true;
3690            }
3691        }
3692
3693        // Grow layers per-node using allocations from unified_alloc.
3694        for &(idx, _target) in &ascending {
3695            let layer = &mut self.layers[idx];
3696
3697            if layer_is_open(layer) {
3698                continue;
3699            }
3700
3701            // Skip StickyDynamic layers when per-node SD allocation is active.
3702            if layer.growth_algo == LayerGrowthAlgo::StickyDynamic && use_sd_alloc {
3703                continue;
3704            }
3705
3706            let alloc = &layer_allocs[idx];
3707            let norder = &norders[idx];
3708            let mut alloced = false;
3709
3710            for &node_id in norder.iter() {
3711                let node_target = alloc.node_target(node_id) * au;
3712                let cur_node = layer.nr_node_cpus[node_id];
3713                if node_target <= cur_node {
3714                    continue;
3715                }
3716                let pinned_target = alloc.pinned[node_id] * au;
3717                let mut nr_to_alloc = node_target - cur_node;
3718                let node_span = &self.topo.nodes[&node_id].span;
3719                let node_allowed = layer.allowed_cpus.and(node_span);
3720
3721                while nr_to_alloc > 0 {
3722                    let nr_alloced = match self.cpu_pool.alloc_cpus(
3723                        &node_allowed,
3724                        &layer.core_order[node_id],
3725                        nr_to_alloc,
3726                    ) {
3727                        Some(new_cpus) => {
3728                            let nr = new_cpus.weight();
3729                            layer.cpus |= &new_cpus;
3730                            layer.nr_cpus += nr;
3731                            for cpu in new_cpus.iter() {
3732                                layer.nr_llc_cpus[self.cpu_pool.topo.all_cpus[&cpu].llc_id] += 1;
3733                                let nid = self.cpu_pool.topo.all_cpus[&cpu].node_id;
3734                                layer.nr_node_cpus[nid] += 1;
3735                                if layer.nr_pinned_cpus[nid] < pinned_target {
3736                                    layer.nr_pinned_cpus[nid] += 1;
3737                                }
3738                            }
3739                            nr
3740                        }
3741                        None => 0,
3742                    };
3743                    if nr_alloced == 0 {
3744                        break;
3745                    }
3746                    alloced = true;
3747                    nr_to_alloc -= nr_alloced.min(nr_to_alloc);
3748                }
3749            }
3750
3751            if alloced {
3752                Self::update_bpf_layer_cpumask(
3753                    layer,
3754                    &mut self.skel.maps.bss_data.as_mut().unwrap().layers[idx],
3755                );
3756                updated = true;
3757            }
3758        }
3759
3760        // Recompute growth_denied: for each node, check whether the
3761        // unpinned portion of the layer warranted growth (unpinned util /
3762        // util_high > unpinned CPUs) but the node didn't gain CPUs.  Only
3763        // unpinned matters because pinned tasks can't migrate cross-node.
3764        let node_utils = &self.sched_stats.layer_node_utils;
3765        let pinned_utils = &self.sched_stats.layer_node_pinned_utils;
3766        for (idx, layer) in self.layers.iter().enumerate() {
3767            self.growth_denied[idx].fill(false);
3768            let util_high = match layer.kind.util_range() {
3769                Some((_, high)) => high,
3770                None => continue,
3771            };
3772            for n in 0..nr_nodes {
3773                let unpinned_util = (node_utils[idx][n] - pinned_utils[idx][n]).max(0.0);
3774                let unpinned_cpus_needed = unpinned_util / util_high;
3775                let unpinned_cpus_have =
3776                    layer.nr_node_cpus[n].saturating_sub(layer.nr_pinned_cpus[n]) as f64;
3777                let wanted = unpinned_cpus_needed > unpinned_cpus_have;
3778                let got = layer.nr_node_cpus[n] > prev_node_cpus[idx][n];
3779                if wanted && !got {
3780                    self.growth_denied[idx][n] = true;
3781                }
3782            }
3783        }
3784
3785        // Log per-layer allocation changes.
3786        if updated {
3787            for (idx, layer) in self.layers.iter().enumerate() {
3788                if layer_is_open(layer) {
3789                    continue;
3790                }
3791                let prev = prev_nr_cpus[idx];
3792                let cur = layer.nr_cpus;
3793                if prev != cur {
3794                    debug!(
3795                        "ALLOC {} algo={:?} cpus:{}→{} mask={:x}",
3796                        layer.name, layer.growth_algo, prev, cur, layer.cpus,
3797                    );
3798                }
3799            }
3800            debug!(
3801                "ALLOC pool_available={}",
3802                self.cpu_pool.available_cpus().weight()
3803            );
3804        }
3805
3806        // Log allocation changes.
3807        if updated {
3808            let nr_nodes = self.topo.nodes.len();
3809            for (idx, layer) in self.layers.iter().enumerate() {
3810                if layer_is_open(layer) {
3811                    continue;
3812                }
3813                let prev = &prev_node_cpus[idx];
3814                let cur = &layer.nr_node_cpus;
3815                if prev == cur {
3816                    continue;
3817                }
3818                let per_node: String = (0..nr_nodes)
3819                    .map(|n| format!("n{}:{}→{}", n, prev[n], cur[n]))
3820                    .collect::<Vec<_>>()
3821                    .join(" ");
3822                let prev_total: usize = prev.iter().sum();
3823                let cur_total: usize = cur[..nr_nodes].iter().sum();
3824                let target: String = (0..nr_nodes)
3825                    .map(|n| format!("n{}:{}", n, layer_allocs[idx].node_target(n) * au))
3826                    .collect::<Vec<_>>()
3827                    .join(" ");
3828                debug!(
3829                    "ALLOC {} algo={:?} {} total:{}→{} target:[{}] mask={:x}",
3830                    layer.name,
3831                    layer.growth_algo,
3832                    per_node,
3833                    prev_total,
3834                    cur_total,
3835                    target,
3836                    layer.cpus,
3837                );
3838            }
3839            debug!(
3840                "ALLOC pool_available={}",
3841                self.cpu_pool.available_cpus().weight()
3842            );
3843        }
3844
3845        // Give the rest to the open layers.
3846        if updated {
3847            for (idx, layer) in self.layers.iter_mut().enumerate() {
3848                if !layer_is_open(layer) {
3849                    continue;
3850                }
3851
3852                let bpf_layer = &mut self.skel.maps.bss_data.as_mut().unwrap().layers[idx];
3853                let available_cpus = self.cpu_pool.available_cpus().and(&layer.allowed_cpus);
3854                let nr_available_cpus = available_cpus.weight();
3855
3856                // Open layers need the intersection of allowed cpus and
3857                // available cpus. Recompute per-LLC and per-node counts
3858                // since open layers bypass alloc/free.
3859                layer.cpus = available_cpus;
3860                layer.nr_cpus = nr_available_cpus;
3861                for llc in self.cpu_pool.topo.all_llcs.values() {
3862                    layer.nr_llc_cpus[llc.id] = layer.cpus.and(&llc.span).weight();
3863                }
3864                for node in self.cpu_pool.topo.nodes.values() {
3865                    layer.nr_node_cpus[node.id] = layer.cpus.and(&node.span).weight();
3866                    layer.nr_pinned_cpus[node.id] = 0;
3867                }
3868                Self::update_bpf_layer_cpumask(layer, bpf_layer);
3869            }
3870
3871            for (&node_id, &cpu) in &self.cpu_pool.fallback_cpus {
3872                self.skel.maps.bss_data.as_mut().unwrap().fallback_cpus[node_id] = cpu as u32;
3873            }
3874
3875            for (lidx, layer) in self.layers.iter().enumerate() {
3876                self.nr_layer_cpus_ranges[lidx] = (
3877                    self.nr_layer_cpus_ranges[lidx].0.min(layer.nr_cpus),
3878                    self.nr_layer_cpus_ranges[lidx].1.max(layer.nr_cpus),
3879                );
3880            }
3881
3882            // Trigger updates on the BPF side.
3883            let input = ProgramInput {
3884                ..Default::default()
3885            };
3886            let prog = &mut self.skel.progs.refresh_layer_cpumasks;
3887            let _ = prog.test_run(input);
3888
3889            // Update per-node empty layer IDs via BPF prog.
3890            let nr_nodes = self.topo.nodes.len();
3891            let node_empty_layers: Vec<Vec<u32>> = (0..nr_nodes)
3892                .map(|nid| {
3893                    self.layers
3894                        .iter()
3895                        .enumerate()
3896                        .filter(|(_lidx, layer)| layer.nr_node_cpus[nid] == 0)
3897                        .map(|(lidx, _)| lidx as u32)
3898                        .collect()
3899                })
3900                .collect();
3901            Self::refresh_node_ctx(&mut self.skel, &self.topo, &node_empty_layers, false);
3902        }
3903
3904        if let Err(e) = self.update_netdev_cpumasks() {
3905            warn!("Failed to update netdev IRQ cpumasks: {:#}", e);
3906        }
3907        Ok(())
3908    }
3909
3910    fn refresh_idle_qos(&mut self) -> Result<()> {
3911        if !self.idle_qos_enabled {
3912            return Ok(());
3913        }
3914
3915        let mut cpu_idle_qos = vec![0; *NR_CPU_IDS];
3916        for layer in self.layers.iter() {
3917            let idle_resume_us = layer.kind.common().idle_resume_us.unwrap_or(0) as i32;
3918            for cpu in layer.cpus.iter() {
3919                cpu_idle_qos[cpu] = idle_resume_us;
3920            }
3921        }
3922
3923        for (cpu, idle_resume_usec) in cpu_idle_qos.iter().enumerate() {
3924            update_cpu_idle_resume_latency(cpu, *idle_resume_usec)?;
3925        }
3926
3927        Ok(())
3928    }
3929
3930    fn refresh_xnuma(&mut self) {
3931        let nr_nodes = self.topo.nodes.len();
3932        if nr_nodes <= 1 {
3933            return;
3934        }
3935
3936        let duty_sums = &self.sched_stats.layer_node_duty_sums;
3937
3938        for (layer_idx, spec) in self.layer_specs.iter().enumerate() {
3939            let common = spec.kind.common();
3940            let threshold = common.xnuma_threshold;
3941            let threshold_delta = common.xnuma_threshold_delta;
3942            let bpf_layer = &mut self.skel.maps.bss_data.as_mut().unwrap().layers[layer_idx];
3943
3944            if threshold.0 <= 0.0 && threshold.1 <= 0.0 {
3945                // Off — all open, infinite budget
3946                for src in 0..nr_nodes {
3947                    bpf_layer.node[src].xnuma_is_mig_src.write(true);
3948                    for dst in 0..nr_nodes {
3949                        bpf_layer.node[src].xnuma[dst].rate = u64::MAX;
3950                    }
3951                }
3952                self.xnuma_mig_src[layer_idx].fill(false);
3953                continue;
3954            }
3955
3956            let layer = &self.layers[layer_idx];
3957            let is_mig_src = xnuma_check_active(
3958                &duty_sums[layer_idx],
3959                &layer.nr_node_cpus,
3960                threshold,
3961                threshold_delta,
3962                &self.growth_denied[layer_idx],
3963                &self.xnuma_mig_src[layer_idx],
3964            );
3965
3966            self.xnuma_mig_src[layer_idx] = is_mig_src.clone();
3967
3968            let result = xnuma_compute_rates(&duty_sums[layer_idx], &layer.nr_node_cpus);
3969
3970            // Write rates before is_mig_src so BPF sees valid rates
3971            // when the gate activates.
3972            for src in 0..nr_nodes {
3973                for dst in 0..nr_nodes {
3974                    bpf_layer.node[src].xnuma[dst].rate = result.rates[src][dst];
3975                }
3976            }
3977            for (nid, is_src) in is_mig_src.iter().enumerate().take(nr_nodes) {
3978                bpf_layer.node[nid].xnuma_is_mig_src.write(*is_src);
3979            }
3980        }
3981    }
3982
3983    fn step(&mut self) -> Result<()> {
3984        let started_at = Instant::now();
3985        self.sched_stats.refresh(
3986            &mut self.skel,
3987            &self.proc_reader,
3988            started_at,
3989            self.processing_dur,
3990            &self.gpu_task_handler,
3991        )?;
3992
3993        // Update BPF with EWMA values
3994        self.skel
3995            .maps
3996            .bss_data
3997            .as_mut()
3998            .unwrap()
3999            .system_cpu_util_ewma = (self.sched_stats.system_cpu_util_ewma * 10000.0) as u64;
4000
4001        for layer_id in 0..self.sched_stats.nr_layers {
4002            self.skel
4003                .maps
4004                .bss_data
4005                .as_mut()
4006                .unwrap()
4007                .layer_dsq_insert_ewma[layer_id] =
4008                (self.sched_stats.layer_dsq_insert_ewma[layer_id] * 10000.0) as u64;
4009        }
4010
4011        self.refresh_cpumasks()?;
4012        self.refresh_xnuma();
4013        self.refresh_idle_qos()?;
4014        self.gpu_task_handler.maybe_affinitize();
4015        self.processing_dur += Instant::now().duration_since(started_at);
4016        Ok(())
4017    }
4018
4019    fn generate_sys_stats(
4020        &mut self,
4021        stats: &Stats,
4022        cpus_ranges: &mut [(usize, usize)],
4023    ) -> Result<SysStats> {
4024        let bstats = &stats.bpf_stats;
4025        let mut sys_stats = SysStats::new(stats, bstats, &self.cpu_pool.fallback_cpus)?;
4026
4027        for (lidx, (spec, layer)) in self.layer_specs.iter().zip(self.layers.iter()).enumerate() {
4028            let layer_stats = LayerStats::new(
4029                lidx,
4030                layer,
4031                stats,
4032                bstats,
4033                cpus_ranges[lidx],
4034                self.xnuma_mig_src[lidx].iter().any(|&a| a),
4035            );
4036            sys_stats.layers.insert(spec.name.to_string(), layer_stats);
4037            cpus_ranges[lidx] = (layer.nr_cpus, layer.nr_cpus);
4038        }
4039
4040        Ok(sys_stats)
4041    }
4042
4043    // Helper function to process a cgroup creation (common logic for walkdir and inotify)
4044    fn process_cgroup_creation(
4045        path: &Path,
4046        cgroup_regexes: &HashMap<u32, Regex>,
4047        cgroup_path_to_id: &mut HashMap<String, u64>,
4048        sender: &crossbeam::channel::Sender<CgroupEvent>,
4049    ) {
4050        let path_str = path.to_string_lossy().to_string();
4051
4052        // Get cgroup ID (inode number)
4053        let cgroup_id = std::fs::metadata(path)
4054            .map(|metadata| {
4055                use std::os::unix::fs::MetadataExt;
4056                metadata.ino()
4057            })
4058            .unwrap_or(0);
4059
4060        // Build match bitmap by testing against CgroupRegex rules
4061        let mut match_bitmap = 0u64;
4062        for (rule_id, regex) in cgroup_regexes {
4063            if regex.is_match(&path_str) {
4064                match_bitmap |= 1u64 << rule_id;
4065            }
4066        }
4067
4068        // Store in hash
4069        cgroup_path_to_id.insert(path_str.clone(), cgroup_id);
4070
4071        // Send event
4072        if let Err(e) = sender.send(CgroupEvent::Created {
4073            path: path_str,
4074            cgroup_id,
4075            match_bitmap,
4076        }) {
4077            error!("Failed to send cgroup creation event: {}", e);
4078        }
4079    }
4080
4081    fn start_cgroup_watcher(
4082        shutdown: Arc<AtomicBool>,
4083        cgroup_regexes: HashMap<u32, Regex>,
4084    ) -> Result<Receiver<CgroupEvent>> {
4085        let mut inotify = Inotify::init().context("Failed to initialize inotify")?;
4086        let mut wd_to_path = HashMap::new();
4087
4088        // Create crossbeam channel for cgroup events (bounded to prevent memory issues)
4089        let (sender, receiver) = crossbeam::channel::bounded::<CgroupEvent>(1024);
4090
4091        // Watch for directory creation and deletion events
4092        let root_wd = inotify
4093            .watches()
4094            .add("/sys/fs/cgroup", WatchMask::CREATE | WatchMask::DELETE)
4095            .context("Failed to add watch for /sys/fs/cgroup")?;
4096        wd_to_path.insert(root_wd, PathBuf::from("/sys/fs/cgroup"));
4097
4098        // Also recursively watch existing directories for new subdirectories
4099        Self::add_recursive_watches(&mut inotify, &mut wd_to_path, Path::new("/sys/fs/cgroup"))?;
4100
4101        // Spawn watcher thread
4102        std::thread::spawn(move || {
4103            let mut buffer = [0; 4096];
4104            let inotify_fd = inotify.as_raw_fd();
4105            // Maintain hash of cgroup path -> cgroup ID (inode number)
4106            let mut cgroup_path_to_id = HashMap::<String, u64>::new();
4107
4108            // Populate existing cgroups
4109            for entry in WalkDir::new("/sys/fs/cgroup")
4110                .into_iter()
4111                .filter_map(|e| e.ok())
4112                .filter(|e| e.file_type().is_dir())
4113            {
4114                let path = entry.path();
4115                Self::process_cgroup_creation(
4116                    path,
4117                    &cgroup_regexes,
4118                    &mut cgroup_path_to_id,
4119                    &sender,
4120                );
4121            }
4122
4123            while !shutdown.load(Ordering::Relaxed) {
4124                // Use select to wait for events with a 100ms timeout
4125                let ready = unsafe {
4126                    let mut read_fds: libc::fd_set = std::mem::zeroed();
4127                    libc::FD_ZERO(&mut read_fds);
4128                    libc::FD_SET(inotify_fd, &mut read_fds);
4129
4130                    let mut timeout = libc::timeval {
4131                        tv_sec: 0,
4132                        tv_usec: 100_000, // 100ms
4133                    };
4134
4135                    libc::select(
4136                        inotify_fd + 1,
4137                        &mut read_fds,
4138                        std::ptr::null_mut(),
4139                        std::ptr::null_mut(),
4140                        &mut timeout,
4141                    )
4142                };
4143
4144                if ready <= 0 {
4145                    // Timeout or error, continue loop to check shutdown
4146                    continue;
4147                }
4148
4149                // Read events non-blocking
4150                let events = match inotify.read_events(&mut buffer) {
4151                    Ok(events) => events,
4152                    Err(e) => {
4153                        error!("Error reading inotify events: {}", e);
4154                        break;
4155                    }
4156                };
4157
4158                for event in events {
4159                    if !event.mask.contains(inotify::EventMask::CREATE)
4160                        && !event.mask.contains(inotify::EventMask::DELETE)
4161                    {
4162                        continue;
4163                    }
4164
4165                    let name = match event.name {
4166                        Some(name) => name,
4167                        None => continue,
4168                    };
4169
4170                    let parent_path = match wd_to_path.get(&event.wd) {
4171                        Some(parent) => parent,
4172                        None => {
4173                            warn!("Unknown watch descriptor: {:?}", event.wd);
4174                            continue;
4175                        }
4176                    };
4177
4178                    let path = parent_path.join(name.to_string_lossy().as_ref());
4179
4180                    if event.mask.contains(inotify::EventMask::CREATE) {
4181                        if !path.is_dir() {
4182                            continue;
4183                        }
4184
4185                        Self::process_cgroup_creation(
4186                            &path,
4187                            &cgroup_regexes,
4188                            &mut cgroup_path_to_id,
4189                            &sender,
4190                        );
4191
4192                        // Add watch for this new directory
4193                        match inotify
4194                            .watches()
4195                            .add(&path, WatchMask::CREATE | WatchMask::DELETE)
4196                        {
4197                            Ok(wd) => {
4198                                wd_to_path.insert(wd, path.clone());
4199                            }
4200                            Err(e) => {
4201                                warn!(
4202                                    "Failed to add watch for new cgroup {}: {}",
4203                                    path.display(),
4204                                    e
4205                                );
4206                            }
4207                        }
4208                    } else if event.mask.contains(inotify::EventMask::DELETE) {
4209                        let path_str = path.to_string_lossy().to_string();
4210
4211                        // Get cgroup ID from our hash (since the directory is gone, we can't stat it)
4212                        let cgroup_id = cgroup_path_to_id.remove(&path_str).unwrap_or(0);
4213
4214                        // Send removal event to main thread
4215                        if let Err(e) = sender.send(CgroupEvent::Removed {
4216                            path: path_str,
4217                            cgroup_id,
4218                        }) {
4219                            error!("Failed to send cgroup removal event: {}", e);
4220                        }
4221
4222                        // Find and remove the watch descriptor for this path
4223                        let wd_to_remove = wd_to_path.iter().find_map(|(wd, watched_path)| {
4224                            if watched_path == &path {
4225                                Some(wd.clone())
4226                            } else {
4227                                None
4228                            }
4229                        });
4230                        if let Some(wd) = wd_to_remove {
4231                            wd_to_path.remove(&wd);
4232                        }
4233                    }
4234                }
4235            }
4236        });
4237
4238        Ok(receiver)
4239    }
4240
4241    fn add_recursive_watches(
4242        inotify: &mut Inotify,
4243        wd_to_path: &mut HashMap<inotify::WatchDescriptor, PathBuf>,
4244        path: &Path,
4245    ) -> Result<()> {
4246        for entry in WalkDir::new(path)
4247            .into_iter()
4248            .filter_map(|e| e.ok())
4249            .filter(|e| e.file_type().is_dir())
4250            .skip(1)
4251        {
4252            let entry_path = entry.path();
4253            // Add watch for this directory
4254            match inotify
4255                .watches()
4256                .add(entry_path, WatchMask::CREATE | WatchMask::DELETE)
4257            {
4258                Ok(wd) => {
4259                    wd_to_path.insert(wd, entry_path.to_path_buf());
4260                }
4261                Err(e) => {
4262                    debug!("Failed to add watch for {}: {}", entry_path.display(), e);
4263                }
4264            }
4265        }
4266        Ok(())
4267    }
4268
4269    fn run(&mut self, shutdown: Arc<AtomicBool>) -> Result<UserExitInfo> {
4270        let (res_ch, req_ch) = self.stats_server.channels();
4271        let mut next_sched_at = Instant::now() + self.sched_intv;
4272        let enable_layer_refresh = !self.layer_refresh_intv.is_zero();
4273        let mut next_layer_refresh_at = Instant::now() + self.layer_refresh_intv;
4274        let mut cpus_ranges = HashMap::<ThreadId, Vec<(usize, usize)>>::new();
4275
4276        // Start the cgroup watcher only if there are CgroupRegex rules
4277        let cgroup_regexes = self.cgroup_regexes.take().unwrap();
4278        let cgroup_event_rx = if !cgroup_regexes.is_empty() {
4279            Some(Self::start_cgroup_watcher(
4280                shutdown.clone(),
4281                cgroup_regexes,
4282            )?)
4283        } else {
4284            None
4285        };
4286
4287        while !shutdown.load(Ordering::Relaxed) && !uei_exited!(&self.skel, uei) {
4288            let now = Instant::now();
4289
4290            if now >= next_sched_at {
4291                self.step()?;
4292                while next_sched_at < now {
4293                    next_sched_at += self.sched_intv;
4294                }
4295            }
4296
4297            if enable_layer_refresh && now >= next_layer_refresh_at {
4298                self.skel
4299                    .maps
4300                    .bss_data
4301                    .as_mut()
4302                    .unwrap()
4303                    .layer_refresh_seq_avgruntime += 1;
4304                while next_layer_refresh_at < now {
4305                    next_layer_refresh_at += self.layer_refresh_intv;
4306                }
4307            }
4308
4309            // Handle both stats requests and cgroup events with timeout
4310            let timeout_duration = next_sched_at.saturating_duration_since(Instant::now());
4311            let never_rx = crossbeam::channel::never();
4312            let cgroup_rx = cgroup_event_rx.as_ref().unwrap_or(&never_rx);
4313
4314            select! {
4315                recv(req_ch) -> msg => match msg {
4316                    Ok(StatsReq::Hello(tid)) => {
4317                        cpus_ranges.insert(
4318                            tid,
4319                            self.layers.iter().map(|l| (l.nr_cpus, l.nr_cpus)).collect(),
4320                        );
4321                        let stats =
4322                            Stats::new(&mut self.skel, &self.proc_reader, self.topo.clone(), &self.gpu_task_handler)?;
4323                        res_ch.send(StatsRes::Hello(Box::new(stats)))?;
4324                    }
4325                    Ok(StatsReq::Refresh(tid, mut stats)) => {
4326                        // Propagate self's layer cpu ranges into each stat's.
4327                        for i in 0..self.nr_layer_cpus_ranges.len() {
4328                            for (_, ranges) in cpus_ranges.iter_mut() {
4329                                ranges[i] = (
4330                                    ranges[i].0.min(self.nr_layer_cpus_ranges[i].0),
4331                                    ranges[i].1.max(self.nr_layer_cpus_ranges[i].1),
4332                                );
4333                            }
4334                            self.nr_layer_cpus_ranges[i] =
4335                                (self.layers[i].nr_cpus, self.layers[i].nr_cpus);
4336                        }
4337
4338                        stats.refresh(
4339                            &mut self.skel,
4340                            &self.proc_reader,
4341                            now,
4342                            self.processing_dur,
4343                            &self.gpu_task_handler,
4344                        )?;
4345                        let sys_stats =
4346                            self.generate_sys_stats(&stats, cpus_ranges.get_mut(&tid).unwrap())?;
4347                        res_ch.send(StatsRes::Refreshed(Box::new((*stats, sys_stats))))?;
4348                    }
4349                    Ok(StatsReq::Bye(tid)) => {
4350                        cpus_ranges.remove(&tid);
4351                        res_ch.send(StatsRes::Bye)?;
4352                    }
4353                    Err(e) => Err(e)?,
4354                },
4355
4356                recv(cgroup_rx) -> event => match event {
4357                    Ok(CgroupEvent::Created { path, cgroup_id, match_bitmap }) => {
4358                        // Insert into BPF map
4359                        self.skel.maps.cgroup_match_bitmap.update(
4360                            &cgroup_id.to_ne_bytes(),
4361                            &match_bitmap.to_ne_bytes(),
4362                            libbpf_rs::MapFlags::ANY,
4363                        ).with_context(|| format!(
4364                            "Failed to insert cgroup {}({}) into BPF map. Cgroup map may be full \
4365                             (max 16384 entries). Aborting.",
4366                            cgroup_id, path
4367                        ))?;
4368
4369                        debug!("Added cgroup {} to BPF map with bitmap 0x{:x}", cgroup_id, match_bitmap);
4370                    }
4371                    Ok(CgroupEvent::Removed { path, cgroup_id }) => {
4372                        // Delete from BPF map
4373                        if let Err(e) = self.skel.maps.cgroup_match_bitmap.delete(&cgroup_id.to_ne_bytes()) {
4374                            warn!("Failed to delete cgroup {} from BPF map: {}", cgroup_id, e);
4375                        } else {
4376                            debug!("Removed cgroup {}({}) from BPF map", cgroup_id, path);
4377                        }
4378                    }
4379                    Err(e) => {
4380                        error!("Error receiving cgroup event: {}", e);
4381                    }
4382                },
4383
4384                recv(crossbeam::channel::after(timeout_duration)) -> _ => {
4385                    // Timeout - continue main loop
4386                }
4387            }
4388        }
4389
4390        let _ = self.struct_ops.take();
4391        uei_report!(&self.skel, uei)
4392    }
4393}
4394
4395impl Drop for Scheduler<'_> {
4396    fn drop(&mut self) {
4397        info!("Unregister {SCHEDULER_NAME} scheduler");
4398
4399        if !self.netdevs.is_empty() {
4400            for (iface, netdev) in &self.netdevs {
4401                if let Err(e) = netdev.restore_cpumasks() {
4402                    warn!("Failed to restore {iface} IRQ affinity: {e}");
4403                }
4404            }
4405            info!("Restored original netdev IRQ affinity");
4406        }
4407
4408        if let Some(struct_ops) = self.struct_ops.take() {
4409            drop(struct_ops);
4410        }
4411    }
4412}
4413
4414fn write_example_file(path: &str) -> Result<()> {
4415    let mut f = fs::OpenOptions::new()
4416        .create_new(true)
4417        .write(true)
4418        .open(path)?;
4419    Ok(f.write_all(serde_json::to_string_pretty(&*EXAMPLE_CONFIG)?.as_bytes())?)
4420}
4421
4422struct HintLayerInfo {
4423    layer_id: usize,
4424    system_cpu_util_below: Option<f64>,
4425    dsq_insert_below: Option<f64>,
4426}
4427
4428fn verify_layer_specs(specs: &[LayerSpec]) -> Result<HashMap<u64, HintLayerInfo>> {
4429    let mut hint_to_layer_map = HashMap::<u64, (usize, String, Option<f64>, Option<f64>)>::new();
4430
4431    let nr_specs = specs.len();
4432    if nr_specs == 0 {
4433        bail!("No layer spec");
4434    }
4435    if nr_specs > MAX_LAYERS {
4436        bail!("Too many layer specs");
4437    }
4438
4439    for (idx, spec) in specs.iter().enumerate() {
4440        if idx < nr_specs - 1 {
4441            if spec.matches.is_empty() {
4442                bail!("Non-terminal spec {:?} has NULL matches", spec.name);
4443            }
4444        } else if spec.matches.len() != 1 || !spec.matches[0].is_empty() {
4445            bail!("Terminal spec {:?} must have an empty match", spec.name);
4446        }
4447
4448        if spec.matches.len() > MAX_LAYER_MATCH_ORS {
4449            bail!(
4450                "Spec {:?} has too many ({}) OR match blocks",
4451                spec.name,
4452                spec.matches.len()
4453            );
4454        }
4455
4456        for (ands_idx, ands) in spec.matches.iter().enumerate() {
4457            if ands.len() > NR_LAYER_MATCH_KINDS {
4458                bail!(
4459                    "Spec {:?}'s {}th OR block has too many ({}) match conditions",
4460                    spec.name,
4461                    ands_idx,
4462                    ands.len()
4463                );
4464            }
4465            let mut hint_equals_cnt = 0;
4466            let mut system_cpu_util_below_cnt = 0;
4467            let mut dsq_insert_below_cnt = 0;
4468            let mut hint_value: Option<u64> = None;
4469            let mut system_cpu_util_threshold: Option<f64> = None;
4470            let mut dsq_insert_threshold: Option<f64> = None;
4471            for one in ands.iter() {
4472                match one {
4473                    LayerMatch::CgroupPrefix(prefix) => {
4474                        if prefix.len() > MAX_PATH {
4475                            bail!("Spec {:?} has too long a cgroup prefix", spec.name);
4476                        }
4477                    }
4478                    LayerMatch::CgroupSuffix(suffix) => {
4479                        if suffix.len() > MAX_PATH {
4480                            bail!("Spec {:?} has too long a cgroup suffix", spec.name);
4481                        }
4482                    }
4483                    LayerMatch::CgroupContains(substr) => {
4484                        if substr.len() > MAX_PATH {
4485                            bail!("Spec {:?} has too long a cgroup substr", spec.name);
4486                        }
4487                    }
4488                    LayerMatch::CommPrefix(prefix) => {
4489                        if prefix.len() > MAX_COMM {
4490                            bail!("Spec {:?} has too long a comm prefix", spec.name);
4491                        }
4492                    }
4493                    LayerMatch::PcommPrefix(prefix) => {
4494                        if prefix.len() > MAX_COMM {
4495                            bail!("Spec {:?} has too long a process name prefix", spec.name);
4496                        }
4497                    }
4498                    LayerMatch::SystemCpuUtilBelow(threshold) => {
4499                        if *threshold < 0.0 || *threshold > 1.0 {
4500                            bail!(
4501                                "Spec {:?} has SystemCpuUtilBelow threshold outside the range [0.0, 1.0]",
4502                                spec.name
4503                            );
4504                        }
4505                        system_cpu_util_threshold = Some(*threshold);
4506                        system_cpu_util_below_cnt += 1;
4507                    }
4508                    LayerMatch::DsqInsertBelow(threshold) => {
4509                        if *threshold < 0.0 || *threshold > 1.0 {
4510                            bail!(
4511                                "Spec {:?} has DsqInsertBelow threshold outside the range [0.0, 1.0]",
4512                                spec.name
4513                            );
4514                        }
4515                        dsq_insert_threshold = Some(*threshold);
4516                        dsq_insert_below_cnt += 1;
4517                    }
4518                    LayerMatch::HintEquals(hint) => {
4519                        if *hint > 1024 {
4520                            bail!(
4521                                "Spec {:?} has hint value outside the range [0, 1024]",
4522                                spec.name
4523                            );
4524                        }
4525                        hint_value = Some(*hint);
4526                        hint_equals_cnt += 1;
4527                    }
4528                    _ => {}
4529                }
4530            }
4531            if hint_equals_cnt > 1 {
4532                bail!("Only 1 HintEquals match permitted per AND block");
4533            }
4534            let high_freq_matcher_cnt = system_cpu_util_below_cnt + dsq_insert_below_cnt;
4535            if high_freq_matcher_cnt > 0 {
4536                if hint_equals_cnt != 1 {
4537                    bail!("High-frequency matchers (SystemCpuUtilBelow, DsqInsertBelow) must be used with one HintEquals");
4538                }
4539                if system_cpu_util_below_cnt > 1 {
4540                    bail!("Only 1 SystemCpuUtilBelow match permitted per AND block");
4541                }
4542                if dsq_insert_below_cnt > 1 {
4543                    bail!("Only 1 DsqInsertBelow match permitted per AND block");
4544                }
4545                if ands.len() != hint_equals_cnt + system_cpu_util_below_cnt + dsq_insert_below_cnt
4546                {
4547                    bail!("High-frequency matchers must be used only with HintEquals (no other matchers)");
4548                }
4549            } else if hint_equals_cnt == 1 && ands.len() != 1 {
4550                bail!("HintEquals match cannot be in conjunction with other matches");
4551            }
4552
4553            // Insert hint into map if present
4554            if let Some(hint) = hint_value {
4555                if let Some((layer_id, name, _, _)) = hint_to_layer_map.get(&hint) {
4556                    if *layer_id != idx {
4557                        bail!(
4558                            "Spec {:?} has hint value ({}) that is already mapped to Spec {:?}",
4559                            spec.name,
4560                            hint,
4561                            name
4562                        );
4563                    }
4564                } else {
4565                    hint_to_layer_map.insert(
4566                        hint,
4567                        (
4568                            idx,
4569                            spec.name.clone(),
4570                            system_cpu_util_threshold,
4571                            dsq_insert_threshold,
4572                        ),
4573                    );
4574                }
4575            }
4576        }
4577
4578        match spec.kind {
4579            LayerKind::Confined {
4580                cpus_range,
4581                util_range,
4582                ..
4583            }
4584            | LayerKind::Grouped {
4585                cpus_range,
4586                util_range,
4587                ..
4588            } => {
4589                if let Some((cpus_min, cpus_max)) = cpus_range {
4590                    if cpus_min > cpus_max {
4591                        bail!(
4592                            "Spec {:?} has invalid cpus_range({}, {})",
4593                            spec.name,
4594                            cpus_min,
4595                            cpus_max
4596                        );
4597                    }
4598                }
4599                if util_range.0 >= util_range.1 {
4600                    bail!(
4601                        "Spec {:?} has invalid util_range ({}, {})",
4602                        spec.name,
4603                        util_range.0,
4604                        util_range.1
4605                    );
4606                }
4607            }
4608            _ => {}
4609        }
4610    }
4611
4612    Ok(hint_to_layer_map
4613        .into_iter()
4614        .map(|(k, v)| {
4615            (
4616                k,
4617                HintLayerInfo {
4618                    layer_id: v.0,
4619                    system_cpu_util_below: v.2,
4620                    dsq_insert_below: v.3,
4621                },
4622            )
4623        })
4624        .collect())
4625}
4626
4627fn name_suffix(cgroup: &str, len: usize) -> String {
4628    let suffixlen = std::cmp::min(len, cgroup.len());
4629    let suffixrev: String = cgroup.chars().rev().take(suffixlen).collect();
4630
4631    suffixrev.chars().rev().collect()
4632}
4633
4634fn traverse_sysfs(dir: &Path) -> Result<Vec<PathBuf>> {
4635    let mut paths = vec![];
4636
4637    if !dir.is_dir() {
4638        panic!("path {:?} does not correspond to directory", dir);
4639    }
4640
4641    let direntries = fs::read_dir(dir)?;
4642
4643    for entry in direntries {
4644        let path = entry?.path();
4645        if path.is_dir() {
4646            paths.append(&mut traverse_sysfs(&path)?);
4647            paths.push(path);
4648        }
4649    }
4650
4651    Ok(paths)
4652}
4653
4654fn find_cpumask(cgroup: &str) -> Cpumask {
4655    let mut path = String::from(cgroup);
4656    path.push_str("/cpuset.cpus.effective");
4657
4658    let description = fs::read_to_string(&mut path).unwrap();
4659
4660    Cpumask::from_cpulist(&description).unwrap()
4661}
4662
4663fn expand_template(rule: &LayerMatch) -> Result<Vec<(LayerMatch, Cpumask)>> {
4664    match rule {
4665        LayerMatch::CgroupSuffix(suffix) => Ok(traverse_sysfs(Path::new("/sys/fs/cgroup"))?
4666            .into_iter()
4667            .map(|cgroup| String::from(cgroup.to_str().expect("could not parse cgroup path")))
4668            .filter(|cgroup| cgroup.ends_with(suffix))
4669            .map(|cgroup| {
4670                (
4671                    {
4672                        let mut slashterminated = cgroup.clone();
4673                        slashterminated.push('/');
4674                        LayerMatch::CgroupSuffix(name_suffix(&slashterminated, 64))
4675                    },
4676                    find_cpumask(&cgroup),
4677                )
4678            })
4679            .collect()),
4680        LayerMatch::CgroupRegex(expr) => Ok(traverse_sysfs(Path::new("/sys/fs/cgroup"))?
4681            .into_iter()
4682            .map(|cgroup| String::from(cgroup.to_str().expect("could not parse cgroup path")))
4683            .filter(|cgroup| {
4684                let re = Regex::new(expr).unwrap();
4685                re.is_match(cgroup)
4686            })
4687            .map(|cgroup| {
4688                (
4689                    // Here we convert the regex match into a suffix match because we still need to
4690                    // do the matching on the bpf side and doing a regex match in bpf isn't
4691                    // easily done.
4692                    {
4693                        let mut slashterminated = cgroup.clone();
4694                        slashterminated.push('/');
4695                        LayerMatch::CgroupSuffix(name_suffix(&slashterminated, 64))
4696                    },
4697                    find_cpumask(&cgroup),
4698                )
4699            })
4700            .collect()),
4701        _ => panic!("Unimplemented template enum {:?}", rule),
4702    }
4703}
4704
4705fn create_perf_fds(skel: &mut BpfSkel, event: u64) -> Result<()> {
4706    let mut attr = perf::bindings::perf_event_attr {
4707        size: std::mem::size_of::<perf::bindings::perf_event_attr>() as u32,
4708        type_: perf::bindings::PERF_TYPE_RAW,
4709        config: event,
4710        sample_type: 0u64,
4711        ..Default::default()
4712    };
4713    attr.__bindgen_anon_1.sample_period = 0u64;
4714    attr.set_disabled(0);
4715
4716    let perf_events_map = &skel.maps.scx_pmu_map;
4717    let map_fd = unsafe { libbpf_sys::bpf_map__fd(perf_events_map.as_libbpf_object().as_ptr()) };
4718
4719    let mut failures = 0u64;
4720
4721    for cpu in 0..*NR_CPUS_POSSIBLE {
4722        let fd = unsafe { perf::perf_event_open(&mut attr as *mut _, -1, cpu as i32, -1, 0) };
4723        if fd < 0 {
4724            failures += 1;
4725            trace!(
4726                "perf_event_open failed cpu={cpu} errno={}",
4727                std::io::Error::last_os_error()
4728            );
4729            continue;
4730        }
4731
4732        let key = cpu as u32;
4733        let val = fd as u32;
4734        let ret = unsafe {
4735            libbpf_sys::bpf_map_update_elem(
4736                map_fd,
4737                &key as *const _ as *const _,
4738                &val as *const _ as *const _,
4739                0,
4740            )
4741        };
4742        if ret != 0 {
4743            trace!("bpf_map_update_elem failed cpu={cpu} fd={fd} ret={ret}");
4744        } else {
4745            trace!("mapped cpu={cpu} -> fd={fd}");
4746        }
4747    }
4748
4749    if failures > 0 {
4750        println!("membw tracking: failed to install {failures} counters");
4751        // Keep going, do not fail the scheduler for this
4752    }
4753
4754    Ok(())
4755}
4756
4757// Set up the counters
4758fn setup_membw_tracking(skel: &mut OpenBpfSkel) -> Result<u64> {
4759    let pmumanager = PMUManager::new()?;
4760    let codename = &pmumanager.codename as &str;
4761
4762    let pmuspec = match codename {
4763        "amdzen1" | "amdzen2" | "amdzen3" => {
4764            trace!("found AMD codename {codename}");
4765            pmumanager.pmus.get("ls_any_fills_from_sys.mem_io_local")
4766        }
4767        "amdzen4" | "amdzen5" => {
4768            trace!("found AMD codename {codename}");
4769            pmumanager.pmus.get("ls_any_fills_from_sys.dram_io_all")
4770        }
4771
4772        "haswell" | "broadwell" | "broadwellde" | "broadwellx" | "skylake" | "skylakex"
4773        | "cascadelakex" | "arrowlake" | "meteorlake" | "sapphirerapids" | "emeraldrapids"
4774        | "graniterapids" => {
4775            trace!("found Intel codename {codename}");
4776            pmumanager.pmus.get("LONGEST_LAT_CACHE.MISS")
4777        }
4778
4779        _ => {
4780            trace!("found unknown codename {codename}");
4781            None
4782        }
4783    };
4784
4785    let spec = pmuspec.ok_or("not_found").unwrap();
4786    let config = (spec.umask << 8) | spec.event[0];
4787
4788    // Install the counter in the BPF map
4789    skel.maps.rodata_data.as_mut().unwrap().membw_event = config;
4790
4791    Ok(config)
4792}
4793
4794#[clap_main::clap_main]
4795fn main(opts: Opts) -> Result<()> {
4796    if opts.version {
4797        println!(
4798            "scx_layered {}",
4799            build_id::full_version(env!("CARGO_PKG_VERSION"))
4800        );
4801        return Ok(());
4802    }
4803
4804    if opts.help_stats {
4805        stats::server_data().describe_meta(&mut std::io::stdout(), None)?;
4806        return Ok(());
4807    }
4808
4809    let env_filter = EnvFilter::try_from_default_env()
4810        .or_else(|_| match EnvFilter::try_new(&opts.log_level) {
4811            Ok(filter) => Ok(filter),
4812            Err(e) => {
4813                eprintln!(
4814                    "invalid log envvar: {}, using info, err is: {}",
4815                    opts.log_level, e
4816                );
4817                EnvFilter::try_new("info")
4818            }
4819        })
4820        .unwrap_or_else(|_| EnvFilter::new("info"));
4821
4822    match tracing_subscriber::fmt()
4823        .with_env_filter(env_filter)
4824        .with_target(true)
4825        .with_thread_ids(true)
4826        .with_file(true)
4827        .with_line_number(true)
4828        .try_init()
4829    {
4830        Ok(()) => {}
4831        Err(e) => eprintln!("failed to init logger: {}", e),
4832    }
4833
4834    if opts.verbose > 0 {
4835        warn!("Setting verbose via -v is deprecated and will be an error in future releases.");
4836    }
4837
4838    if opts.no_load_frac_limit {
4839        warn!("--no-load-frac-limit is deprecated and noop");
4840    }
4841    if opts.layer_preempt_weight_disable != 0.0 {
4842        warn!("--layer-preempt-weight-disable is deprecated and noop");
4843    }
4844    if opts.layer_growth_weight_disable != 0.0 {
4845        warn!("--layer-growth-weight-disable is deprecated and noop");
4846    }
4847    if opts.local_llc_iteration {
4848        warn!("--local_llc_iteration is deprecated and noop");
4849    }
4850
4851    debug!("opts={:?}", &opts);
4852
4853    if let Some(run_id) = opts.run_id {
4854        info!("scx_layered run_id: {}", run_id);
4855    }
4856
4857    let shutdown = Arc::new(AtomicBool::new(false));
4858    let shutdown_clone = shutdown.clone();
4859    ctrlc::set_handler(move || {
4860        shutdown_clone.store(true, Ordering::Relaxed);
4861    })
4862    .context("Error setting Ctrl-C handler")?;
4863
4864    if let Some(intv) = opts.monitor.or(opts.stats) {
4865        let shutdown_copy = shutdown.clone();
4866        let stats_columns = opts.stats_columns;
4867        let stats_no_llc = opts.stats_no_llc;
4868        let jh = std::thread::spawn(move || {
4869            match stats::monitor(
4870                Duration::from_secs_f64(intv),
4871                shutdown_copy,
4872                stats_columns,
4873                stats_no_llc,
4874            ) {
4875                Ok(_) => {
4876                    debug!("stats monitor thread finished successfully")
4877                }
4878                Err(error_object) => {
4879                    warn!(
4880                        "stats monitor thread finished because of an error {}",
4881                        error_object
4882                    )
4883                }
4884            }
4885        });
4886        if opts.monitor.is_some() {
4887            let _ = jh.join();
4888            return Ok(());
4889        }
4890    }
4891
4892    if let Some(path) = &opts.example {
4893        write_example_file(path)?;
4894        return Ok(());
4895    }
4896
4897    let mut layer_config = match opts.run_example {
4898        true => EXAMPLE_CONFIG.clone(),
4899        false => LayerConfig { specs: vec![] },
4900    };
4901
4902    for (idx, input) in opts.specs.iter().enumerate() {
4903        let specs = LayerSpec::parse(input)
4904            .context(format!("Failed to parse specs[{}] ({:?})", idx, input))?;
4905
4906        for spec in specs {
4907            match spec.template {
4908                Some(ref rule) => {
4909                    let matches = expand_template(rule)?;
4910                    // in the absence of matching cgroups, have template layers
4911                    // behave as non-template layers do.
4912                    if matches.is_empty() {
4913                        layer_config.specs.push(spec);
4914                    } else {
4915                        for (mt, mask) in matches {
4916                            let mut genspec = spec.clone();
4917
4918                            genspec.cpuset = Some(mask);
4919
4920                            // Push the new "and" rule into each "or" term.
4921                            for orterm in &mut genspec.matches {
4922                                orterm.push(mt.clone());
4923                            }
4924
4925                            match &mt {
4926                                LayerMatch::CgroupSuffix(cgroup) => genspec.name.push_str(cgroup),
4927                                _ => bail!("Template match has unexpected type"),
4928                            }
4929
4930                            // Push the generated layer into the config
4931                            layer_config.specs.push(genspec);
4932                        }
4933                    }
4934                }
4935
4936                None => {
4937                    layer_config.specs.push(spec);
4938                }
4939            }
4940        }
4941    }
4942
4943    for spec in layer_config.specs.iter_mut() {
4944        let common = spec.kind.common_mut();
4945
4946        if common.slice_us == 0 {
4947            common.slice_us = opts.slice_us;
4948        }
4949
4950        if common.weight == 0 {
4951            common.weight = DEFAULT_LAYER_WEIGHT;
4952        }
4953        common.weight = common.weight.clamp(MIN_LAYER_WEIGHT, MAX_LAYER_WEIGHT);
4954
4955        if common.preempt {
4956            if common.disallow_open_after_us.is_some() {
4957                warn!(
4958                    "Preempt layer {} has non-null disallow_open_after_us, ignored",
4959                    &spec.name
4960                );
4961            }
4962            if common.disallow_preempt_after_us.is_some() {
4963                warn!(
4964                    "Preempt layer {} has non-null disallow_preempt_after_us, ignored",
4965                    &spec.name
4966                );
4967            }
4968            common.disallow_open_after_us = Some(u64::MAX);
4969            common.disallow_preempt_after_us = Some(u64::MAX);
4970        } else {
4971            if common.disallow_open_after_us.is_none() {
4972                common.disallow_open_after_us = Some(*DFL_DISALLOW_OPEN_AFTER_US);
4973            }
4974
4975            if common.disallow_preempt_after_us.is_none() {
4976                common.disallow_preempt_after_us = Some(*DFL_DISALLOW_PREEMPT_AFTER_US);
4977            }
4978        }
4979
4980        if common.idle_smt.is_some() {
4981            warn!("Layer {} has deprecated flag \"idle_smt\"", &spec.name);
4982        }
4983
4984        if common.allow_node_aligned.is_some() {
4985            warn!("Layer {} has deprecated flag \"allow_node_aligned\", node-aligned tasks are now always dispatched on layer DSQs", &spec.name);
4986        }
4987    }
4988
4989    let membw_required = layer_config.specs.iter().any(|spec| match spec.kind {
4990        LayerKind::Confined { membw_gb, .. } | LayerKind::Grouped { membw_gb, .. } => {
4991            membw_gb.is_some()
4992        }
4993        LayerKind::Open { .. } => false,
4994    });
4995
4996    if opts.print_and_exit {
4997        println!("specs={}", serde_json::to_string_pretty(&layer_config)?);
4998        return Ok(());
4999    }
5000
5001    debug!("specs={}", serde_json::to_string_pretty(&layer_config)?);
5002    let hint_to_layer_map = verify_layer_specs(&layer_config.specs)?;
5003
5004    let mut open_object = MaybeUninit::uninit();
5005    loop {
5006        let mut sched = Scheduler::init(
5007            &opts,
5008            &layer_config.specs,
5009            &mut open_object,
5010            &hint_to_layer_map,
5011            membw_required,
5012        )?;
5013        if !sched.run(shutdown.clone())?.should_restart() {
5014            break;
5015        }
5016    }
5017
5018    Ok(())
5019}
5020
5021#[cfg(test)]
5022mod xnuma_tests {
5023    use super::*;
5024
5025    // Default thresholds for tests
5026    const THRESH: (f64, f64) = (0.6, 0.7);
5027    const DELTA: (f64, f64) = (0.2, 0.3);
5028
5029    // =====================================================================
5030    // xnuma_check_active tests — per-(layer, node) two-threshold
5031    // =====================================================================
5032
5033    #[test]
5034    fn test_activation_below_threshold() {
5035        // Both nodes below threshold — both closed
5036        let duty = vec![40.0, 40.0];
5037        let allocs = vec![96, 96];
5038        let gd = vec![true, true];
5039        let cur = vec![false, false];
5040        let result = xnuma_check_active(&duty, &allocs, THRESH, DELTA, &gd, &cur);
5041        // ratio = 0.42 < 0.6 (low) → deactivate
5042        assert!(!result[0]);
5043        assert!(!result[1]);
5044    }
5045
5046    #[test]
5047    fn test_activation_above_all_thresholds() {
5048        // N0 overloaded + imbalanced + growth denied → open
5049        let duty = vec![90.0, 20.0];
5050        let allocs = vec![96, 96];
5051        let gd = vec![true, false];
5052        let cur = vec![false, false];
5053        let result = xnuma_check_active(&duty, &allocs, THRESH, DELTA, &gd, &cur);
5054        // N0: load=0.94>0.7, eq=110/192=0.573, surplus=90-55=35, ratio=0.365>0.3, gd=true → open
5055        assert!(result[0]);
5056        // N1: load=0.21<0.6 → closed
5057        assert!(!result[1]);
5058    }
5059
5060    #[test]
5061    fn test_activation_requires_growth_denied() {
5062        // N0 overloaded + imbalanced but growth NOT denied → closed
5063        let duty = vec![90.0, 20.0];
5064        let allocs = vec![96, 96];
5065        let gd = vec![false, false]; // growth succeeded
5066        let cur = vec![false, false];
5067        let result = xnuma_check_active(&duty, &allocs, THRESH, DELTA, &gd, &cur);
5068        assert!(!result[0]); // !growth_denied → deactivate
5069    }
5070
5071    #[test]
5072    fn test_symmetric_high_load_stays_closed() {
5073        // Both nodes above threshold but balanced (delta=0) → closed
5074        let duty = vec![80.0, 80.0];
5075        let allocs = vec![96, 96];
5076        let gd = vec![true, true];
5077        let cur = vec![false, false];
5078        let result = xnuma_check_active(&duty, &allocs, THRESH, DELTA, &gd, &cur);
5079        // load=0.83>0.7 but surplus=0, surplus_ratio=0 < 0.3 → no activation
5080        assert!(!result[0]);
5081        assert!(!result[1]);
5082    }
5083
5084    #[test]
5085    fn test_hysteresis_stays_active() {
5086        // N0 was active, now between thresholds → stays active
5087        let duty = vec![75.0, 20.0];
5088        let allocs = vec![96, 96];
5089        let gd = vec![true, false];
5090        let cur = vec![true, false]; // N0 was active
5091        let result = xnuma_check_active(&duty, &allocs, THRESH, DELTA, &gd, &cur);
5092        // N0: load=0.78 > 0.6(lo) and < 0.7(hi), surplus=(75-20)/2=27.5, ratio=0.286 > 0.2(lo)
5093        // gd=true. activate: 0.78<0.7 NO. deactivate: 0.78>0.6 NO, 0.286>0.2 NO, gd=true NO
5094        // → hysteresis preserves true
5095        assert!(result[0]);
5096    }
5097
5098    #[test]
5099    fn test_hysteresis_stays_inactive() {
5100        // N0 was inactive, in hysteresis band → stays inactive
5101        let duty = vec![75.0, 20.0];
5102        let allocs = vec![96, 96];
5103        let gd = vec![true, false];
5104        let cur = vec![false, false]; // N0 was inactive
5105        let result = xnuma_check_active(&duty, &allocs, THRESH, DELTA, &gd, &cur);
5106        // Same conditions but currently inactive → stays inactive
5107        assert!(!result[0]);
5108    }
5109
5110    #[test]
5111    fn test_deactivation_load_drops() {
5112        // N0 was active, load drops below low → closes
5113        let duty = vec![50.0, 50.0];
5114        let allocs = vec![96, 96];
5115        let gd = vec![true, true];
5116        let cur = vec![true, false];
5117        let result = xnuma_check_active(&duty, &allocs, THRESH, DELTA, &gd, &cur);
5118        // N0: load=0.52 < 0.6(lo) → deactivate
5119        assert!(!result[0]);
5120    }
5121
5122    #[test]
5123    fn test_deactivation_growth_succeeds() {
5124        // N0 was active, growth now succeeds → closes
5125        let duty = vec![90.0, 20.0];
5126        let allocs = vec![96, 96];
5127        let gd = vec![false, false]; // growth succeeded
5128        let cur = vec![true, false];
5129        let result = xnuma_check_active(&duty, &allocs, THRESH, DELTA, &gd, &cur);
5130        // !growth_denied → deactivate regardless of load/delta
5131        assert!(!result[0]);
5132    }
5133
5134    #[test]
5135    fn test_zero_alloc_with_duty_and_growth_denied() {
5136        // Zero alloc but duty > 0 and growth denied → open
5137        let duty = vec![50.0, 0.0];
5138        let allocs = vec![0, 96];
5139        let gd = vec![true, false];
5140        let cur = vec![false, false];
5141        let result = xnuma_check_active(&duty, &allocs, THRESH, DELTA, &gd, &cur);
5142        assert!(result[0]); // zero alloc + duty + gd → source
5143    }
5144
5145    #[test]
5146    fn test_all_zero_alloc() {
5147        let duty = vec![0.0, 0.0];
5148        let allocs = vec![0, 0];
5149        let gd = vec![true, true];
5150        let cur = vec![true, true];
5151        let result = xnuma_check_active(&duty, &allocs, THRESH, DELTA, &gd, &cur);
5152        assert!(!result[0]);
5153        assert!(!result[1]);
5154    }
5155
5156    #[test]
5157    fn test_three_nodes_mixed() {
5158        // 3 nodes: N0 overloaded+imbalanced+gd, N1 moderate, N2 low
5159        let duty = vec![90.0, 40.0, 10.0];
5160        let allocs = vec![96, 96, 96];
5161        let gd = vec![true, true, false];
5162        let cur = vec![false, false, false];
5163        let result = xnuma_check_active(&duty, &allocs, THRESH, DELTA, &gd, &cur);
5164        // eq_ratio = 140/288 ≈ 0.486
5165        // N0: load=0.94>0.7, surplus=90-46.7=43.3, ratio=0.451>0.3, gd=true → open
5166        assert!(result[0]);
5167        // N1: load=0.42<0.6 → deactivate
5168        assert!(!result[1]);
5169        // N2: load=0.10<0.6 → deactivate
5170        assert!(!result[2]);
5171    }
5172
5173    #[test]
5174    fn test_per_node_independence() {
5175        // N0 active, N1 deactivates independently
5176        let duty = vec![90.0, 50.0];
5177        let allocs = vec![96, 96];
5178        let gd = vec![true, true];
5179        let cur = vec![true, true];
5180        let result = xnuma_check_active(&duty, &allocs, THRESH, DELTA, &gd, &cur);
5181        // N0: load=0.94>0.7, eq=140/192=0.729, surplus=90-70=20, ratio=0.208>0.2(lo)
5182        //   activate: 0.94>0.7 YES, 0.208>0.3 NO → no activate
5183        //   deactivate: 0.94>0.6 NO, 0.208>0.2 NO, gd=true NO → no deactivate → preserve true
5184        assert!(result[0]);
5185        // N1: load=0.52<0.6 → deactivate
5186        assert!(!result[1]);
5187    }
5188
5189    // =====================================================================
5190    // xnuma_compute_rates tests — basic
5191    // =====================================================================
5192
5193    #[test]
5194    fn test_rates_balanced_load() {
5195        // Equal load per CPU on both nodes — no migration needed
5196        let duty = vec![48.0, 48.0];
5197        let allocs = vec![96, 96];
5198        let result = xnuma_compute_rates(&duty, &allocs);
5199
5200        // No surplus/deficit → all rates zero
5201        for src in 0..2 {
5202            for dst in 0..2 {
5203                assert_eq!(result.rates[src][dst], 0);
5204            }
5205        }
5206    }
5207
5208    #[test]
5209    fn test_rates_one_overloaded() {
5210        // N0 has 80 CPUs of duty, N1 has 40. Both have 96 CPUs.
5211        // eq_ratio = 120/192 = 0.625
5212        // N0 expected = 0.625 * 96 = 60, surplus = 80 - 60 = 20
5213        // N1 expected = 0.625 * 96 = 60, deficit = 60 - 40 = 20
5214        let duty = vec![80.0, 40.0];
5215        let allocs = vec![96, 96];
5216        let result = xnuma_compute_rates(&duty, &allocs);
5217
5218        // rate[0][1] should be positive (migrate from N0 to N1)
5219        assert!(result.rates[0][1] > 0);
5220        // rate[1][0] should be 0 (N1 has no surplus)
5221        assert_eq!(result.rates[1][0], 0);
5222        // Self-rates always 0
5223        assert_eq!(result.rates[0][0], 0);
5224        assert_eq!(result.rates[1][1], 0);
5225
5226        // Verify the rate magnitude: migration = 20.0 * DAMPEN, scaled
5227        let expected_rate = (20.0 * XNUMA_RATE_DAMPEN * DUTY_CYCLE_SCALE) as u64;
5228        assert_eq!(result.rates[0][1], expected_rate);
5229    }
5230
5231    #[test]
5232    fn test_rates_asymmetric_allocation() {
5233        // N0 has 48 CPUs, N1 has 144 CPUs. Duty 48 each.
5234        // eq_ratio = 96/192 = 0.5
5235        // N0 expected = 0.5 * 48 = 24, surplus = 48 - 24 = 24
5236        // N1 expected = 0.5 * 144 = 72, deficit = 72 - 48 = 24
5237        let duty = vec![48.0, 48.0];
5238        let allocs = vec![48, 144];
5239        let result = xnuma_compute_rates(&duty, &allocs);
5240
5241        let expected_rate = (24.0 * XNUMA_RATE_DAMPEN * DUTY_CYCLE_SCALE) as u64;
5242        assert_eq!(result.rates[0][1], expected_rate);
5243        assert_eq!(result.rates[1][0], 0);
5244    }
5245
5246    // =====================================================================
5247    // xnuma_compute_rates tests — multi-node
5248    // =====================================================================
5249
5250    #[test]
5251    fn test_rates_three_nodes_one_source() {
5252        // N0 overloaded, N1 and N2 are deficit
5253        // allocs: 96 each, duty: N0=120, N1=30, N2=30
5254        // eq_ratio = 180/288 = 0.625
5255        // N0 expected = 60, surplus = 60
5256        // N1 expected = 60, deficit = 30
5257        // N2 expected = 60, deficit = 30
5258        let duty = vec![120.0, 30.0, 30.0];
5259        let allocs = vec![96, 96, 96];
5260        let result = xnuma_compute_rates(&duty, &allocs);
5261
5262        // Total deficit = 60. N1 gets 30/60 = 50%, N2 gets 30/60 = 50%
5263        // rate[0][1] = 60 * 30/60 * DAMPEN = 15
5264        // rate[0][2] = 60 * 30/60 * DAMPEN = 15
5265        let rate_01 = (30.0 * XNUMA_RATE_DAMPEN * DUTY_CYCLE_SCALE) as u64;
5266        let rate_02 = (30.0 * XNUMA_RATE_DAMPEN * DUTY_CYCLE_SCALE) as u64;
5267        assert_eq!(result.rates[0][1], rate_01);
5268        assert_eq!(result.rates[0][2], rate_02);
5269
5270        // No reverse flow
5271        assert_eq!(result.rates[1][0], 0);
5272        assert_eq!(result.rates[2][0], 0);
5273        assert_eq!(result.rates[1][2], 0);
5274        assert_eq!(result.rates[2][1], 0);
5275    }
5276
5277    #[test]
5278    fn test_rates_three_nodes_unequal_deficit() {
5279        // N0 overloaded. N1 slight deficit, N2 large deficit.
5280        // allocs: 96 each, duty: N0=120, N1=50, N2=10
5281        // eq_ratio = 180/288 = 0.625
5282        // N0: expected=60, surplus=60
5283        // N1: expected=60, deficit=10
5284        // N2: expected=60, deficit=50
5285        let duty = vec![120.0, 50.0, 10.0];
5286        let allocs = vec![96, 96, 96];
5287        let result = xnuma_compute_rates(&duty, &allocs);
5288
5289        // Total deficit = 60. N1 share = 10/60, N2 share = 50/60
5290        // rate[0][1] = 60 * 10/60 * DAMPEN = 5
5291        // rate[0][2] = 60 * 50/60 * DAMPEN = 25
5292        let rate_01 = (10.0 * XNUMA_RATE_DAMPEN * DUTY_CYCLE_SCALE) as u64;
5293        let rate_02 = (50.0 * XNUMA_RATE_DAMPEN * DUTY_CYCLE_SCALE) as u64;
5294        assert_eq!(result.rates[0][1], rate_01);
5295        assert_eq!(result.rates[0][2], rate_02);
5296    }
5297
5298    #[test]
5299    fn test_rates_two_sources_one_sink() {
5300        // N0 and N1 overloaded, N2 deficit
5301        // allocs: 96 each, duty: N0=80, N1=70, N2=30
5302        // total = 180, eq_ratio = 180/288 = 0.625
5303        // N0: expected=60, surplus=20
5304        // N1: expected=60, surplus=10
5305        // N2: expected=60, deficit=30
5306        let duty = vec![80.0, 70.0, 30.0];
5307        let allocs = vec![96, 96, 96];
5308        let result = xnuma_compute_rates(&duty, &allocs);
5309
5310        // Total deficit = 30. Only N2 is deficit, so deficit share = 1.0
5311        // rate[0][2] = 20 * 1.0 * DAMPEN = 10
5312        // rate[1][2] = 10 * 1.0 * DAMPEN = 5
5313        let rate_02 = (20.0 * XNUMA_RATE_DAMPEN * DUTY_CYCLE_SCALE) as u64;
5314        let rate_12 = (10.0 * XNUMA_RATE_DAMPEN * DUTY_CYCLE_SCALE) as u64;
5315        assert_eq!(result.rates[0][2], rate_02);
5316        assert_eq!(result.rates[1][2], rate_12);
5317
5318        // No flow between sources
5319        assert_eq!(result.rates[0][1], 0);
5320        assert_eq!(result.rates[1][0], 0);
5321    }
5322
5323    // =====================================================================
5324    // Conservation invariants
5325    // =====================================================================
5326
5327    #[test]
5328    fn test_conservation_per_source_outbound() {
5329        // Each source's total outbound should equal its surplus * DAMPEN (scaled).
5330        // Total outbound from src = surplus[src] * DAMPEN * DUTY_CYCLE_SCALE
5331        let duty = vec![100.0, 30.0, 50.0, 20.0];
5332        let allocs = vec![96, 96, 96, 96];
5333        let nr = 4;
5334        let result = xnuma_compute_rates(&duty, &allocs);
5335
5336        let total_duty: f64 = duty.iter().sum();
5337        let total_alloc: f64 = allocs.iter().map(|&a| a as f64).sum();
5338        let eq_ratio = total_duty / total_alloc;
5339
5340        for src in 0..nr {
5341            let expected = eq_ratio * allocs[src] as f64;
5342            let surplus = (duty[src] - expected).max(0.0);
5343            let total_outbound: u64 = (0..nr).map(|dst| result.rates[src][dst]).sum();
5344            let expected_rate = (surplus * XNUMA_RATE_DAMPEN * DUTY_CYCLE_SCALE) as u64;
5345            assert_eq!(
5346                total_outbound, expected_rate,
5347                "node {} outbound mismatch",
5348                src
5349            );
5350        }
5351    }
5352
5353    #[test]
5354    fn test_conservation_surplus_equals_deficit() {
5355        // Mathematical invariant: total surplus == total deficit in water-fill
5356        let duty = vec![100.0, 30.0, 50.0];
5357        let allocs = vec![96, 96, 96];
5358
5359        let total_duty: f64 = duty.iter().sum();
5360        let total_alloc: f64 = allocs.iter().map(|&a| a as f64).sum();
5361        let eq_ratio = total_duty / total_alloc;
5362
5363        let mut total_surplus = 0.0f64;
5364        let mut total_deficit = 0.0f64;
5365        for i in 0..3 {
5366            let expected = eq_ratio * allocs[i] as f64;
5367            let delta = duty[i] - expected;
5368            if delta > 0.0 {
5369                total_surplus += delta;
5370            } else {
5371                total_deficit += -delta;
5372            }
5373        }
5374        assert!((total_surplus - total_deficit).abs() < 1e-10);
5375    }
5376
5377    #[test]
5378    fn test_self_rates_always_zero() {
5379        let duty = vec![100.0, 30.0, 50.0];
5380        let allocs = vec![96, 96, 96];
5381        let result = xnuma_compute_rates(&duty, &allocs);
5382
5383        for nid in 0..3 {
5384            assert_eq!(result.rates[nid][nid], 0);
5385        }
5386    }
5387
5388    #[test]
5389    fn test_deficit_nodes_have_zero_outbound() {
5390        // Deficit nodes should have zero outbound rates
5391        let duty = vec![100.0, 20.0, 30.0, 50.0];
5392        let allocs = vec![96, 96, 96, 96];
5393        let result = xnuma_compute_rates(&duty, &allocs);
5394
5395        // eq_ratio = 200/384 ≈ 0.521. N0: surplus=50, N3: surplus=0
5396        // N1: deficit=30, N2: deficit=20
5397        // Deficit nodes (outbound sum == 0) should have all zero rates
5398        for nid in 0..4 {
5399            let outbound: u64 = (0..4).map(|dst| result.rates[nid][dst]).sum();
5400            if outbound == 0 {
5401                for dst in 0..4 {
5402                    assert_eq!(
5403                        result.rates[nid][dst], 0,
5404                        "deficit node {} has non-zero rate to {}",
5405                        nid, dst
5406                    );
5407                }
5408            }
5409        }
5410    }
5411
5412    // =====================================================================
5413    // Edge cases
5414    // =====================================================================
5415
5416    #[test]
5417    fn test_rates_zero_duty_everywhere() {
5418        let duty = vec![0.0, 0.0];
5419        let allocs = vec![96, 96];
5420        let result = xnuma_compute_rates(&duty, &allocs);
5421
5422        for src in 0..2 {
5423            for dst in 0..2 {
5424                assert_eq!(result.rates[src][dst], 0);
5425            }
5426        }
5427    }
5428
5429    #[test]
5430    fn test_rates_zero_alloc() {
5431        let duty = vec![50.0, 50.0];
5432        let allocs = vec![0, 0];
5433        let result = xnuma_compute_rates(&duty, &allocs);
5434
5435        // total_alloc = 0 → early return with all zeros
5436        for src in 0..2 {
5437            for dst in 0..2 {
5438                assert_eq!(result.rates[src][dst], 0);
5439            }
5440        }
5441    }
5442
5443    #[test]
5444    fn test_rates_all_load_one_node() {
5445        // All duty on N0, nothing on N1
5446        // allocs: 96 each, duty: N0=96, N1=0
5447        // eq_ratio = 96/192 = 0.5
5448        // N0: expected=48, surplus=48
5449        // N1: expected=48, deficit=48
5450        let duty = vec![96.0, 0.0];
5451        let allocs = vec![96, 96];
5452        let result = xnuma_compute_rates(&duty, &allocs);
5453
5454        let expected_rate = (48.0 * XNUMA_RATE_DAMPEN * DUTY_CYCLE_SCALE) as u64;
5455        assert_eq!(result.rates[0][1], expected_rate);
5456    }
5457
5458    #[test]
5459    fn test_rates_single_node() {
5460        // Single node — balanced by definition
5461        let duty = vec![96.0];
5462        let allocs = vec![96];
5463        let result = xnuma_compute_rates(&duty, &allocs);
5464
5465        assert_eq!(result.rates[0][0], 0);
5466    }
5467
5468    #[test]
5469    fn test_rates_one_node_zero_alloc() {
5470        // N0 has allocation, N1 has zero
5471        // eq_ratio = 80/96
5472        // N0: expected=80, surplus=0 → balanced
5473        // N1: expected=0, deficit=0 → zero alloc skipped effectively
5474        let duty = vec![80.0, 0.0];
5475        let allocs = vec![96, 0];
5476        let result = xnuma_compute_rates(&duty, &allocs);
5477
5478        // N0: surplus = 80 - (80/96 * 96) = 0
5479        // N1: deficit = (80/96 * 0) - 0 = 0
5480        // All zeros — nothing to migrate
5481        assert_eq!(result.rates[0][1], 0);
5482        assert_eq!(result.rates[1][0], 0);
5483    }
5484
5485    // =====================================================================
5486    // Rate magnitude and scaling
5487    // =====================================================================
5488
5489    #[test]
5490    fn test_rate_scaling() {
5491        // Verify rates are in DUTY_CYCLE_SCALE units
5492        let duty = vec![80.0, 40.0];
5493        let allocs = vec![96, 96];
5494        let result = xnuma_compute_rates(&duty, &allocs);
5495
5496        // surplus = 20, deficit = 20 → migration = 20 * DAMPEN = 10
5497        // rate = 10 * (1 << 20)
5498        let expected = (20.0 * XNUMA_RATE_DAMPEN * DUTY_CYCLE_SCALE) as u64;
5499        assert_eq!(result.rates[0][1], expected);
5500        assert_eq!(expected, 10 * (1 << 20));
5501    }
5502
5503    #[test]
5504    fn test_rates_tiny_imbalance() {
5505        // Very small imbalance — should still produce non-zero rate
5506        let duty = vec![48.001, 47.999];
5507        let allocs = vec![96, 96];
5508        let result = xnuma_compute_rates(&duty, &allocs);
5509
5510        // surplus ≈ 0.001, rate ≈ 0.001 * 2^20 ≈ 1048
5511        assert!(result.rates[0][1] > 0);
5512        assert!(result.rates[0][1] < (1 << 20)); // Less than 1.0 CPU worth
5513    }
5514
5515    #[test]
5516    fn test_rates_large_values() {
5517        // Large system: 8 nodes, 96 CPUs each
5518        let duty = vec![300.0, 100.0, 50.0, 50.0, 50.0, 50.0, 50.0, 50.0];
5519        let allocs = vec![96, 96, 96, 96, 96, 96, 96, 96];
5520        let result = xnuma_compute_rates(&duty, &allocs);
5521
5522        // eq_ratio = 700/768 ≈ 0.911. N0 surplus=212.5, N1 surplus=12.5
5523        // N2-N7 deficit=37.5 each. N0 and N1 have surplus.
5524        assert!(result.rates[0][2] > 0); // N0 → N2 (deficit node)
5525        assert!(result.rates[1][2] > 0); // N1 → N2 (N1 also has surplus)
5526
5527        // Verify conservation: per-source outbound ≈ surplus * DAMPEN (within
5528        // truncation tolerance — each `as u64` can lose up to 1 per cell)
5529        let total_duty: f64 = duty.iter().sum();
5530        let total_alloc: f64 = allocs.iter().map(|&a| a as f64).sum();
5531        let eq_ratio = total_duty / total_alloc;
5532        let nr = 8;
5533        for src in 0..nr {
5534            let surplus = (duty[src] - eq_ratio * allocs[src] as f64).max(0.0);
5535            let outbound: u64 = (0..nr).map(|dst| result.rates[src][dst]).sum();
5536            let expected = (surplus * XNUMA_RATE_DAMPEN * DUTY_CYCLE_SCALE) as u64;
5537            let tolerance = nr as u64; // up to 1 per destination cell
5538            assert!(
5539                outbound.abs_diff(expected) <= tolerance,
5540                "node {} outbound {} vs expected {}, diff {}",
5541                src,
5542                outbound,
5543                expected,
5544                outbound.abs_diff(expected)
5545            );
5546        }
5547    }
5548
5549    // =====================================================================
5550    // Hysteresis integration
5551    // =====================================================================
5552
5553    #[test]
5554    fn test_hysteresis_cycle() {
5555        let allocs = vec![96, 96];
5556        let gd = vec![true, false]; // N0 growth denied
5557
5558        // Start: all closed, low load
5559        let active =
5560            xnuma_check_active(&[40.0, 40.0], &allocs, THRESH, DELTA, &gd, &[false, false]);
5561        assert!(!active[0]); // load 0.42 < 0.6(lo) → deactivate
5562
5563        // N0 overloaded + imbalanced + gd → opens
5564        let active =
5565            xnuma_check_active(&[90.0, 20.0], &allocs, THRESH, DELTA, &gd, &[false, false]);
5566        assert!(active[0]); // load 0.94>0.7, surplus=35, ratio=0.365>0.3, gd=true
5567
5568        // Load decreases but in hysteresis band → stays open
5569        let active = xnuma_check_active(&[75.0, 20.0], &allocs, THRESH, DELTA, &gd, &[true, false]);
5570        assert!(active[0]); // load 0.78 between 0.6 and 0.7, surplus ok, gd → preserve
5571
5572        // Load drops below low threshold → closes
5573        let active = xnuma_check_active(&[50.0, 50.0], &allocs, THRESH, DELTA, &gd, &[true, false]);
5574        assert!(!active[0]); // load 0.52 < 0.6(lo) → deactivate
5575    }
5576
5577    #[test]
5578    fn test_hysteresis_growth_toggle() {
5579        // N0 was open, then growth succeeds → closes
5580        let allocs = vec![96, 96];
5581        let gd_denied = vec![true, false];
5582        let gd_ok = vec![false, false];
5583
5584        // Active with growth denied
5585        let active = xnuma_check_active(
5586            &[90.0, 20.0],
5587            &allocs,
5588            THRESH,
5589            DELTA,
5590            &gd_denied,
5591            &[false, false],
5592        );
5593        assert!(active[0]);
5594
5595        // Growth succeeds → !gd → deactivate
5596        let active = xnuma_check_active(
5597            &[90.0, 20.0],
5598            &allocs,
5599            THRESH,
5600            DELTA,
5601            &gd_ok,
5602            &[true, false],
5603        );
5604        assert!(!active[0]); // !growth_denied → deactivate
5605    }
5606
5607    // =====================================================================
5608    // Proportional distribution to multiple sinks
5609    // =====================================================================
5610
5611    #[test]
5612    fn test_proportional_sink_distribution() {
5613        // 4 nodes: N0 source, N1-N3 sinks with different deficits
5614        // allocs: 96 each, duty: N0=180, N1=20, N2=40, N3=0
5615        // total = 240, eq_ratio = 240/384 = 0.625
5616        // N0: expected=60, surplus=120
5617        // N1: expected=60, deficit=40
5618        // N2: expected=60, deficit=20
5619        // N3: expected=60, deficit=60
5620        // total_deficit = 120
5621        let duty = vec![180.0, 20.0, 40.0, 0.0];
5622        let allocs = vec![96, 96, 96, 96];
5623        let result = xnuma_compute_rates(&duty, &allocs);
5624
5625        // rate[0][1] = 120 * 40/120 * DAMPEN = 20
5626        // rate[0][2] = 120 * 20/120 * DAMPEN = 10
5627        // rate[0][3] = 120 * 60/120 * DAMPEN = 30
5628        assert_eq!(
5629            result.rates[0][1],
5630            (40.0 * XNUMA_RATE_DAMPEN * DUTY_CYCLE_SCALE) as u64
5631        );
5632        assert_eq!(
5633            result.rates[0][2],
5634            (20.0 * XNUMA_RATE_DAMPEN * DUTY_CYCLE_SCALE) as u64
5635        );
5636        assert_eq!(
5637            result.rates[0][3],
5638            (60.0 * XNUMA_RATE_DAMPEN * DUTY_CYCLE_SCALE) as u64
5639        );
5640
5641        // Total outbound from N0 = 20 + 10 + 30 = 60 (half of surplus, dampened)
5642        let total_from_n0: u64 = (0..4).map(|dst| result.rates[0][dst]).sum();
5643        assert_eq!(
5644            total_from_n0,
5645            (120.0 * XNUMA_RATE_DAMPEN * DUTY_CYCLE_SCALE) as u64
5646        );
5647    }
5648}