Skip to main content

scx_layered/
main.rs

1// Copyright (c) Meta Platforms, Inc. and affiliates.
2
3// This software may be used and distributed according to the terms of the
4// GNU General Public License version 2.
5mod bpf_skel;
6mod stats;
7
8use std::collections::BTreeMap;
9use std::collections::BTreeSet;
10use std::collections::HashMap;
11use std::collections::HashSet;
12use std::ffi::CString;
13use std::fs;
14use std::io::Write;
15use std::mem::MaybeUninit;
16use std::ops::Sub;
17use std::path::Path;
18use std::path::PathBuf;
19use std::sync::atomic::AtomicBool;
20use std::sync::atomic::Ordering;
21use std::sync::Arc;
22use std::thread::ThreadId;
23use std::time::Duration;
24use std::time::Instant;
25
26use inotify::{Inotify, WatchMask};
27use std::os::unix::io::AsRawFd;
28
29use anyhow::anyhow;
30use anyhow::bail;
31use anyhow::Context;
32use anyhow::Result;
33pub use bpf_skel::*;
34use clap::Parser;
35use crossbeam::channel::Receiver;
36use crossbeam::select;
37use lazy_static::lazy_static;
38use libbpf_rs::libbpf_sys;
39use libbpf_rs::AsRawLibbpf;
40use libbpf_rs::MapCore as _;
41use libbpf_rs::OpenObject;
42use libbpf_rs::ProgramInput;
43use nix::sched::CpuSet;
44use nvml_wrapper::error::NvmlError;
45use nvml_wrapper::Nvml;
46use once_cell::sync::OnceCell;
47use regex::Regex;
48use scx_layered::alloc::{unified_alloc, LayerAlloc, LayerDemand};
49use scx_layered::*;
50use scx_raw_pmu::PMUManager;
51use scx_stats::prelude::*;
52use scx_utils::build_id;
53use scx_utils::compat;
54use scx_utils::init_libbpf_logging;
55use scx_utils::libbpf_clap_opts::LibbpfOpts;
56use scx_utils::perf;
57use scx_utils::pm::{cpu_idle_resume_latency_supported, update_cpu_idle_resume_latency};
58use scx_utils::read_netdevs;
59use scx_utils::scx_enums;
60use scx_utils::scx_ops_attach;
61use scx_utils::scx_ops_load;
62use scx_utils::scx_ops_open;
63use scx_utils::uei_exited;
64use scx_utils::uei_report;
65use scx_utils::CoreType;
66use scx_utils::Cpumask;
67use scx_utils::Llc;
68use scx_utils::NetDev;
69use scx_utils::Topology;
70use scx_utils::TopologyArgs;
71use scx_utils::UserExitInfo;
72use scx_utils::NR_CPUS_POSSIBLE;
73use scx_utils::NR_CPU_IDS;
74use stats::LayerStats;
75use stats::StatsReq;
76use stats::StatsRes;
77use stats::SysStats;
78use std::collections::VecDeque;
79use sysinfo::{Pid, ProcessRefreshKind, ProcessesToUpdate, System};
80use tracing::{debug, error, info, trace, warn};
81use tracing_subscriber::filter::EnvFilter;
82use walkdir::WalkDir;
83
84const SCHEDULER_NAME: &str = "scx_layered";
85const MAX_PATH: usize = bpf_intf::consts_MAX_PATH as usize;
86const MAX_COMM: usize = bpf_intf::consts_MAX_COMM as usize;
87const MAX_LAYER_WEIGHT: u32 = bpf_intf::consts_MAX_LAYER_WEIGHT;
88const MIN_LAYER_WEIGHT: u32 = bpf_intf::consts_MIN_LAYER_WEIGHT;
89const MAX_LAYER_MATCH_ORS: usize = bpf_intf::consts_MAX_LAYER_MATCH_ORS as usize;
90const MAX_LAYER_NAME: usize = bpf_intf::consts_MAX_LAYER_NAME as usize;
91const MAX_LAYERS: usize = bpf_intf::consts_MAX_LAYERS as usize;
92const DEFAULT_LAYER_WEIGHT: u32 = bpf_intf::consts_DEFAULT_LAYER_WEIGHT;
93const USAGE_HALF_LIFE: u32 = bpf_intf::consts_USAGE_HALF_LIFE;
94const USAGE_HALF_LIFE_F64: f64 = USAGE_HALF_LIFE as f64 / 1_000_000_000.0;
95
96const LAYER_USAGE_OWNED: usize = bpf_intf::layer_usage_LAYER_USAGE_OWNED as usize;
97const LAYER_USAGE_OPEN: usize = bpf_intf::layer_usage_LAYER_USAGE_OPEN as usize;
98const LAYER_USAGE_SUM_UPTO: usize = bpf_intf::layer_usage_LAYER_USAGE_SUM_UPTO as usize;
99const LAYER_USAGE_PROTECTED: usize = bpf_intf::layer_usage_LAYER_USAGE_PROTECTED as usize;
100const LAYER_USAGE_PROTECTED_PREEMPT: usize =
101    bpf_intf::layer_usage_LAYER_USAGE_PROTECTED_PREEMPT as usize;
102const NR_LAYER_USAGES: usize = bpf_intf::layer_usage_NR_LAYER_USAGES as usize;
103
104const NR_GSTATS: usize = bpf_intf::global_stat_id_NR_GSTATS as usize;
105const NR_LSTATS: usize = bpf_intf::layer_stat_id_NR_LSTATS as usize;
106const NR_LLC_LSTATS: usize = bpf_intf::llc_layer_stat_id_NR_LLC_LSTATS as usize;
107
108const NR_LAYER_MATCH_KINDS: usize = bpf_intf::layer_match_kind_NR_LAYER_MATCH_KINDS as usize;
109
110static NVML: OnceCell<Nvml> = OnceCell::new();
111
112fn nvml() -> Result<&'static Nvml, NvmlError> {
113    NVML.get_or_try_init(Nvml::init)
114}
115
116lazy_static! {
117    static ref USAGE_DECAY: f64 = 0.5f64.powf(1.0 / USAGE_HALF_LIFE_F64);
118    static ref DFL_DISALLOW_OPEN_AFTER_US: u64 = 2 * scx_enums.SCX_SLICE_DFL / 1000;
119    static ref DFL_DISALLOW_PREEMPT_AFTER_US: u64 = 4 * scx_enums.SCX_SLICE_DFL / 1000;
120    static ref EXAMPLE_CONFIG: LayerConfig = serde_json::from_str(
121        r#"[
122          {
123            "name": "batch",
124            "comment": "tasks under system.slice or tasks with nice value > 0",
125            "matches": [[{"CgroupPrefix": "system.slice/"}], [{"NiceAbove": 0}]],
126            "kind": {"Confined": {
127              "util_range": [0.8, 0.9], "cpus_range": [0, 16],
128              "min_exec_us": 1000, "slice_us": 20000, "weight": 100,
129              "xllc_mig_min_us": 1000.0, "perf": 1024
130            }}
131          },
132          {
133            "name": "immediate",
134            "comment": "tasks under workload.slice with nice value < 0",
135            "matches": [[{"CgroupPrefix": "workload.slice/"}, {"NiceBelow": 0}]],
136            "kind": {"Open": {
137              "min_exec_us": 100, "yield_ignore": 0.25, "slice_us": 20000,
138              "preempt": true, "exclusive": true,
139              "prev_over_idle_core": true,
140              "weight": 100, "perf": 1024
141            }}
142          },
143          {
144            "name": "stress-ng",
145            "comment": "stress-ng test layer",
146            "matches": [[{"CommPrefix": "stress-ng"}], [{"PcommPrefix": "stress-ng"}]],
147            "kind": {"Confined": {
148              "util_range": [0.2, 0.8],
149              "min_exec_us": 800, "preempt": true, "slice_us": 800,
150              "weight": 100, "growth_algo": "Topo", "perf": 1024
151            }}
152          },
153          {
154            "name": "normal",
155            "comment": "the rest",
156            "matches": [[]],
157            "kind": {"Grouped": {
158              "util_range": [0.5, 0.6], "util_includes_open_cputime": true,
159              "min_exec_us": 200, "slice_us": 20000, "weight": 100,
160              "xllc_mig_min_us": 100.0, "growth_algo": "Linear", "perf": 1024
161            }}
162          }
163        ]"#,
164    )
165    .unwrap();
166}
167
168/// scx_layered: A highly configurable multi-layer sched_ext scheduler
169///
170/// scx_layered allows classifying tasks into multiple layers and applying
171/// different scheduling policies to them. The configuration is specified in
172/// json and composed of two parts - matches and policies.
173///
174/// Matches
175/// =======
176///
177/// Whenever a task is forked or its attributes are changed, the task goes
178/// through a series of matches to determine the layer it belongs to. A
179/// match set is composed of OR groups of AND blocks. An example:
180///
181///   "matches": [
182///     [
183///       {
184///         "CgroupPrefix": "system.slice/"
185///       }
186///     ],
187///     [
188///       {
189///         "CommPrefix": "fbagent"
190///       },
191///       {
192///         "NiceAbove": 0
193///       }
194///     ]
195///   ],
196///
197/// The outer array contains the OR groups and the inner AND blocks, so the
198/// above matches:
199///
200/// - Tasks which are in the cgroup sub-hierarchy under "system.slice".
201///
202/// - Or tasks whose comm starts with "fbagent" and have a nice value > 0.
203///
204/// Currently, the following matches are supported:
205///
206/// - CgroupPrefix: Matches the prefix of the cgroup that the task belongs
207///   to. As this is a string match, whether the pattern has the trailing
208///   '/' makes a difference. For example, "TOP/CHILD/" only matches tasks
209///   which are under that particular cgroup while "TOP/CHILD" also matches
210///   tasks under "TOP/CHILD0/" or "TOP/CHILD1/".
211///
212/// - CommPrefix: Matches the task's comm prefix.
213///
214/// - PcommPrefix: Matches the task's thread group leader's comm prefix.
215///
216/// - NiceAbove: Matches if the task's nice value is greater than the
217///   pattern.
218///
219/// - NiceBelow: Matches if the task's nice value is smaller than the
220///   pattern.
221///
222/// - NiceEquals: Matches if the task's nice value is exactly equal to
223///   the pattern.
224///
225/// - UIDEquals: Matches if the task's effective user id matches the value
226///
227/// - GIDEquals: Matches if the task's effective group id matches the value.
228///
229/// - PIDEquals: Matches if the task's pid matches the value.
230///
231/// - PPIDEquals: Matches if the task's ppid matches the value.
232///
233/// - TGIDEquals: Matches if the task's tgid matches the value.
234///
235/// - NSPIDEquals: Matches if the task's namespace id and pid matches the values.
236///
237/// - NSEquals: Matches if the task's namespace id matches the values.
238///
239/// - IsGroupLeader: Bool. When true, matches if the task is group leader
240///   (i.e. PID == TGID), aka the thread from which other threads are made.
241///   When false, matches if the task is *not* the group leader (i.e. the rest).
242///
243/// - CmdJoin: Matches when the task uses pthread_setname_np to send a join/leave
244///   command to the scheduler. See examples/cmdjoin.c for more details.
245///
246/// - UsedGpuTid: Bool. When true, matches if the tasks which have used
247///   gpus by tid.
248///
249/// - UsedGpuPid: Bool. When true, matches if the tasks which have used gpu
250///   by tgid/pid.
251///
252/// - [EXPERIMENTAL] AvgRuntime: (u64, u64). Match tasks whose average runtime
253///   is within the provided values [min, max).
254///
255/// - HintEquals: u64. Match tasks whose hint value equals this value.
256///   The value must be in the range [0, 1024].
257///
258/// - SystemCpuUtilBelow: f64. Match when the system CPU utilization fraction
259///   is below the specified threshold (a value in the range [0.0, 1.0]). This
260///   option can only be used in conjunction with HintEquals.
261///
262/// - DsqInsertBelow: f64. Match when the layer DSQ insertion fraction is below
263///   the specified threshold (a value in the range [0.0, 1.0]). This option can
264///   only be used in conjunction with HintEquals.
265///
266/// While there are complexity limitations as the matches are performed in
267/// BPF, it is straightforward to add more types of matches.
268///
269/// Templates
270/// ---------
271///
272/// Templates let us create a variable number of layers dynamically at initialization
273/// time out of a cgroup name suffix/prefix. Sometimes we know there are multiple
274/// applications running on a machine, each with their own cgroup but do not know the
275/// exact names of the applications or cgroups, e.g., in cloud computing contexts where
276/// workloads are placed on machines dynamically and run under cgroups whose name is
277/// autogenerated. In that case, we cannot hardcode the cgroup match rules when writing
278/// the configuration. We thus cannot easily prevent tasks from different cgroups from
279/// falling into the same layer and affecting each other's performance.
280///
281///
282/// Templates offer a solution to this problem by generating one layer for each such cgroup,
283/// provided these cgroups share a suffix, and that the suffix is unique to them. Templates
284/// have a cgroup suffix rule that we use to find the relevant cgroups in the system. For each
285/// such cgroup, we copy the layer config and add a matching rule that matches just this cgroup.
286///
287///
288/// Policies
289/// ========
290///
291/// The following is an example policy configuration for a layer.
292///
293///   "kind": {
294///     "Confined": {
295///       "cpus_range": [1, 8],
296///       "util_range": [0.8, 0.9]
297///     }
298///   }
299///
300/// It's of "Confined" kind, which tries to concentrate the layer's tasks
301/// into a limited number of CPUs. In the above case, the number of CPUs
302/// assigned to the layer is scaled between 1 and 8 so that the per-cpu
303/// utilization is kept between 80% and 90%. If the CPUs are loaded higher
304/// than 90%, more CPUs are allocated to the layer. If the utilization drops
305/// below 80%, the layer loses CPUs.
306///
307/// Currently, the following policy kinds are supported:
308///
309/// - Confined: Tasks are restricted to the allocated CPUs. The number of
310///   CPUs allocated is modulated to keep the per-CPU utilization in
311///   "util_range". The range can optionally be restricted with the
312///   "cpus_range" property.
313///
314/// - Grouped: Similar to Confined but tasks may spill outside if there are
315///   idle CPUs outside the allocated ones. The range can optionally be
316///   restricted with the "cpus_range" property. The optional
317///   "idle_confined" flag restricts idle selection to
318///   the layer's CPUs until the layer becomes saturated or the system
319///   is fully allocated, providing Confined-style cache locality under
320///   normal load with Grouped-style overflow under pressure.
321///
322/// - Open: Prefer the CPUs which are not occupied by Confined or Grouped
323///   layers. Tasks in this group will spill into occupied CPUs if there are
324///   no unoccupied idle CPUs.
325///
326/// All layers take the following options:
327///
328/// - min_exec_us: Minimum execution time in microseconds. Whenever a task
329///   is scheduled in, this is the minimum CPU time that it's charged no
330///   matter how short the actual execution time may be.
331///
332/// - yield_ignore: Yield ignore ratio. If 0.0, yield(2) forfeits a whole
333///   execution slice. 0.25 yields three quarters of an execution slice and
334///   so on. If 1.0, yield is completely ignored.
335///
336/// - slice_us: Scheduling slice duration in microseconds.
337///
338/// - fifo: Use FIFO queues within the layer instead of the default vtime.
339///
340/// - preempt: If true, tasks in the layer will preempt tasks which belong
341///   to other non-preempting layers when no idle CPUs are available.
342///
343/// - preempt_first: If true, tasks in the layer will try to preempt tasks
344///   in their previous CPUs before trying to find idle CPUs.
345///
346/// - exclusive: If true, tasks in the layer will occupy the whole core. The
347///   other logical CPUs sharing the same core will be kept idle. This isn't
348///   a hard guarantee, so don't depend on it for security purposes.
349///
350/// - allow_node_aligned: DEPRECATED. Node-aligned tasks are now always
351///   dispatched on layer DSQs. This field is ignored if specified.
352///
353/// - prev_over_idle_core: On SMT enabled systems, prefer using the same CPU
354///   when picking a CPU for tasks on this layer, even if that CPUs SMT
355///   sibling is processing a task.
356///
357/// - weight: Weight of the layer, which is a range from 1 to 10000 with a
358///   default of 100. Layer weights are used during contention to prevent
359///   starvation across layers. Weights are used in combination with
360///   utilization to determine the infeasible adjusted weight with higher
361///   weights having a larger adjustment in adjusted utilization.
362///
363/// - disallow_open_after_us: Duration to wait after machine reaches saturation
364///   before confining tasks in Open layers.
365///
366/// - cpus_range_frac: Array of 2 floats between 0 and 1.0. Lower and upper
367///   bound fractions of all CPUs to give to a layer. Mutually exclusive
368///   with cpus_range.
369///
370/// - disallow_preempt_after_us: Duration to wait after machine reaches saturation
371///   before confining tasks to preempt.
372///
373/// - xllc_mig_min_us: Skip cross-LLC migrations if they are likely to run on
374///   their existing LLC sooner than this.
375///
376/// - idle_smt: *** DEPRECATED ****
377///
378/// - growth_algo: Determines the order in which CPUs are allocated to the
379///   layer as it grows. All algorithms are NUMA-aware and produce per-node
380///   core orderings. Three placement classes exist:
381///
382///   * Locality (most algorithms): prefer the layer's home NUMA node and
383///     spill to remote nodes only when local capacity is exhausted.
384///   * Balanced (RoundRobin): distribute proportionally across all NUMA
385///     nodes via cap-weighted water_fill. A congested node reduces its
386///     share but does not cap the total — freed capacity flows to less
387///     loaded nodes.
388///   * Strict spread (NodeSpread*): enforce equal CPU counts across all
389///     NUMA nodes, capped at the least available node. Use when the
390///     workload needs strictly equal per-node CPU counts (memory-
391///     bandwidth-bound or replicated work).
392///
393///   Default: Sticky.
394///
395/// - perf: CPU performance target. 0 means no configuration. A value
396///   between 1 and 1024 indicates the performance level CPUs running tasks
397///   in this layer are configured to using scx_bpf_cpuperf_set().
398///
399/// - idle_resume_us: Sets the idle resume QoS value. CPU idle time governors are expected to
400///   regard the minimum of the global (effective) CPU latency limit and the effective resume
401///   latency constraint for the given CPU as the upper limit for the exit latency of the idle
402///   states. See the latest kernel docs for more details:
403///   https://www.kernel.org/doc/html/latest/admin-guide/pm/cpuidle.html
404///
405/// - nodes: If set the layer will use the set of NUMA nodes for scheduling
406///   decisions. If unset then all available NUMA nodes will be used. If the
407///   llcs value is set the cpuset of NUMA nodes will be or'ed with the LLC
408///   config.
409///
410/// - llcs: If set the layer will use the set of LLCs (last level caches)
411///   for scheduling decisions. If unset then all LLCs will be used. If
412///   the nodes value is set the cpuset of LLCs will be or'ed with the nodes
413///   config.
414///
415///
416/// Similar to matches, adding new policies and extending existing ones
417/// should be relatively straightforward.
418///
419/// Configuration example and running scx_layered
420/// =============================================
421///
422/// An scx_layered config is composed of layer configs. A layer config is
423/// composed of a name, a set of matches, and a policy block. Running the
424/// following will write an example configuration into example.json.
425///
426///   $ scx_layered -e example.json
427///
428/// Note that the last layer in the configuration must have an empty match set
429/// as a catch-all for tasks which haven't been matched into previous layers.
430///
431/// The configuration can be specified in multiple json files and
432/// command line arguments, which are concatenated in the specified
433/// order. Each must contain valid layer configurations.
434///
435/// By default, an argument to scx_layered is interpreted as a JSON string. If
436/// the argument is a pointer to a JSON file, it should be prefixed with file:
437/// or f: as follows:
438///
439///   $ scx_layered file:example.json
440///   ...
441///   $ scx_layered f:example.json
442///
443/// Monitoring Statistics
444/// =====================
445///
446/// Run with `--stats INTERVAL` to enable stats monitoring. There is
447/// also an scx_stat server listening on /var/run/scx/root/stat that can
448/// be monitored by running `scx_layered --monitor INTERVAL` separately.
449///
450///   ```bash
451///   $ scx_layered --monitor 1
452///   tot= 117909 local=86.20 open_idle= 0.21 affn_viol= 1.37 proc=6ms
453///   busy= 34.2 util= 1733.6 load=  21744.1 fb_cpus=[n0:1]
454///     batch    : util/frac=   11.8/  0.7 load/frac=     29.7:  0.1 tasks=  2597
455///                tot=   3478 local=67.80 open_idle= 0.00 preempt= 0.00 affn_viol= 0.00
456///                cpus=  2 [  2,  2] 04000001 00000000
457///     immediate: util/frac= 1218.8/ 70.3 load/frac=  21399.9: 98.4 tasks=  1107
458///                tot=  68997 local=90.57 open_idle= 0.26 preempt= 9.36 affn_viol= 0.00
459///                cpus= 50 [ 50, 50] fbfffffe 000fffff
460///     normal   : util/frac=  502.9/ 29.0 load/frac=    314.5:  1.4 tasks=  3512
461///                tot=  45434 local=80.97 open_idle= 0.16 preempt= 0.00 affn_viol= 3.56
462///                cpus= 50 [ 50, 50] fbfffffe 000fffff
463///   ```
464///
465/// Global statistics: see [`SysStats`]
466///
467/// Per-layer statistics: see [`LayerStats`]
468///
469#[derive(Debug, Parser)]
470#[command(verbatim_doc_comment)]
471struct Opts {
472    /// Deprecated, noop, use RUST_LOG or --log-level instead.
473    #[clap(short = 'v', long, action = clap::ArgAction::Count)]
474    verbose: u8,
475
476    /// Scheduling slice duration in microseconds.
477    #[clap(short = 's', long, default_value = "20000")]
478    slice_us: u64,
479
480    /// Maximum consecutive execution time in microseconds. A task may be
481    /// allowed to keep executing on a CPU for this long. Note that this is
482    /// the upper limit and a task may have to moved off the CPU earlier. 0
483    /// indicates default - 20 * slice_us.
484    #[clap(short = 'M', long, default_value = "0")]
485    max_exec_us: u64,
486
487    /// Scheduling interval in seconds.
488    #[clap(short = 'i', long, default_value = "0.1")]
489    interval: f64,
490
491    /// ***DEPRECATED*** Disable load-fraction based max layer CPU limit.
492    /// recommended.
493    #[clap(short = 'n', long, default_value = "false")]
494    no_load_frac_limit: bool,
495
496    /// Exit debug dump buffer length. 0 indicates default.
497    #[clap(long, default_value = "0")]
498    exit_dump_len: u32,
499
500    /// Specify the logging level. Accepts rust's envfilter syntax for modular
501    /// logging: https://docs.rs/tracing-subscriber/latest/tracing_subscriber/filter/struct.EnvFilter.html#example-syntax. Examples: ["info", "warn,tokio=info"]
502    #[clap(long, default_value = "info")]
503    log_level: String,
504
505    /// Disable topology awareness. When enabled, the "nodes" and "llcs" settings on
506    /// a layer are ignored. Defaults to false on topologies with multiple NUMA nodes
507    /// or LLCs, and true otherwise.
508    #[arg(short = 't', long, num_args = 0..=1, default_missing_value = "true", require_equals = true)]
509    disable_topology: Option<bool>,
510
511    /// Disable monitor
512    #[clap(long)]
513    monitor_disable: bool,
514
515    /// Write example layer specifications into the file and exit.
516    #[clap(short = 'e', long)]
517    example: Option<String>,
518
519    /// ***DEPRECATED*** Disables preemption if the weighted load fraction
520    /// of a layer (load_frac_adj) exceeds the threshold. The default is
521    /// disabled (0.0).
522    #[clap(long, default_value = "0.0")]
523    layer_preempt_weight_disable: f64,
524
525    /// ***DEPRECATED*** Disables layer growth if the weighted load fraction
526    /// of a layer (load_frac_adj) exceeds the threshold. The default is
527    /// disabled (0.0).
528    #[clap(long, default_value = "0.0")]
529    layer_growth_weight_disable: f64,
530
531    /// Enable stats monitoring with the specified interval.
532    #[clap(long)]
533    stats: Option<f64>,
534
535    /// Run in stats monitoring mode with the specified interval. Scheduler
536    /// is not launched.
537    #[clap(long)]
538    monitor: Option<f64>,
539
540    /// Column limit for stats monitor output.
541    #[clap(long, default_value = "95")]
542    stats_columns: usize,
543
544    /// Disable per-LLC stats in monitor output.
545    #[clap(long)]
546    stats_no_llc: bool,
547
548    /// Run with example layer specifications (useful for e.g. CI pipelines)
549    #[clap(long)]
550    run_example: bool,
551
552    /// Allocate CPUs at an SMT granularity (not core)
553    #[clap(long)]
554    allow_partial_core: bool,
555
556    /// ***DEPRECATED *** Enables iteration over local LLCs first for
557    /// dispatch.
558    #[clap(long, default_value = "false")]
559    local_llc_iteration: bool,
560
561    /// Low priority fallback DSQs are used to execute tasks with custom CPU
562    /// affinities. These DSQs are immediately executed iff a CPU is
563    /// otherwise idle. However, after the specified wait, they are
564    /// guaranteed upto --lo-fb-share fraction of each CPU.
565    #[clap(long, default_value = "10000")]
566    lo_fb_wait_us: u64,
567
568    /// The fraction of CPU time guaranteed to low priority fallback DSQs.
569    /// See --lo-fb-wait-us.
570    #[clap(long, default_value = ".05")]
571    lo_fb_share: f64,
572
573    /// Disable antistall
574    #[clap(long, default_value = "false")]
575    disable_antistall: bool,
576
577    /// Enable numa topology based gpu task affinitization.
578    #[clap(long, default_value = "false")]
579    enable_gpu_affinitize: bool,
580
581    /// Interval at which to reaffinitize gpu tasks to numa nodes.
582    /// Defaults to 900s
583    #[clap(long, default_value = "900")]
584    gpu_affinitize_secs: u64,
585
586    /// Enable match debug
587    /// This stores a mapping of task tid
588    /// to layer id such that bpftool map dump
589    /// can be used to debug layer matches.
590    #[clap(long, default_value = "false")]
591    enable_match_debug: bool,
592
593    /// Maximum task runnable_at delay (in seconds) before antistall turns on
594    #[clap(long, default_value = "3")]
595    antistall_sec: u64,
596
597    /// Enable gpu support
598    #[clap(long, default_value = "false")]
599    enable_gpu_support: bool,
600
601    /// Gpu Kprobe Level
602    /// The value set here determines how aggressive
603    /// the kprobes enabled on gpu driver functions are.
604    /// Higher values are more aggressive, incurring more system overhead
605    /// and more accurately identifying PIDs using GPUs in a more timely manner.
606    /// Lower values incur less system overhead, at the cost of less accurately
607    /// identifying GPU pids and taking longer to do so.
608    #[clap(long, default_value = "3")]
609    gpu_kprobe_level: u64,
610
611    /// Enable utilization compensation for unattributed CPU work (irq, softirq, stolen). When
612    /// enabled, each CPU's layer usage is scaled by the inverse of available capacity to account
613    /// for time lost to interrupts.
614    #[clap(long, default_value = "false")]
615    util_compensation: bool,
616
617    /// Enable netdev IRQ balancing. This is experimental and should be used with caution.
618    #[clap(long, default_value = "false")]
619    netdev_irq_balance: bool,
620
621    /// Disable queued wakeup optimization.
622    #[clap(long, default_value = "false")]
623    disable_queued_wakeup: bool,
624
625    /// Per-cpu kthreads are preempting by default. Make it not so.
626    #[clap(long, default_value = "false")]
627    disable_percpu_kthread_preempt: bool,
628
629    /// Only highpri (nice < 0) per-cpu kthreads are preempting by default.
630    /// Make every per-cpu kthread preempting. Meaningful only if
631    /// --disable-percpu-kthread-preempt is not set.
632    #[clap(long, default_value = "false")]
633    percpu_kthread_preempt_all: bool,
634
635    /// Print scheduler version and exit.
636    #[clap(short = 'V', long, action = clap::ArgAction::SetTrue)]
637    version: bool,
638
639    /// Optional run ID for tracking scheduler instances.
640    #[clap(long)]
641    run_id: Option<u64>,
642
643    /// Show descriptions for statistics.
644    #[clap(long)]
645    help_stats: bool,
646
647    /// Layer specification. See --help.
648    specs: Vec<String>,
649
650    /// Periodically force tasks in layers using the AvgRuntime match rule to reevaluate which layer they belong to. Default period of 2s.
651    /// turns this off.
652    #[clap(long, default_value = "2000")]
653    layer_refresh_ms_avgruntime: u64,
654
655    /// Set the path for pinning the task hint map.
656    #[clap(long, default_value = "")]
657    task_hint_map: String,
658
659    /// Print the config (after template expansion) and exit.
660    #[clap(long, default_value = "false")]
661    print_and_exit: bool,
662
663    /// Enable affinitized task to use hi fallback queue to get more CPU time.
664    #[clap(long, default_value = "")]
665    hi_fb_thread_name: String,
666
667    #[clap(flatten, next_help_heading = "Topology Options")]
668    topology: TopologyArgs,
669
670    #[clap(flatten, next_help_heading = "Libbpf Options")]
671    pub libbpf: LibbpfOpts,
672}
673
674// Cgroup event types for inter-thread communication
675#[derive(Debug, Clone)]
676enum CgroupEvent {
677    Created {
678        path: String,
679        cgroup_id: u64,    // inode number
680        match_bitmap: u64, // bitmap of matched regex rules
681    },
682    Removed {
683        path: String,
684        cgroup_id: u64, // inode number
685    },
686}
687
688fn read_total_cpu(reader: &fb_procfs::ProcReader) -> Result<fb_procfs::CpuStat> {
689    reader
690        .read_stat()
691        .context("Failed to read procfs")?
692        .total_cpu
693        .ok_or_else(|| anyhow!("Could not read total cpu stat in proc"))
694}
695
696fn calc_util(curr: &fb_procfs::CpuStat, prev: &fb_procfs::CpuStat) -> Result<f64> {
697    match (curr, prev) {
698        (
699            fb_procfs::CpuStat {
700                user_usec: Some(curr_user),
701                nice_usec: Some(curr_nice),
702                system_usec: Some(curr_system),
703                idle_usec: Some(curr_idle),
704                iowait_usec: Some(curr_iowait),
705                irq_usec: Some(curr_irq),
706                softirq_usec: Some(curr_softirq),
707                stolen_usec: Some(curr_stolen),
708                ..
709            },
710            fb_procfs::CpuStat {
711                user_usec: Some(prev_user),
712                nice_usec: Some(prev_nice),
713                system_usec: Some(prev_system),
714                idle_usec: Some(prev_idle),
715                iowait_usec: Some(prev_iowait),
716                irq_usec: Some(prev_irq),
717                softirq_usec: Some(prev_softirq),
718                stolen_usec: Some(prev_stolen),
719                ..
720            },
721        ) => {
722            let idle_usec = curr_idle.saturating_sub(*prev_idle);
723            let iowait_usec = curr_iowait.saturating_sub(*prev_iowait);
724            let user_usec = curr_user.saturating_sub(*prev_user);
725            let system_usec = curr_system.saturating_sub(*prev_system);
726            let nice_usec = curr_nice.saturating_sub(*prev_nice);
727            let irq_usec = curr_irq.saturating_sub(*prev_irq);
728            let softirq_usec = curr_softirq.saturating_sub(*prev_softirq);
729            let stolen_usec = curr_stolen.saturating_sub(*prev_stolen);
730
731            let busy_usec =
732                user_usec + system_usec + nice_usec + irq_usec + softirq_usec + stolen_usec;
733            let total_usec = idle_usec + busy_usec + iowait_usec;
734            if total_usec > 0 {
735                Ok(((busy_usec as f64) / (total_usec as f64)).clamp(0.0, 1.0))
736            } else {
737                Ok(1.0)
738            }
739        }
740        _ => bail!("Missing stats in cpustat"),
741    }
742}
743
744fn copy_into_cstr(dst: &mut [i8], src: &str) {
745    let cstr = CString::new(src).unwrap();
746    let bytes = unsafe { std::mem::transmute::<&[u8], &[i8]>(cstr.as_bytes_with_nul()) };
747    dst[0..bytes.len()].copy_from_slice(bytes);
748}
749
750#[allow(clippy::needless_range_loop)]
751fn read_cpu_ctxs(skel: &BpfSkel) -> Result<Vec<bpf_intf::cpu_ctx>> {
752    let mut cpu_ctxs = vec![];
753    let cpu_ctxs_vec = skel
754        .maps
755        .cpu_ctxs
756        .lookup_percpu(&0u32.to_ne_bytes(), libbpf_rs::MapFlags::ANY)
757        .context("Failed to lookup cpu_ctx")?
758        .unwrap();
759    for cpu in 0..*NR_CPUS_POSSIBLE {
760        cpu_ctxs.push(
761            *plain::from_bytes(cpu_ctxs_vec[cpu].as_slice())
762                .expect("cpu_ctx: short or misaligned buffer"),
763        );
764    }
765    Ok(cpu_ctxs)
766}
767
768#[derive(Clone, Debug)]
769struct BpfStats {
770    gstats: Vec<u64>,
771    lstats: Vec<Vec<u64>>,
772    lstats_sums: Vec<u64>,
773    llc_lstats: Vec<Vec<Vec<u64>>>, // [layer][llc][stat]
774}
775
776impl BpfStats {
777    #[allow(clippy::needless_range_loop)]
778    fn read(skel: &BpfSkel, cpu_ctxs: &[bpf_intf::cpu_ctx]) -> Self {
779        let nr_layers = skel.maps.rodata_data.as_ref().unwrap().nr_layers as usize;
780        let nr_llcs = skel.maps.rodata_data.as_ref().unwrap().nr_llcs as usize;
781        let mut gstats = vec![0u64; NR_GSTATS];
782        let mut lstats = vec![vec![0u64; NR_LSTATS]; nr_layers];
783        let mut llc_lstats = vec![vec![vec![0u64; NR_LLC_LSTATS]; nr_llcs]; nr_layers];
784
785        for cpu in 0..*NR_CPUS_POSSIBLE {
786            for (stat, value) in gstats.iter_mut().enumerate() {
787                *value += cpu_ctxs[cpu].gstats[stat];
788            }
789            for (layer, layer_stats) in lstats.iter_mut().enumerate() {
790                for (stat, value) in layer_stats.iter_mut().enumerate() {
791                    *value += cpu_ctxs[cpu].lstats[layer][stat];
792                }
793            }
794        }
795
796        let mut lstats_sums = vec![0u64; NR_LSTATS];
797        for layer_stats in lstats.iter() {
798            for (stat, value) in lstats_sums.iter_mut().enumerate() {
799                *value += layer_stats[stat];
800            }
801        }
802
803        for llc_id in 0..nr_llcs {
804            // XXX - This would be a lot easier if llc_ctx were in
805            // the bss. Unfortunately, kernel < v6.12 crashes and
806            // kernel >= v6.12 fails verification after such
807            // conversion due to seemingly verifier bugs. Convert to
808            // bss maps later.
809            let v = skel
810                .maps
811                .llc_data
812                .lookup(&(llc_id as u32).to_ne_bytes(), libbpf_rs::MapFlags::ANY)
813                .unwrap()
814                .unwrap();
815            let llcc: &bpf_intf::llc_ctx =
816                plain::from_bytes(v.as_slice()).expect("llc_ctx: short or misaligned buffer");
817
818            for (layer_id, layer_llc_stats) in llc_lstats.iter_mut().enumerate() {
819                for (stat_id, stat_value) in layer_llc_stats[llc_id].iter_mut().enumerate() {
820                    *stat_value = llcc.lstats[layer_id][stat_id];
821                }
822            }
823        }
824
825        Self {
826            gstats,
827            lstats,
828            lstats_sums,
829            llc_lstats,
830        }
831    }
832}
833
834impl<'b> Sub<&'b BpfStats> for &BpfStats {
835    type Output = BpfStats;
836
837    fn sub(self, rhs: &'b BpfStats) -> BpfStats {
838        let vec_sub = |l: &[u64], r: &[u64]| l.iter().zip(r.iter()).map(|(l, r)| *l - *r).collect();
839        BpfStats {
840            gstats: vec_sub(&self.gstats, &rhs.gstats),
841            lstats: self
842                .lstats
843                .iter()
844                .zip(rhs.lstats.iter())
845                .map(|(l, r)| vec_sub(l, r))
846                .collect(),
847            lstats_sums: vec_sub(&self.lstats_sums, &rhs.lstats_sums),
848            llc_lstats: self
849                .llc_lstats
850                .iter()
851                .zip(rhs.llc_lstats.iter())
852                .map(|(l_layer, r_layer)| {
853                    l_layer
854                        .iter()
855                        .zip(r_layer.iter())
856                        .map(|(l_llc, r_llc)| {
857                            let (l_llc, mut r_llc) = (l_llc.clone(), r_llc.clone());
858                            // Lat is not subtractable, take L side.
859                            r_llc[bpf_intf::llc_layer_stat_id_LLC_LSTAT_LAT as usize] = 0;
860                            vec_sub(&l_llc, &r_llc)
861                        })
862                        .collect()
863                })
864                .collect(),
865        }
866    }
867}
868
869#[derive(Clone, Debug)]
870struct Stats {
871    at: Instant,
872    elapsed: Duration,
873    topo: Arc<Topology>,
874    nr_layers: usize,
875    nr_layer_tasks: Vec<usize>,
876    layer_nr_node_pinned_tasks: Vec<Vec<u64>>,
877
878    total_util: f64, // Running AVG of sum of layer_utils
879    layer_utils: Vec<Vec<f64>>,
880    layer_node_pinned_utils: Vec<Vec<f64>>,
881    prev_layer_node_pinned_usages: Vec<Vec<u64>>,
882    layer_node_utils: Vec<Vec<f64>>,
883    prev_layer_node_usages: Vec<Vec<u64>>,
884    layer_node_duty_sums: Vec<Vec<f64>>, // Per-node per-layer duty in CPU units (EWMA)
885    prev_layer_node_duty_raw: Vec<Vec<u64>>, // Raw accumulated layer_duty_sum values
886
887    layer_membws: Vec<Vec<f64>>, // Estimated memory bandsidth consumption
888    prev_layer_membw_agg: Vec<Vec<u64>>, // Estimated aggregate membw consumption
889
890    cpu_busy: f64, // Read from /proc, maybe higher than total_util
891    prev_total_cpu: fb_procfs::CpuStat,
892    prev_pmu_resctrl_membw: (u64, u64), // (PMU-reported membw, resctrl-reported membw)
893
894    util_compensation: bool,
895    layer_utils_compensated: Vec<Vec<f64>>, // EWMA of per-CPU-scaled layer utils
896    prev_cpu_layer_usages: Vec<u64>,        // Per-CPU per-layer usages for computing deltas
897    prev_per_cpu_stats: BTreeMap<u32, fb_procfs::CpuStat>,
898
899    system_cpu_util_ewma: f64,       // 10s EWMA of system CPU utilization
900    layer_dsq_insert_ewma: Vec<f64>, // 10s EWMA of per-layer DSQ insertion ratio
901
902    bpf_stats: BpfStats,
903    prev_bpf_stats: BpfStats,
904
905    processing_dur: Duration,
906    prev_processing_dur: Duration,
907
908    layer_slice_us: Vec<u64>,
909
910    gpu_tasks_affinitized: u64,
911    gpu_task_affinitization_ms: u64,
912}
913
914impl Stats {
915    #[allow(clippy::needless_range_loop)]
916    fn read_layer_membw_agg(cpu_ctxs: &[bpf_intf::cpu_ctx], nr_layers: usize) -> Vec<Vec<u64>> {
917        let mut layer_membw_agg = vec![vec![0u64; NR_LAYER_USAGES]; nr_layers];
918
919        for cpu in 0..*NR_CPUS_POSSIBLE {
920            for (layer, layer_membw) in layer_membw_agg.iter_mut().enumerate() {
921                for (usage, value) in layer_membw.iter_mut().enumerate() {
922                    *value += cpu_ctxs[cpu].layer_membw_agg[layer][usage];
923                }
924            }
925        }
926
927        layer_membw_agg
928    }
929
930    #[allow(clippy::needless_range_loop)]
931    fn read_layer_node_pinned_usages(
932        cpu_ctxs: &[bpf_intf::cpu_ctx],
933        topo: &Topology,
934        nr_layers: usize,
935        nr_nodes: usize,
936    ) -> Vec<Vec<u64>> {
937        let mut usages = vec![vec![0u64; nr_nodes]; nr_layers];
938
939        for cpu in 0..*NR_CPUS_POSSIBLE {
940            let node = topo.all_cpus.get(&cpu).map_or(0, |c| c.node_id);
941            for (layer, layer_usages) in usages.iter_mut().enumerate().take(nr_layers) {
942                layer_usages[node] += cpu_ctxs[cpu].node_pinned_usage[layer];
943            }
944        }
945
946        usages
947    }
948
949    #[allow(clippy::needless_range_loop)]
950    fn read_layer_node_usages(
951        cpu_ctxs: &[bpf_intf::cpu_ctx],
952        topo: &Topology,
953        nr_layers: usize,
954        nr_nodes: usize,
955    ) -> Vec<Vec<u64>> {
956        let mut usages = vec![vec![0u64; nr_nodes]; nr_layers];
957
958        for cpu in 0..*NR_CPUS_POSSIBLE {
959            let node = topo.all_cpus.get(&cpu).map_or(0, |c| c.node_id);
960            for (layer, layer_usages) in usages.iter_mut().enumerate().take(nr_layers) {
961                for usage in 0..=LAYER_USAGE_SUM_UPTO {
962                    layer_usages[node] += cpu_ctxs[cpu].layer_usages[layer][usage];
963                }
964            }
965        }
966
967        usages
968    }
969
970    #[allow(clippy::needless_range_loop)]
971    fn read_layer_node_duty_raw(
972        cpu_ctxs: &[bpf_intf::cpu_ctx],
973        topo: &Topology,
974        nr_layers: usize,
975        nr_nodes: usize,
976    ) -> Vec<Vec<u64>> {
977        let mut sums = vec![vec![0u64; nr_nodes]; nr_layers];
978
979        for cpu in 0..*NR_CPUS_POSSIBLE {
980            let node = topo.all_cpus.get(&cpu).map_or(0, |c| c.node_id);
981            for (layer, layer_sums) in sums.iter_mut().enumerate().take(nr_layers) {
982                layer_sums[node] += cpu_ctxs[cpu].layer_duty_sum[layer];
983            }
984        }
985
986        sums
987    }
988
989    #[allow(clippy::needless_range_loop)]
990    fn read_per_cpu_layer_usages(cpu_ctxs: &[bpf_intf::cpu_ctx], nr_layers: usize) -> Vec<u64> {
991        let stride = nr_layers * NR_LAYER_USAGES;
992        let mut flat = vec![0u64; *NR_CPUS_POSSIBLE * stride];
993
994        for cpu in 0..*NR_CPUS_POSSIBLE {
995            let base = cpu * stride;
996            for layer in 0..nr_layers {
997                for usage in 0..NR_LAYER_USAGES {
998                    flat[base + layer * NR_LAYER_USAGES + usage] =
999                        cpu_ctxs[cpu].layer_usages[layer][usage];
1000                }
1001            }
1002        }
1003
1004        flat
1005    }
1006
1007    /// Use the membw reported by resctrl to normalize the values reported by hw counters.
1008    /// We have the following problem:
1009    /// 1) We want per-task memory bandwidth reporting. We cannot do this with resctrl, much
1010    ///    less transparently, since we would require different RMID for each task.
1011    /// 2) We want to directly use perf counters for tracking per-task memory bandwidth, but
1012    ///    we can't: Non-resctrl counters do not measure the right thing (e.g., they only measure
1013    ///    proxies like load operations),
1014    /// 3) Resctrl counters are not accessible directly so we cannot read them from the BPF side.
1015    ///
1016    /// Approximate per-task memory bandwidth using perf counters to measure _relative_ memory
1017    /// bandwidth usage.
1018    fn resctrl_read_total_membw() -> Result<u64> {
1019        let mut total_membw = 0u64;
1020        for entry in WalkDir::new("/sys/fs/resctrl/mon_data")
1021            .min_depth(1)
1022            .into_iter()
1023            .filter_map(Result::ok)
1024            .filter(|x| x.path().is_dir())
1025        {
1026            let mut path = entry.path().to_path_buf();
1027            path.push("mbm_total_bytes");
1028            total_membw += fs::read_to_string(path)?.trim().parse::<u64>()?;
1029        }
1030
1031        Ok(total_membw)
1032    }
1033
1034    fn new(
1035        skel: &mut BpfSkel,
1036        proc_reader: &fb_procfs::ProcReader,
1037        topo: Arc<Topology>,
1038        gpu_task_affinitizer: &GpuTaskAffinitizer,
1039        util_compensation: bool,
1040    ) -> Result<Self> {
1041        let nr_layers = skel.maps.rodata_data.as_ref().unwrap().nr_layers as usize;
1042        let nr_nodes = topo.nodes.len();
1043        let cpu_ctxs = read_cpu_ctxs(skel)?;
1044        let bpf_stats = BpfStats::read(skel, &cpu_ctxs);
1045        let pmu_membw = Self::read_layer_membw_agg(&cpu_ctxs, nr_layers);
1046
1047        Ok(Self {
1048            at: Instant::now(),
1049            elapsed: Default::default(),
1050
1051            topo: topo.clone(),
1052            nr_layers,
1053            nr_layer_tasks: vec![0; nr_layers],
1054            layer_nr_node_pinned_tasks: vec![vec![0; nr_nodes]; nr_layers],
1055
1056            total_util: 0.0,
1057            layer_utils: vec![vec![0.0; NR_LAYER_USAGES]; nr_layers],
1058            layer_node_pinned_utils: vec![vec![0.0; nr_nodes]; nr_layers],
1059            prev_layer_node_pinned_usages: Self::read_layer_node_pinned_usages(
1060                &cpu_ctxs, &topo, nr_layers, nr_nodes,
1061            ),
1062            layer_node_utils: vec![vec![0.0; nr_nodes]; nr_layers],
1063            prev_layer_node_usages: Self::read_layer_node_usages(
1064                &cpu_ctxs, &topo, nr_layers, nr_nodes,
1065            ),
1066            layer_node_duty_sums: vec![vec![0.0; nr_nodes]; nr_layers],
1067            prev_layer_node_duty_raw: Self::read_layer_node_duty_raw(
1068                &cpu_ctxs, &topo, nr_layers, nr_nodes,
1069            ),
1070            layer_membws: vec![vec![0.0; NR_LAYER_USAGES]; nr_layers],
1071            // This is not normalized because we don't have enough history to do so.
1072            // It should not matter too much, since the value is dropped on the first
1073            // iteration.
1074            prev_layer_membw_agg: pmu_membw,
1075            prev_pmu_resctrl_membw: (0, 0),
1076
1077            cpu_busy: 0.0,
1078            prev_total_cpu: read_total_cpu(proc_reader)?,
1079            util_compensation,
1080            layer_utils_compensated: vec![vec![0.0; NR_LAYER_USAGES]; nr_layers],
1081            prev_cpu_layer_usages: Self::read_per_cpu_layer_usages(&cpu_ctxs, nr_layers),
1082            prev_per_cpu_stats: BTreeMap::new(),
1083            system_cpu_util_ewma: 0.0,
1084            layer_dsq_insert_ewma: vec![0.0; nr_layers],
1085
1086            bpf_stats: bpf_stats.clone(),
1087            prev_bpf_stats: bpf_stats,
1088
1089            processing_dur: Default::default(),
1090            prev_processing_dur: Default::default(),
1091
1092            layer_slice_us: vec![0; nr_layers],
1093            gpu_tasks_affinitized: gpu_task_affinitizer.tasks_affinitized,
1094            gpu_task_affinitization_ms: gpu_task_affinitizer.last_task_affinitization_ms,
1095        })
1096    }
1097
1098    fn refresh(
1099        &mut self,
1100        skel: &mut BpfSkel,
1101        proc_reader: &fb_procfs::ProcReader,
1102        now: Instant,
1103        cur_processing_dur: Duration,
1104        gpu_task_affinitizer: &GpuTaskAffinitizer,
1105    ) -> Result<()> {
1106        let elapsed = now.duration_since(self.at);
1107        let elapsed_f64 = elapsed.as_secs_f64();
1108        let cpu_ctxs = read_cpu_ctxs(skel)?;
1109
1110        let layers = &skel.maps.bss_data.as_ref().unwrap().layers;
1111        let nr_layer_tasks: Vec<usize> = layers
1112            .iter()
1113            .take(self.nr_layers)
1114            .map(|layer| layer.nr_tasks as usize)
1115            .collect();
1116        let layer_nr_node_pinned_tasks: Vec<Vec<u64>> = layers
1117            .iter()
1118            .take(self.nr_layers)
1119            .map(|layer| {
1120                layer.node[..self.topo.nodes.len()]
1121                    .iter()
1122                    .map(|n| n.nr_pinned_tasks)
1123                    .collect()
1124            })
1125            .collect();
1126        let layer_slice_us: Vec<u64> = layers
1127            .iter()
1128            .take(self.nr_layers)
1129            .map(|layer| layer.slice_ns / 1000_u64)
1130            .collect();
1131
1132        let cur_layer_node_pinned_usages = Self::read_layer_node_pinned_usages(
1133            &cpu_ctxs,
1134            &self.topo,
1135            self.nr_layers,
1136            self.topo.nodes.len(),
1137        );
1138        let cur_layer_node_usages = Self::read_layer_node_usages(
1139            &cpu_ctxs,
1140            &self.topo,
1141            self.nr_layers,
1142            self.topo.nodes.len(),
1143        );
1144        let cur_layer_node_duty_raw = Self::read_layer_node_duty_raw(
1145            &cpu_ctxs,
1146            &self.topo,
1147            self.nr_layers,
1148            self.topo.nodes.len(),
1149        );
1150        let cur_layer_membw_agg = Self::read_layer_membw_agg(&cpu_ctxs, self.nr_layers);
1151
1152        // Memory BW normalization. It requires finding the delta according to perf, the delta
1153        // according to resctl, and finding the factor between them. This also helps in
1154        // determining whether this delta is stable.
1155        //
1156        // We scale only the raw PMC delta - the part of the counter incremented between two
1157        // periods. This is because the scaling factor is only valid for that time period.
1158        // Non-scaled samples are not comparable, while scaled ones are.
1159        let (pmu_prev, resctrl_prev) = self.prev_pmu_resctrl_membw;
1160        let pmu_cur: u64 = cur_layer_membw_agg
1161            .iter()
1162            .map(|membw_agg| membw_agg[LAYER_USAGE_OPEN] + membw_agg[LAYER_USAGE_OWNED])
1163            .sum();
1164        let resctrl_cur = Self::resctrl_read_total_membw()?;
1165        let factor = (resctrl_cur - resctrl_prev) as f64 / (pmu_cur - pmu_prev) as f64;
1166
1167        // Computes the runtime deltas and converts them from ns to s.
1168        let compute_diff = |cur_agg: &Vec<Vec<u64>>, prev_agg: &Vec<Vec<u64>>| {
1169            cur_agg
1170                .iter()
1171                .zip(prev_agg.iter())
1172                .map(|(cur, prev)| {
1173                    cur.iter()
1174                        .zip(prev.iter())
1175                        .map(|(c, p)| (c - p) as f64 / 1_000_000_000.0 / elapsed_f64)
1176                        .collect()
1177                })
1178                .collect()
1179        };
1180
1181        // Computes the total memory traffic done since last computation and normalizes to GBs.
1182        // We derive the rate of consumption elsewhere.
1183        let compute_mem_diff = |cur_agg: &Vec<Vec<u64>>, prev_agg: &Vec<Vec<u64>>| {
1184            cur_agg
1185                .iter()
1186                .zip(prev_agg.iter())
1187                .map(|(cur, prev)| {
1188                    cur.iter()
1189                        .zip(prev.iter())
1190                        .map(|(c, p)| (*c as i64 - *p as i64) as f64 / 1024_f64.powf(3.0))
1191                        .collect()
1192                })
1193                .collect()
1194        };
1195
1196        // Scale the raw value delta by the resctrl/pmc computed factor.
1197        let cur_layer_membw: Vec<Vec<f64>> =
1198            compute_mem_diff(&cur_layer_membw_agg, &self.prev_layer_membw_agg);
1199
1200        let cur_layer_membw: Vec<Vec<f64>> = cur_layer_membw
1201            .iter()
1202            .map(|x| x.iter().map(|x| *x * factor).collect())
1203            .collect();
1204
1205        let metric_decay =
1206            |cur_metric: Vec<Vec<f64>>, prev_metric: &Vec<Vec<f64>>, decay_rate: f64| {
1207                cur_metric
1208                    .iter()
1209                    .zip(prev_metric.iter())
1210                    .map(|(cur, prev)| {
1211                        cur.iter()
1212                            .zip(prev.iter())
1213                            .map(|(c, p)| {
1214                                let decay = decay_rate.powf(elapsed_f64);
1215                                p * decay + c * (1.0 - decay)
1216                            })
1217                            .collect()
1218                    })
1219                    .collect()
1220            };
1221
1222        let cur_node_pinned_utils: Vec<Vec<f64>> = compute_diff(
1223            &cur_layer_node_pinned_usages,
1224            &self.prev_layer_node_pinned_usages,
1225        );
1226        let layer_node_pinned_utils: Vec<Vec<f64>> = metric_decay(
1227            cur_node_pinned_utils,
1228            &self.layer_node_pinned_utils,
1229            *USAGE_DECAY,
1230        );
1231        let cur_node_utils: Vec<Vec<f64>> =
1232            compute_diff(&cur_layer_node_usages, &self.prev_layer_node_usages);
1233        let layer_node_utils: Vec<Vec<f64>> =
1234            metric_decay(cur_node_utils, &self.layer_node_utils, *USAGE_DECAY);
1235        let cur_node_duty: Vec<Vec<f64>> =
1236            compute_diff(&cur_layer_node_duty_raw, &self.prev_layer_node_duty_raw);
1237        let layer_node_duty_sums: Vec<Vec<f64>> =
1238            metric_decay(cur_node_duty, &self.layer_node_duty_sums, *USAGE_DECAY);
1239
1240        let layer_membws: Vec<Vec<f64>> = metric_decay(cur_layer_membw, &self.layer_membws, 0.0);
1241
1242        let proc_stat = proc_reader
1243            .read_stat()
1244            .context("Failed to read /proc/stat")?;
1245        let cur_total_cpu = proc_stat
1246            .total_cpu
1247            .ok_or_else(|| anyhow!("Could not read total cpu stat in proc"))?;
1248        let cpu_busy = calc_util(&cur_total_cpu, &self.prev_total_cpu)?;
1249
1250        // Calculate system CPU utilization EWMA (10 second window)
1251        const SYS_CPU_UTIL_EWMA_SECS: f64 = 10.0;
1252        let elapsed_f64 = elapsed.as_secs_f64();
1253        let alpha = elapsed_f64 / SYS_CPU_UTIL_EWMA_SECS.max(elapsed_f64);
1254        let system_cpu_util_ewma = alpha * cpu_busy + (1.0 - alpha) * self.system_cpu_util_ewma;
1255
1256        // Per-CPU scale factors: s[c] = Δt / (Δt - irq - softirq - stolen).
1257        // When compensation is off, all scales stay 1.0.
1258        let cur_per_cpu_stats = proc_stat.cpus_map.unwrap_or_default();
1259        let mut cpu_scales = vec![1.0f64; *NR_CPUS_POSSIBLE];
1260        if self.util_compensation {
1261            for (&cpu_id, cur_cpu_stat) in &cur_per_cpu_stats {
1262                let cpu = cpu_id as usize;
1263                if let Some(prev_cpu_stat) = self.prev_per_cpu_stats.get(&cpu_id) {
1264                    if let (
1265                        fb_procfs::CpuStat {
1266                            user_usec: Some(cu),
1267                            nice_usec: Some(cn),
1268                            system_usec: Some(cs),
1269                            idle_usec: Some(ci),
1270                            iowait_usec: Some(cw),
1271                            irq_usec: Some(cq),
1272                            softirq_usec: Some(cf),
1273                            stolen_usec: Some(ct),
1274                            ..
1275                        },
1276                        fb_procfs::CpuStat {
1277                            user_usec: Some(pu),
1278                            nice_usec: Some(pn),
1279                            system_usec: Some(ps),
1280                            idle_usec: Some(pi),
1281                            iowait_usec: Some(pw),
1282                            irq_usec: Some(pq),
1283                            softirq_usec: Some(pf),
1284                            stolen_usec: Some(pt),
1285                            ..
1286                        },
1287                    ) = (cur_cpu_stat, prev_cpu_stat)
1288                    {
1289                        let delta_total = cu.saturating_sub(*pu)
1290                            + cn.saturating_sub(*pn)
1291                            + cs.saturating_sub(*ps)
1292                            + ci.saturating_sub(*pi)
1293                            + cw.saturating_sub(*pw)
1294                            + cq.saturating_sub(*pq)
1295                            + cf.saturating_sub(*pf)
1296                            + ct.saturating_sub(*pt);
1297                        let overhead = cq.saturating_sub(*pq)
1298                            + cf.saturating_sub(*pf)
1299                            + ct.saturating_sub(*pt);
1300                        let available = delta_total.saturating_sub(overhead);
1301                        cpu_scales[cpu] = if available > 0 {
1302                            (delta_total as f64 / available as f64).clamp(1.0, 20.0)
1303                        } else {
1304                            1.0
1305                        };
1306                    }
1307                }
1308            }
1309        }
1310
1311        // Single pass over all CPUs: build both raw and compensated layer
1312        // util streams. When compensation is off, all scales are 1.0 so
1313        // comp == raw naturally.
1314        let cur_cpu_layer_usages = Self::read_per_cpu_layer_usages(&cpu_ctxs, self.nr_layers);
1315        let stride = self.nr_layers * NR_LAYER_USAGES;
1316        let mut raw_sums = vec![vec![0.0f64; NR_LAYER_USAGES]; self.nr_layers];
1317        let mut scaled_sums = vec![vec![0.0f64; NR_LAYER_USAGES]; self.nr_layers];
1318        #[allow(clippy::needless_range_loop)]
1319        for cpu in 0..*NR_CPUS_POSSIBLE {
1320            let scale = cpu_scales[cpu];
1321            let base = cpu * stride;
1322            for layer in 0..self.nr_layers {
1323                for usage in 0..NR_LAYER_USAGES {
1324                    let idx = base + layer * NR_LAYER_USAGES + usage;
1325                    let delta =
1326                        cur_cpu_layer_usages[idx].saturating_sub(self.prev_cpu_layer_usages[idx]);
1327                    if delta > 0 {
1328                        let delta_f = delta as f64;
1329                        raw_sums[layer][usage] += delta_f;
1330                        scaled_sums[layer][usage] += delta_f * scale;
1331                    }
1332                }
1333            }
1334        }
1335        let normalize = |sums: Vec<Vec<f64>>| -> Vec<Vec<f64>> {
1336            sums.into_iter()
1337                .map(|layer_sums| {
1338                    layer_sums
1339                        .into_iter()
1340                        .map(|s| s / 1_000_000_000.0 / elapsed_f64)
1341                        .collect()
1342                })
1343                .collect()
1344        };
1345        let layer_utils: Vec<Vec<f64>> =
1346            metric_decay(normalize(raw_sums), &self.layer_utils, *USAGE_DECAY);
1347        let layer_utils_compensated: Vec<Vec<f64>> = metric_decay(
1348            normalize(scaled_sums),
1349            &self.layer_utils_compensated,
1350            *USAGE_DECAY,
1351        );
1352
1353        let cur_bpf_stats = BpfStats::read(skel, &cpu_ctxs);
1354        let bpf_stats = &cur_bpf_stats - &self.prev_bpf_stats;
1355
1356        // Calculate per-layer DSQ insertion EWMA (10 second window)
1357        const DSQ_INSERT_EWMA_SECS: f64 = 10.0;
1358        let dsq_alpha = elapsed_f64 / DSQ_INSERT_EWMA_SECS.max(elapsed_f64);
1359        let layer_dsq_insert_ewma: Vec<f64> = (0..self.nr_layers)
1360            .map(|layer_id| {
1361                let sel_local = bpf_stats.lstats[layer_id]
1362                    [bpf_intf::layer_stat_id_LSTAT_SEL_LOCAL as usize]
1363                    as f64;
1364                let enq_local = bpf_stats.lstats[layer_id]
1365                    [bpf_intf::layer_stat_id_LSTAT_ENQ_LOCAL as usize]
1366                    as f64;
1367                let enq_dsq = bpf_stats.lstats[layer_id]
1368                    [bpf_intf::layer_stat_id_LSTAT_ENQ_DSQ as usize]
1369                    as f64;
1370                let total_dispatches = sel_local + enq_local + enq_dsq;
1371
1372                let cur_ratio = if total_dispatches > 0.0 {
1373                    enq_dsq / total_dispatches
1374                } else {
1375                    0.0
1376                };
1377
1378                dsq_alpha * cur_ratio + (1.0 - dsq_alpha) * self.layer_dsq_insert_ewma[layer_id]
1379            })
1380            .collect();
1381
1382        let processing_dur = cur_processing_dur
1383            .checked_sub(self.prev_processing_dur)
1384            .unwrap();
1385
1386        *self = Self {
1387            at: now,
1388            elapsed,
1389            topo: self.topo.clone(),
1390            nr_layers: self.nr_layers,
1391            nr_layer_tasks,
1392            layer_nr_node_pinned_tasks,
1393
1394            total_util: layer_utils
1395                .iter()
1396                .map(|x| x.iter().take(LAYER_USAGE_SUM_UPTO + 1).sum::<f64>())
1397                .sum(),
1398            layer_utils,
1399            layer_node_pinned_utils,
1400            prev_layer_node_pinned_usages: cur_layer_node_pinned_usages,
1401            layer_node_utils,
1402            prev_layer_node_usages: cur_layer_node_usages,
1403            layer_node_duty_sums,
1404            prev_layer_node_duty_raw: cur_layer_node_duty_raw,
1405
1406            layer_membws,
1407            prev_layer_membw_agg: cur_layer_membw_agg,
1408            // Was updated during normalization.
1409            prev_pmu_resctrl_membw: (pmu_cur, resctrl_cur),
1410
1411            cpu_busy,
1412            prev_total_cpu: cur_total_cpu,
1413            util_compensation: self.util_compensation,
1414            layer_utils_compensated,
1415            prev_cpu_layer_usages: cur_cpu_layer_usages,
1416            prev_per_cpu_stats: cur_per_cpu_stats,
1417            system_cpu_util_ewma,
1418            layer_dsq_insert_ewma,
1419
1420            bpf_stats,
1421            prev_bpf_stats: cur_bpf_stats,
1422
1423            processing_dur,
1424            prev_processing_dur: cur_processing_dur,
1425
1426            layer_slice_us,
1427            gpu_tasks_affinitized: gpu_task_affinitizer.tasks_affinitized,
1428            gpu_task_affinitization_ms: gpu_task_affinitizer.last_task_affinitization_ms,
1429        };
1430        Ok(())
1431    }
1432}
1433
1434#[derive(Debug)]
1435struct Layer {
1436    name: String,
1437    kind: LayerKind,
1438    growth_algo: LayerGrowthAlgo,
1439    core_order: Vec<Vec<usize>>,
1440
1441    assigned_llcs: Vec<Vec<usize>>,
1442
1443    nr_cpus: usize,
1444    nr_llc_cpus: Vec<usize>,
1445    nr_node_cpus: Vec<usize>,
1446    cpus: Cpumask,
1447    allowed_cpus: Cpumask,
1448
1449    /// Per-node count of CPUs allocated for pinned demand.
1450    nr_pinned_cpus: Vec<usize>,
1451}
1452
1453fn get_kallsyms_addr(sym_name: &str) -> Result<u64> {
1454    fs::read_to_string("/proc/kallsyms")?
1455        .lines()
1456        .find(|line| line.contains(sym_name))
1457        .and_then(|line| line.split_whitespace().next())
1458        .and_then(|addr| u64::from_str_radix(addr, 16).ok())
1459        .ok_or_else(|| anyhow!("Symbol '{}' not found", sym_name))
1460}
1461
1462fn resolve_cpus_pct_range(
1463    cpus_range: &Option<(usize, usize)>,
1464    cpus_range_frac: &Option<(f64, f64)>,
1465    max_cpus: usize,
1466) -> Result<(usize, usize)> {
1467    match (cpus_range, cpus_range_frac) {
1468        (Some(_x), Some(_y)) => {
1469            bail!("cpus_range cannot be used with cpus_pct.");
1470        }
1471        (Some((cpus_range_min, cpus_range_max)), None) => Ok((*cpus_range_min, *cpus_range_max)),
1472        (None, Some((cpus_frac_min, cpus_frac_max))) => {
1473            if *cpus_frac_min < 0_f64
1474                || *cpus_frac_min > 1_f64
1475                || *cpus_frac_max < 0_f64
1476                || *cpus_frac_max > 1_f64
1477            {
1478                bail!("cpus_range_frac values must be between 0.0 and 1.0");
1479            }
1480            let cpus_min_count = ((max_cpus as f64) * cpus_frac_min).round_ties_even() as usize;
1481            let cpus_max_count = ((max_cpus as f64) * cpus_frac_max).round_ties_even() as usize;
1482            Ok((
1483                std::cmp::max(cpus_min_count, 1),
1484                std::cmp::min(std::cmp::max(cpus_max_count, 1), max_cpus),
1485            ))
1486        }
1487        (None, None) => Ok((0, max_cpus)),
1488    }
1489}
1490
1491fn update_peak_util(prev_peak: f64, cur_util: f64, half_life: Duration, elapsed: Duration) -> f64 {
1492    let decay = 0.5f64.powf(elapsed.as_secs_f64() / half_life.as_secs_f64());
1493    (prev_peak * decay).max(cur_util)
1494}
1495
1496fn calc_peak_aware_target_range(
1497    util: f64,
1498    peak_util: f64,
1499    util_range: (f64, f64),
1500) -> (usize, usize) {
1501    let low = (util / util_range.1).ceil() as usize;
1502    let high = ((util.max(peak_util) / util_range.0).floor() as usize).max(low);
1503    (low, high)
1504}
1505
1506impl Layer {
1507    fn new(spec: &LayerSpec, topo: &Topology, core_order: &Vec<Vec<usize>>) -> Result<Self> {
1508        let name = &spec.name;
1509        let kind = spec.kind.clone();
1510        let mut allowed_cpus = Cpumask::new();
1511        match &kind {
1512            LayerKind::Confined {
1513                cpus_range,
1514                cpus_range_frac,
1515                common: LayerCommon { nodes, llcs, .. },
1516                ..
1517            } => {
1518                let cpus_range =
1519                    resolve_cpus_pct_range(cpus_range, cpus_range_frac, topo.all_cpus.len())?;
1520                if cpus_range.0 > cpus_range.1 || cpus_range.1 == 0 {
1521                    bail!("invalid cpus_range {:?}", cpus_range);
1522                }
1523                if nodes.is_empty() && llcs.is_empty() {
1524                    allowed_cpus.set_all();
1525                } else {
1526                    // build up the cpus bitset
1527                    for (node_id, node) in &topo.nodes {
1528                        // first do the matching for nodes
1529                        if nodes.contains(node_id) {
1530                            for &id in node.all_cpus.keys() {
1531                                allowed_cpus.set_cpu(id)?;
1532                            }
1533                        }
1534                        // next match on any LLCs
1535                        for (llc_id, llc) in &node.llcs {
1536                            if llcs.contains(llc_id) {
1537                                for &id in llc.all_cpus.keys() {
1538                                    allowed_cpus.set_cpu(id)?;
1539                                }
1540                            }
1541                        }
1542                    }
1543                }
1544            }
1545            LayerKind::Grouped {
1546                common: LayerCommon { nodes, llcs, .. },
1547                ..
1548            }
1549            | LayerKind::Open {
1550                common: LayerCommon { nodes, llcs, .. },
1551                ..
1552            } => {
1553                if nodes.is_empty() && llcs.is_empty() {
1554                    allowed_cpus.set_all();
1555                } else {
1556                    // build up the cpus bitset
1557                    for (node_id, node) in &topo.nodes {
1558                        // first do the matching for nodes
1559                        if nodes.contains(node_id) {
1560                            for &id in node.all_cpus.keys() {
1561                                allowed_cpus.set_cpu(id)?;
1562                            }
1563                        }
1564                        // next match on any LLCs
1565                        for (llc_id, llc) in &node.llcs {
1566                            if llcs.contains(llc_id) {
1567                                for &id in llc.all_cpus.keys() {
1568                                    allowed_cpus.set_cpu(id)?;
1569                                }
1570                            }
1571                        }
1572                    }
1573                }
1574            }
1575        }
1576
1577        // Util can be above 1.0 for grouped layers if
1578        // util_includes_open_cputime is set.
1579        if let Some(util_range) = kind.util_range() {
1580            if util_range.0 < 0.0 || util_range.1 < 0.0 || util_range.0 >= util_range.1 {
1581                bail!("invalid util_range {:?}", util_range);
1582            }
1583        } else if kind.common().util_peak_half_life_ms != 0 {
1584            bail!("util_peak_half_life_ms requires util_range");
1585        }
1586
1587        let layer_growth_algo = kind.common().growth_algo.clone();
1588
1589        debug!(
1590            "layer: {} algo: {:?} core order: {:?}",
1591            name, &layer_growth_algo, core_order
1592        );
1593
1594        Ok(Self {
1595            name: name.into(),
1596            kind,
1597            growth_algo: layer_growth_algo,
1598            core_order: core_order.clone(),
1599
1600            assigned_llcs: vec![vec![]; topo.nodes.len()],
1601
1602            nr_cpus: 0,
1603            nr_llc_cpus: vec![0; topo.all_llcs.len()],
1604            nr_node_cpus: vec![0; topo.nodes.len()],
1605            cpus: Cpumask::new(),
1606            allowed_cpus,
1607
1608            nr_pinned_cpus: vec![0; topo.nodes.len()],
1609        })
1610    }
1611}
1612#[derive(Debug, Clone)]
1613struct NodeInfo {
1614    node_mask: nix::sched::CpuSet,
1615    _node_id: usize,
1616}
1617
1618#[derive(Debug)]
1619struct GpuTaskAffinitizer {
1620    // This struct tracks information necessary to numa affinitize
1621    // gpu tasks periodically when needed.
1622    gpu_devs_to_node_info: HashMap<u32, NodeInfo>,
1623    gpu_pids_to_devs: HashMap<Pid, u32>,
1624    last_process_time: Option<Instant>,
1625    sys: System,
1626    pid_map: HashMap<Pid, Vec<Pid>>,
1627    poll_interval: Duration,
1628    enable: bool,
1629    tasks_affinitized: u64,
1630    last_task_affinitization_ms: u64,
1631}
1632
1633impl GpuTaskAffinitizer {
1634    pub fn new(poll_interval: u64, enable: bool) -> GpuTaskAffinitizer {
1635        GpuTaskAffinitizer {
1636            gpu_devs_to_node_info: HashMap::new(),
1637            gpu_pids_to_devs: HashMap::new(),
1638            last_process_time: None,
1639            sys: System::default(),
1640            pid_map: HashMap::new(),
1641            poll_interval: Duration::from_secs(poll_interval),
1642            enable,
1643            tasks_affinitized: 0,
1644            last_task_affinitization_ms: 0,
1645        }
1646    }
1647
1648    fn find_one_cpu(&self, affinity: Vec<u64>) -> Result<u32> {
1649        for (chunk, &mask) in affinity.iter().enumerate() {
1650            let mut inner_offset: u64 = 1;
1651            for _ in 0..64 {
1652                if (mask & inner_offset) != 0 {
1653                    return Ok((64 * chunk + u64::trailing_zeros(inner_offset) as usize) as u32);
1654                }
1655                inner_offset <<= 1;
1656            }
1657        }
1658        anyhow::bail!("unable to get CPU from NVML bitmask");
1659    }
1660
1661    fn node_to_cpuset(&self, node: &scx_utils::Node) -> Result<CpuSet> {
1662        let mut cpuset = CpuSet::new();
1663        for cpu_id in node.all_cpus.keys() {
1664            cpuset.set(*cpu_id)?;
1665        }
1666        Ok(cpuset)
1667    }
1668
1669    fn init_dev_node_map(&mut self, topo: Arc<Topology>) -> Result<()> {
1670        let nvml = nvml()?;
1671        let device_count = nvml.device_count()?;
1672
1673        for idx in 0..device_count {
1674            let dev = nvml.device_by_index(idx)?;
1675            // For machines w/ up to 1024 CPUs.
1676            let cpu = dev.cpu_affinity(16)?;
1677            let ideal_cpu = self.find_one_cpu(cpu)?;
1678            if let Some(cpu) = topo.all_cpus.get(&(ideal_cpu as usize)) {
1679                self.gpu_devs_to_node_info.insert(
1680                    idx,
1681                    NodeInfo {
1682                        node_mask: self.node_to_cpuset(
1683                            topo.nodes.get(&cpu.node_id).expect("topo missing node"),
1684                        )?,
1685                        _node_id: cpu.node_id,
1686                    },
1687                );
1688            }
1689        }
1690        Ok(())
1691    }
1692
1693    fn update_gpu_pids(&mut self) -> Result<()> {
1694        let nvml = nvml()?;
1695        for i in 0..nvml.device_count()? {
1696            let device = nvml.device_by_index(i)?;
1697            for proc in device
1698                .running_compute_processes()?
1699                .into_iter()
1700                .chain(device.running_graphics_processes()?.into_iter())
1701            {
1702                self.gpu_pids_to_devs.insert(Pid::from_u32(proc.pid), i);
1703            }
1704        }
1705        Ok(())
1706    }
1707
1708    fn update_process_info(&mut self) -> Result<()> {
1709        self.sys.refresh_processes_specifics(
1710            ProcessesToUpdate::All,
1711            true,
1712            ProcessRefreshKind::nothing(),
1713        );
1714        self.pid_map.clear();
1715        for (pid, proc_) in self.sys.processes() {
1716            if let Some(ppid) = proc_.parent() {
1717                self.pid_map.entry(ppid).or_default().push(*pid);
1718            }
1719        }
1720        Ok(())
1721    }
1722
1723    fn get_child_pids_and_tids(&self, root_pid: Pid) -> HashSet<Pid> {
1724        let mut work = VecDeque::from([root_pid]);
1725        let mut pids_and_tids: HashSet<Pid> = HashSet::new();
1726
1727        while let Some(pid) = work.pop_front() {
1728            if pids_and_tids.insert(pid) {
1729                if let Some(kids) = self.pid_map.get(&pid) {
1730                    work.extend(kids);
1731                }
1732                if let Some(proc_) = self.sys.process(pid) {
1733                    if let Some(tasks) = proc_.tasks() {
1734                        pids_and_tids.extend(tasks.iter().copied());
1735                    }
1736                }
1737            }
1738        }
1739        pids_and_tids
1740    }
1741
1742    fn affinitize_gpu_pids(&mut self) -> Result<()> {
1743        if !self.enable {
1744            return Ok(());
1745        }
1746        for (pid, dev) in &self.gpu_pids_to_devs {
1747            let node_info = self
1748                .gpu_devs_to_node_info
1749                .get(dev)
1750                .expect("Unable to get gpu pid node mask");
1751            for child in self.get_child_pids_and_tids(*pid) {
1752                match nix::sched::sched_setaffinity(
1753                    nix::unistd::Pid::from_raw(child.as_u32() as i32),
1754                    &node_info.node_mask,
1755                ) {
1756                    Ok(_) => {
1757                        // Increment the global counter for successful affinitization
1758                        self.tasks_affinitized += 1;
1759                    }
1760                    Err(_) => {
1761                        debug!(
1762                            "Error affinitizing gpu pid {} to node {:#?}",
1763                            child.as_u32(),
1764                            node_info
1765                        );
1766                    }
1767                };
1768            }
1769        }
1770        Ok(())
1771    }
1772
1773    pub fn maybe_affinitize(&mut self) {
1774        if !self.enable {
1775            return;
1776        }
1777        let now = Instant::now();
1778
1779        if let Some(last_process_time) = self.last_process_time {
1780            if (now - last_process_time) < self.poll_interval {
1781                return;
1782            }
1783        }
1784
1785        match self.update_gpu_pids() {
1786            Ok(_) => {}
1787            Err(e) => {
1788                error!("Error updating GPU PIDs: {}", e);
1789            }
1790        };
1791        match self.update_process_info() {
1792            Ok(_) => {}
1793            Err(e) => {
1794                error!("Error updating process info to affinitize GPU PIDs: {}", e);
1795            }
1796        };
1797        match self.affinitize_gpu_pids() {
1798            Ok(_) => {}
1799            Err(e) => {
1800                error!("Error updating GPU PIDs: {}", e);
1801            }
1802        };
1803        self.last_process_time = Some(now);
1804        self.last_task_affinitization_ms = (Instant::now() - now).as_millis() as u64;
1805    }
1806
1807    pub fn init(&mut self, topo: Arc<Topology>) {
1808        if !self.enable || self.last_process_time.is_some() {
1809            return;
1810        }
1811
1812        match self.init_dev_node_map(topo) {
1813            Ok(_) => {}
1814            Err(e) => {
1815                error!("Error initializing gpu node dev map: {}", e);
1816            }
1817        };
1818        self.sys = System::new_all();
1819    }
1820}
1821
1822struct Scheduler<'a> {
1823    skel: BpfSkel<'a>,
1824    struct_ops: Option<libbpf_rs::Link>,
1825    layer_specs: Vec<LayerSpec>,
1826
1827    sched_intv: Duration,
1828    layer_refresh_intv: Duration,
1829
1830    cpu_pool: CpuPool,
1831    layers: Vec<Layer>,
1832    idle_qos_enabled: bool,
1833
1834    proc_reader: fb_procfs::ProcReader,
1835    sched_stats: Stats,
1836    layer_peak_utils: Vec<f64>,
1837
1838    cgroup_regexes: Option<HashMap<u32, Regex>>,
1839
1840    nr_layer_cpus_ranges: Vec<(usize, usize)>,
1841    xnuma_mig_src: Vec<Vec<bool>>,
1842    growth_denied: Vec<Vec<bool>>,
1843    processing_dur: Duration,
1844
1845    topo: Arc<Topology>,
1846    netdevs: BTreeMap<String, NetDev>,
1847    stats_server: StatsServer<StatsReq, StatsRes>,
1848    gpu_task_handler: GpuTaskAffinitizer,
1849}
1850
1851const DUTY_CYCLE_SCALE: f64 = (1u64 << 20) as f64;
1852const XNUMA_RATE_DAMPEN: f64 = 0.5;
1853
1854/// Result of xnuma water-fill computation for a single layer.
1855struct XnumaRates {
1856    /// rates[src][dst]: migration rate in duty-cycle-scaled units.
1857    rates: Vec<Vec<u64>>,
1858}
1859
1860/// Determine per-node migration source state with two-threshold hysteresis.
1861///
1862/// Each (layer, node) independently decides if it's a migration source.
1863/// Open (is_mig_src=true) requires all three:
1864///   1. load/alloc > threshold.1 (significant load)
1865///   2. surplus/alloc > delta.1 (significant imbalance)
1866///   3. growth_denied (allocation can't solve it)
1867///
1868/// Close (is_mig_src=false) when any one:
1869///   1. load/alloc < threshold.0 (load dropped)
1870///   2. surplus/alloc < delta.0 (imbalance resolved)
1871///   3. !growth_denied (growth succeeded)
1872fn xnuma_check_active(
1873    duty_sums: &[f64],
1874    allocs: &[usize],
1875    threshold: (f64, f64),
1876    threshold_delta: (f64, f64),
1877    growth_denied: &[bool],
1878    currently_active: &[bool],
1879) -> Vec<bool> {
1880    let nr_nodes = duty_sums.len();
1881    let total_duty: f64 = duty_sums.iter().sum();
1882    let total_alloc: f64 = allocs.iter().map(|&a| a as f64).sum();
1883    let eq_ratio = if total_alloc > 0.0 {
1884        total_duty / total_alloc
1885    } else {
1886        0.0
1887    };
1888
1889    let (thresh_lo, thresh_hi) = threshold;
1890    let (delta_lo, delta_hi) = threshold_delta;
1891
1892    let mut result = vec![false; nr_nodes];
1893    for nid in 0..nr_nodes {
1894        let alloc = allocs[nid] as f64;
1895        if alloc <= 0.0 {
1896            if duty_sums[nid] > 0.0 && growth_denied[nid] {
1897                result[nid] = true;
1898            }
1899            continue;
1900        }
1901
1902        let load_ratio = duty_sums[nid] / alloc;
1903        let surplus = duty_sums[nid] - eq_ratio * alloc;
1904        let surplus_ratio = surplus / alloc;
1905
1906        let should_activate =
1907            load_ratio > thresh_hi && surplus_ratio > delta_hi && growth_denied[nid];
1908        let should_deactivate =
1909            load_ratio < thresh_lo || surplus_ratio < delta_lo || !growth_denied[nid];
1910
1911        if should_activate {
1912            result[nid] = true;
1913        } else if should_deactivate {
1914            result[nid] = false;
1915        } else {
1916            result[nid] = currently_active[nid];
1917        }
1918    }
1919    result
1920}
1921
1922/// Compute water-fill migration rates for a single layer.
1923///
1924/// Finds the equalization ratio (water line) across all nodes, then
1925/// computes per-(src, dst) migration rates proportional to each source's
1926/// surplus and each destination's share of total deficit.
1927fn xnuma_compute_rates(duty_sums: &[f64], allocs: &[usize]) -> XnumaRates {
1928    let nr_nodes = duty_sums.len();
1929    let total_duty: f64 = duty_sums.iter().sum();
1930    let total_alloc: f64 = allocs.iter().map(|&a| a as f64).sum();
1931
1932    if total_alloc <= 0.0 {
1933        return XnumaRates {
1934            rates: vec![vec![0u64; nr_nodes]; nr_nodes],
1935        };
1936    }
1937
1938    let eq_ratio = total_duty / total_alloc;
1939
1940    let mut surpluses = vec![0.0f64; nr_nodes];
1941    let mut deficits = vec![0.0f64; nr_nodes];
1942    for nid in 0..nr_nodes {
1943        let expected = eq_ratio * allocs[nid] as f64;
1944        let delta = duty_sums[nid] - expected;
1945        if delta > 0.0 {
1946            surpluses[nid] = delta;
1947        } else {
1948            deficits[nid] = -delta;
1949        }
1950    }
1951
1952    let total_deficit: f64 = deficits.iter().sum();
1953
1954    let mut rates = vec![vec![0u64; nr_nodes]; nr_nodes];
1955    for src in 0..nr_nodes {
1956        for dst in 0..nr_nodes {
1957            if src == dst || total_deficit <= 0.0 || surpluses[src] <= 0.0 {
1958                continue;
1959            }
1960            // Dampen: transfer half the surplus per cycle so convergence
1961            // is gradual rather than a single-step overcorrection.
1962            let migration = surpluses[src] * deficits[dst] / total_deficit * XNUMA_RATE_DAMPEN;
1963            rates[src][dst] = (migration * DUTY_CYCLE_SCALE) as u64;
1964        }
1965    }
1966
1967    XnumaRates { rates }
1968}
1969
1970impl<'a> Scheduler<'a> {
1971    fn init_layers(
1972        skel: &mut OpenBpfSkel,
1973        specs: &[LayerSpec],
1974        topo: &Topology,
1975    ) -> Result<HashMap<u32, Regex>> {
1976        skel.maps.rodata_data.as_mut().unwrap().nr_layers = specs.len() as u32;
1977        let mut perf_set = false;
1978
1979        let mut layer_iteration_order = (0..specs.len()).collect::<Vec<_>>();
1980        let mut layer_weights: Vec<usize> = vec![];
1981        let mut cgroup_regex_id = 0;
1982        let mut cgroup_regexes = HashMap::new();
1983
1984        for (spec_i, spec) in specs.iter().enumerate() {
1985            let layer = &mut skel.maps.bss_data.as_mut().unwrap().layers[spec_i];
1986
1987            for (or_i, or) in spec.matches.iter().enumerate() {
1988                for (and_i, and) in or.iter().enumerate() {
1989                    let mt = &mut layer.matches[or_i].matches[and_i];
1990
1991                    // Rules are allowlist-based by default
1992                    mt.exclude.write(false);
1993
1994                    match and {
1995                        LayerMatch::CgroupPrefix(prefix) => {
1996                            mt.kind = bpf_intf::layer_match_kind_MATCH_CGROUP_PREFIX as i32;
1997                            copy_into_cstr(&mut mt.cgroup_prefix, prefix.as_str());
1998                        }
1999                        LayerMatch::CgroupSuffix(suffix) => {
2000                            mt.kind = bpf_intf::layer_match_kind_MATCH_CGROUP_SUFFIX as i32;
2001                            copy_into_cstr(&mut mt.cgroup_suffix, suffix.as_str());
2002                        }
2003                        LayerMatch::CgroupRegex(regex_str) => {
2004                            if cgroup_regex_id >= bpf_intf::consts_MAX_CGROUP_REGEXES {
2005                                bail!(
2006                                    "Too many cgroup regex rules. Maximum allowed: {}",
2007                                    bpf_intf::consts_MAX_CGROUP_REGEXES
2008                                );
2009                            }
2010
2011                            // CgroupRegex matching handled in userspace via cgroup watcher
2012                            mt.kind = bpf_intf::layer_match_kind_MATCH_CGROUP_REGEX as i32;
2013                            mt.cgroup_regex_id = cgroup_regex_id;
2014
2015                            let regex = Regex::new(regex_str).with_context(|| {
2016                                format!("Invalid regex '{}' in layer '{}'", regex_str, spec.name)
2017                            })?;
2018                            cgroup_regexes.insert(cgroup_regex_id, regex);
2019                            cgroup_regex_id += 1;
2020                        }
2021                        LayerMatch::CgroupContains(substr) => {
2022                            mt.kind = bpf_intf::layer_match_kind_MATCH_CGROUP_CONTAINS as i32;
2023                            copy_into_cstr(&mut mt.cgroup_substr, substr.as_str());
2024                        }
2025                        LayerMatch::CommPrefix(prefix) => {
2026                            mt.kind = bpf_intf::layer_match_kind_MATCH_COMM_PREFIX as i32;
2027                            copy_into_cstr(&mut mt.comm_prefix, prefix.as_str());
2028                        }
2029                        LayerMatch::CommPrefixExclude(prefix) => {
2030                            mt.kind = bpf_intf::layer_match_kind_MATCH_COMM_PREFIX as i32;
2031                            mt.exclude.write(true);
2032                            copy_into_cstr(&mut mt.comm_prefix, prefix.as_str());
2033                        }
2034                        LayerMatch::PcommPrefix(prefix) => {
2035                            mt.kind = bpf_intf::layer_match_kind_MATCH_PCOMM_PREFIX as i32;
2036                            copy_into_cstr(&mut mt.pcomm_prefix, prefix.as_str());
2037                        }
2038                        LayerMatch::PcommPrefixExclude(prefix) => {
2039                            mt.kind = bpf_intf::layer_match_kind_MATCH_PCOMM_PREFIX as i32;
2040                            mt.exclude.write(true);
2041                            copy_into_cstr(&mut mt.pcomm_prefix, prefix.as_str());
2042                        }
2043                        LayerMatch::NiceAbove(nice) => {
2044                            mt.kind = bpf_intf::layer_match_kind_MATCH_NICE_ABOVE as i32;
2045                            mt.nice = *nice;
2046                        }
2047                        LayerMatch::NiceBelow(nice) => {
2048                            mt.kind = bpf_intf::layer_match_kind_MATCH_NICE_BELOW as i32;
2049                            mt.nice = *nice;
2050                        }
2051                        LayerMatch::NiceEquals(nice) => {
2052                            mt.kind = bpf_intf::layer_match_kind_MATCH_NICE_EQUALS as i32;
2053                            mt.nice = *nice;
2054                        }
2055                        LayerMatch::UIDEquals(user_id) => {
2056                            mt.kind = bpf_intf::layer_match_kind_MATCH_USER_ID_EQUALS as i32;
2057                            mt.user_id = *user_id;
2058                        }
2059                        LayerMatch::GIDEquals(group_id) => {
2060                            mt.kind = bpf_intf::layer_match_kind_MATCH_GROUP_ID_EQUALS as i32;
2061                            mt.group_id = *group_id;
2062                        }
2063                        LayerMatch::PIDEquals(pid) => {
2064                            mt.kind = bpf_intf::layer_match_kind_MATCH_PID_EQUALS as i32;
2065                            mt.pid = *pid;
2066                        }
2067                        LayerMatch::PPIDEquals(ppid) => {
2068                            mt.kind = bpf_intf::layer_match_kind_MATCH_PPID_EQUALS as i32;
2069                            mt.ppid = *ppid;
2070                        }
2071                        LayerMatch::TGIDEquals(tgid) => {
2072                            mt.kind = bpf_intf::layer_match_kind_MATCH_TGID_EQUALS as i32;
2073                            mt.tgid = *tgid;
2074                        }
2075                        LayerMatch::NSPIDEquals(nsid, pid) => {
2076                            mt.kind = bpf_intf::layer_match_kind_MATCH_NSPID_EQUALS as i32;
2077                            mt.nsid = *nsid;
2078                            mt.pid = *pid;
2079                        }
2080                        LayerMatch::NSEquals(nsid) => {
2081                            mt.kind = bpf_intf::layer_match_kind_MATCH_NS_EQUALS as i32;
2082                            mt.nsid = *nsid as u64;
2083                        }
2084                        LayerMatch::CmdJoin(joincmd) => {
2085                            mt.kind = bpf_intf::layer_match_kind_MATCH_SCXCMD_JOIN as i32;
2086                            copy_into_cstr(&mut mt.comm_prefix, joincmd);
2087                        }
2088                        LayerMatch::IsGroupLeader(polarity) => {
2089                            mt.kind = bpf_intf::layer_match_kind_MATCH_IS_GROUP_LEADER as i32;
2090                            mt.is_group_leader.write(*polarity);
2091                        }
2092                        LayerMatch::IsKthread(polarity) => {
2093                            mt.kind = bpf_intf::layer_match_kind_MATCH_IS_KTHREAD as i32;
2094                            mt.is_kthread.write(*polarity);
2095                        }
2096                        LayerMatch::UsedGpuTid(polarity) => {
2097                            mt.kind = bpf_intf::layer_match_kind_MATCH_USED_GPU_TID as i32;
2098                            mt.used_gpu_tid.write(*polarity);
2099                        }
2100                        LayerMatch::UsedGpuPid(polarity) => {
2101                            mt.kind = bpf_intf::layer_match_kind_MATCH_USED_GPU_PID as i32;
2102                            mt.used_gpu_pid.write(*polarity);
2103                        }
2104                        LayerMatch::AvgRuntime(min, max) => {
2105                            mt.kind = bpf_intf::layer_match_kind_MATCH_AVG_RUNTIME as i32;
2106                            mt.min_avg_runtime_us = *min;
2107                            mt.max_avg_runtime_us = *max;
2108                        }
2109                        LayerMatch::HintEquals(hint) => {
2110                            mt.kind = bpf_intf::layer_match_kind_MATCH_HINT_EQUALS as i32;
2111                            mt.hint = *hint;
2112                        }
2113                        LayerMatch::SystemCpuUtilBelow(threshold) => {
2114                            mt.kind = bpf_intf::layer_match_kind_MATCH_SYSTEM_CPU_UTIL_BELOW as i32;
2115                            mt.system_cpu_util_below = (*threshold * 10000.0) as u64;
2116                        }
2117                        LayerMatch::DsqInsertBelow(threshold) => {
2118                            mt.kind = bpf_intf::layer_match_kind_MATCH_DSQ_INSERT_BELOW as i32;
2119                            mt.dsq_insert_below = (*threshold * 10000.0) as u64;
2120                        }
2121                        LayerMatch::NumaNode(node_id) => {
2122                            if *node_id as usize >= topo.nodes.len() {
2123                                bail!(
2124                                    "Spec {:?} has invalid NUMA node ID {} (available nodes: 0-{})",
2125                                    spec.name,
2126                                    node_id,
2127                                    topo.nodes.len() - 1
2128                                );
2129                            }
2130                            mt.kind = bpf_intf::layer_match_kind_MATCH_NUMA_NODE as i32;
2131                            mt.numa_node_id = *node_id;
2132                        }
2133                    }
2134                }
2135                layer.matches[or_i].nr_match_ands = or.len() as i32;
2136            }
2137
2138            layer.nr_match_ors = spec.matches.len() as u32;
2139            layer.kind = spec.kind.as_bpf_enum();
2140
2141            {
2142                let LayerCommon {
2143                    min_exec_us,
2144                    yield_ignore,
2145                    perf,
2146                    preempt,
2147                    preempt_first,
2148                    exclusive,
2149                    skip_remote_node,
2150                    prev_over_idle_core,
2151                    growth_algo,
2152                    slice_us,
2153                    fifo,
2154                    weight,
2155                    disallow_open_after_us,
2156                    disallow_preempt_after_us,
2157                    xllc_mig_min_us,
2158                    placement,
2159                    member_expire_ms,
2160                    ..
2161                } = spec.kind.common();
2162
2163                layer.slice_ns = *slice_us * 1000;
2164                layer.fifo.write(*fifo);
2165                layer.min_exec_ns = min_exec_us * 1000;
2166                layer.yield_step_ns = if *yield_ignore > 0.999 {
2167                    0
2168                } else if *yield_ignore < 0.001 {
2169                    layer.slice_ns
2170                } else {
2171                    (layer.slice_ns as f64 * (1.0 - *yield_ignore)) as u64
2172                };
2173                let mut layer_name: String = spec.name.clone();
2174                layer_name.truncate(MAX_LAYER_NAME);
2175                copy_into_cstr(&mut layer.name, layer_name.as_str());
2176                layer.preempt.write(*preempt);
2177                layer.preempt_first.write(*preempt_first);
2178                layer.excl.write(*exclusive);
2179                layer.skip_remote_node.write(*skip_remote_node);
2180                layer.prev_over_idle_core.write(*prev_over_idle_core);
2181                layer.growth_algo = growth_algo.as_bpf_enum();
2182                layer.weight = *weight;
2183                layer.member_expire_ms = *member_expire_ms;
2184                layer.disallow_open_after_ns = match disallow_open_after_us.unwrap() {
2185                    v if v == u64::MAX => v,
2186                    v => v * 1000,
2187                };
2188                layer.disallow_preempt_after_ns = match disallow_preempt_after_us.unwrap() {
2189                    v if v == u64::MAX => v,
2190                    v => v * 1000,
2191                };
2192                layer.xllc_mig_min_ns = (xllc_mig_min_us * 1000.0) as u64;
2193                layer_weights.push(layer.weight.try_into().unwrap());
2194                layer.perf = u32::try_from(*perf)?;
2195
2196                let task_place = |place: u32| crate::types::layer_task_place(place);
2197                layer.task_place = match placement {
2198                    LayerPlacement::Standard => {
2199                        task_place(bpf_intf::layer_task_place_PLACEMENT_STD)
2200                    }
2201                    LayerPlacement::Sticky => {
2202                        task_place(bpf_intf::layer_task_place_PLACEMENT_STICK)
2203                    }
2204                    LayerPlacement::Floating => {
2205                        task_place(bpf_intf::layer_task_place_PLACEMENT_FLOAT)
2206                    }
2207                };
2208            }
2209
2210            layer.is_protected.write(match spec.kind {
2211                LayerKind::Open { .. } => false,
2212                LayerKind::Confined { protected, .. } | LayerKind::Grouped { protected, .. } => {
2213                    protected
2214                }
2215            });
2216
2217            layer.idle_confined.write(match spec.kind {
2218                LayerKind::Grouped { idle_confined, .. } => idle_confined,
2219                _ => false,
2220            });
2221
2222            match &spec.cpuset {
2223                Some(mask) => {
2224                    Self::update_cpumask(mask, &mut layer.cpuset);
2225                    layer.has_cpuset.write(true);
2226                }
2227                None => {
2228                    for i in 0..layer.cpuset.len() {
2229                        layer.cpuset[i] = u8::MAX;
2230                    }
2231                    layer.has_cpuset.write(false);
2232                }
2233            };
2234
2235            perf_set |= layer.perf > 0;
2236        }
2237
2238        layer_iteration_order.sort_by(|i, j| layer_weights[*i].cmp(&layer_weights[*j]));
2239        for (idx, layer_idx) in layer_iteration_order.iter().enumerate() {
2240            skel.maps
2241                .rodata_data
2242                .as_mut()
2243                .unwrap()
2244                .layer_iteration_order[idx] = *layer_idx as u32;
2245        }
2246
2247        if perf_set && !compat::ksym_exists("scx_bpf_cpuperf_set")? {
2248            warn!("cpufreq support not available, ignoring perf configurations");
2249        }
2250
2251        Ok(cgroup_regexes)
2252    }
2253
2254    fn init_nodes(skel: &mut OpenBpfSkel, _opts: &Opts, topo: &Topology) {
2255        skel.maps.rodata_data.as_mut().unwrap().nr_nodes = topo.nodes.len() as u32;
2256        skel.maps.rodata_data.as_mut().unwrap().nr_llcs = 0;
2257
2258        for (&node_id, node) in &topo.nodes {
2259            debug!("configuring node {}, LLCs {:?}", node_id, node.llcs.len());
2260            skel.maps.rodata_data.as_mut().unwrap().nr_llcs += node.llcs.len() as u32;
2261            let raw_numa_slice = node.span.as_raw_slice();
2262            let node_cpumask_slice =
2263                &mut skel.maps.rodata_data.as_mut().unwrap().numa_cpumasks[node_id];
2264            let (left, _) = node_cpumask_slice.split_at_mut(raw_numa_slice.len());
2265            left.clone_from_slice(raw_numa_slice);
2266            debug!(
2267                "node {} mask: {:?}",
2268                node_id,
2269                skel.maps.rodata_data.as_ref().unwrap().numa_cpumasks[node_id]
2270            );
2271
2272            for llc in node.llcs.values() {
2273                debug!("configuring llc {:?} for node {:?}", llc.id, node_id);
2274                skel.maps.rodata_data.as_mut().unwrap().llc_numa_id_map[llc.id] = node_id as u32;
2275            }
2276        }
2277
2278        for cpu in topo.all_cpus.values() {
2279            skel.maps.rodata_data.as_mut().unwrap().cpu_llc_id_map[cpu.id] = cpu.llc_id as u32;
2280        }
2281    }
2282
2283    fn init_cpu_prox_map(topo: &Topology, cpu_ctxs: &mut [bpf_intf::cpu_ctx]) {
2284        let radiate = |mut vec: Vec<usize>, center_id: usize| -> Vec<usize> {
2285            vec.sort_by_key(|&id| (center_id as i32 - id as i32).abs());
2286            vec
2287        };
2288        let radiate_cpu =
2289            |mut vec: Vec<usize>, center_cpu: usize, center_core: usize| -> Vec<usize> {
2290                vec.sort_by_key(|&id| {
2291                    (
2292                        (center_core as i32 - topo.all_cpus.get(&id).unwrap().core_id as i32).abs(),
2293                        (center_cpu as i32 - id as i32).abs(),
2294                    )
2295                });
2296                vec
2297            };
2298
2299        for (&cpu_id, cpu) in &topo.all_cpus {
2300            // Collect the spans.
2301            let mut core_span = topo.all_cores[&cpu.core_id].span.clone();
2302            let llc_span = &topo.all_llcs[&cpu.llc_id].span;
2303            let node_span = &topo.nodes[&cpu.node_id].span;
2304            let sys_span = &topo.span;
2305
2306            // Make the spans exclusive.
2307            let sys_span = sys_span.and(&node_span.not());
2308            let node_span = node_span.and(&llc_span.not());
2309            let llc_span = llc_span.and(&core_span.not());
2310            core_span.clear_cpu(cpu_id).unwrap();
2311
2312            // Convert them into arrays.
2313            let mut sys_order: Vec<usize> = sys_span.iter().collect();
2314            let mut node_order: Vec<usize> = node_span.iter().collect();
2315            let mut llc_order: Vec<usize> = llc_span.iter().collect();
2316            let mut core_order: Vec<usize> = core_span.iter().collect();
2317
2318            // Shuffle them so that different CPUs follow different orders.
2319            // Each CPU radiates in both directions based on the cpu id and
2320            // radiates out to the closest cores based on core ids.
2321
2322            sys_order = radiate_cpu(sys_order, cpu_id, cpu.core_id);
2323            node_order = radiate(node_order, cpu.node_id);
2324            llc_order = radiate_cpu(llc_order, cpu_id, cpu.core_id);
2325            core_order = radiate_cpu(core_order, cpu_id, cpu.core_id);
2326
2327            // Concatenate and record the topology boundaries.
2328            let mut order: Vec<usize> = vec![];
2329            let mut idx: usize = 0;
2330
2331            idx += 1;
2332            order.push(cpu_id);
2333
2334            idx += core_order.len();
2335            order.append(&mut core_order);
2336            let core_end = idx;
2337
2338            idx += llc_order.len();
2339            order.append(&mut llc_order);
2340            let llc_end = idx;
2341
2342            idx += node_order.len();
2343            order.append(&mut node_order);
2344            let node_end = idx;
2345
2346            idx += sys_order.len();
2347            order.append(&mut sys_order);
2348            let sys_end = idx;
2349
2350            debug!(
2351                "CPU[{}] proximity map[{}/{}/{}/{}]: {:?}",
2352                cpu_id, core_end, llc_end, node_end, sys_end, &order
2353            );
2354
2355            // Record in cpu_ctx.
2356            let pmap = &mut cpu_ctxs[cpu_id].prox_map;
2357            for (i, &cpu) in order.iter().enumerate() {
2358                pmap.cpus[i] = cpu as u16;
2359            }
2360            pmap.core_end = core_end as u32;
2361            pmap.llc_end = llc_end as u32;
2362            pmap.node_end = node_end as u32;
2363            pmap.sys_end = sys_end as u32;
2364        }
2365    }
2366
2367    fn convert_cpu_ctxs(cpu_ctxs: Vec<bpf_intf::cpu_ctx>) -> Vec<Vec<u8>> {
2368        cpu_ctxs
2369            .iter()
2370            .map(|cpu_ctx| unsafe { plain::as_bytes(cpu_ctx) }.to_vec())
2371            .collect()
2372    }
2373
2374    fn init_cpus(skel: &BpfSkel, layer_specs: &[LayerSpec], topo: &Topology) -> Result<()> {
2375        let key = (0_u32).to_ne_bytes();
2376        let mut cpu_ctxs: Vec<bpf_intf::cpu_ctx> = vec![];
2377        let cpu_ctxs_vec = skel
2378            .maps
2379            .cpu_ctxs
2380            .lookup_percpu(&key, libbpf_rs::MapFlags::ANY)
2381            .context("Failed to lookup cpu_ctx")?
2382            .unwrap();
2383
2384        let op_layers: Vec<u32> = layer_specs
2385            .iter()
2386            .enumerate()
2387            .filter(|(_idx, spec)| match &spec.kind {
2388                LayerKind::Open { .. } => spec.kind.common().preempt,
2389                _ => false,
2390            })
2391            .map(|(idx, _)| idx as u32)
2392            .collect();
2393        let on_layers: Vec<u32> = layer_specs
2394            .iter()
2395            .enumerate()
2396            .filter(|(_idx, spec)| match &spec.kind {
2397                LayerKind::Open { .. } => !spec.kind.common().preempt,
2398                _ => false,
2399            })
2400            .map(|(idx, _)| idx as u32)
2401            .collect();
2402        let gp_layers: Vec<u32> = layer_specs
2403            .iter()
2404            .enumerate()
2405            .filter(|(_idx, spec)| match &spec.kind {
2406                LayerKind::Grouped { .. } => spec.kind.common().preempt,
2407                _ => false,
2408            })
2409            .map(|(idx, _)| idx as u32)
2410            .collect();
2411        let gn_layers: Vec<u32> = layer_specs
2412            .iter()
2413            .enumerate()
2414            .filter(|(_idx, spec)| match &spec.kind {
2415                LayerKind::Grouped { .. } => !spec.kind.common().preempt,
2416                _ => false,
2417            })
2418            .map(|(idx, _)| idx as u32)
2419            .collect();
2420
2421        // FIXME - this incorrectly assumes all possible CPUs are consecutive.
2422        for cpu in 0..*NR_CPUS_POSSIBLE {
2423            cpu_ctxs.push(
2424                *plain::from_bytes(cpu_ctxs_vec[cpu].as_slice())
2425                    .expect("cpu_ctx: short or misaligned buffer"),
2426            );
2427
2428            let topo_cpu = topo.all_cpus.get(&cpu).unwrap();
2429            let is_big = topo_cpu.core_type == CoreType::Big { turbo: true };
2430            cpu_ctxs[cpu].cpu = cpu as i32;
2431            cpu_ctxs[cpu].layer_id = MAX_LAYERS as u32;
2432            cpu_ctxs[cpu].is_big = is_big;
2433
2434            fastrand::seed(cpu as u64);
2435
2436            let mut ogp_order = op_layers.clone();
2437            ogp_order.append(&mut gp_layers.clone());
2438            fastrand::shuffle(&mut ogp_order);
2439
2440            let mut ogn_order = on_layers.clone();
2441            ogn_order.append(&mut gn_layers.clone());
2442            fastrand::shuffle(&mut ogn_order);
2443
2444            let mut op_order = op_layers.clone();
2445            fastrand::shuffle(&mut op_order);
2446
2447            let mut on_order = on_layers.clone();
2448            fastrand::shuffle(&mut on_order);
2449
2450            let mut gp_order = gp_layers.clone();
2451            fastrand::shuffle(&mut gp_order);
2452
2453            let mut gn_order = gn_layers.clone();
2454            fastrand::shuffle(&mut gn_order);
2455
2456            for i in 0..MAX_LAYERS {
2457                cpu_ctxs[cpu].ogp_layer_order[i] =
2458                    ogp_order.get(i).cloned().unwrap_or(MAX_LAYERS as u32);
2459                cpu_ctxs[cpu].ogn_layer_order[i] =
2460                    ogn_order.get(i).cloned().unwrap_or(MAX_LAYERS as u32);
2461
2462                cpu_ctxs[cpu].op_layer_order[i] =
2463                    op_order.get(i).cloned().unwrap_or(MAX_LAYERS as u32);
2464                cpu_ctxs[cpu].on_layer_order[i] =
2465                    on_order.get(i).cloned().unwrap_or(MAX_LAYERS as u32);
2466                cpu_ctxs[cpu].gp_layer_order[i] =
2467                    gp_order.get(i).cloned().unwrap_or(MAX_LAYERS as u32);
2468                cpu_ctxs[cpu].gn_layer_order[i] =
2469                    gn_order.get(i).cloned().unwrap_or(MAX_LAYERS as u32);
2470            }
2471        }
2472
2473        Self::init_cpu_prox_map(topo, &mut cpu_ctxs);
2474
2475        skel.maps
2476            .cpu_ctxs
2477            .update_percpu(
2478                &key,
2479                &Self::convert_cpu_ctxs(cpu_ctxs),
2480                libbpf_rs::MapFlags::ANY,
2481            )
2482            .context("Failed to update cpu_ctx")?;
2483
2484        Ok(())
2485    }
2486
2487    fn init_single_prox_map_per_llc(
2488        skel: &mut BpfSkel,
2489        topo: &Topology,
2490        prox_map_idx: &usize,
2491    ) -> Result<()> {
2492        for (&llc_id, llc) in &topo.all_llcs {
2493            // Collect the orders.
2494            let mut node_order: Vec<usize> =
2495                topo.nodes[&llc.node_id].llcs.keys().cloned().collect();
2496            let mut sys_order: Vec<usize> = topo.all_llcs.keys().cloned().collect();
2497
2498            // Make the orders exclusive.
2499            sys_order.retain(|id| !node_order.contains(id));
2500            node_order.retain(|&id| id != llc_id);
2501
2502            // Shuffle so that different LLCs follow different orders. See
2503            // init_cpu_prox_map().
2504            fastrand::seed((*prox_map_idx as u64) << 32 | llc_id as u64);
2505            fastrand::shuffle(&mut sys_order);
2506            fastrand::shuffle(&mut node_order);
2507
2508            // Concatenate and record the node boundary.
2509            let mut order: Vec<usize> = vec![];
2510            let mut idx: usize = 0;
2511
2512            idx += 1;
2513            order.push(llc_id);
2514
2515            idx += node_order.len();
2516            order.append(&mut node_order);
2517            let node_end = idx;
2518
2519            idx += sys_order.len();
2520            order.append(&mut sys_order);
2521            let sys_end = idx;
2522
2523            debug!(
2524                "LLC[{}] proximity map {}[{}/{}]: {:?}",
2525                llc_id, prox_map_idx, node_end, sys_end, &order
2526            );
2527
2528            // Record in llc_ctx.
2529            //
2530            // XXX - This would be a lot easier if llc_ctx were in the bss.
2531            // See BpfStats::read().
2532            let key = (llc_id as u32).to_ne_bytes();
2533            let v = skel
2534                .maps
2535                .llc_data
2536                .lookup(&key, libbpf_rs::MapFlags::ANY)
2537                .unwrap()
2538                .unwrap();
2539            let mut llcc: bpf_intf::llc_ctx =
2540                *plain::from_bytes(v.as_slice()).expect("llc_ctx: short or misaligned buffer");
2541
2542            let pmap = &mut llcc.prox_maps[*prox_map_idx];
2543            for (i, &llc_id) in order.iter().enumerate() {
2544                pmap.llcs[i] = llc_id as u16;
2545            }
2546            pmap.node_end = node_end as u32;
2547            pmap.sys_end = sys_end as u32;
2548
2549            skel.maps.llc_data.update(
2550                &key,
2551                unsafe { plain::as_bytes(&llcc) },
2552                libbpf_rs::MapFlags::ANY,
2553            )?
2554        }
2555
2556        Ok(())
2557    }
2558
2559    fn init_llc_prox_map(skel: &mut BpfSkel, topo: &Topology) -> Result<()> {
2560        let num_proximity_maps = bpf_intf::consts_NUM_PROXIMITY_MAPS as usize;
2561        for prox_map_idx in 0..num_proximity_maps {
2562            Self::init_single_prox_map_per_llc(skel, topo, &prox_map_idx)?;
2563        }
2564
2565        Ok(())
2566    }
2567
2568    fn init_node_prox_map(skel: &mut BpfSkel, topo: &Topology) -> Result<()> {
2569        for (&node_id, node) in &topo.nodes {
2570            let mut order: Vec<(usize, usize)> = node
2571                .distance
2572                .iter()
2573                .enumerate()
2574                .filter(|&(nid, _)| nid != node_id)
2575                .map(|(nid, &dist)| (nid, dist))
2576                .collect();
2577            order.sort_by_key(|&(_, dist)| dist);
2578
2579            let key = (node_id as u32).to_ne_bytes();
2580
2581            // The map entry may not exist yet — create a zeroed one.
2582            let v = skel.maps.node_data.lookup(&key, libbpf_rs::MapFlags::ANY);
2583            let mut nodec: bpf_intf::node_ctx = match v {
2584                Ok(Some(v)) => {
2585                    *plain::from_bytes(v.as_slice()).expect("node_ctx: short or misaligned buffer")
2586                }
2587                _ => unsafe { MaybeUninit::zeroed().assume_init() },
2588            };
2589
2590            let pmap = &mut nodec.prox_map;
2591            for (i, &(nid, _)) in order.iter().enumerate() {
2592                pmap.nodes[i] = nid as u16;
2593            }
2594            pmap.sys_end = order.len() as u32;
2595
2596            debug!(
2597                "NODE[{}] prox_map[{}]: {:?}",
2598                node_id,
2599                pmap.sys_end,
2600                &order.iter().map(|(n, d)| (*n, *d)).collect::<Vec<_>>()
2601            );
2602
2603            skel.maps.node_data.update(
2604                &key,
2605                unsafe { plain::as_bytes(&nodec) },
2606                libbpf_rs::MapFlags::ANY,
2607            )?;
2608        }
2609        Ok(())
2610    }
2611
2612    fn init_node_ctx(skel: &mut BpfSkel, topo: &Topology, nr_layers: usize) -> Result<()> {
2613        let all_layers: Vec<u32> = (0..nr_layers as u32).collect();
2614        let node_empty_layers: Vec<Vec<u32>> =
2615            (0..topo.nodes.len()).map(|_| all_layers.clone()).collect();
2616        Self::refresh_node_ctx(skel, topo, &node_empty_layers, true);
2617        Ok(())
2618    }
2619
2620    fn init(
2621        opts: &'a Opts,
2622        layer_specs: &[LayerSpec],
2623        open_object: &'a mut MaybeUninit<OpenObject>,
2624        hint_to_layer_map: &HashMap<u64, HintLayerInfo>,
2625        membw_tracking: bool,
2626    ) -> Result<Self> {
2627        let nr_layers = layer_specs.len();
2628        let mut disable_topology = opts.disable_topology.unwrap_or(false);
2629
2630        let topo = Arc::new(if disable_topology {
2631            Topology::with_flattened_llc_node()?
2632        } else if opts.topology.virt_llc.is_some() {
2633            Topology::with_args(&opts.topology)?
2634        } else {
2635            Topology::new()?
2636        });
2637
2638        /*
2639         * FIXME: scx_layered incorrectly assumes that node, LLC and CPU IDs
2640         * are consecutive. Verify that they are on this system and bail if
2641         * not. It's lucky that core ID is not used anywhere as core IDs are
2642         * not consecutive on some Ryzen CPUs.
2643         */
2644        if topo.nodes.keys().enumerate().any(|(i, &k)| i != k) {
2645            bail!("Holes in node IDs detected: {:?}", topo.nodes.keys());
2646        }
2647        if topo.all_llcs.keys().enumerate().any(|(i, &k)| i != k) {
2648            bail!("Holes in LLC IDs detected: {:?}", topo.all_llcs.keys());
2649        }
2650        if topo.all_cpus.keys().enumerate().any(|(i, &k)| i != k) {
2651            bail!("Holes in CPU IDs detected: {:?}", topo.all_cpus.keys());
2652        }
2653
2654        let netdevs = if opts.netdev_irq_balance {
2655            let devs = read_netdevs()?;
2656            let total_irqs: usize = devs.values().map(|d| d.irqs.len()).sum();
2657            let breakdown = devs
2658                .iter()
2659                .map(|(iface, d)| format!("{iface}={}", d.irqs.len()))
2660                .collect::<Vec<_>>()
2661                .join(", ");
2662            info!(
2663                "Netdev IRQ balancing enabled: overriding {total_irqs} IRQ{} \
2664                 across {} interface{} [{breakdown}]",
2665                if total_irqs == 1 { "" } else { "s" },
2666                devs.len(),
2667                if devs.len() == 1 { "" } else { "s" },
2668            );
2669            devs
2670        } else {
2671            BTreeMap::new()
2672        };
2673
2674        if !disable_topology {
2675            if topo.nodes.len() == 1 && topo.nodes[&0].llcs.len() == 1 {
2676                disable_topology = true;
2677            };
2678            info!(
2679                "Topology awareness not specified, selecting {} based on hardware",
2680                if disable_topology {
2681                    "disabled"
2682                } else {
2683                    "enabled"
2684                }
2685            );
2686        };
2687
2688        let cpu_pool = CpuPool::new(topo.clone(), opts.allow_partial_core)?;
2689
2690        // If disabling topology awareness clear out any set NUMA/LLC configs and
2691        // it will fallback to using all cores.
2692        let layer_specs: Vec<_> = if disable_topology {
2693            info!("Disabling topology awareness");
2694            layer_specs
2695                .iter()
2696                .cloned()
2697                .map(|mut s| {
2698                    s.kind.common_mut().nodes.clear();
2699                    s.kind.common_mut().llcs.clear();
2700                    s
2701                })
2702                .collect()
2703        } else {
2704            layer_specs.to_vec()
2705        };
2706
2707        // Validate that spec node/LLC references exist in the topology.
2708        for spec in layer_specs.iter() {
2709            let mut seen = BTreeSet::new();
2710            for &node_id in spec.nodes().iter() {
2711                if !topo.nodes.contains_key(&node_id) {
2712                    bail!(
2713                        "layer {:?}: nodes references node {} which does not \
2714                         exist in the topology (available: {:?})",
2715                        spec.name,
2716                        node_id,
2717                        topo.nodes.keys().collect::<Vec<_>>()
2718                    );
2719                }
2720                if !seen.insert(node_id) {
2721                    bail!(
2722                        "layer {:?}: nodes contains duplicate node {}",
2723                        spec.name,
2724                        node_id
2725                    );
2726                }
2727            }
2728
2729            seen.clear();
2730            for &llc_id in spec.llcs().iter() {
2731                if !topo.all_llcs.contains_key(&llc_id) {
2732                    bail!(
2733                        "layer {:?}: llcs references LLC {} which does not \
2734                         exist in the topology (available: {:?})",
2735                        spec.name,
2736                        llc_id,
2737                        topo.all_llcs.keys().collect::<Vec<_>>()
2738                    );
2739                }
2740                if !seen.insert(llc_id) {
2741                    bail!(
2742                        "layer {:?}: llcs contains duplicate LLC {}",
2743                        spec.name,
2744                        llc_id
2745                    );
2746                }
2747            }
2748        }
2749
2750        for spec in layer_specs.iter() {
2751            let has_numa_node_match = spec
2752                .matches
2753                .iter()
2754                .flatten()
2755                .any(|m| matches!(m, LayerMatch::NumaNode(_)));
2756            let has_node_spread_algo = matches!(
2757                spec.kind.common().growth_algo,
2758                LayerGrowthAlgo::NodeSpread
2759                    | LayerGrowthAlgo::NodeSpreadReverse
2760                    | LayerGrowthAlgo::NodeSpreadRandom
2761            );
2762            if has_numa_node_match && has_node_spread_algo {
2763                bail!(
2764                    "layer {:?}: NumaNode matcher cannot be combined with {:?} \
2765                     growth algorithm. NodeSpread* allocates CPUs equally across \
2766                     ALL NUMA nodes, but NumaNode restricts tasks to one node's \
2767                     CPUs — CPUs on other nodes are wasted and utilization \
2768                     will never exceed 1/numa_nodes. Use a non-spread algorithm \
2769                     (e.g. Linear, Topo) instead.",
2770                    spec.name,
2771                    spec.kind.common().growth_algo
2772                );
2773            }
2774        }
2775
2776        // Check kernel features
2777        init_libbpf_logging(None);
2778        let kfuncs_in_syscall = scx_bpf_compat::kfuncs_supported_in_syscall()?;
2779        if !kfuncs_in_syscall {
2780            warn!("Using slow path: kfuncs not supported in syscall programs (a8e03b6bbb2c ∉ ker)");
2781        }
2782
2783        // Open the BPF prog first for verification.
2784        let debug_level = if opts.log_level.contains("trace") {
2785            2
2786        } else if opts.log_level.contains("debug") {
2787            1
2788        } else {
2789            0
2790        };
2791        let mut skel_builder = BpfSkelBuilder::default();
2792        skel_builder.obj_builder.debug(debug_level > 1);
2793
2794        info!(
2795            "Running scx_layered (build ID: {})",
2796            build_id::full_version(env!("CARGO_PKG_VERSION"))
2797        );
2798        let open_opts = opts.libbpf.clone().into_bpf_open_opts();
2799        let mut skel = scx_ops_open!(skel_builder, open_object, layered, open_opts)?;
2800
2801        // No memory BW tracking by default
2802        skel.progs.scx_pmu_switch_tc.set_autoload(membw_tracking);
2803        skel.progs.scx_pmu_tick_tc.set_autoload(membw_tracking);
2804
2805        let mut loaded_kprobes = HashSet::new();
2806
2807        // enable autoloads for conditionally loaded things
2808        // immediately after creating skel (because this is always before loading)
2809        if opts.enable_gpu_support {
2810            // by default, enable open if gpu support is enabled.
2811            // open has been observed to be relatively cheap to kprobe.
2812            if opts.gpu_kprobe_level >= 1 {
2813                compat::cond_kprobe_load("nvidia_open", &skel.progs.kprobe_nvidia_open)?;
2814                loaded_kprobes.insert("nvidia_open");
2815            }
2816            // enable the rest progressively based upon how often they are called
2817            // for observed workloads
2818            if opts.gpu_kprobe_level >= 2 {
2819                compat::cond_kprobe_load("nvidia_mmap", &skel.progs.kprobe_nvidia_mmap)?;
2820                loaded_kprobes.insert("nvidia_mmap");
2821            }
2822            if opts.gpu_kprobe_level >= 3 {
2823                compat::cond_kprobe_load("nvidia_poll", &skel.progs.kprobe_nvidia_poll)?;
2824                loaded_kprobes.insert("nvidia_poll");
2825            }
2826        }
2827
2828        let ext_sched_class_addr = get_kallsyms_addr("ext_sched_class");
2829        let idle_sched_class_addr = get_kallsyms_addr("idle_sched_class");
2830
2831        let event = if membw_tracking {
2832            setup_membw_tracking(&mut skel)?
2833        } else {
2834            0
2835        };
2836
2837        let rodata = skel.maps.rodata_data.as_mut().unwrap();
2838
2839        if let (Ok(ext_addr), Ok(idle_addr)) = (ext_sched_class_addr, idle_sched_class_addr) {
2840            rodata.ext_sched_class_addr = ext_addr;
2841            rodata.idle_sched_class_addr = idle_addr;
2842        } else {
2843            warn!(
2844                "Unable to get sched_class addresses from /proc/kallsyms, disabling skip_preempt."
2845            );
2846        }
2847
2848        rodata.slice_ns = scx_enums.SCX_SLICE_DFL;
2849        rodata.max_exec_ns = 20 * scx_enums.SCX_SLICE_DFL;
2850
2851        // Initialize skel according to @opts.
2852        skel.struct_ops.layered_mut().exit_dump_len = opts.exit_dump_len;
2853
2854        if !opts.disable_queued_wakeup {
2855            match *compat::SCX_OPS_ALLOW_QUEUED_WAKEUP {
2856                0 => info!("Kernel does not support queued wakeup optimization"),
2857                v => skel.struct_ops.layered_mut().flags |= v,
2858            }
2859        }
2860
2861        rodata.percpu_kthread_preempt = !opts.disable_percpu_kthread_preempt;
2862        rodata.percpu_kthread_preempt_all =
2863            !opts.disable_percpu_kthread_preempt && opts.percpu_kthread_preempt_all;
2864        rodata.debug = debug_level as u32;
2865        rodata.slice_ns = opts.slice_us * 1000;
2866        rodata.max_exec_ns = if opts.max_exec_us > 0 {
2867            opts.max_exec_us * 1000
2868        } else {
2869            opts.slice_us * 1000 * 20
2870        };
2871        rodata.nr_cpu_ids = *NR_CPU_IDS as u32;
2872        rodata.nr_possible_cpus = *NR_CPUS_POSSIBLE as u32;
2873        rodata.smt_enabled = topo.smt_enabled;
2874        rodata.has_little_cores = topo.has_little_cores();
2875        rodata.antistall_sec = opts.antistall_sec;
2876        rodata.monitor_disable = opts.monitor_disable;
2877        rodata.lo_fb_wait_ns = opts.lo_fb_wait_us * 1000;
2878        rodata.lo_fb_share_ppk = ((opts.lo_fb_share * 1024.0) as u32).clamp(1, 1024);
2879        rodata.enable_antistall = !opts.disable_antistall;
2880        rodata.enable_match_debug = opts.enable_match_debug;
2881        rodata.enable_gpu_support = opts.enable_gpu_support;
2882        rodata.kfuncs_supported_in_syscall = kfuncs_in_syscall;
2883
2884        for (cpu, sib) in topo.sibling_cpus().iter().enumerate() {
2885            rodata.__sibling_cpu[cpu] = *sib;
2886        }
2887        for cpu in topo.all_cpus.keys() {
2888            rodata.all_cpus[cpu / 8] |= 1 << (cpu % 8);
2889        }
2890
2891        rodata.nr_op_layers = layer_specs
2892            .iter()
2893            .filter(|spec| match &spec.kind {
2894                LayerKind::Open { .. } => spec.kind.common().preempt,
2895                _ => false,
2896            })
2897            .count() as u32;
2898        rodata.nr_on_layers = layer_specs
2899            .iter()
2900            .filter(|spec| match &spec.kind {
2901                LayerKind::Open { .. } => !spec.kind.common().preempt,
2902                _ => false,
2903            })
2904            .count() as u32;
2905        rodata.nr_gp_layers = layer_specs
2906            .iter()
2907            .filter(|spec| match &spec.kind {
2908                LayerKind::Grouped { .. } => spec.kind.common().preempt,
2909                _ => false,
2910            })
2911            .count() as u32;
2912        rodata.nr_gn_layers = layer_specs
2913            .iter()
2914            .filter(|spec| match &spec.kind {
2915                LayerKind::Grouped { .. } => !spec.kind.common().preempt,
2916                _ => false,
2917            })
2918            .count() as u32;
2919        rodata.nr_excl_layers = layer_specs
2920            .iter()
2921            .filter(|spec| spec.kind.common().exclusive)
2922            .count() as u32;
2923
2924        let mut min_open = u64::MAX;
2925        let mut min_preempt = u64::MAX;
2926
2927        for spec in layer_specs.iter() {
2928            if let LayerKind::Open { common, .. } = &spec.kind {
2929                min_open = min_open.min(common.disallow_open_after_us.unwrap());
2930                min_preempt = min_preempt.min(common.disallow_preempt_after_us.unwrap());
2931            }
2932        }
2933
2934        rodata.min_open_layer_disallow_open_after_ns = match min_open {
2935            u64::MAX => *DFL_DISALLOW_OPEN_AFTER_US,
2936            v => v,
2937        };
2938        rodata.min_open_layer_disallow_preempt_after_ns = match min_preempt {
2939            u64::MAX => *DFL_DISALLOW_PREEMPT_AFTER_US,
2940            v => v,
2941        };
2942
2943        // We set the pin path before loading the skeleton. This will ensure
2944        // libbpf creates and pins the map, or reuses the pinned map fd for us,
2945        // so that we can keep reusing the older map already pinned on scheduler
2946        // restarts.
2947        let layered_task_hint_map_path = &opts.task_hint_map;
2948        let hint_map = &mut skel.maps.scx_layered_task_hint_map;
2949        // Only set pin path if a path is provided.
2950        if !layered_task_hint_map_path.is_empty() {
2951            hint_map.set_pin_path(layered_task_hint_map_path).unwrap();
2952            rodata.task_hint_map_enabled = true;
2953        }
2954
2955        if !opts.hi_fb_thread_name.is_empty() {
2956            let bpf_hi_fb_thread_name = &mut rodata.hi_fb_thread_name;
2957            copy_into_cstr(bpf_hi_fb_thread_name, opts.hi_fb_thread_name.as_str());
2958            rodata.enable_hi_fb_thread_name_match = true;
2959        }
2960
2961        let cgroup_regexes = Self::init_layers(&mut skel, &layer_specs, &topo)?;
2962        skel.maps.rodata_data.as_mut().unwrap().nr_cgroup_regexes = cgroup_regexes.len() as u32;
2963        Self::init_nodes(&mut skel, opts, &topo);
2964
2965        let mut skel = scx_ops_load!(skel, layered, uei)?;
2966
2967        // Populate the mapping of hints to layer IDs for faster lookups
2968        if !hint_to_layer_map.is_empty() {
2969            for (k, v) in hint_to_layer_map.iter() {
2970                let key: u32 = *k as u32;
2971
2972                // Create hint_layer_info struct
2973                let mut info_bytes = vec![0u8; std::mem::size_of::<bpf_intf::hint_layer_info>()];
2974                let info_ptr = info_bytes.as_mut_ptr() as *mut bpf_intf::hint_layer_info;
2975                unsafe {
2976                    (*info_ptr).layer_id = v.layer_id as u32;
2977                    (*info_ptr).system_cpu_util_below = match v.system_cpu_util_below {
2978                        Some(threshold) => (threshold * 10000.0) as u64,
2979                        None => u64::MAX, // disabled sentinel
2980                    };
2981                    (*info_ptr).dsq_insert_below = match v.dsq_insert_below {
2982                        Some(threshold) => (threshold * 10000.0) as u64,
2983                        None => u64::MAX, // disabled sentinel
2984                    };
2985                }
2986
2987                skel.maps.hint_to_layer_id_map.update(
2988                    &key.to_ne_bytes(),
2989                    &info_bytes,
2990                    libbpf_rs::MapFlags::ANY,
2991                )?;
2992            }
2993        }
2994
2995        if membw_tracking {
2996            create_perf_fds(&mut skel, event)?;
2997        }
2998
2999        let mut layers = vec![];
3000        let layer_growth_orders =
3001            LayerGrowthAlgo::layer_core_orders(&cpu_pool, &layer_specs, &topo)?;
3002        for (idx, spec) in layer_specs.iter().enumerate() {
3003            let growth_order = layer_growth_orders
3004                .get(&idx)
3005                .with_context(|| "layer has no growth order".to_string())?;
3006            layers.push(Layer::new(spec, &topo, growth_order)?);
3007        }
3008
3009        let mut idle_qos_enabled = layers
3010            .iter()
3011            .any(|layer| layer.kind.common().idle_resume_us.unwrap_or(0) > 0);
3012        if idle_qos_enabled && !cpu_idle_resume_latency_supported() {
3013            warn!("idle_resume_us not supported, ignoring");
3014            idle_qos_enabled = false;
3015        }
3016
3017        Self::init_cpus(&skel, &layer_specs, &topo)?;
3018        Self::init_llc_prox_map(&mut skel, &topo)?;
3019        Self::init_node_prox_map(&mut skel, &topo)?;
3020        Self::init_node_ctx(&mut skel, &topo, nr_layers)?;
3021
3022        // Other stuff.
3023        let proc_reader = fb_procfs::ProcReader::new();
3024
3025        // Handle setup if layered is running in a pid namespace.
3026        let input = ProgramInput {
3027            ..Default::default()
3028        };
3029        let prog = &mut skel.progs.initialize_pid_namespace;
3030
3031        let _ = prog.test_run(input);
3032
3033        // XXX If we try to refresh the cpumasks here before attaching, we
3034        // sometimes (non-deterministically) don't see the updated values in
3035        // BPF. It would be better to update the cpumasks here before we
3036        // attach, but the value will quickly converge anyways so it's not a
3037        // huge problem in the interim until we figure it out.
3038
3039        // Allow all tasks to open and write to BPF task hint map, now that
3040        // we should have it pinned at the desired location.
3041        if !layered_task_hint_map_path.is_empty() {
3042            let path = CString::new(layered_task_hint_map_path.as_bytes()).unwrap();
3043            let mode: libc::mode_t = 0o666;
3044            unsafe {
3045                if libc::chmod(path.as_ptr(), mode) != 0 {
3046                    trace!("'chmod' to 666 of task hint map failed, continuing...");
3047                }
3048            }
3049        }
3050
3051        // Attach.
3052        let struct_ops = scx_ops_attach!(skel, layered)?;
3053
3054        // Turn on installed kprobes
3055        if opts.enable_gpu_support {
3056            if loaded_kprobes.contains("nvidia_open") {
3057                compat::cond_kprobe_attach("nvidia_open", &skel.progs.kprobe_nvidia_open)?;
3058            }
3059            if loaded_kprobes.contains("nvidia_mmap") {
3060                compat::cond_kprobe_attach("nvidia_mmap", &skel.progs.kprobe_nvidia_mmap)?;
3061            }
3062            if loaded_kprobes.contains("nvidia_poll") {
3063                compat::cond_kprobe_attach("nvidia_poll", &skel.progs.kprobe_nvidia_poll)?;
3064            }
3065        }
3066
3067        let stats_server = StatsServer::new(stats::server_data()).launch()?;
3068        let mut gpu_task_handler =
3069            GpuTaskAffinitizer::new(opts.gpu_affinitize_secs, opts.enable_gpu_affinitize);
3070        gpu_task_handler.init(topo.clone());
3071
3072        let sched = Self {
3073            struct_ops: Some(struct_ops),
3074            layer_specs,
3075
3076            sched_intv: Duration::from_secs_f64(opts.interval),
3077            layer_refresh_intv: Duration::from_millis(opts.layer_refresh_ms_avgruntime),
3078
3079            cpu_pool,
3080            layers,
3081            idle_qos_enabled,
3082
3083            sched_stats: Stats::new(
3084                &mut skel,
3085                &proc_reader,
3086                topo.clone(),
3087                &gpu_task_handler,
3088                opts.util_compensation,
3089            )?,
3090            layer_peak_utils: vec![0.0; nr_layers],
3091
3092            cgroup_regexes: Some(cgroup_regexes),
3093            nr_layer_cpus_ranges: vec![(0, 0); nr_layers],
3094            xnuma_mig_src: vec![vec![false; topo.nodes.len()]; nr_layers],
3095            growth_denied: vec![vec![false; topo.nodes.len()]; nr_layers],
3096            processing_dur: Default::default(),
3097
3098            proc_reader,
3099            skel,
3100
3101            topo,
3102            netdevs,
3103            stats_server,
3104            gpu_task_handler,
3105        };
3106
3107        info!("Layered Scheduler Attached. Run `scx_layered --monitor` for metrics.");
3108
3109        Ok(sched)
3110    }
3111
3112    fn update_cpumask(mask: &Cpumask, bpfmask: &mut [u8]) {
3113        for cpu in 0..mask.len() {
3114            if mask.test_cpu(cpu) {
3115                bpfmask[cpu / 8] |= 1 << (cpu % 8);
3116            } else {
3117                bpfmask[cpu / 8] &= !(1 << (cpu % 8));
3118            }
3119        }
3120    }
3121
3122    fn update_bpf_layer_cpumask(layer: &Layer, bpf_layer: &mut types::layer) {
3123        trace!("[{}] Updating BPF CPUs: {}", layer.name, &layer.cpus);
3124        Self::update_cpumask(&layer.cpus, &mut bpf_layer.cpus);
3125
3126        bpf_layer.nr_cpus = layer.nr_cpus as u32;
3127        for (llc_id, &nr_llc_cpus) in layer.nr_llc_cpus.iter().enumerate() {
3128            bpf_layer.nr_llc_cpus[llc_id] = nr_llc_cpus as u32;
3129        }
3130        for (node_id, &nr_node_cpus) in layer.nr_node_cpus.iter().enumerate() {
3131            bpf_layer.node[node_id].nr_cpus = nr_node_cpus as u32;
3132        }
3133
3134        bpf_layer.refresh_cpus = 1;
3135    }
3136
3137    fn update_netdev_cpumasks(&mut self) -> Result<()> {
3138        let available_cpus = self.cpu_pool.available_cpus();
3139        if available_cpus.is_empty() {
3140            return Ok(());
3141        }
3142
3143        for (iface, netdev) in self.netdevs.iter_mut() {
3144            let node = self
3145                .topo
3146                .nodes
3147                .values()
3148                .find(|n| n.id == netdev.node())
3149                .ok_or_else(|| anyhow!("Failed to get netdev node"))?;
3150            let node_cpus = node.span.clone();
3151            for (irq, irqmask) in netdev.irqs.iter_mut() {
3152                irqmask.clear_all();
3153                for cpu in available_cpus.iter() {
3154                    if !node_cpus.test_cpu(cpu) {
3155                        continue;
3156                    }
3157                    let _ = irqmask.set_cpu(cpu);
3158                }
3159                // If no CPUs are available in the node then spread the load across the node
3160                if irqmask.weight() == 0 {
3161                    for cpu in node_cpus.iter() {
3162                        let _ = irqmask.set_cpu(cpu);
3163                    }
3164                }
3165                trace!("{} updating irq {} cpumask {:?}", iface, irq, irqmask);
3166            }
3167            netdev.apply_cpumasks()?;
3168            debug!(
3169                "{iface}: applied affinity override to {} IRQ{}",
3170                netdev.irqs.len(),
3171                if netdev.irqs.len() == 1 { "" } else { "s" },
3172            );
3173        }
3174
3175        Ok(())
3176    }
3177
3178    fn clamp_target_by_membw(
3179        &self,
3180        layer: &Layer,
3181        membw_limit: f64,
3182        membw: f64,
3183        curtarget: u64,
3184    ) -> usize {
3185        let ncpu: u64 = layer.cpus.weight() as u64;
3186        let membw = (membw * 1024_f64.powf(3.0)).round() as u64;
3187        let membw_limit = (membw_limit * 1024_f64.powf(3.0)).round() as u64;
3188        let last_membw_percpu = if ncpu > 0 { membw / ncpu } else { 0 };
3189
3190        // Either there is no memory bandwidth limit set, or the counters
3191        // are not fully initialized yet. Just return the current target.
3192        if membw_limit == 0 || last_membw_percpu == 0 {
3193            return curtarget as usize;
3194        }
3195
3196        (membw_limit / last_membw_percpu) as usize
3197    }
3198
3199    /// Decompose per-layer CPU targets into per-node pinned demand and
3200    /// unpinned demand for unified_alloc(). Uses layer_node_pinned_utils
3201    /// to split each layer's target. All outputs are in alloc units.
3202    fn calc_raw_demands(&self, targets: &[(usize, usize)]) -> Vec<LayerDemand> {
3203        let au = self.cpu_pool.alloc_unit();
3204        let pinned_utils = &self.sched_stats.layer_node_pinned_utils;
3205        let nr_nodes = self.topo.nodes.len();
3206
3207        targets
3208            .iter()
3209            .enumerate()
3210            .map(|(idx, &(target, _min))| {
3211                let layer = &self.layers[idx];
3212                let weight = layer.kind.common().weight as usize;
3213
3214                // Open layers don't participate in allocation.
3215                if matches!(layer.kind, LayerKind::Open { .. }) {
3216                    return LayerDemand {
3217                        raw_pinned: vec![0; nr_nodes],
3218                        raw_unpinned: 0,
3219                        weight,
3220                        spread: false,
3221                    };
3222                }
3223
3224                // Only NodeSpread* keeps strict equal-per-node placement
3225                // via the spread:bool path.  RoundRobin migrated to a
3226                // single-tier node_groups (balanced via cap-weighted
3227                // water_fill in place_unpinned, no bottleneck cap).
3228                let spread = matches!(
3229                    layer.growth_algo,
3230                    LayerGrowthAlgo::NodeSpread
3231                        | LayerGrowthAlgo::NodeSpreadReverse
3232                        | LayerGrowthAlgo::NodeSpreadRandom
3233                );
3234
3235                let util_high = match &layer.kind {
3236                    LayerKind::Confined { util_range, .. }
3237                    | LayerKind::Grouped { util_range, .. } => util_range.1,
3238                    _ => 1.0,
3239                };
3240
3241                // Convert per-node pinned utilization to CPU demand.
3242                let mut raw_pinned = vec![0usize; nr_nodes];
3243                for n in 0..nr_nodes {
3244                    let pu = pinned_utils[idx][n];
3245                    if pu < 0.01 {
3246                        continue;
3247                    }
3248                    // Check this layer has allowed_cpus on this node.
3249                    let node_span = &self.topo.nodes[&n].span;
3250                    if layer.allowed_cpus.and(node_span).is_empty() {
3251                        continue;
3252                    }
3253                    let cpus = (pu / util_high).ceil() as usize;
3254                    // Round up to alloc units.
3255                    let units = cpus.div_ceil(au);
3256                    raw_pinned[n] = units;
3257                }
3258
3259                // Unpinned = remainder of the target.
3260                let target_units = target.div_ceil(au);
3261                let pinned_units: usize = raw_pinned.iter().sum();
3262                let raw_unpinned = target_units.saturating_sub(pinned_units);
3263
3264                LayerDemand {
3265                    raw_pinned,
3266                    raw_unpinned,
3267                    weight,
3268                    spread,
3269                }
3270            })
3271            .collect()
3272    }
3273
3274    /// Calculate how many CPUs each layer would like to have if there were
3275    /// no competition. When util_compensation is enabled, compensated
3276    /// utilization (scaled for irq/softirq/stolen overhead) is used
3277    /// instead of raw utilization. The CPU range is determined by
3278    /// applying the inverse of util_range and capping by cpus_range.
3279    /// If the current allocation is within the acceptable range, no
3280    /// change is made. Returns (target, min) pair for each layer.
3281    fn calc_target_nr_cpus(&mut self) -> Vec<(usize, usize)> {
3282        let nr_cpus = self.cpu_pool.topo.all_cpus.len();
3283        let membws = &self.sched_stats.layer_membws;
3284
3285        let mut records: Vec<(u64, u64, u64, u64, usize, usize, usize)> = vec![];
3286        let mut targets: Vec<(usize, usize)> = vec![];
3287
3288        for (idx, layer) in self.layers.iter().enumerate() {
3289            targets.push(match &layer.kind {
3290                LayerKind::Confined {
3291                    util_range,
3292                    cpus_range,
3293                    cpus_range_frac,
3294                    membw_gb,
3295                    ..
3296                }
3297                | LayerKind::Grouped {
3298                    util_range,
3299                    cpus_range,
3300                    cpus_range_frac,
3301                    membw_gb,
3302                    ..
3303                } => {
3304                    let cpus_range =
3305                        resolve_cpus_pct_range(cpus_range, cpus_range_frac, nr_cpus).unwrap();
3306
3307                    // A grouped layer can choose to include open cputime
3308                    // for sizing. Also, as an empty layer can only get CPU
3309                    // time through fallback (counted as owned) or open
3310                    // execution, add open cputime for empty layers.
3311                    let (owned, open) = {
3312                        let utils = if self.sched_stats.util_compensation {
3313                            &self.sched_stats.layer_utils_compensated[idx]
3314                        } else {
3315                            &self.sched_stats.layer_utils[idx]
3316                        };
3317                        (utils[LAYER_USAGE_OWNED], utils[LAYER_USAGE_OPEN])
3318                    };
3319
3320                    let membw_owned = membws[idx][LAYER_USAGE_OWNED];
3321                    let membw_open = membws[idx][LAYER_USAGE_OPEN];
3322
3323                    let mut util = owned;
3324                    let mut membw = membw_owned;
3325                    if layer.kind.util_includes_open_cputime() || layer.nr_cpus == 0 {
3326                        util += open;
3327                        membw += membw_open;
3328                    }
3329
3330                    let util = if util < 0.01 { 0.0 } else { util };
3331
3332                    let peak_util = if layer.kind.common().util_peak_half_life_ms == 0 {
3333                        util
3334                    } else {
3335                        update_peak_util(
3336                            self.layer_peak_utils[idx],
3337                            util,
3338                            Duration::from_millis(layer.kind.common().util_peak_half_life_ms),
3339                            self.sched_stats.elapsed,
3340                        )
3341                    };
3342                    self.layer_peak_utils[idx] = peak_util;
3343                    let (low, high) = calc_peak_aware_target_range(util, peak_util, *util_range);
3344
3345                    let membw_limit = match membw_gb {
3346                        Some(membw_limit) => *membw_limit,
3347                        None => 0.0,
3348                    };
3349
3350                    trace!(
3351                        "layer {0} (membw, membw_limit): ({membw} gi_b, {membw_limit} gi_b)",
3352                        layer.name
3353                    );
3354
3355                    let target = layer.cpus.weight().clamp(low, high);
3356
3357                    records.push((
3358                        (owned * 100.0) as u64,
3359                        (open * 100.0) as u64,
3360                        (util * 100.0) as u64,
3361                        (peak_util * 100.0) as u64,
3362                        low,
3363                        high,
3364                        target,
3365                    ));
3366
3367                    let target = target.clamp(cpus_range.0, cpus_range.1);
3368                    let membw_target =
3369                        self.clamp_target_by_membw(layer, membw_limit, membw, target as u64);
3370
3371                    trace!("CPU target pre- and post-membw adjustment: {target} -> {membw_target}");
3372
3373                    // If there's no way to drop our memory usage down enough,
3374                    // pin the target CPUs to low and drop a warning.
3375                    if membw_target < cpus_range.0 {
3376                        warn!("cannot satisfy memory bw limit for layer {}", layer.name);
3377                        warn!("membw_target {membw_target} low {}", cpus_range.0);
3378                    };
3379
3380                    // Memory bandwidth target cannot override imposed limits or bump
3381                    // the target above what CPU usage-based throttling requires.
3382                    let target = membw_target.clamp(cpus_range.0, target);
3383
3384                    (target, cpus_range.0)
3385                }
3386                LayerKind::Open { .. } => (0, 0),
3387            });
3388        }
3389
3390        trace!(
3391            "(owned, open, util, peak, low, high, target): {:?}",
3392            &records
3393        );
3394        targets
3395    }
3396
3397    // Figure out a tuple (LLCs, extra_cpus) in terms of the target CPUs
3398    // computed by weighted_target_nr_cpus. Returns the number of full LLCs
3399    // occupied by a layer, and any extra cores needed beyond those LLCs.
3400    fn compute_target_llcs(target: usize, node_llcs: &BTreeMap<usize, Arc<Llc>>) -> (usize, usize) {
3401        let mut remaining = target;
3402        let mut full = 0;
3403        for llc in node_llcs.values() {
3404            let cpus_in_llc = llc.span.weight();
3405            if remaining >= cpus_in_llc {
3406                full += 1;
3407                remaining -= cpus_in_llc;
3408            } else {
3409                let mut extra = 0;
3410                for (_, core) in llc.cores.iter() {
3411                    if remaining == 0 {
3412                        break;
3413                    }
3414                    extra += 1;
3415                    remaining = remaining.saturating_sub(core.cpus.len());
3416                }
3417                return (full, extra);
3418            }
3419        }
3420        (full, 0)
3421    }
3422
3423    // Recalculate the core order for layers using StickyDynamic growth
3424    // algorithm. Uses per-node targets from unified_alloc() to decide how
3425    // many LLCs each layer gets on each node, then builds core_order and
3426    // applies CPU changes.
3427    fn recompute_layer_core_order(
3428        &mut self,
3429        layer_targets: &[(usize, usize)],
3430        layer_allocs: &[LayerAlloc],
3431        au: usize,
3432    ) -> Result<bool> {
3433        let nr_nodes = self.topo.nodes.len();
3434
3435        // Phase 1 — Free per-node: return excess LLCs to cpu_pool.
3436        debug!(
3437            " free: before pass: free_llcs={:?}",
3438            self.cpu_pool.free_llcs
3439        );
3440        for &(idx, _) in layer_targets.iter().rev() {
3441            let layer = &mut self.layers[idx];
3442
3443            if layer.growth_algo != LayerGrowthAlgo::StickyDynamic {
3444                continue;
3445            }
3446
3447            let alloc = &layer_allocs[idx];
3448
3449            for n in 0..nr_nodes {
3450                let assigned_on_n = layer.assigned_llcs[n].len();
3451                let node = self.topo.nodes.get(&n).unwrap();
3452                let target_full_n =
3453                    Self::compute_target_llcs(alloc.node_target(n) * au, &node.llcs).0;
3454                let mut to_free = assigned_on_n.saturating_sub(target_full_n);
3455
3456                debug!(
3457                    " free: layer={} node={} assigned={} target_full={} to_free={}",
3458                    layer.name, n, assigned_on_n, target_full_n, to_free,
3459                );
3460
3461                while to_free > 0 {
3462                    if let Some(llc) = layer.assigned_llcs[n].pop() {
3463                        self.cpu_pool.return_llc(llc);
3464                        to_free -= 1;
3465                        debug!(" layer={} freed_llc={} from node={}", layer.name, llc, n);
3466                    } else {
3467                        break;
3468                    }
3469                }
3470            }
3471        }
3472        debug!(" free: after pass: free_llcs={:?}", self.cpu_pool.free_llcs);
3473
3474        // Phase 2 — Acquire per-node: claim LLCs from cpu_pool.
3475        for &(idx, _) in layer_targets.iter().rev() {
3476            let layer = &mut self.layers[idx];
3477
3478            if layer.growth_algo != LayerGrowthAlgo::StickyDynamic {
3479                continue;
3480            }
3481
3482            let alloc = &layer_allocs[idx];
3483
3484            for n in 0..nr_nodes {
3485                let cur_on_n = layer.assigned_llcs[n].len();
3486                let node = self.topo.nodes.get(&n).unwrap();
3487                let target_full_n =
3488                    Self::compute_target_llcs(alloc.node_target(n) * au, &node.llcs).0;
3489                let mut to_alloc = target_full_n.saturating_sub(cur_on_n);
3490
3491                debug!(
3492                    " alloc: layer={} node={} cur={} target_full={} to_alloc={} free={}",
3493                    layer.name,
3494                    n,
3495                    cur_on_n,
3496                    target_full_n,
3497                    to_alloc,
3498                    self.cpu_pool.free_llcs.get(&n).map_or(0, |v| v.len()),
3499                );
3500
3501                while to_alloc > 0 {
3502                    if let Some(llc) = self.cpu_pool.take_llc_from_node(n) {
3503                        layer.assigned_llcs[n].push(llc);
3504                        to_alloc -= 1;
3505                        debug!(" layer={} alloc_llc={} on node={}", layer.name, llc, n);
3506                    } else {
3507                        break;
3508                    }
3509                }
3510            }
3511
3512            debug!(
3513                " alloc: layer={} assigned_llcs={:?}",
3514                layer.name, layer.assigned_llcs
3515            );
3516        }
3517
3518        // Phase 3 — Spillover per-node: consume extra cores from free LLCs.
3519        for &(idx, _) in layer_targets.iter() {
3520            let layer = &mut self.layers[idx];
3521
3522            if layer.growth_algo != LayerGrowthAlgo::StickyDynamic {
3523                continue;
3524            }
3525
3526            layer.core_order = vec![Vec::new(); nr_nodes];
3527            let alloc = &layer_allocs[idx];
3528
3529            for n in 0..nr_nodes {
3530                let node = self.topo.nodes.get(&n).unwrap();
3531                let mut extra = Self::compute_target_llcs(alloc.node_target(n) * au, &node.llcs).1;
3532
3533                if let Some(node_llcs) = self.cpu_pool.free_llcs.get_mut(&n) {
3534                    for entry in node_llcs.iter_mut() {
3535                        if extra == 0 {
3536                            break;
3537                        }
3538                        let llc_id = entry.0;
3539                        let llc = self.topo.all_llcs.get(&llc_id).unwrap();
3540                        let avail = llc.cores.len() - entry.1;
3541                        let mut used = extra.min(avail);
3542                        let cores_to_add = used;
3543
3544                        let shift = entry.1;
3545                        entry.1 += used;
3546
3547                        for core in llc.cores.iter().skip(shift) {
3548                            if used == 0 {
3549                                break;
3550                            }
3551                            layer.core_order[n].push(core.1.id);
3552                            used -= 1;
3553                        }
3554
3555                        extra -= cores_to_add;
3556                    }
3557                }
3558            }
3559
3560            for node_cores in &mut layer.core_order {
3561                node_cores.reverse();
3562            }
3563        }
3564
3565        // Reset consumed entries in free LLCs.
3566        for node_llcs in self.cpu_pool.free_llcs.values_mut() {
3567            for entry in node_llcs.iter_mut() {
3568                entry.1 = 0;
3569            }
3570        }
3571
3572        // Phase 4 — Build core_order: append cores from assigned LLCs.
3573        for &(idx, _) in layer_targets.iter() {
3574            let layer = &mut self.layers[idx];
3575
3576            if layer.growth_algo != LayerGrowthAlgo::StickyDynamic {
3577                continue;
3578            }
3579
3580            let all_assigned: HashSet<usize> =
3581                layer.assigned_llcs.iter().flatten().copied().collect();
3582
3583            for core in self.topo.all_cores.iter() {
3584                let llc_id = core.1.llc_id;
3585                if all_assigned.contains(&llc_id) {
3586                    let nid = core.1.node_id;
3587                    layer.core_order[nid].push(core.1.id);
3588                }
3589            }
3590            for node_cores in &mut layer.core_order {
3591                node_cores.reverse();
3592            }
3593
3594            debug!(
3595                " alloc: layer={} core_order={:?}",
3596                layer.name, layer.core_order
3597            );
3598        }
3599
3600        // Phase 5 — Apply CPU changes for StickyDynamic layers.
3601        // Two phases: first free per-node, then allocate per-node.
3602        let mut updated = false;
3603
3604        // Free excess CPUs per-node.
3605        for &(idx, _) in layer_targets.iter() {
3606            let layer = &mut self.layers[idx];
3607
3608            if layer.growth_algo != LayerGrowthAlgo::StickyDynamic {
3609                continue;
3610            }
3611
3612            for n in 0..nr_nodes {
3613                let mut node_target = Cpumask::new();
3614                for &core_id in &layer.core_order[n] {
3615                    if let Some(core) = self.topo.all_cores.get(&core_id) {
3616                        node_target |= &core.span;
3617                    }
3618                }
3619                node_target &= &layer.allowed_cpus;
3620
3621                let node_span = &self.topo.nodes[&n].span;
3622                let node_cur = layer.cpus.and(node_span);
3623                let cpus_to_free = node_cur.and(&node_target.not());
3624
3625                if cpus_to_free.weight() > 0 {
3626                    debug!(
3627                        " apply: layer={} freeing CPUs on node {}: {}",
3628                        layer.name, n, cpus_to_free
3629                    );
3630                    layer.cpus &= &cpus_to_free.not();
3631                    layer.nr_cpus -= cpus_to_free.weight();
3632                    for cpu in cpus_to_free.iter() {
3633                        layer.nr_llc_cpus[self.cpu_pool.topo.all_cpus[&cpu].llc_id] -= 1;
3634                        layer.nr_node_cpus[n] -= 1;
3635                    }
3636                    self.cpu_pool.free(&cpus_to_free)?;
3637                    updated = true;
3638                }
3639            }
3640        }
3641
3642        // Allocate needed CPUs per-node.
3643        for &(idx, _) in layer_targets.iter() {
3644            let layer = &mut self.layers[idx];
3645
3646            if layer.growth_algo != LayerGrowthAlgo::StickyDynamic {
3647                continue;
3648            }
3649
3650            for n in 0..nr_nodes {
3651                let mut node_target = Cpumask::new();
3652                for &core_id in &layer.core_order[n] {
3653                    if let Some(core) = self.topo.all_cores.get(&core_id) {
3654                        node_target |= &core.span;
3655                    }
3656                }
3657                node_target &= &layer.allowed_cpus;
3658
3659                let available_cpus = self.cpu_pool.available_cpus();
3660                let desired_to_alloc = node_target.and(&layer.cpus.clone().not());
3661                let cpus_to_alloc = desired_to_alloc.clone().and(&available_cpus);
3662
3663                if desired_to_alloc.weight() > cpus_to_alloc.weight() {
3664                    debug!(
3665                        " apply: layer={} node {} wanted to alloc {} CPUs but only {} available",
3666                        layer.name,
3667                        n,
3668                        desired_to_alloc.weight(),
3669                        cpus_to_alloc.weight()
3670                    );
3671                }
3672
3673                if cpus_to_alloc.weight() > 0 {
3674                    debug!(
3675                        " apply: layer={} allocating CPUs on node {}: {}",
3676                        layer.name, n, cpus_to_alloc
3677                    );
3678                    layer.cpus |= &cpus_to_alloc;
3679                    layer.nr_cpus += cpus_to_alloc.weight();
3680                    for cpu in cpus_to_alloc.iter() {
3681                        layer.nr_llc_cpus[self.cpu_pool.topo.all_cpus[&cpu].llc_id] += 1;
3682                        layer.nr_node_cpus[n] += 1;
3683                    }
3684                    self.cpu_pool.mark_allocated(&cpus_to_alloc)?;
3685                    updated = true;
3686                }
3687            }
3688
3689            debug!(
3690                " apply: layer={} final cpus.weight()={} nr_cpus={}",
3691                layer.name,
3692                layer.cpus.weight(),
3693                layer.nr_cpus
3694            );
3695        }
3696
3697        Ok(updated)
3698    }
3699
3700    fn refresh_node_ctx(
3701        skel: &mut BpfSkel,
3702        topo: &Topology,
3703        node_empty_layers: &[Vec<u32>],
3704        init: bool,
3705    ) {
3706        for &nid in topo.nodes.keys() {
3707            let mut arg: bpf_intf::refresh_node_ctx_arg =
3708                unsafe { MaybeUninit::zeroed().assume_init() };
3709            arg.node_id = nid as u32;
3710            arg.init = init as u32;
3711
3712            let empty = &node_empty_layers[nid];
3713            arg.nr_empty_layer_ids = empty.len() as u32;
3714            for (i, &lid) in empty.iter().enumerate() {
3715                arg.empty_layer_ids[i] = lid;
3716            }
3717            for i in empty.len()..MAX_LAYERS {
3718                arg.empty_layer_ids[i] = MAX_LAYERS as u32;
3719            }
3720
3721            if init {
3722                // Per-node LLC list — static topology, only set during init.
3723                let node = &topo.nodes[&nid];
3724                let llcs: Vec<u32> = node.llcs.keys().map(|&id| id as u32).collect();
3725                arg.nr_llcs = llcs.len() as u32;
3726                for (i, &llc_id) in llcs.iter().enumerate() {
3727                    arg.llcs[i] = llc_id;
3728                }
3729            }
3730
3731            let input = ProgramInput {
3732                context_in: Some(unsafe { plain::as_mut_bytes(&mut arg) }),
3733                ..Default::default()
3734            };
3735            let _ = skel.progs.refresh_node_ctx.test_run(input);
3736        }
3737    }
3738
3739    fn refresh_cpumasks(&mut self) -> Result<()> {
3740        let layer_is_open = |layer: &Layer| matches!(layer.kind, LayerKind::Open { .. });
3741
3742        let mut updated = false;
3743        let raw_targets = self.calc_target_nr_cpus();
3744        let au = self.cpu_pool.alloc_unit();
3745        let total_cpus = self.cpu_pool.topo.all_cpus.len();
3746
3747        // Dampen shrink: only drop halfway per cycle to avoid unnecessary
3748        // changes. There's some dampening built into util metrics but slow
3749        // down freeing further. This is solely based on intuition. Drop or
3750        // update according to real-world behavior.
3751        let targets: Vec<(usize, usize)> = raw_targets
3752            .iter()
3753            .enumerate()
3754            .map(|(idx, &(target, min))| {
3755                let cur = self.layers[idx].nr_cpus;
3756                if target < cur {
3757                    let dampened = cur - (cur - target).div_ceil(2);
3758                    (dampened.max(min), min)
3759                } else {
3760                    (target, min)
3761                }
3762            })
3763            .collect();
3764
3765        // Build demands for unified_alloc and compute per-node allocations.
3766        let demands = self.calc_raw_demands(&targets);
3767        let nr_nodes = self.topo.nodes.len();
3768        let node_caps: Vec<usize> = self
3769            .topo
3770            .nodes
3771            .values()
3772            .map(|n| n.span.weight() / au)
3773            .collect();
3774        let all_layer_nodes: Vec<&[usize]> = self
3775            .layer_specs
3776            .iter()
3777            .map(|s| s.nodes().as_slice())
3778            .collect();
3779        let norders: Vec<Vec<usize>> = (0..self.layers.len())
3780            .map(|idx| {
3781                layer_core_growth::node_order(
3782                    self.layer_specs[idx].nodes(),
3783                    &self.topo,
3784                    idx,
3785                    &all_layer_nodes,
3786                )
3787            })
3788            .collect();
3789        let node_groups: Vec<Vec<Vec<usize>>> = (0..self.layers.len())
3790            .map(|idx| {
3791                layer_core_growth::node_groups(
3792                    self.layer_specs[idx].nodes(),
3793                    &self.topo,
3794                    idx,
3795                    &all_layer_nodes,
3796                    &self.layer_specs[idx].kind.common().growth_algo,
3797                )
3798            })
3799            .collect();
3800        let layer_allocs = unified_alloc(total_cpus / au, &node_caps, &demands, &node_groups);
3801
3802        // Convert allocations back to CPU counts. Shrink dampening is
3803        // already applied to the targets fed into unified_alloc above.
3804        let cpu_targets: Vec<usize> = layer_allocs.iter().map(|a| a.total() * au).collect();
3805
3806        // Snapshot per-layer CPU counts for ALLOC debug logging.
3807        let prev_nr_cpus: Vec<usize> = self.layers.iter().map(|l| l.nr_cpus).collect();
3808
3809        let mut ascending: Vec<(usize, usize)> = cpu_targets.iter().copied().enumerate().collect();
3810        ascending.sort_by(|a, b| a.1.cmp(&b.1));
3811
3812        // Snapshot per-layer per-node CPU counts before allocation changes.
3813        let prev_node_cpus: Vec<Vec<usize>> =
3814            self.layers.iter().map(|l| l.nr_node_cpus.clone()).collect();
3815
3816        // Per-node SD allocation requires multiple LLCs. On flat topologies
3817        // (single LLC, e.g. VMs with topology disabled), SD layers fall through
3818        // to the non-SD grow/shrink loops below.
3819        let use_sd_alloc = self.topo.all_llcs.len() > 1;
3820        let sticky_dynamic_updated = if use_sd_alloc {
3821            self.recompute_layer_core_order(&ascending, &layer_allocs, au)?
3822        } else {
3823            false
3824        };
3825        updated |= sticky_dynamic_updated;
3826
3827        // Update BPF cpumasks for StickyDynamic layers if they were updated
3828        if sticky_dynamic_updated {
3829            for (idx, layer) in self.layers.iter().enumerate() {
3830                if layer.growth_algo == LayerGrowthAlgo::StickyDynamic {
3831                    Self::update_bpf_layer_cpumask(
3832                        layer,
3833                        &mut self.skel.maps.bss_data.as_mut().unwrap().layers[idx],
3834                    );
3835                }
3836            }
3837        }
3838
3839        // Shrink per-node: free excess CPUs from each node.
3840        for &(idx, _target) in ascending.iter().rev() {
3841            let layer = &mut self.layers[idx];
3842            if layer_is_open(layer) {
3843                continue;
3844            }
3845
3846            // Skip StickyDynamic layers when per-node SD allocation is active.
3847            if layer.growth_algo == LayerGrowthAlgo::StickyDynamic && use_sd_alloc {
3848                continue;
3849            }
3850
3851            let alloc = &layer_allocs[idx];
3852            let mut freed = false;
3853
3854            for n in 0..nr_nodes {
3855                let desired = alloc.node_target(n) * au;
3856                let mut to_free = layer.nr_node_cpus[n].saturating_sub(desired);
3857                let node_span = &self.topo.nodes[&n].span;
3858
3859                while to_free > 0 {
3860                    let node_cands = layer.cpus.and(node_span);
3861                    let cpus_to_free = match self
3862                        .cpu_pool
3863                        .next_to_free(&node_cands, layer.core_order[n].iter().rev())?
3864                    {
3865                        Some(ret) => ret,
3866                        None => break,
3867                    };
3868                    let nr = cpus_to_free.weight();
3869                    trace!(
3870                        "[{}] freeing CPUs on node {}: {}",
3871                        layer.name,
3872                        n,
3873                        &cpus_to_free
3874                    );
3875                    layer.cpus &= &cpus_to_free.not();
3876                    layer.nr_cpus -= nr;
3877                    for cpu in cpus_to_free.iter() {
3878                        let node_id = self.cpu_pool.topo.all_cpus[&cpu].node_id;
3879                        layer.nr_llc_cpus[self.cpu_pool.topo.all_cpus[&cpu].llc_id] -= 1;
3880                        layer.nr_node_cpus[node_id] -= 1;
3881                        layer.nr_pinned_cpus[node_id] =
3882                            layer.nr_pinned_cpus[node_id].min(layer.nr_node_cpus[node_id]);
3883                    }
3884                    self.cpu_pool.free(&cpus_to_free)?;
3885                    to_free = to_free.saturating_sub(nr);
3886                    freed = true;
3887                }
3888            }
3889
3890            if freed {
3891                Self::update_bpf_layer_cpumask(
3892                    layer,
3893                    &mut self.skel.maps.bss_data.as_mut().unwrap().layers[idx],
3894                );
3895                updated = true;
3896            }
3897        }
3898
3899        // Grow layers per-node using allocations from unified_alloc.
3900        for &(idx, _target) in &ascending {
3901            let layer = &mut self.layers[idx];
3902
3903            if layer_is_open(layer) {
3904                continue;
3905            }
3906
3907            // Skip StickyDynamic layers when per-node SD allocation is active.
3908            if layer.growth_algo == LayerGrowthAlgo::StickyDynamic && use_sd_alloc {
3909                continue;
3910            }
3911
3912            let alloc = &layer_allocs[idx];
3913            let norder = &norders[idx];
3914            let mut alloced = false;
3915
3916            for &node_id in norder.iter() {
3917                let node_target = alloc.node_target(node_id) * au;
3918                let cur_node = layer.nr_node_cpus[node_id];
3919                if node_target <= cur_node {
3920                    continue;
3921                }
3922                let pinned_target = alloc.pinned[node_id] * au;
3923                let mut nr_to_alloc = node_target - cur_node;
3924                let node_span = &self.topo.nodes[&node_id].span;
3925                let node_allowed = layer.allowed_cpus.and(node_span);
3926
3927                while nr_to_alloc > 0 {
3928                    let nr_alloced = match self.cpu_pool.alloc_cpus(
3929                        &node_allowed,
3930                        &layer.core_order[node_id],
3931                        nr_to_alloc,
3932                    ) {
3933                        Some(new_cpus) => {
3934                            let nr = new_cpus.weight();
3935                            layer.cpus |= &new_cpus;
3936                            layer.nr_cpus += nr;
3937                            for cpu in new_cpus.iter() {
3938                                layer.nr_llc_cpus[self.cpu_pool.topo.all_cpus[&cpu].llc_id] += 1;
3939                                let nid = self.cpu_pool.topo.all_cpus[&cpu].node_id;
3940                                layer.nr_node_cpus[nid] += 1;
3941                                if layer.nr_pinned_cpus[nid] < pinned_target {
3942                                    layer.nr_pinned_cpus[nid] += 1;
3943                                }
3944                            }
3945                            nr
3946                        }
3947                        None => 0,
3948                    };
3949                    if nr_alloced == 0 {
3950                        break;
3951                    }
3952                    alloced = true;
3953                    nr_to_alloc -= nr_alloced.min(nr_to_alloc);
3954                }
3955            }
3956
3957            if alloced {
3958                Self::update_bpf_layer_cpumask(
3959                    layer,
3960                    &mut self.skel.maps.bss_data.as_mut().unwrap().layers[idx],
3961                );
3962                updated = true;
3963            }
3964        }
3965
3966        // Tell BPF whether all CPUs are allocated.  When the system is
3967        // fully allocated, idle_confined layers have nowhere to grow, so
3968        // they should be allowed to use open (unprotected) idle CPUs from
3969        // other layers.
3970        let total_allocated: usize = self.layers.iter().map(|l| l.nr_cpus).sum();
3971        let fully_allocated = total_allocated >= total_cpus;
3972        for (idx, layer) in self.layers.iter().enumerate() {
3973            if !layer_is_open(layer) {
3974                self.skel.maps.bss_data.as_mut().unwrap().layers[idx]
3975                    .fully_allocated
3976                    .write(fully_allocated);
3977            }
3978        }
3979
3980        // Recompute growth_denied: for each node, check whether the
3981        // unpinned portion of the layer warranted growth (unpinned util /
3982        // util_high > unpinned CPUs) but the node didn't gain CPUs.  Only
3983        // unpinned matters because pinned tasks can't migrate cross-node.
3984        let node_utils = &self.sched_stats.layer_node_utils;
3985        let pinned_utils = &self.sched_stats.layer_node_pinned_utils;
3986        for (idx, layer) in self.layers.iter().enumerate() {
3987            self.growth_denied[idx].fill(false);
3988            let util_high = match layer.kind.util_range() {
3989                Some((_, high)) => high,
3990                None => continue,
3991            };
3992            for n in 0..nr_nodes {
3993                let unpinned_util = (node_utils[idx][n] - pinned_utils[idx][n]).max(0.0);
3994                let unpinned_cpus_needed = unpinned_util / util_high;
3995                let unpinned_cpus_have =
3996                    layer.nr_node_cpus[n].saturating_sub(layer.nr_pinned_cpus[n]) as f64;
3997                let wanted = unpinned_cpus_needed > unpinned_cpus_have;
3998                let got = layer.nr_node_cpus[n] > prev_node_cpus[idx][n];
3999                if wanted && !got {
4000                    self.growth_denied[idx][n] = true;
4001                }
4002            }
4003        }
4004
4005        // Log per-layer allocation changes.
4006        if updated {
4007            for (idx, layer) in self.layers.iter().enumerate() {
4008                if layer_is_open(layer) {
4009                    continue;
4010                }
4011                let prev = prev_nr_cpus[idx];
4012                let cur = layer.nr_cpus;
4013                if prev != cur {
4014                    debug!(
4015                        "ALLOC {} algo={:?} cpus:{}→{} mask={:x}",
4016                        layer.name, layer.growth_algo, prev, cur, layer.cpus,
4017                    );
4018                }
4019            }
4020            debug!(
4021                "ALLOC pool_available={}",
4022                self.cpu_pool.available_cpus().weight()
4023            );
4024        }
4025
4026        // Log allocation changes.
4027        if updated {
4028            let nr_nodes = self.topo.nodes.len();
4029            for (idx, layer) in self.layers.iter().enumerate() {
4030                if layer_is_open(layer) {
4031                    continue;
4032                }
4033                let prev = &prev_node_cpus[idx];
4034                let cur = &layer.nr_node_cpus;
4035                if prev == cur {
4036                    continue;
4037                }
4038                let per_node: String = (0..nr_nodes)
4039                    .map(|n| format!("n{}:{}→{}", n, prev[n], cur[n]))
4040                    .collect::<Vec<_>>()
4041                    .join(" ");
4042                let prev_total: usize = prev.iter().sum();
4043                let cur_total: usize = cur[..nr_nodes].iter().sum();
4044                let target: String = (0..nr_nodes)
4045                    .map(|n| format!("n{}:{}", n, layer_allocs[idx].node_target(n) * au))
4046                    .collect::<Vec<_>>()
4047                    .join(" ");
4048                debug!(
4049                    "ALLOC {} algo={:?} {} total:{}→{} target:[{}] mask={:x}",
4050                    layer.name,
4051                    layer.growth_algo,
4052                    per_node,
4053                    prev_total,
4054                    cur_total,
4055                    target,
4056                    layer.cpus,
4057                );
4058            }
4059            debug!(
4060                "ALLOC pool_available={}",
4061                self.cpu_pool.available_cpus().weight()
4062            );
4063        }
4064
4065        // Give the rest to the open layers.
4066        if updated {
4067            for (idx, layer) in self.layers.iter_mut().enumerate() {
4068                if !layer_is_open(layer) {
4069                    continue;
4070                }
4071
4072                let bpf_layer = &mut self.skel.maps.bss_data.as_mut().unwrap().layers[idx];
4073                let available_cpus = self.cpu_pool.available_cpus().and(&layer.allowed_cpus);
4074                let nr_available_cpus = available_cpus.weight();
4075
4076                // Open layers need the intersection of allowed cpus and
4077                // available cpus. Recompute per-LLC and per-node counts
4078                // since open layers bypass alloc/free.
4079                layer.cpus = available_cpus;
4080                layer.nr_cpus = nr_available_cpus;
4081                for llc in self.cpu_pool.topo.all_llcs.values() {
4082                    layer.nr_llc_cpus[llc.id] = layer.cpus.and(&llc.span).weight();
4083                }
4084                for node in self.cpu_pool.topo.nodes.values() {
4085                    layer.nr_node_cpus[node.id] = layer.cpus.and(&node.span).weight();
4086                    layer.nr_pinned_cpus[node.id] = 0;
4087                }
4088                Self::update_bpf_layer_cpumask(layer, bpf_layer);
4089            }
4090
4091            for (&node_id, &cpu) in &self.cpu_pool.fallback_cpus {
4092                self.skel.maps.bss_data.as_mut().unwrap().fallback_cpus[node_id] = cpu as u32;
4093            }
4094
4095            for (lidx, layer) in self.layers.iter().enumerate() {
4096                self.nr_layer_cpus_ranges[lidx] = (
4097                    self.nr_layer_cpus_ranges[lidx].0.min(layer.nr_cpus),
4098                    self.nr_layer_cpus_ranges[lidx].1.max(layer.nr_cpus),
4099                );
4100            }
4101
4102            // Trigger updates on the BPF side.
4103            let input = ProgramInput {
4104                ..Default::default()
4105            };
4106            let prog = &mut self.skel.progs.refresh_layer_cpumasks;
4107            let _ = prog.test_run(input);
4108
4109            // Update per-node empty layer IDs via BPF prog.
4110            let nr_nodes = self.topo.nodes.len();
4111            let node_empty_layers: Vec<Vec<u32>> = (0..nr_nodes)
4112                .map(|nid| {
4113                    self.layers
4114                        .iter()
4115                        .enumerate()
4116                        .filter(|(_lidx, layer)| layer.nr_node_cpus[nid] == 0)
4117                        .map(|(lidx, _)| lidx as u32)
4118                        .collect()
4119                })
4120                .collect();
4121            Self::refresh_node_ctx(&mut self.skel, &self.topo, &node_empty_layers, false);
4122        }
4123
4124        if let Err(e) = self.update_netdev_cpumasks() {
4125            warn!("Failed to update netdev IRQ cpumasks: {:#}", e);
4126        }
4127        Ok(())
4128    }
4129
4130    fn refresh_idle_qos(&mut self) -> Result<()> {
4131        if !self.idle_qos_enabled {
4132            return Ok(());
4133        }
4134
4135        let mut cpu_idle_qos = vec![0; *NR_CPU_IDS];
4136        for layer in self.layers.iter() {
4137            let idle_resume_us = layer.kind.common().idle_resume_us.unwrap_or(0) as i32;
4138            for cpu in layer.cpus.iter() {
4139                cpu_idle_qos[cpu] = idle_resume_us;
4140            }
4141        }
4142
4143        for (cpu, idle_resume_usec) in cpu_idle_qos.iter().enumerate() {
4144            update_cpu_idle_resume_latency(cpu, *idle_resume_usec)?;
4145        }
4146
4147        Ok(())
4148    }
4149
4150    fn refresh_xnuma(&mut self) {
4151        let nr_nodes = self.topo.nodes.len();
4152        if nr_nodes <= 1 {
4153            return;
4154        }
4155
4156        let duty_sums = &self.sched_stats.layer_node_duty_sums;
4157
4158        for (layer_idx, spec) in self.layer_specs.iter().enumerate() {
4159            let common = spec.kind.common();
4160            let threshold = common.xnuma_threshold;
4161            let threshold_delta = common.xnuma_threshold_delta;
4162            let bpf_layer = &mut self.skel.maps.bss_data.as_mut().unwrap().layers[layer_idx];
4163
4164            if threshold.0 <= 0.0 && threshold.1 <= 0.0 {
4165                // Off — all open, infinite budget
4166                for src in 0..nr_nodes {
4167                    bpf_layer.node[src].xnuma_is_mig_src.write(true);
4168                    for dst in 0..nr_nodes {
4169                        bpf_layer.node[src].xnuma[dst].rate = u64::MAX;
4170                    }
4171                }
4172                self.xnuma_mig_src[layer_idx].fill(false);
4173                continue;
4174            }
4175
4176            let layer = &self.layers[layer_idx];
4177            let is_mig_src = xnuma_check_active(
4178                &duty_sums[layer_idx],
4179                &layer.nr_node_cpus,
4180                threshold,
4181                threshold_delta,
4182                &self.growth_denied[layer_idx],
4183                &self.xnuma_mig_src[layer_idx],
4184            );
4185
4186            self.xnuma_mig_src[layer_idx] = is_mig_src.clone();
4187
4188            let result = xnuma_compute_rates(&duty_sums[layer_idx], &layer.nr_node_cpus);
4189
4190            // Write rates before is_mig_src so BPF sees valid rates
4191            // when the gate activates.
4192            for src in 0..nr_nodes {
4193                for dst in 0..nr_nodes {
4194                    bpf_layer.node[src].xnuma[dst].rate = result.rates[src][dst];
4195                }
4196            }
4197            for (nid, is_src) in is_mig_src.iter().enumerate().take(nr_nodes) {
4198                bpf_layer.node[nid].xnuma_is_mig_src.write(*is_src);
4199            }
4200        }
4201    }
4202
4203    fn step(&mut self) -> Result<()> {
4204        let started_at = Instant::now();
4205        self.sched_stats.refresh(
4206            &mut self.skel,
4207            &self.proc_reader,
4208            started_at,
4209            self.processing_dur,
4210            &self.gpu_task_handler,
4211        )?;
4212
4213        // Update BPF with EWMA values
4214        self.skel
4215            .maps
4216            .bss_data
4217            .as_mut()
4218            .unwrap()
4219            .system_cpu_util_ewma = (self.sched_stats.system_cpu_util_ewma * 10000.0) as u64;
4220
4221        for layer_id in 0..self.sched_stats.nr_layers {
4222            self.skel
4223                .maps
4224                .bss_data
4225                .as_mut()
4226                .unwrap()
4227                .layer_dsq_insert_ewma[layer_id] =
4228                (self.sched_stats.layer_dsq_insert_ewma[layer_id] * 10000.0) as u64;
4229        }
4230
4231        self.refresh_cpumasks()?;
4232        self.refresh_xnuma();
4233        self.refresh_idle_qos()?;
4234        self.gpu_task_handler.maybe_affinitize();
4235        self.processing_dur += Instant::now().duration_since(started_at);
4236        Ok(())
4237    }
4238
4239    fn generate_sys_stats(
4240        &mut self,
4241        stats: &Stats,
4242        cpus_ranges: &mut [(usize, usize)],
4243    ) -> Result<SysStats> {
4244        let bstats = &stats.bpf_stats;
4245        let mut sys_stats = SysStats::new(stats, bstats, &self.cpu_pool.fallback_cpus)?;
4246
4247        for (lidx, (spec, layer)) in self.layer_specs.iter().zip(self.layers.iter()).enumerate() {
4248            let layer_stats = LayerStats::new(
4249                lidx,
4250                layer,
4251                stats,
4252                bstats,
4253                cpus_ranges[lidx],
4254                self.xnuma_mig_src[lidx].iter().any(|&a| a),
4255            );
4256            sys_stats.layers.insert(spec.name.to_string(), layer_stats);
4257            cpus_ranges[lidx] = (layer.nr_cpus, layer.nr_cpus);
4258        }
4259
4260        Ok(sys_stats)
4261    }
4262
4263    // Helper function to process a cgroup creation (common logic for walkdir and inotify)
4264    fn process_cgroup_creation(
4265        path: &Path,
4266        cgroup_regexes: &HashMap<u32, Regex>,
4267        cgroup_path_to_id: &mut HashMap<String, u64>,
4268        sender: &crossbeam::channel::Sender<CgroupEvent>,
4269    ) {
4270        let path_str = path.to_string_lossy().to_string();
4271
4272        // Get cgroup ID (inode number)
4273        let cgroup_id = std::fs::metadata(path)
4274            .map(|metadata| {
4275                use std::os::unix::fs::MetadataExt;
4276                metadata.ino()
4277            })
4278            .unwrap_or(0);
4279
4280        // Build match bitmap by testing against CgroupRegex rules
4281        let mut match_bitmap = 0u64;
4282        for (rule_id, regex) in cgroup_regexes {
4283            if regex.is_match(&path_str) {
4284                match_bitmap |= 1u64 << rule_id;
4285            }
4286        }
4287
4288        // Store in hash
4289        cgroup_path_to_id.insert(path_str.clone(), cgroup_id);
4290
4291        // Send event
4292        if let Err(e) = sender.send(CgroupEvent::Created {
4293            path: path_str,
4294            cgroup_id,
4295            match_bitmap,
4296        }) {
4297            error!("Failed to send cgroup creation event: {}", e);
4298        }
4299    }
4300
4301    fn start_cgroup_watcher(
4302        shutdown: Arc<AtomicBool>,
4303        cgroup_regexes: HashMap<u32, Regex>,
4304    ) -> Result<Receiver<CgroupEvent>> {
4305        let mut inotify = Inotify::init().context("Failed to initialize inotify")?;
4306        let mut wd_to_path = HashMap::new();
4307
4308        // Create crossbeam channel for cgroup events (bounded to prevent memory issues)
4309        let (sender, receiver) = crossbeam::channel::bounded::<CgroupEvent>(1024);
4310
4311        // Watch for directory creation and deletion events
4312        let root_wd = inotify
4313            .watches()
4314            .add("/sys/fs/cgroup", WatchMask::CREATE | WatchMask::DELETE)
4315            .context("Failed to add watch for /sys/fs/cgroup")?;
4316        wd_to_path.insert(root_wd, PathBuf::from("/sys/fs/cgroup"));
4317
4318        // Also recursively watch existing directories for new subdirectories
4319        Self::add_recursive_watches(&mut inotify, &mut wd_to_path, Path::new("/sys/fs/cgroup"))?;
4320
4321        // Spawn watcher thread
4322        std::thread::spawn(move || {
4323            let mut buffer = [0; 4096];
4324            let inotify_fd = inotify.as_raw_fd();
4325            // Maintain hash of cgroup path -> cgroup ID (inode number)
4326            let mut cgroup_path_to_id = HashMap::<String, u64>::new();
4327
4328            // Populate existing cgroups
4329            for entry in WalkDir::new("/sys/fs/cgroup")
4330                .into_iter()
4331                .filter_map(|e| e.ok())
4332                .filter(|e| e.file_type().is_dir())
4333            {
4334                let path = entry.path();
4335                Self::process_cgroup_creation(
4336                    path,
4337                    &cgroup_regexes,
4338                    &mut cgroup_path_to_id,
4339                    &sender,
4340                );
4341            }
4342
4343            while !shutdown.load(Ordering::Relaxed) {
4344                // Use select to wait for events with a 100ms timeout
4345                let ready = unsafe {
4346                    let mut read_fds: libc::fd_set = std::mem::zeroed();
4347                    libc::FD_ZERO(&mut read_fds);
4348                    libc::FD_SET(inotify_fd, &mut read_fds);
4349
4350                    let mut timeout = libc::timeval {
4351                        tv_sec: 0,
4352                        tv_usec: 100_000, // 100ms
4353                    };
4354
4355                    libc::select(
4356                        inotify_fd + 1,
4357                        &mut read_fds,
4358                        std::ptr::null_mut(),
4359                        std::ptr::null_mut(),
4360                        &mut timeout,
4361                    )
4362                };
4363
4364                if ready <= 0 {
4365                    // Timeout or error, continue loop to check shutdown
4366                    continue;
4367                }
4368
4369                // Read events non-blocking
4370                let events = match inotify.read_events(&mut buffer) {
4371                    Ok(events) => events,
4372                    Err(e) => {
4373                        error!("Error reading inotify events: {}", e);
4374                        break;
4375                    }
4376                };
4377
4378                for event in events {
4379                    if !event.mask.contains(inotify::EventMask::CREATE)
4380                        && !event.mask.contains(inotify::EventMask::DELETE)
4381                    {
4382                        continue;
4383                    }
4384
4385                    let name = match event.name {
4386                        Some(name) => name,
4387                        None => continue,
4388                    };
4389
4390                    let parent_path = match wd_to_path.get(&event.wd) {
4391                        Some(parent) => parent,
4392                        None => {
4393                            warn!("Unknown watch descriptor: {:?}", event.wd);
4394                            continue;
4395                        }
4396                    };
4397
4398                    let path = parent_path.join(name.to_string_lossy().as_ref());
4399
4400                    if event.mask.contains(inotify::EventMask::CREATE) {
4401                        if !path.is_dir() {
4402                            continue;
4403                        }
4404
4405                        Self::process_cgroup_creation(
4406                            &path,
4407                            &cgroup_regexes,
4408                            &mut cgroup_path_to_id,
4409                            &sender,
4410                        );
4411
4412                        // Add watch for this new directory
4413                        match inotify
4414                            .watches()
4415                            .add(&path, WatchMask::CREATE | WatchMask::DELETE)
4416                        {
4417                            Ok(wd) => {
4418                                wd_to_path.insert(wd, path.clone());
4419                            }
4420                            Err(e) => {
4421                                warn!(
4422                                    "Failed to add watch for new cgroup {}: {}",
4423                                    path.display(),
4424                                    e
4425                                );
4426                            }
4427                        }
4428                    } else if event.mask.contains(inotify::EventMask::DELETE) {
4429                        let path_str = path.to_string_lossy().to_string();
4430
4431                        // Get cgroup ID from our hash (since the directory is gone, we can't stat it)
4432                        let cgroup_id = cgroup_path_to_id.remove(&path_str).unwrap_or(0);
4433
4434                        // Send removal event to main thread
4435                        if let Err(e) = sender.send(CgroupEvent::Removed {
4436                            path: path_str,
4437                            cgroup_id,
4438                        }) {
4439                            error!("Failed to send cgroup removal event: {}", e);
4440                        }
4441
4442                        // Find and remove the watch descriptor for this path
4443                        let wd_to_remove = wd_to_path.iter().find_map(|(wd, watched_path)| {
4444                            if watched_path == &path {
4445                                Some(wd.clone())
4446                            } else {
4447                                None
4448                            }
4449                        });
4450                        if let Some(wd) = wd_to_remove {
4451                            wd_to_path.remove(&wd);
4452                        }
4453                    }
4454                }
4455            }
4456        });
4457
4458        Ok(receiver)
4459    }
4460
4461    fn add_recursive_watches(
4462        inotify: &mut Inotify,
4463        wd_to_path: &mut HashMap<inotify::WatchDescriptor, PathBuf>,
4464        path: &Path,
4465    ) -> Result<()> {
4466        for entry in WalkDir::new(path)
4467            .into_iter()
4468            .filter_map(|e| e.ok())
4469            .filter(|e| e.file_type().is_dir())
4470            .skip(1)
4471        {
4472            let entry_path = entry.path();
4473            // Add watch for this directory
4474            match inotify
4475                .watches()
4476                .add(entry_path, WatchMask::CREATE | WatchMask::DELETE)
4477            {
4478                Ok(wd) => {
4479                    wd_to_path.insert(wd, entry_path.to_path_buf());
4480                }
4481                Err(e) => {
4482                    debug!("Failed to add watch for {}: {}", entry_path.display(), e);
4483                }
4484            }
4485        }
4486        Ok(())
4487    }
4488
4489    fn run(&mut self, shutdown: Arc<AtomicBool>) -> Result<UserExitInfo> {
4490        let (res_ch, req_ch) = self.stats_server.channels();
4491        let mut next_sched_at = Instant::now() + self.sched_intv;
4492        let enable_layer_refresh = !self.layer_refresh_intv.is_zero();
4493        let mut next_layer_refresh_at = Instant::now() + self.layer_refresh_intv;
4494        let mut cpus_ranges = HashMap::<ThreadId, Vec<(usize, usize)>>::new();
4495
4496        // Start the cgroup watcher only if there are CgroupRegex rules
4497        let cgroup_regexes = self.cgroup_regexes.take().unwrap();
4498        let cgroup_event_rx = if !cgroup_regexes.is_empty() {
4499            Some(Self::start_cgroup_watcher(
4500                shutdown.clone(),
4501                cgroup_regexes,
4502            )?)
4503        } else {
4504            None
4505        };
4506
4507        while !shutdown.load(Ordering::Relaxed) && !uei_exited!(&self.skel, uei) {
4508            let now = Instant::now();
4509
4510            if now >= next_sched_at {
4511                self.step()?;
4512                while next_sched_at < now {
4513                    next_sched_at += self.sched_intv;
4514                }
4515            }
4516
4517            if enable_layer_refresh && now >= next_layer_refresh_at {
4518                self.skel
4519                    .maps
4520                    .bss_data
4521                    .as_mut()
4522                    .unwrap()
4523                    .layer_refresh_seq_avgruntime += 1;
4524                while next_layer_refresh_at < now {
4525                    next_layer_refresh_at += self.layer_refresh_intv;
4526                }
4527            }
4528
4529            // Handle both stats requests and cgroup events with timeout
4530            let timeout_duration = next_sched_at.saturating_duration_since(Instant::now());
4531            let never_rx = crossbeam::channel::never();
4532            let cgroup_rx = cgroup_event_rx.as_ref().unwrap_or(&never_rx);
4533
4534            select! {
4535                recv(req_ch) -> msg => match msg {
4536                    Ok(StatsReq::Hello(tid)) => {
4537                        cpus_ranges.insert(
4538                            tid,
4539                            self.layers.iter().map(|l| (l.nr_cpus, l.nr_cpus)).collect(),
4540                        );
4541                        let stats =
4542                            Stats::new(&mut self.skel, &self.proc_reader, self.topo.clone(), &self.gpu_task_handler, self.sched_stats.util_compensation)?;
4543                        res_ch.send(StatsRes::Hello(Box::new(stats)))?;
4544                    }
4545                    Ok(StatsReq::Refresh(tid, mut stats)) => {
4546                        // Propagate self's layer cpu ranges into each stat's.
4547                        for i in 0..self.nr_layer_cpus_ranges.len() {
4548                            for (_, ranges) in cpus_ranges.iter_mut() {
4549                                ranges[i] = (
4550                                    ranges[i].0.min(self.nr_layer_cpus_ranges[i].0),
4551                                    ranges[i].1.max(self.nr_layer_cpus_ranges[i].1),
4552                                );
4553                            }
4554                            self.nr_layer_cpus_ranges[i] =
4555                                (self.layers[i].nr_cpus, self.layers[i].nr_cpus);
4556                        }
4557
4558                        stats.refresh(
4559                            &mut self.skel,
4560                            &self.proc_reader,
4561                            now,
4562                            self.processing_dur,
4563                            &self.gpu_task_handler,
4564                        )?;
4565                        let sys_stats =
4566                            self.generate_sys_stats(&stats, cpus_ranges.get_mut(&tid).unwrap())?;
4567                        res_ch.send(StatsRes::Refreshed(Box::new((*stats, sys_stats))))?;
4568                    }
4569                    Ok(StatsReq::Bye(tid)) => {
4570                        cpus_ranges.remove(&tid);
4571                        res_ch.send(StatsRes::Bye)?;
4572                    }
4573                    Err(e) => Err(e)?,
4574                },
4575
4576                recv(cgroup_rx) -> event => match event {
4577                    Ok(CgroupEvent::Created { path, cgroup_id, match_bitmap }) => {
4578                        // Insert into BPF map
4579                        self.skel.maps.cgroup_match_bitmap.update(
4580                            &cgroup_id.to_ne_bytes(),
4581                            &match_bitmap.to_ne_bytes(),
4582                            libbpf_rs::MapFlags::ANY,
4583                        ).with_context(|| format!(
4584                            "Failed to insert cgroup {}({}) into BPF map. Cgroup map may be full \
4585                             (max 16384 entries). Aborting.",
4586                            cgroup_id, path
4587                        ))?;
4588
4589                        debug!("Added cgroup {} to BPF map with bitmap 0x{:x}", cgroup_id, match_bitmap);
4590                    }
4591                    Ok(CgroupEvent::Removed { path, cgroup_id }) => {
4592                        // Delete from BPF map
4593                        if let Err(e) = self.skel.maps.cgroup_match_bitmap.delete(&cgroup_id.to_ne_bytes()) {
4594                            warn!("Failed to delete cgroup {} from BPF map: {}", cgroup_id, e);
4595                        } else {
4596                            debug!("Removed cgroup {}({}) from BPF map", cgroup_id, path);
4597                        }
4598                    }
4599                    Err(e) => {
4600                        error!("Error receiving cgroup event: {}", e);
4601                    }
4602                },
4603
4604                recv(crossbeam::channel::after(timeout_duration)) -> _ => {
4605                    // Timeout - continue main loop
4606                }
4607            }
4608        }
4609
4610        let _ = self.struct_ops.take();
4611        uei_report!(&self.skel, uei)
4612    }
4613}
4614
4615impl Drop for Scheduler<'_> {
4616    fn drop(&mut self) {
4617        info!("Unregister {SCHEDULER_NAME} scheduler");
4618
4619        if !self.netdevs.is_empty() {
4620            for (iface, netdev) in &self.netdevs {
4621                if let Err(e) = netdev.restore_cpumasks() {
4622                    warn!("Failed to restore {iface} IRQ affinity: {e}");
4623                }
4624            }
4625            info!("Restored original netdev IRQ affinity");
4626        }
4627
4628        if let Some(struct_ops) = self.struct_ops.take() {
4629            drop(struct_ops);
4630        }
4631    }
4632}
4633
4634fn write_example_file(path: &str) -> Result<()> {
4635    let mut f = fs::OpenOptions::new()
4636        .create_new(true)
4637        .write(true)
4638        .open(path)?;
4639    Ok(f.write_all(serde_json::to_string_pretty(&*EXAMPLE_CONFIG)?.as_bytes())?)
4640}
4641
4642struct HintLayerInfo {
4643    layer_id: usize,
4644    system_cpu_util_below: Option<f64>,
4645    dsq_insert_below: Option<f64>,
4646}
4647
4648fn verify_layer_specs(specs: &[LayerSpec]) -> Result<HashMap<u64, HintLayerInfo>> {
4649    let mut hint_to_layer_map = HashMap::<u64, (usize, String, Option<f64>, Option<f64>)>::new();
4650
4651    let nr_specs = specs.len();
4652    if nr_specs == 0 {
4653        bail!("No layer spec");
4654    }
4655    if nr_specs > MAX_LAYERS {
4656        bail!("Too many layer specs");
4657    }
4658
4659    for (idx, spec) in specs.iter().enumerate() {
4660        if idx < nr_specs - 1 {
4661            if spec.matches.is_empty() {
4662                bail!("Non-terminal spec {:?} has NULL matches", spec.name);
4663            }
4664        } else if spec.matches.len() != 1 || !spec.matches[0].is_empty() {
4665            bail!("Terminal spec {:?} must have an empty match", spec.name);
4666        }
4667
4668        if spec.matches.len() > MAX_LAYER_MATCH_ORS {
4669            bail!(
4670                "Spec {:?} has too many ({}) OR match blocks",
4671                spec.name,
4672                spec.matches.len()
4673            );
4674        }
4675
4676        for (ands_idx, ands) in spec.matches.iter().enumerate() {
4677            if ands.len() > NR_LAYER_MATCH_KINDS {
4678                bail!(
4679                    "Spec {:?}'s {}th OR block has too many ({}) match conditions",
4680                    spec.name,
4681                    ands_idx,
4682                    ands.len()
4683                );
4684            }
4685            let mut hint_equals_cnt = 0;
4686            let mut system_cpu_util_below_cnt = 0;
4687            let mut dsq_insert_below_cnt = 0;
4688            let mut hint_value: Option<u64> = None;
4689            let mut system_cpu_util_threshold: Option<f64> = None;
4690            let mut dsq_insert_threshold: Option<f64> = None;
4691            for one in ands.iter() {
4692                match one {
4693                    LayerMatch::CgroupPrefix(prefix) => {
4694                        if prefix.len() > MAX_PATH {
4695                            bail!("Spec {:?} has too long a cgroup prefix", spec.name);
4696                        }
4697                    }
4698                    LayerMatch::CgroupSuffix(suffix) => {
4699                        if suffix.len() > MAX_PATH {
4700                            bail!("Spec {:?} has too long a cgroup suffix", spec.name);
4701                        }
4702                    }
4703                    LayerMatch::CgroupContains(substr) => {
4704                        if substr.len() > MAX_PATH {
4705                            bail!("Spec {:?} has too long a cgroup substr", spec.name);
4706                        }
4707                    }
4708                    LayerMatch::CommPrefix(prefix) => {
4709                        if prefix.len() > MAX_COMM {
4710                            bail!("Spec {:?} has too long a comm prefix", spec.name);
4711                        }
4712                    }
4713                    LayerMatch::PcommPrefix(prefix) => {
4714                        if prefix.len() > MAX_COMM {
4715                            bail!("Spec {:?} has too long a process name prefix", spec.name);
4716                        }
4717                    }
4718                    LayerMatch::SystemCpuUtilBelow(threshold) => {
4719                        if *threshold < 0.0 || *threshold > 1.0 {
4720                            bail!(
4721                                "Spec {:?} has SystemCpuUtilBelow threshold outside the range [0.0, 1.0]",
4722                                spec.name
4723                            );
4724                        }
4725                        system_cpu_util_threshold = Some(*threshold);
4726                        system_cpu_util_below_cnt += 1;
4727                    }
4728                    LayerMatch::DsqInsertBelow(threshold) => {
4729                        if *threshold < 0.0 || *threshold > 1.0 {
4730                            bail!(
4731                                "Spec {:?} has DsqInsertBelow threshold outside the range [0.0, 1.0]",
4732                                spec.name
4733                            );
4734                        }
4735                        dsq_insert_threshold = Some(*threshold);
4736                        dsq_insert_below_cnt += 1;
4737                    }
4738                    LayerMatch::HintEquals(hint) => {
4739                        if *hint > 1024 {
4740                            bail!(
4741                                "Spec {:?} has hint value outside the range [0, 1024]",
4742                                spec.name
4743                            );
4744                        }
4745                        hint_value = Some(*hint);
4746                        hint_equals_cnt += 1;
4747                    }
4748                    _ => {}
4749                }
4750            }
4751            if hint_equals_cnt > 1 {
4752                bail!("Only 1 HintEquals match permitted per AND block");
4753            }
4754            let high_freq_matcher_cnt = system_cpu_util_below_cnt + dsq_insert_below_cnt;
4755            if high_freq_matcher_cnt > 0 {
4756                if hint_equals_cnt != 1 {
4757                    bail!("High-frequency matchers (SystemCpuUtilBelow, DsqInsertBelow) must be used with one HintEquals");
4758                }
4759                if system_cpu_util_below_cnt > 1 {
4760                    bail!("Only 1 SystemCpuUtilBelow match permitted per AND block");
4761                }
4762                if dsq_insert_below_cnt > 1 {
4763                    bail!("Only 1 DsqInsertBelow match permitted per AND block");
4764                }
4765                if ands.len() != hint_equals_cnt + system_cpu_util_below_cnt + dsq_insert_below_cnt
4766                {
4767                    bail!("High-frequency matchers must be used only with HintEquals (no other matchers)");
4768                }
4769            } else if hint_equals_cnt == 1 && ands.len() != 1 {
4770                bail!("HintEquals match cannot be in conjunction with other matches");
4771            }
4772
4773            // Insert hint into map if present
4774            if let Some(hint) = hint_value {
4775                if let Some((layer_id, name, _, _)) = hint_to_layer_map.get(&hint) {
4776                    if *layer_id != idx {
4777                        bail!(
4778                            "Spec {:?} has hint value ({}) that is already mapped to Spec {:?}",
4779                            spec.name,
4780                            hint,
4781                            name
4782                        );
4783                    }
4784                } else {
4785                    hint_to_layer_map.insert(
4786                        hint,
4787                        (
4788                            idx,
4789                            spec.name.clone(),
4790                            system_cpu_util_threshold,
4791                            dsq_insert_threshold,
4792                        ),
4793                    );
4794                }
4795            }
4796        }
4797
4798        match spec.kind {
4799            LayerKind::Confined {
4800                cpus_range,
4801                util_range,
4802                ..
4803            }
4804            | LayerKind::Grouped {
4805                cpus_range,
4806                util_range,
4807                ..
4808            } => {
4809                if let Some((cpus_min, cpus_max)) = cpus_range {
4810                    if cpus_min > cpus_max {
4811                        bail!(
4812                            "Spec {:?} has invalid cpus_range({}, {})",
4813                            spec.name,
4814                            cpus_min,
4815                            cpus_max
4816                        );
4817                    }
4818                }
4819                if util_range.0 >= util_range.1 {
4820                    bail!(
4821                        "Spec {:?} has invalid util_range ({}, {})",
4822                        spec.name,
4823                        util_range.0,
4824                        util_range.1
4825                    );
4826                }
4827            }
4828            _ => {}
4829        }
4830    }
4831
4832    Ok(hint_to_layer_map
4833        .into_iter()
4834        .map(|(k, v)| {
4835            (
4836                k,
4837                HintLayerInfo {
4838                    layer_id: v.0,
4839                    system_cpu_util_below: v.2,
4840                    dsq_insert_below: v.3,
4841                },
4842            )
4843        })
4844        .collect())
4845}
4846
4847fn name_suffix(cgroup: &str, len: usize) -> String {
4848    let suffixlen = std::cmp::min(len, cgroup.len());
4849    let suffixrev: String = cgroup.chars().rev().take(suffixlen).collect();
4850
4851    suffixrev.chars().rev().collect()
4852}
4853
4854fn traverse_sysfs(dir: &Path) -> Result<Vec<PathBuf>> {
4855    let mut paths = vec![];
4856
4857    if !dir.is_dir() {
4858        panic!("path {:?} does not correspond to directory", dir);
4859    }
4860
4861    let direntries = fs::read_dir(dir)?;
4862
4863    for entry in direntries {
4864        let path = entry?.path();
4865        if path.is_dir() {
4866            paths.append(&mut traverse_sysfs(&path)?);
4867            paths.push(path);
4868        }
4869    }
4870
4871    Ok(paths)
4872}
4873
4874fn find_cpumask(cgroup: &str) -> Cpumask {
4875    let mut path = String::from(cgroup);
4876    path.push_str("/cpuset.cpus.effective");
4877
4878    let description = fs::read_to_string(&mut path).unwrap();
4879
4880    Cpumask::from_cpulist(&description).unwrap()
4881}
4882
4883fn expand_template(rule: &LayerMatch) -> Result<Vec<(LayerMatch, Cpumask)>> {
4884    match rule {
4885        LayerMatch::CgroupSuffix(suffix) => Ok(traverse_sysfs(Path::new("/sys/fs/cgroup"))?
4886            .into_iter()
4887            .map(|cgroup| String::from(cgroup.to_str().expect("could not parse cgroup path")))
4888            .filter(|cgroup| cgroup.ends_with(suffix))
4889            .map(|cgroup| {
4890                (
4891                    {
4892                        let mut slashterminated = cgroup.clone();
4893                        slashterminated.push('/');
4894                        LayerMatch::CgroupSuffix(name_suffix(&slashterminated, 64))
4895                    },
4896                    find_cpumask(&cgroup),
4897                )
4898            })
4899            .collect()),
4900        LayerMatch::CgroupRegex(expr) => Ok(traverse_sysfs(Path::new("/sys/fs/cgroup"))?
4901            .into_iter()
4902            .map(|cgroup| String::from(cgroup.to_str().expect("could not parse cgroup path")))
4903            .filter(|cgroup| {
4904                let re = Regex::new(expr).unwrap();
4905                re.is_match(cgroup)
4906            })
4907            .map(|cgroup| {
4908                (
4909                    // Here we convert the regex match into a suffix match because we still need to
4910                    // do the matching on the bpf side and doing a regex match in bpf isn't
4911                    // easily done.
4912                    {
4913                        let mut slashterminated = cgroup.clone();
4914                        slashterminated.push('/');
4915                        LayerMatch::CgroupSuffix(name_suffix(&slashterminated, 64))
4916                    },
4917                    find_cpumask(&cgroup),
4918                )
4919            })
4920            .collect()),
4921        _ => panic!("Unimplemented template enum {:?}", rule),
4922    }
4923}
4924
4925fn create_perf_fds(skel: &mut BpfSkel, event: u64) -> Result<()> {
4926    let mut attr = perf::bindings::perf_event_attr {
4927        size: std::mem::size_of::<perf::bindings::perf_event_attr>() as u32,
4928        type_: perf::bindings::PERF_TYPE_RAW,
4929        config: event,
4930        sample_type: 0u64,
4931        ..Default::default()
4932    };
4933    attr.__bindgen_anon_1.sample_period = 0u64;
4934    attr.set_disabled(0);
4935
4936    let perf_events_map = &skel.maps.scx_pmu_map;
4937    let map_fd = unsafe { libbpf_sys::bpf_map__fd(perf_events_map.as_libbpf_object().as_ptr()) };
4938
4939    let mut failures = 0u64;
4940
4941    for cpu in 0..*NR_CPUS_POSSIBLE {
4942        let fd = unsafe { perf::perf_event_open(&mut attr as *mut _, -1, cpu as i32, -1, 0) };
4943        if fd < 0 {
4944            failures += 1;
4945            trace!(
4946                "perf_event_open failed cpu={cpu} errno={}",
4947                std::io::Error::last_os_error()
4948            );
4949            continue;
4950        }
4951
4952        let key = cpu as u32;
4953        let val = fd as u32;
4954        let ret = unsafe {
4955            libbpf_sys::bpf_map_update_elem(
4956                map_fd,
4957                &key as *const _ as *const _,
4958                &val as *const _ as *const _,
4959                0,
4960            )
4961        };
4962        if ret != 0 {
4963            trace!("bpf_map_update_elem failed cpu={cpu} fd={fd} ret={ret}");
4964        } else {
4965            trace!("mapped cpu={cpu} -> fd={fd}");
4966        }
4967    }
4968
4969    if failures > 0 {
4970        println!("membw tracking: failed to install {failures} counters");
4971        // Keep going, do not fail the scheduler for this
4972    }
4973
4974    Ok(())
4975}
4976
4977// Set up the counters
4978fn setup_membw_tracking(skel: &mut OpenBpfSkel) -> Result<u64> {
4979    let pmumanager = PMUManager::new()?;
4980    let codename = &pmumanager.codename as &str;
4981
4982    let pmuspec = match codename {
4983        "amdzen1" | "amdzen2" | "amdzen3" => {
4984            trace!("found AMD codename {codename}");
4985            pmumanager.pmus.get("ls_any_fills_from_sys.mem_io_local")
4986        }
4987        "amdzen4" | "amdzen5" => {
4988            trace!("found AMD codename {codename}");
4989            pmumanager.pmus.get("ls_any_fills_from_sys.dram_io_all")
4990        }
4991
4992        "haswell" | "broadwell" | "broadwellde" | "broadwellx" | "skylake" | "skylakex"
4993        | "cascadelakex" | "arrowlake" | "meteorlake" | "sapphirerapids" | "emeraldrapids"
4994        | "graniterapids" => {
4995            trace!("found Intel codename {codename}");
4996            pmumanager.pmus.get("LONGEST_LAT_CACHE.MISS")
4997        }
4998
4999        _ => {
5000            trace!("found unknown codename {codename}");
5001            None
5002        }
5003    };
5004
5005    let spec = pmuspec.ok_or("not_found").unwrap();
5006    let config = (spec.umask << 8) | spec.event[0];
5007
5008    // Install the counter in the BPF map
5009    skel.maps.rodata_data.as_mut().unwrap().membw_event = config;
5010
5011    Ok(config)
5012}
5013
5014#[clap_main::clap_main]
5015fn main(opts: Opts) -> Result<()> {
5016    if opts.version {
5017        println!(
5018            "scx_layered {}",
5019            build_id::full_version(env!("CARGO_PKG_VERSION"))
5020        );
5021        return Ok(());
5022    }
5023
5024    if opts.help_stats {
5025        stats::server_data().describe_meta(&mut std::io::stdout(), None)?;
5026        return Ok(());
5027    }
5028
5029    let env_filter = EnvFilter::try_from_default_env()
5030        .or_else(|_| match EnvFilter::try_new(&opts.log_level) {
5031            Ok(filter) => Ok(filter),
5032            Err(e) => {
5033                eprintln!(
5034                    "invalid log envvar: {}, using info, err is: {}",
5035                    opts.log_level, e
5036                );
5037                EnvFilter::try_new("info")
5038            }
5039        })
5040        .unwrap_or_else(|_| EnvFilter::new("info"));
5041
5042    match tracing_subscriber::fmt()
5043        .with_env_filter(env_filter)
5044        .with_target(true)
5045        .with_thread_ids(true)
5046        .with_file(true)
5047        .with_line_number(true)
5048        .try_init()
5049    {
5050        Ok(()) => {}
5051        Err(e) => eprintln!("failed to init logger: {}", e),
5052    }
5053
5054    if opts.verbose > 0 {
5055        warn!("Setting verbose via -v is deprecated and will be an error in future releases.");
5056    }
5057
5058    if opts.no_load_frac_limit {
5059        warn!("--no-load-frac-limit is deprecated and noop");
5060    }
5061    if opts.layer_preempt_weight_disable != 0.0 {
5062        warn!("--layer-preempt-weight-disable is deprecated and noop");
5063    }
5064    if opts.layer_growth_weight_disable != 0.0 {
5065        warn!("--layer-growth-weight-disable is deprecated and noop");
5066    }
5067    if opts.local_llc_iteration {
5068        warn!("--local_llc_iteration is deprecated and noop");
5069    }
5070
5071    debug!("opts={:?}", &opts);
5072
5073    if let Some(run_id) = opts.run_id {
5074        info!("scx_layered run_id: {}", run_id);
5075    }
5076
5077    let shutdown = Arc::new(AtomicBool::new(false));
5078    let shutdown_clone = shutdown.clone();
5079    ctrlc::set_handler(move || {
5080        shutdown_clone.store(true, Ordering::Relaxed);
5081    })
5082    .context("Error setting Ctrl-C handler")?;
5083
5084    if let Some(intv) = opts.monitor.or(opts.stats) {
5085        let shutdown_copy = shutdown.clone();
5086        let stats_columns = opts.stats_columns;
5087        let stats_no_llc = opts.stats_no_llc;
5088        let jh = std::thread::spawn(move || {
5089            match stats::monitor(
5090                Duration::from_secs_f64(intv),
5091                shutdown_copy,
5092                stats_columns,
5093                stats_no_llc,
5094            ) {
5095                Ok(_) => {
5096                    debug!("stats monitor thread finished successfully")
5097                }
5098                Err(error_object) => {
5099                    warn!(
5100                        "stats monitor thread finished because of an error {}",
5101                        error_object
5102                    )
5103                }
5104            }
5105        });
5106        if opts.monitor.is_some() {
5107            let _ = jh.join();
5108            return Ok(());
5109        }
5110    }
5111
5112    if let Some(path) = &opts.example {
5113        write_example_file(path)?;
5114        return Ok(());
5115    }
5116
5117    let mut layer_config = match opts.run_example {
5118        true => EXAMPLE_CONFIG.clone(),
5119        false => LayerConfig { specs: vec![] },
5120    };
5121
5122    for (idx, input) in opts.specs.iter().enumerate() {
5123        let specs = LayerSpec::parse(input)
5124            .context(format!("Failed to parse specs[{}] ({:?})", idx, input))?;
5125
5126        for spec in specs {
5127            match spec.template {
5128                Some(ref rule) => {
5129                    let matches = expand_template(rule)?;
5130                    // in the absence of matching cgroups, have template layers
5131                    // behave as non-template layers do.
5132                    if matches.is_empty() {
5133                        layer_config.specs.push(spec);
5134                    } else {
5135                        for (mt, mask) in matches {
5136                            let mut genspec = spec.clone();
5137
5138                            genspec.cpuset = Some(mask);
5139
5140                            // Push the new "and" rule into each "or" term.
5141                            for orterm in &mut genspec.matches {
5142                                orterm.push(mt.clone());
5143                            }
5144
5145                            match &mt {
5146                                LayerMatch::CgroupSuffix(cgroup) => genspec.name.push_str(cgroup),
5147                                _ => bail!("Template match has unexpected type"),
5148                            }
5149
5150                            // Push the generated layer into the config
5151                            layer_config.specs.push(genspec);
5152                        }
5153                    }
5154                }
5155
5156                None => {
5157                    layer_config.specs.push(spec);
5158                }
5159            }
5160        }
5161    }
5162
5163    for spec in layer_config.specs.iter_mut() {
5164        let common = spec.kind.common_mut();
5165
5166        if common.slice_us == 0 {
5167            common.slice_us = opts.slice_us;
5168        }
5169
5170        if common.weight == 0 {
5171            common.weight = DEFAULT_LAYER_WEIGHT;
5172        }
5173        common.weight = common.weight.clamp(MIN_LAYER_WEIGHT, MAX_LAYER_WEIGHT);
5174
5175        if common.preempt {
5176            if common.disallow_open_after_us.is_some() {
5177                warn!(
5178                    "Preempt layer {} has non-null disallow_open_after_us, ignored",
5179                    &spec.name
5180                );
5181            }
5182            if common.disallow_preempt_after_us.is_some() {
5183                warn!(
5184                    "Preempt layer {} has non-null disallow_preempt_after_us, ignored",
5185                    &spec.name
5186                );
5187            }
5188            common.disallow_open_after_us = Some(u64::MAX);
5189            common.disallow_preempt_after_us = Some(u64::MAX);
5190        } else {
5191            if common.disallow_open_after_us.is_none() {
5192                common.disallow_open_after_us = Some(*DFL_DISALLOW_OPEN_AFTER_US);
5193            }
5194
5195            if common.disallow_preempt_after_us.is_none() {
5196                common.disallow_preempt_after_us = Some(*DFL_DISALLOW_PREEMPT_AFTER_US);
5197            }
5198        }
5199
5200        if common.idle_smt.is_some() {
5201            warn!("Layer {} has deprecated flag \"idle_smt\"", &spec.name);
5202        }
5203
5204        if common.allow_node_aligned.is_some() {
5205            warn!("Layer {} has deprecated flag \"allow_node_aligned\", node-aligned tasks are now always dispatched on layer DSQs", &spec.name);
5206        }
5207    }
5208
5209    let membw_required = layer_config.specs.iter().any(|spec| match spec.kind {
5210        LayerKind::Confined { membw_gb, .. } | LayerKind::Grouped { membw_gb, .. } => {
5211            membw_gb.is_some()
5212        }
5213        LayerKind::Open { .. } => false,
5214    });
5215
5216    if opts.print_and_exit {
5217        println!("specs={}", serde_json::to_string_pretty(&layer_config)?);
5218        return Ok(());
5219    }
5220
5221    debug!("specs={}", serde_json::to_string_pretty(&layer_config)?);
5222    let hint_to_layer_map = verify_layer_specs(&layer_config.specs)?;
5223
5224    let mut open_object = MaybeUninit::uninit();
5225    loop {
5226        let mut sched = Scheduler::init(
5227            &opts,
5228            &layer_config.specs,
5229            &mut open_object,
5230            &hint_to_layer_map,
5231            membw_required,
5232        )?;
5233        if !sched.run(shutdown.clone())?.should_restart() {
5234            break;
5235        }
5236    }
5237
5238    Ok(())
5239}
5240
5241#[cfg(test)]
5242mod peak_util_tests {
5243    use super::*;
5244
5245    #[test]
5246    fn test_peak_pins_recent_burst() {
5247        let elapsed = Duration::from_millis(100);
5248        let half_life = Duration::from_secs(1);
5249        let mut peak = 0.0;
5250
5251        for _ in 0..5 {
5252            peak = update_peak_util(peak, 0.1, half_life, elapsed);
5253        }
5254
5255        peak = update_peak_util(peak, 2.0, half_life, elapsed);
5256
5257        for _ in 0..5 {
5258            peak = update_peak_util(peak, 0.1, half_life, elapsed);
5259        }
5260
5261        assert!(
5262            peak > 1.3,
5263            "peak should still hold the recent burst, got {peak}"
5264        );
5265    }
5266
5267    #[test]
5268    fn test_peak_decays_after_quiet_period() {
5269        let elapsed = Duration::from_millis(100);
5270        let half_life = Duration::from_secs(1);
5271        let mut peak = 2.0;
5272
5273        for _ in 0..50 {
5274            peak = update_peak_util(peak, 0.0, half_life, elapsed);
5275        }
5276
5277        assert!(
5278            peak < 0.1,
5279            "peak should decay after a long quiet period, got {peak}"
5280        );
5281    }
5282
5283    #[test]
5284    fn test_peak_matches_steady_input() {
5285        let elapsed = Duration::from_millis(100);
5286        let half_life = Duration::from_secs(1);
5287        let mut peak = 0.0;
5288
5289        for _ in 0..30 {
5290            peak = update_peak_util(peak, 1.0, half_life, elapsed);
5291        }
5292
5293        assert!(
5294            (peak - 1.0).abs() < 1e-9,
5295            "peak should match steady-state input, got {peak}"
5296        );
5297    }
5298
5299    #[test]
5300    fn test_peak_only_changes_shrink_target() {
5301        let util_range = (0.7, 0.9);
5302        let (low_no_peak, high_no_peak) = calc_peak_aware_target_range(1.8, 1.8, util_range);
5303        let (low_with_peak, high_with_peak) = calc_peak_aware_target_range(1.8, 3.1, util_range);
5304
5305        assert_eq!(low_no_peak, 2);
5306        assert_eq!(
5307            low_with_peak, low_no_peak,
5308            "peak should not affect growth target"
5309        );
5310        assert_eq!(high_no_peak, 2);
5311        assert_eq!(
5312            high_with_peak, 4,
5313            "peak should only widen the shrink boundary"
5314        );
5315    }
5316}
5317
5318#[cfg(test)]
5319mod xnuma_tests {
5320    use super::*;
5321
5322    // Default thresholds for tests
5323    const THRESH: (f64, f64) = (0.6, 0.7);
5324    const DELTA: (f64, f64) = (0.2, 0.3);
5325
5326    // =====================================================================
5327    // xnuma_check_active tests — per-(layer, node) two-threshold
5328    // =====================================================================
5329
5330    #[test]
5331    fn test_activation_below_threshold() {
5332        // Both nodes below threshold — both closed
5333        let duty = vec![40.0, 40.0];
5334        let allocs = vec![96, 96];
5335        let gd = vec![true, true];
5336        let cur = vec![false, false];
5337        let result = xnuma_check_active(&duty, &allocs, THRESH, DELTA, &gd, &cur);
5338        // ratio = 0.42 < 0.6 (low) → deactivate
5339        assert!(!result[0]);
5340        assert!(!result[1]);
5341    }
5342
5343    #[test]
5344    fn test_activation_above_all_thresholds() {
5345        // N0 overloaded + imbalanced + growth denied → open
5346        let duty = vec![90.0, 20.0];
5347        let allocs = vec![96, 96];
5348        let gd = vec![true, false];
5349        let cur = vec![false, false];
5350        let result = xnuma_check_active(&duty, &allocs, THRESH, DELTA, &gd, &cur);
5351        // N0: load=0.94>0.7, eq=110/192=0.573, surplus=90-55=35, ratio=0.365>0.3, gd=true → open
5352        assert!(result[0]);
5353        // N1: load=0.21<0.6 → closed
5354        assert!(!result[1]);
5355    }
5356
5357    #[test]
5358    fn test_activation_requires_growth_denied() {
5359        // N0 overloaded + imbalanced but growth NOT denied → closed
5360        let duty = vec![90.0, 20.0];
5361        let allocs = vec![96, 96];
5362        let gd = vec![false, false]; // growth succeeded
5363        let cur = vec![false, false];
5364        let result = xnuma_check_active(&duty, &allocs, THRESH, DELTA, &gd, &cur);
5365        assert!(!result[0]); // !growth_denied → deactivate
5366    }
5367
5368    #[test]
5369    fn test_symmetric_high_load_stays_closed() {
5370        // Both nodes above threshold but balanced (delta=0) → closed
5371        let duty = vec![80.0, 80.0];
5372        let allocs = vec![96, 96];
5373        let gd = vec![true, true];
5374        let cur = vec![false, false];
5375        let result = xnuma_check_active(&duty, &allocs, THRESH, DELTA, &gd, &cur);
5376        // load=0.83>0.7 but surplus=0, surplus_ratio=0 < 0.3 → no activation
5377        assert!(!result[0]);
5378        assert!(!result[1]);
5379    }
5380
5381    #[test]
5382    fn test_hysteresis_stays_active() {
5383        // N0 was active, now between thresholds → stays active
5384        let duty = vec![75.0, 20.0];
5385        let allocs = vec![96, 96];
5386        let gd = vec![true, false];
5387        let cur = vec![true, false]; // N0 was active
5388        let result = xnuma_check_active(&duty, &allocs, THRESH, DELTA, &gd, &cur);
5389        // N0: load=0.78 > 0.6(lo) and < 0.7(hi), surplus=(75-20)/2=27.5, ratio=0.286 > 0.2(lo)
5390        // gd=true. activate: 0.78<0.7 NO. deactivate: 0.78>0.6 NO, 0.286>0.2 NO, gd=true NO
5391        // → hysteresis preserves true
5392        assert!(result[0]);
5393    }
5394
5395    #[test]
5396    fn test_hysteresis_stays_inactive() {
5397        // N0 was inactive, in hysteresis band → stays inactive
5398        let duty = vec![75.0, 20.0];
5399        let allocs = vec![96, 96];
5400        let gd = vec![true, false];
5401        let cur = vec![false, false]; // N0 was inactive
5402        let result = xnuma_check_active(&duty, &allocs, THRESH, DELTA, &gd, &cur);
5403        // Same conditions but currently inactive → stays inactive
5404        assert!(!result[0]);
5405    }
5406
5407    #[test]
5408    fn test_deactivation_load_drops() {
5409        // N0 was active, load drops below low → closes
5410        let duty = vec![50.0, 50.0];
5411        let allocs = vec![96, 96];
5412        let gd = vec![true, true];
5413        let cur = vec![true, false];
5414        let result = xnuma_check_active(&duty, &allocs, THRESH, DELTA, &gd, &cur);
5415        // N0: load=0.52 < 0.6(lo) → deactivate
5416        assert!(!result[0]);
5417    }
5418
5419    #[test]
5420    fn test_deactivation_growth_succeeds() {
5421        // N0 was active, growth now succeeds → closes
5422        let duty = vec![90.0, 20.0];
5423        let allocs = vec![96, 96];
5424        let gd = vec![false, false]; // growth succeeded
5425        let cur = vec![true, false];
5426        let result = xnuma_check_active(&duty, &allocs, THRESH, DELTA, &gd, &cur);
5427        // !growth_denied → deactivate regardless of load/delta
5428        assert!(!result[0]);
5429    }
5430
5431    #[test]
5432    fn test_zero_alloc_with_duty_and_growth_denied() {
5433        // Zero alloc but duty > 0 and growth denied → open
5434        let duty = vec![50.0, 0.0];
5435        let allocs = vec![0, 96];
5436        let gd = vec![true, false];
5437        let cur = vec![false, false];
5438        let result = xnuma_check_active(&duty, &allocs, THRESH, DELTA, &gd, &cur);
5439        assert!(result[0]); // zero alloc + duty + gd → source
5440    }
5441
5442    #[test]
5443    fn test_all_zero_alloc() {
5444        let duty = vec![0.0, 0.0];
5445        let allocs = vec![0, 0];
5446        let gd = vec![true, true];
5447        let cur = vec![true, true];
5448        let result = xnuma_check_active(&duty, &allocs, THRESH, DELTA, &gd, &cur);
5449        assert!(!result[0]);
5450        assert!(!result[1]);
5451    }
5452
5453    #[test]
5454    fn test_three_nodes_mixed() {
5455        // 3 nodes: N0 overloaded+imbalanced+gd, N1 moderate, N2 low
5456        let duty = vec![90.0, 40.0, 10.0];
5457        let allocs = vec![96, 96, 96];
5458        let gd = vec![true, true, false];
5459        let cur = vec![false, false, false];
5460        let result = xnuma_check_active(&duty, &allocs, THRESH, DELTA, &gd, &cur);
5461        // eq_ratio = 140/288 ≈ 0.486
5462        // N0: load=0.94>0.7, surplus=90-46.7=43.3, ratio=0.451>0.3, gd=true → open
5463        assert!(result[0]);
5464        // N1: load=0.42<0.6 → deactivate
5465        assert!(!result[1]);
5466        // N2: load=0.10<0.6 → deactivate
5467        assert!(!result[2]);
5468    }
5469
5470    #[test]
5471    fn test_per_node_independence() {
5472        // N0 active, N1 deactivates independently
5473        let duty = vec![90.0, 50.0];
5474        let allocs = vec![96, 96];
5475        let gd = vec![true, true];
5476        let cur = vec![true, true];
5477        let result = xnuma_check_active(&duty, &allocs, THRESH, DELTA, &gd, &cur);
5478        // N0: load=0.94>0.7, eq=140/192=0.729, surplus=90-70=20, ratio=0.208>0.2(lo)
5479        //   activate: 0.94>0.7 YES, 0.208>0.3 NO → no activate
5480        //   deactivate: 0.94>0.6 NO, 0.208>0.2 NO, gd=true NO → no deactivate → preserve true
5481        assert!(result[0]);
5482        // N1: load=0.52<0.6 → deactivate
5483        assert!(!result[1]);
5484    }
5485
5486    // =====================================================================
5487    // xnuma_compute_rates tests — basic
5488    // =====================================================================
5489
5490    #[test]
5491    fn test_rates_balanced_load() {
5492        // Equal load per CPU on both nodes — no migration needed
5493        let duty = vec![48.0, 48.0];
5494        let allocs = vec![96, 96];
5495        let result = xnuma_compute_rates(&duty, &allocs);
5496
5497        // No surplus/deficit → all rates zero
5498        for src in 0..2 {
5499            for dst in 0..2 {
5500                assert_eq!(result.rates[src][dst], 0);
5501            }
5502        }
5503    }
5504
5505    #[test]
5506    fn test_rates_one_overloaded() {
5507        // N0 has 80 CPUs of duty, N1 has 40. Both have 96 CPUs.
5508        // eq_ratio = 120/192 = 0.625
5509        // N0 expected = 0.625 * 96 = 60, surplus = 80 - 60 = 20
5510        // N1 expected = 0.625 * 96 = 60, deficit = 60 - 40 = 20
5511        let duty = vec![80.0, 40.0];
5512        let allocs = vec![96, 96];
5513        let result = xnuma_compute_rates(&duty, &allocs);
5514
5515        // rate[0][1] should be positive (migrate from N0 to N1)
5516        assert!(result.rates[0][1] > 0);
5517        // rate[1][0] should be 0 (N1 has no surplus)
5518        assert_eq!(result.rates[1][0], 0);
5519        // Self-rates always 0
5520        assert_eq!(result.rates[0][0], 0);
5521        assert_eq!(result.rates[1][1], 0);
5522
5523        // Verify the rate magnitude: migration = 20.0 * DAMPEN, scaled
5524        let expected_rate = (20.0 * XNUMA_RATE_DAMPEN * DUTY_CYCLE_SCALE) as u64;
5525        assert_eq!(result.rates[0][1], expected_rate);
5526    }
5527
5528    #[test]
5529    fn test_rates_asymmetric_allocation() {
5530        // N0 has 48 CPUs, N1 has 144 CPUs. Duty 48 each.
5531        // eq_ratio = 96/192 = 0.5
5532        // N0 expected = 0.5 * 48 = 24, surplus = 48 - 24 = 24
5533        // N1 expected = 0.5 * 144 = 72, deficit = 72 - 48 = 24
5534        let duty = vec![48.0, 48.0];
5535        let allocs = vec![48, 144];
5536        let result = xnuma_compute_rates(&duty, &allocs);
5537
5538        let expected_rate = (24.0 * XNUMA_RATE_DAMPEN * DUTY_CYCLE_SCALE) as u64;
5539        assert_eq!(result.rates[0][1], expected_rate);
5540        assert_eq!(result.rates[1][0], 0);
5541    }
5542
5543    // =====================================================================
5544    // xnuma_compute_rates tests — multi-node
5545    // =====================================================================
5546
5547    #[test]
5548    fn test_rates_three_nodes_one_source() {
5549        // N0 overloaded, N1 and N2 are deficit
5550        // allocs: 96 each, duty: N0=120, N1=30, N2=30
5551        // eq_ratio = 180/288 = 0.625
5552        // N0 expected = 60, surplus = 60
5553        // N1 expected = 60, deficit = 30
5554        // N2 expected = 60, deficit = 30
5555        let duty = vec![120.0, 30.0, 30.0];
5556        let allocs = vec![96, 96, 96];
5557        let result = xnuma_compute_rates(&duty, &allocs);
5558
5559        // Total deficit = 60. N1 gets 30/60 = 50%, N2 gets 30/60 = 50%
5560        // rate[0][1] = 60 * 30/60 * DAMPEN = 15
5561        // rate[0][2] = 60 * 30/60 * DAMPEN = 15
5562        let rate_01 = (30.0 * XNUMA_RATE_DAMPEN * DUTY_CYCLE_SCALE) as u64;
5563        let rate_02 = (30.0 * XNUMA_RATE_DAMPEN * DUTY_CYCLE_SCALE) as u64;
5564        assert_eq!(result.rates[0][1], rate_01);
5565        assert_eq!(result.rates[0][2], rate_02);
5566
5567        // No reverse flow
5568        assert_eq!(result.rates[1][0], 0);
5569        assert_eq!(result.rates[2][0], 0);
5570        assert_eq!(result.rates[1][2], 0);
5571        assert_eq!(result.rates[2][1], 0);
5572    }
5573
5574    #[test]
5575    fn test_rates_three_nodes_unequal_deficit() {
5576        // N0 overloaded. N1 slight deficit, N2 large deficit.
5577        // allocs: 96 each, duty: N0=120, N1=50, N2=10
5578        // eq_ratio = 180/288 = 0.625
5579        // N0: expected=60, surplus=60
5580        // N1: expected=60, deficit=10
5581        // N2: expected=60, deficit=50
5582        let duty = vec![120.0, 50.0, 10.0];
5583        let allocs = vec![96, 96, 96];
5584        let result = xnuma_compute_rates(&duty, &allocs);
5585
5586        // Total deficit = 60. N1 share = 10/60, N2 share = 50/60
5587        // rate[0][1] = 60 * 10/60 * DAMPEN = 5
5588        // rate[0][2] = 60 * 50/60 * DAMPEN = 25
5589        let rate_01 = (10.0 * XNUMA_RATE_DAMPEN * DUTY_CYCLE_SCALE) as u64;
5590        let rate_02 = (50.0 * XNUMA_RATE_DAMPEN * DUTY_CYCLE_SCALE) as u64;
5591        assert_eq!(result.rates[0][1], rate_01);
5592        assert_eq!(result.rates[0][2], rate_02);
5593    }
5594
5595    #[test]
5596    fn test_rates_two_sources_one_sink() {
5597        // N0 and N1 overloaded, N2 deficit
5598        // allocs: 96 each, duty: N0=80, N1=70, N2=30
5599        // total = 180, eq_ratio = 180/288 = 0.625
5600        // N0: expected=60, surplus=20
5601        // N1: expected=60, surplus=10
5602        // N2: expected=60, deficit=30
5603        let duty = vec![80.0, 70.0, 30.0];
5604        let allocs = vec![96, 96, 96];
5605        let result = xnuma_compute_rates(&duty, &allocs);
5606
5607        // Total deficit = 30. Only N2 is deficit, so deficit share = 1.0
5608        // rate[0][2] = 20 * 1.0 * DAMPEN = 10
5609        // rate[1][2] = 10 * 1.0 * DAMPEN = 5
5610        let rate_02 = (20.0 * XNUMA_RATE_DAMPEN * DUTY_CYCLE_SCALE) as u64;
5611        let rate_12 = (10.0 * XNUMA_RATE_DAMPEN * DUTY_CYCLE_SCALE) as u64;
5612        assert_eq!(result.rates[0][2], rate_02);
5613        assert_eq!(result.rates[1][2], rate_12);
5614
5615        // No flow between sources
5616        assert_eq!(result.rates[0][1], 0);
5617        assert_eq!(result.rates[1][0], 0);
5618    }
5619
5620    // =====================================================================
5621    // Conservation invariants
5622    // =====================================================================
5623
5624    #[test]
5625    fn test_conservation_per_source_outbound() {
5626        // Each source's total outbound should equal its surplus * DAMPEN (scaled).
5627        // Total outbound from src = surplus[src] * DAMPEN * DUTY_CYCLE_SCALE
5628        let duty = vec![100.0, 30.0, 50.0, 20.0];
5629        let allocs = vec![96, 96, 96, 96];
5630        let nr = 4;
5631        let result = xnuma_compute_rates(&duty, &allocs);
5632
5633        let total_duty: f64 = duty.iter().sum();
5634        let total_alloc: f64 = allocs.iter().map(|&a| a as f64).sum();
5635        let eq_ratio = total_duty / total_alloc;
5636
5637        for src in 0..nr {
5638            let expected = eq_ratio * allocs[src] as f64;
5639            let surplus = (duty[src] - expected).max(0.0);
5640            let total_outbound: u64 = (0..nr).map(|dst| result.rates[src][dst]).sum();
5641            let expected_rate = (surplus * XNUMA_RATE_DAMPEN * DUTY_CYCLE_SCALE) as u64;
5642            assert_eq!(
5643                total_outbound, expected_rate,
5644                "node {} outbound mismatch",
5645                src
5646            );
5647        }
5648    }
5649
5650    #[test]
5651    fn test_conservation_surplus_equals_deficit() {
5652        // Mathematical invariant: total surplus == total deficit in water-fill
5653        let duty = vec![100.0, 30.0, 50.0];
5654        let allocs = vec![96, 96, 96];
5655
5656        let total_duty: f64 = duty.iter().sum();
5657        let total_alloc: f64 = allocs.iter().map(|&a| a as f64).sum();
5658        let eq_ratio = total_duty / total_alloc;
5659
5660        let mut total_surplus = 0.0f64;
5661        let mut total_deficit = 0.0f64;
5662        for i in 0..3 {
5663            let expected = eq_ratio * allocs[i] as f64;
5664            let delta = duty[i] - expected;
5665            if delta > 0.0 {
5666                total_surplus += delta;
5667            } else {
5668                total_deficit += -delta;
5669            }
5670        }
5671        assert!((total_surplus - total_deficit).abs() < 1e-10);
5672    }
5673
5674    #[test]
5675    fn test_self_rates_always_zero() {
5676        let duty = vec![100.0, 30.0, 50.0];
5677        let allocs = vec![96, 96, 96];
5678        let result = xnuma_compute_rates(&duty, &allocs);
5679
5680        for nid in 0..3 {
5681            assert_eq!(result.rates[nid][nid], 0);
5682        }
5683    }
5684
5685    #[test]
5686    fn test_deficit_nodes_have_zero_outbound() {
5687        // Deficit nodes should have zero outbound rates
5688        let duty = vec![100.0, 20.0, 30.0, 50.0];
5689        let allocs = vec![96, 96, 96, 96];
5690        let result = xnuma_compute_rates(&duty, &allocs);
5691
5692        // eq_ratio = 200/384 ≈ 0.521. N0: surplus=50, N3: surplus=0
5693        // N1: deficit=30, N2: deficit=20
5694        // Deficit nodes (outbound sum == 0) should have all zero rates
5695        for nid in 0..4 {
5696            let outbound: u64 = (0..4).map(|dst| result.rates[nid][dst]).sum();
5697            if outbound == 0 {
5698                for dst in 0..4 {
5699                    assert_eq!(
5700                        result.rates[nid][dst], 0,
5701                        "deficit node {} has non-zero rate to {}",
5702                        nid, dst
5703                    );
5704                }
5705            }
5706        }
5707    }
5708
5709    // =====================================================================
5710    // Edge cases
5711    // =====================================================================
5712
5713    #[test]
5714    fn test_rates_zero_duty_everywhere() {
5715        let duty = vec![0.0, 0.0];
5716        let allocs = vec![96, 96];
5717        let result = xnuma_compute_rates(&duty, &allocs);
5718
5719        for src in 0..2 {
5720            for dst in 0..2 {
5721                assert_eq!(result.rates[src][dst], 0);
5722            }
5723        }
5724    }
5725
5726    #[test]
5727    fn test_rates_zero_alloc() {
5728        let duty = vec![50.0, 50.0];
5729        let allocs = vec![0, 0];
5730        let result = xnuma_compute_rates(&duty, &allocs);
5731
5732        // total_alloc = 0 → early return with all zeros
5733        for src in 0..2 {
5734            for dst in 0..2 {
5735                assert_eq!(result.rates[src][dst], 0);
5736            }
5737        }
5738    }
5739
5740    #[test]
5741    fn test_rates_all_load_one_node() {
5742        // All duty on N0, nothing on N1
5743        // allocs: 96 each, duty: N0=96, N1=0
5744        // eq_ratio = 96/192 = 0.5
5745        // N0: expected=48, surplus=48
5746        // N1: expected=48, deficit=48
5747        let duty = vec![96.0, 0.0];
5748        let allocs = vec![96, 96];
5749        let result = xnuma_compute_rates(&duty, &allocs);
5750
5751        let expected_rate = (48.0 * XNUMA_RATE_DAMPEN * DUTY_CYCLE_SCALE) as u64;
5752        assert_eq!(result.rates[0][1], expected_rate);
5753    }
5754
5755    #[test]
5756    fn test_rates_single_node() {
5757        // Single node — balanced by definition
5758        let duty = vec![96.0];
5759        let allocs = vec![96];
5760        let result = xnuma_compute_rates(&duty, &allocs);
5761
5762        assert_eq!(result.rates[0][0], 0);
5763    }
5764
5765    #[test]
5766    fn test_rates_one_node_zero_alloc() {
5767        // N0 has allocation, N1 has zero
5768        // eq_ratio = 80/96
5769        // N0: expected=80, surplus=0 → balanced
5770        // N1: expected=0, deficit=0 → zero alloc skipped effectively
5771        let duty = vec![80.0, 0.0];
5772        let allocs = vec![96, 0];
5773        let result = xnuma_compute_rates(&duty, &allocs);
5774
5775        // N0: surplus = 80 - (80/96 * 96) = 0
5776        // N1: deficit = (80/96 * 0) - 0 = 0
5777        // All zeros — nothing to migrate
5778        assert_eq!(result.rates[0][1], 0);
5779        assert_eq!(result.rates[1][0], 0);
5780    }
5781
5782    // =====================================================================
5783    // Rate magnitude and scaling
5784    // =====================================================================
5785
5786    #[test]
5787    fn test_rate_scaling() {
5788        // Verify rates are in DUTY_CYCLE_SCALE units
5789        let duty = vec![80.0, 40.0];
5790        let allocs = vec![96, 96];
5791        let result = xnuma_compute_rates(&duty, &allocs);
5792
5793        // surplus = 20, deficit = 20 → migration = 20 * DAMPEN = 10
5794        // rate = 10 * (1 << 20)
5795        let expected = (20.0 * XNUMA_RATE_DAMPEN * DUTY_CYCLE_SCALE) as u64;
5796        assert_eq!(result.rates[0][1], expected);
5797        assert_eq!(expected, 10 * (1 << 20));
5798    }
5799
5800    #[test]
5801    fn test_rates_tiny_imbalance() {
5802        // Very small imbalance — should still produce non-zero rate
5803        let duty = vec![48.001, 47.999];
5804        let allocs = vec![96, 96];
5805        let result = xnuma_compute_rates(&duty, &allocs);
5806
5807        // surplus ≈ 0.001, rate ≈ 0.001 * 2^20 ≈ 1048
5808        assert!(result.rates[0][1] > 0);
5809        assert!(result.rates[0][1] < (1 << 20)); // Less than 1.0 CPU worth
5810    }
5811
5812    #[test]
5813    fn test_rates_large_values() {
5814        // Large system: 8 nodes, 96 CPUs each
5815        let duty = vec![300.0, 100.0, 50.0, 50.0, 50.0, 50.0, 50.0, 50.0];
5816        let allocs = vec![96, 96, 96, 96, 96, 96, 96, 96];
5817        let result = xnuma_compute_rates(&duty, &allocs);
5818
5819        // eq_ratio = 700/768 ≈ 0.911. N0 surplus=212.5, N1 surplus=12.5
5820        // N2-N7 deficit=37.5 each. N0 and N1 have surplus.
5821        assert!(result.rates[0][2] > 0); // N0 → N2 (deficit node)
5822        assert!(result.rates[1][2] > 0); // N1 → N2 (N1 also has surplus)
5823
5824        // Verify conservation: per-source outbound ≈ surplus * DAMPEN (within
5825        // truncation tolerance — each `as u64` can lose up to 1 per cell)
5826        let total_duty: f64 = duty.iter().sum();
5827        let total_alloc: f64 = allocs.iter().map(|&a| a as f64).sum();
5828        let eq_ratio = total_duty / total_alloc;
5829        let nr = 8;
5830        for src in 0..nr {
5831            let surplus = (duty[src] - eq_ratio * allocs[src] as f64).max(0.0);
5832            let outbound: u64 = (0..nr).map(|dst| result.rates[src][dst]).sum();
5833            let expected = (surplus * XNUMA_RATE_DAMPEN * DUTY_CYCLE_SCALE) as u64;
5834            let tolerance = nr as u64; // up to 1 per destination cell
5835            assert!(
5836                outbound.abs_diff(expected) <= tolerance,
5837                "node {} outbound {} vs expected {}, diff {}",
5838                src,
5839                outbound,
5840                expected,
5841                outbound.abs_diff(expected)
5842            );
5843        }
5844    }
5845
5846    // =====================================================================
5847    // Hysteresis integration
5848    // =====================================================================
5849
5850    #[test]
5851    fn test_hysteresis_cycle() {
5852        let allocs = vec![96, 96];
5853        let gd = vec![true, false]; // N0 growth denied
5854
5855        // Start: all closed, low load
5856        let active =
5857            xnuma_check_active(&[40.0, 40.0], &allocs, THRESH, DELTA, &gd, &[false, false]);
5858        assert!(!active[0]); // load 0.42 < 0.6(lo) → deactivate
5859
5860        // N0 overloaded + imbalanced + gd → opens
5861        let active =
5862            xnuma_check_active(&[90.0, 20.0], &allocs, THRESH, DELTA, &gd, &[false, false]);
5863        assert!(active[0]); // load 0.94>0.7, surplus=35, ratio=0.365>0.3, gd=true
5864
5865        // Load decreases but in hysteresis band → stays open
5866        let active = xnuma_check_active(&[75.0, 20.0], &allocs, THRESH, DELTA, &gd, &[true, false]);
5867        assert!(active[0]); // load 0.78 between 0.6 and 0.7, surplus ok, gd → preserve
5868
5869        // Load drops below low threshold → closes
5870        let active = xnuma_check_active(&[50.0, 50.0], &allocs, THRESH, DELTA, &gd, &[true, false]);
5871        assert!(!active[0]); // load 0.52 < 0.6(lo) → deactivate
5872    }
5873
5874    #[test]
5875    fn test_hysteresis_growth_toggle() {
5876        // N0 was open, then growth succeeds → closes
5877        let allocs = vec![96, 96];
5878        let gd_denied = vec![true, false];
5879        let gd_ok = vec![false, false];
5880
5881        // Active with growth denied
5882        let active = xnuma_check_active(
5883            &[90.0, 20.0],
5884            &allocs,
5885            THRESH,
5886            DELTA,
5887            &gd_denied,
5888            &[false, false],
5889        );
5890        assert!(active[0]);
5891
5892        // Growth succeeds → !gd → deactivate
5893        let active = xnuma_check_active(
5894            &[90.0, 20.0],
5895            &allocs,
5896            THRESH,
5897            DELTA,
5898            &gd_ok,
5899            &[true, false],
5900        );
5901        assert!(!active[0]); // !growth_denied → deactivate
5902    }
5903
5904    // =====================================================================
5905    // Proportional distribution to multiple sinks
5906    // =====================================================================
5907
5908    #[test]
5909    fn test_proportional_sink_distribution() {
5910        // 4 nodes: N0 source, N1-N3 sinks with different deficits
5911        // allocs: 96 each, duty: N0=180, N1=20, N2=40, N3=0
5912        // total = 240, eq_ratio = 240/384 = 0.625
5913        // N0: expected=60, surplus=120
5914        // N1: expected=60, deficit=40
5915        // N2: expected=60, deficit=20
5916        // N3: expected=60, deficit=60
5917        // total_deficit = 120
5918        let duty = vec![180.0, 20.0, 40.0, 0.0];
5919        let allocs = vec![96, 96, 96, 96];
5920        let result = xnuma_compute_rates(&duty, &allocs);
5921
5922        // rate[0][1] = 120 * 40/120 * DAMPEN = 20
5923        // rate[0][2] = 120 * 20/120 * DAMPEN = 10
5924        // rate[0][3] = 120 * 60/120 * DAMPEN = 30
5925        assert_eq!(
5926            result.rates[0][1],
5927            (40.0 * XNUMA_RATE_DAMPEN * DUTY_CYCLE_SCALE) as u64
5928        );
5929        assert_eq!(
5930            result.rates[0][2],
5931            (20.0 * XNUMA_RATE_DAMPEN * DUTY_CYCLE_SCALE) as u64
5932        );
5933        assert_eq!(
5934            result.rates[0][3],
5935            (60.0 * XNUMA_RATE_DAMPEN * DUTY_CYCLE_SCALE) as u64
5936        );
5937
5938        // Total outbound from N0 = 20 + 10 + 30 = 60 (half of surplus, dampened)
5939        let total_from_n0: u64 = (0..4).map(|dst| result.rates[0][dst]).sum();
5940        assert_eq!(
5941            total_from_n0,
5942            (120.0 * XNUMA_RATE_DAMPEN * DUTY_CYCLE_SCALE) as u64
5943        );
5944    }
5945}
5946
5947#[cfg(test)]
5948mod util_compensation_tests {
5949    /// Compute the scale factor: s = Δt / (Δt - irq - softirq - stolen)
5950    fn compute_scale(delta_total: u64, overhead: u64) -> f64 {
5951        let available = delta_total.saturating_sub(overhead);
5952        if available > 0 {
5953            (delta_total as f64 / available as f64).clamp(1.0, 20.0)
5954        } else {
5955            1.0
5956        }
5957    }
5958
5959    /// Simulate the per-CPU-scaled aggregation that refresh() does.
5960    fn scaled_aggregate(
5961        deltas: &[Vec<u64>],
5962        scales: &[f64],
5963        nr_layers: usize,
5964        elapsed: f64,
5965    ) -> Vec<f64> {
5966        (0..nr_layers)
5967            .map(|layer| {
5968                let mut sum = 0.0f64;
5969                for (cpu, cpu_deltas) in deltas.iter().enumerate() {
5970                    sum += cpu_deltas[layer] as f64 * scales[cpu];
5971                }
5972                sum / 1_000_000_000.0 / elapsed
5973            })
5974            .collect()
5975    }
5976
5977    #[test]
5978    fn test_scale_no_overhead() {
5979        // No irq/softirq/stolen → scale = 1.0
5980        assert!((compute_scale(1000, 0) - 1.0).abs() < 0.01);
5981    }
5982
5983    #[test]
5984    fn test_scale_half_overhead() {
5985        // 50% overhead → s = 1000/500 = 2.0
5986        assert!((compute_scale(1000, 500) - 2.0).abs() < 0.01);
5987    }
5988
5989    #[test]
5990    fn test_scale_high_overhead() {
5991        // 90% overhead → s = 1000/100 = 10.0
5992        assert!((compute_scale(1000, 900) - 10.0).abs() < 0.01);
5993    }
5994
5995    #[test]
5996    fn test_scale_very_high_overhead() {
5997        // 95% overhead → s = 1000/50 = 20.0 (at clamp boundary)
5998        assert!((compute_scale(1000, 950) - 20.0).abs() < 0.01);
5999    }
6000
6001    #[test]
6002    fn test_scale_clamped_at_max() {
6003        // 98% overhead → ratio = 50.0, clamped to 20.0
6004        assert!((compute_scale(1000, 980) - 20.0).abs() < 0.01);
6005    }
6006
6007    #[test]
6008    fn test_scale_all_overhead() {
6009        // 100% overhead → available = 0, returns 1.0
6010        assert!((compute_scale(1000, 1000) - 1.0).abs() < 0.01);
6011    }
6012
6013    #[test]
6014    fn test_scale_idle_cpu() {
6015        // Idle CPU: delta_total = 0 → returns 1.0
6016        assert!((compute_scale(0, 0) - 1.0).abs() < 0.01);
6017    }
6018
6019    #[test]
6020    fn test_scale_small_overhead_mostly_idle() {
6021        // 1% softirq + 1% user + 98% idle = total 10000, overhead 100
6022        // s = 10000 / 9900 ≈ 1.01 — nearly 1.0, not pathological
6023        let s = compute_scale(10000, 100);
6024        assert!((s - 1.01).abs() < 0.01, "expected ~1.01, got {}", s);
6025    }
6026
6027    #[test]
6028    fn test_uniform_scale_matches_unscaled() {
6029        let deltas = vec![vec![1_000_000_000u64; 2]; 4];
6030        let scales = vec![1.0; 4];
6031        let result = scaled_aggregate(&deltas, &scales, 2, 1.0);
6032        assert!((result[0] - 4.0).abs() < 0.01);
6033        assert!((result[1] - 4.0).abs() < 0.01);
6034    }
6035
6036    #[test]
6037    fn test_hot_cpu_weighted_more() {
6038        // CPU 0: scale=2.0, CPU 1: scale=1.0
6039        // Layer 0: 900ms on CPU 0, 100ms on CPU 1
6040        // Compensated: 900*2.0 + 100*1.0 = 1900ms = 1.9
6041        let deltas = vec![vec![900_000_000, 0], vec![100_000_000, 0]];
6042        let scales = vec![2.0, 1.0];
6043        let result = scaled_aggregate(&deltas, &scales, 2, 1.0);
6044        assert!(
6045            (result[0] - 1.9).abs() < 0.01,
6046            "expected 1.9, got {}",
6047            result[0]
6048        );
6049    }
6050
6051    #[test]
6052    fn test_cold_cpu_weighted_less() {
6053        let deltas = vec![vec![100_000_000, 0], vec![900_000_000, 0]];
6054        let scales = vec![2.0, 1.0];
6055        let result = scaled_aggregate(&deltas, &scales, 2, 1.0);
6056        assert!(
6057            (result[0] - 1.1).abs() < 0.01,
6058            "expected 1.1, got {}",
6059            result[0]
6060        );
6061    }
6062
6063    #[test]
6064    fn test_no_usage_no_compensation() {
6065        let deltas = vec![vec![0u64; 2]; 4];
6066        let scales = vec![5.0; 4];
6067        let result = scaled_aggregate(&deltas, &scales, 2, 1.0);
6068        assert_eq!(result[0], 0.0);
6069        assert_eq!(result[1], 0.0);
6070    }
6071
6072    #[test]
6073    fn test_multilayer_independent_scaling() {
6074        let deltas = vec![
6075            vec![800_000_000, 200_000_000],
6076            vec![200_000_000, 800_000_000],
6077        ];
6078        let scales = vec![3.0, 1.0];
6079        let result = scaled_aggregate(&deltas, &scales, 2, 1.0);
6080        assert!((result[0] - 2.6).abs() < 0.01);
6081        assert!((result[1] - 1.4).abs() < 0.01);
6082    }
6083
6084    #[test]
6085    fn test_elapsed_time_normalization() {
6086        let deltas = vec![vec![500_000_000u64; 1]; 1];
6087        let scales = vec![1.0];
6088        let result = scaled_aggregate(&deltas, &scales, 1, 2.0);
6089        assert!((result[0] - 0.25).abs() < 0.01);
6090    }
6091
6092    #[test]
6093    fn test_many_cpus_mixed_scales() {
6094        let deltas = vec![vec![1_000_000_000u64; 1]; 8];
6095        let scales = vec![2.5, 1.0, 2.5, 1.0, 2.5, 1.0, 2.5, 1.0];
6096        let result = scaled_aggregate(&deltas, &scales, 1, 1.0);
6097        assert!((result[0] - 14.0).abs() < 0.01);
6098    }
6099
6100    #[test]
6101    fn test_compensated_ge_raw() {
6102        // Since scale >= 1.0 always, compensated >= raw for any input.
6103        let deltas = vec![
6104            vec![500_000_000u64; 3],
6105            vec![300_000_000; 3],
6106            vec![200_000_000; 3],
6107        ];
6108        let scales_raw = vec![1.0; 3];
6109        let scales_comp = vec![1.5, 2.0, 1.0];
6110        let raw = scaled_aggregate(&deltas, &scales_raw, 3, 1.0);
6111        let comp = scaled_aggregate(&deltas, &scales_comp, 3, 1.0);
6112        for i in 0..3 {
6113            assert!(
6114                comp[i] >= raw[i] - 0.001,
6115                "layer {}: comp {} < raw {}",
6116                i,
6117                comp[i],
6118                raw[i]
6119            );
6120        }
6121    }
6122}