Skip to main content

scx_layered/
main.rs

1// Copyright (c) Meta Platforms, Inc. and affiliates.
2
3// This software may be used and distributed according to the terms of the
4// GNU General Public License version 2.
5mod bpf_skel;
6mod stats;
7
8use std::collections::BTreeMap;
9use std::collections::BTreeSet;
10use std::collections::HashMap;
11use std::collections::HashSet;
12use std::ffi::CString;
13use std::fs;
14use std::io::Write;
15use std::mem::MaybeUninit;
16use std::ops::Sub;
17use std::path::Path;
18use std::path::PathBuf;
19use std::sync::atomic::AtomicBool;
20use std::sync::atomic::Ordering;
21use std::sync::Arc;
22use std::thread::ThreadId;
23use std::time::Duration;
24use std::time::Instant;
25
26use inotify::{Inotify, WatchMask};
27use std::os::unix::io::AsRawFd;
28
29use anyhow::anyhow;
30use anyhow::bail;
31use anyhow::Context;
32use anyhow::Result;
33pub use bpf_skel::*;
34use clap::Parser;
35use crossbeam::channel::Receiver;
36use crossbeam::select;
37use lazy_static::lazy_static;
38use libbpf_rs::libbpf_sys;
39use libbpf_rs::AsRawLibbpf;
40use libbpf_rs::MapCore as _;
41use libbpf_rs::OpenObject;
42use libbpf_rs::ProgramInput;
43use nix::sched::CpuSet;
44use nvml_wrapper::error::NvmlError;
45use nvml_wrapper::Nvml;
46use once_cell::sync::OnceCell;
47use regex::Regex;
48use scx_layered::alloc::{unified_alloc, LayerAlloc, LayerDemand};
49use scx_layered::*;
50use scx_raw_pmu::PMUManager;
51use scx_stats::prelude::*;
52use scx_utils::build_id;
53use scx_utils::compat;
54use scx_utils::init_libbpf_logging;
55use scx_utils::libbpf_clap_opts::LibbpfOpts;
56use scx_utils::perf;
57use scx_utils::pm::{cpu_idle_resume_latency_supported, update_cpu_idle_resume_latency};
58use scx_utils::read_netdevs;
59use scx_utils::scx_enums;
60use scx_utils::scx_ops_attach;
61use scx_utils::scx_ops_load;
62use scx_utils::scx_ops_open;
63use scx_utils::uei_exited;
64use scx_utils::uei_report;
65use scx_utils::CoreType;
66use scx_utils::Cpumask;
67use scx_utils::NetDev;
68use scx_utils::Topology;
69use scx_utils::TopologyArgs;
70use scx_utils::UserExitInfo;
71use scx_utils::NR_CPUS_POSSIBLE;
72use scx_utils::NR_CPU_IDS;
73use stats::LayerStats;
74use stats::StatsReq;
75use stats::StatsRes;
76use stats::SysStats;
77use std::collections::VecDeque;
78use sysinfo::{Pid, ProcessRefreshKind, ProcessesToUpdate, System};
79use tracing::{debug, error, info, trace, warn};
80use tracing_subscriber::filter::EnvFilter;
81use walkdir::WalkDir;
82
83const SCHEDULER_NAME: &str = "scx_layered";
84const MAX_PATH: usize = bpf_intf::consts_MAX_PATH as usize;
85const MAX_COMM: usize = bpf_intf::consts_MAX_COMM as usize;
86const MAX_LAYER_WEIGHT: u32 = bpf_intf::consts_MAX_LAYER_WEIGHT;
87const MIN_LAYER_WEIGHT: u32 = bpf_intf::consts_MIN_LAYER_WEIGHT;
88const MAX_LAYER_MATCH_ORS: usize = bpf_intf::consts_MAX_LAYER_MATCH_ORS as usize;
89const MAX_LAYER_NAME: usize = bpf_intf::consts_MAX_LAYER_NAME as usize;
90const MAX_LAYERS: usize = bpf_intf::consts_MAX_LAYERS as usize;
91const DEFAULT_LAYER_WEIGHT: u32 = bpf_intf::consts_DEFAULT_LAYER_WEIGHT;
92const USAGE_HALF_LIFE: u32 = bpf_intf::consts_USAGE_HALF_LIFE;
93const USAGE_HALF_LIFE_F64: f64 = USAGE_HALF_LIFE as f64 / 1_000_000_000.0;
94
95const LAYER_USAGE_OWNED: usize = bpf_intf::layer_usage_LAYER_USAGE_OWNED as usize;
96const LAYER_USAGE_OPEN: usize = bpf_intf::layer_usage_LAYER_USAGE_OPEN as usize;
97const LAYER_USAGE_SUM_UPTO: usize = bpf_intf::layer_usage_LAYER_USAGE_SUM_UPTO as usize;
98const LAYER_USAGE_PROTECTED: usize = bpf_intf::layer_usage_LAYER_USAGE_PROTECTED as usize;
99const LAYER_USAGE_PROTECTED_PREEMPT: usize =
100    bpf_intf::layer_usage_LAYER_USAGE_PROTECTED_PREEMPT as usize;
101const NR_LAYER_USAGES: usize = bpf_intf::layer_usage_NR_LAYER_USAGES as usize;
102
103const NR_GSTATS: usize = bpf_intf::global_stat_id_NR_GSTATS as usize;
104const NR_LSTATS: usize = bpf_intf::layer_stat_id_NR_LSTATS as usize;
105const NR_LLC_LSTATS: usize = bpf_intf::llc_layer_stat_id_NR_LLC_LSTATS as usize;
106
107const NR_LAYER_MATCH_KINDS: usize = bpf_intf::layer_match_kind_NR_LAYER_MATCH_KINDS as usize;
108
109static NVML: OnceCell<Nvml> = OnceCell::new();
110
111fn nvml() -> Result<&'static Nvml, NvmlError> {
112    NVML.get_or_try_init(Nvml::init)
113}
114
115lazy_static! {
116    static ref USAGE_DECAY: f64 = 0.5f64.powf(1.0 / USAGE_HALF_LIFE_F64);
117    static ref DFL_DISALLOW_OPEN_AFTER_US: u64 = 2 * scx_enums.SCX_SLICE_DFL / 1000;
118    static ref DFL_DISALLOW_PREEMPT_AFTER_US: u64 = 4 * scx_enums.SCX_SLICE_DFL / 1000;
119    static ref EXAMPLE_CONFIG: LayerConfig = serde_json::from_str(
120        r#"[
121          {
122            "name": "batch",
123            "comment": "tasks under system.slice or tasks with nice value > 0",
124            "matches": [[{"CgroupPrefix": "system.slice/"}], [{"NiceAbove": 0}]],
125            "kind": {"Confined": {
126              "util_range": [0.8, 0.9], "cpus_range": [0, 16],
127              "min_exec_us": 1000, "slice_us": 20000, "weight": 100,
128              "xllc_mig_min_us": 1000.0, "perf": 1024
129            }}
130          },
131          {
132            "name": "immediate",
133            "comment": "tasks under workload.slice with nice value < 0",
134            "matches": [[{"CgroupPrefix": "workload.slice/"}, {"NiceBelow": 0}]],
135            "kind": {"Open": {
136              "min_exec_us": 100, "yield_ignore": 0.25, "slice_us": 20000,
137              "preempt": true, "exclusive": true,
138              "prev_over_idle_core": true,
139              "weight": 100, "perf": 1024
140            }}
141          },
142          {
143            "name": "stress-ng",
144            "comment": "stress-ng test layer",
145            "matches": [[{"CommPrefix": "stress-ng"}], [{"PcommPrefix": "stress-ng"}]],
146            "kind": {"Confined": {
147              "util_range": [0.2, 0.8],
148              "min_exec_us": 800, "preempt": true, "slice_us": 800,
149              "weight": 100, "growth_algo": "Topo", "perf": 1024
150            }}
151          },
152          {
153            "name": "normal",
154            "comment": "the rest",
155            "matches": [[]],
156            "kind": {"Grouped": {
157              "util_range": [0.5, 0.6], "util_includes_open_cputime": true,
158              "min_exec_us": 200, "slice_us": 20000, "weight": 100,
159              "xllc_mig_min_us": 100.0, "growth_algo": "Linear", "perf": 1024
160            }}
161          }
162        ]"#,
163    )
164    .unwrap();
165}
166
167/// scx_layered: A highly configurable multi-layer sched_ext scheduler
168///
169/// scx_layered allows classifying tasks into multiple layers and applying
170/// different scheduling policies to them. The configuration is specified in
171/// json and composed of two parts - matches and policies.
172///
173/// Matches
174/// =======
175///
176/// Whenever a task is forked or its attributes are changed, the task goes
177/// through a series of matches to determine the layer it belongs to. A
178/// match set is composed of OR groups of AND blocks. An example:
179///
180///   "matches": [
181///     [
182///       {
183///         "CgroupPrefix": "system.slice/"
184///       }
185///     ],
186///     [
187///       {
188///         "CommPrefix": "fbagent"
189///       },
190///       {
191///         "NiceAbove": 0
192///       }
193///     ]
194///   ],
195///
196/// The outer array contains the OR groups and the inner AND blocks, so the
197/// above matches:
198///
199/// - Tasks which are in the cgroup sub-hierarchy under "system.slice".
200///
201/// - Or tasks whose comm starts with "fbagent" and have a nice value > 0.
202///
203/// Currently, the following matches are supported:
204///
205/// - CgroupPrefix: Matches the prefix of the cgroup that the task belongs
206///   to. As this is a string match, whether the pattern has the trailing
207///   '/' makes a difference. For example, "TOP/CHILD/" only matches tasks
208///   which are under that particular cgroup while "TOP/CHILD" also matches
209///   tasks under "TOP/CHILD0/" or "TOP/CHILD1/".
210///
211/// - CommPrefix: Matches the task's comm prefix.
212///
213/// - PcommPrefix: Matches the task's thread group leader's comm prefix.
214///
215/// - NiceAbove: Matches if the task's nice value is greater than the
216///   pattern.
217///
218/// - NiceBelow: Matches if the task's nice value is smaller than the
219///   pattern.
220///
221/// - NiceEquals: Matches if the task's nice value is exactly equal to
222///   the pattern.
223///
224/// - UIDEquals: Matches if the task's effective user id matches the value
225///
226/// - GIDEquals: Matches if the task's effective group id matches the value.
227///
228/// - PIDEquals: Matches if the task's pid matches the value.
229///
230/// - PPIDEquals: Matches if the task's ppid matches the value.
231///
232/// - TGIDEquals: Matches if the task's tgid matches the value.
233///
234/// - NSPIDEquals: Matches if the task's namespace id and pid matches the values.
235///
236/// - NSEquals: Matches if the task's namespace id matches the values.
237///
238/// - IsGroupLeader: Bool. When true, matches if the task is group leader
239///   (i.e. PID == TGID), aka the thread from which other threads are made.
240///   When false, matches if the task is *not* the group leader (i.e. the rest).
241///
242/// - CmdJoin: Matches when the task uses pthread_setname_np to send a join/leave
243///   command to the scheduler. See examples/cmdjoin.c for more details.
244///
245/// - UsedGpuTid: Bool. When true, matches if the tasks which have used
246///   gpus by tid.
247///
248/// - UsedGpuPid: Bool. When true, matches if the tasks which have used gpu
249///   by tgid/pid.
250///
251/// - [EXPERIMENTAL] AvgRuntime: (u64, u64). Match tasks whose average runtime
252///   is within the provided values [min, max).
253///
254/// - HintEquals: u64. Match tasks whose hint value equals this value.
255///   The value must be in the range [0, 1024].
256///
257/// - SystemCpuUtilBelow: f64. Match when the system CPU utilization fraction
258///   is below the specified threshold (a value in the range [0.0, 1.0]). This
259///   option can only be used in conjunction with HintEquals.
260///
261/// - DsqInsertBelow: f64. Match when the layer DSQ insertion fraction is below
262///   the specified threshold (a value in the range [0.0, 1.0]). This option can
263///   only be used in conjunction with HintEquals.
264///
265/// While there are complexity limitations as the matches are performed in
266/// BPF, it is straightforward to add more types of matches.
267///
268/// Templates
269/// ---------
270///
271/// Templates let us create a variable number of layers dynamically at initialization
272/// time out of a cgroup name suffix/prefix. Sometimes we know there are multiple
273/// applications running on a machine, each with their own cgroup but do not know the
274/// exact names of the applications or cgroups, e.g., in cloud computing contexts where
275/// workloads are placed on machines dynamically and run under cgroups whose name is
276/// autogenerated. In that case, we cannot hardcode the cgroup match rules when writing
277/// the configuration. We thus cannot easily prevent tasks from different cgroups from
278/// falling into the same layer and affecting each other's performance.
279///
280///
281/// Templates offer a solution to this problem by generating one layer for each such cgroup,
282/// provided these cgroups share a suffix, and that the suffix is unique to them. Templates
283/// have a cgroup suffix rule that we use to find the relevant cgroups in the system. For each
284/// such cgroup, we copy the layer config and add a matching rule that matches just this cgroup.
285///
286///
287/// Policies
288/// ========
289///
290/// The following is an example policy configuration for a layer.
291///
292///   "kind": {
293///     "Confined": {
294///       "cpus_range": [1, 8],
295///       "util_range": [0.8, 0.9]
296///     }
297///   }
298///
299/// It's of "Confined" kind, which tries to concentrate the layer's tasks
300/// into a limited number of CPUs. In the above case, the number of CPUs
301/// assigned to the layer is scaled between 1 and 8 so that the per-cpu
302/// utilization is kept between 80% and 90%. If the CPUs are loaded higher
303/// than 90%, more CPUs are allocated to the layer. If the utilization drops
304/// below 80%, the layer loses CPUs.
305///
306/// Currently, the following policy kinds are supported:
307///
308/// - Confined: Tasks are restricted to the allocated CPUs. The number of
309///   CPUs allocated is modulated to keep the per-CPU utilization in
310///   "util_range". The range can optionally be restricted with the
311///   "cpus_range" property.
312///
313/// - Grouped: Similar to Confined but tasks may spill outside if there are
314///   idle CPUs outside the allocated ones. The range can optionally be
315///   restricted with the "cpus_range" property. The optional
316///   "idle_confined" flag restricts idle selection to
317///   the layer's CPUs until the layer becomes saturated or the system
318///   is fully allocated, providing Confined-style cache locality under
319///   normal load with Grouped-style overflow under pressure.
320///
321/// - Open: Prefer the CPUs which are not occupied by Confined or Grouped
322///   layers. Tasks in this group will spill into occupied CPUs if there are
323///   no unoccupied idle CPUs.
324///
325/// All layers take the following options:
326///
327/// - min_exec_us: Minimum execution time in microseconds. Whenever a task
328///   is scheduled in, this is the minimum CPU time that it's charged no
329///   matter how short the actual execution time may be.
330///
331/// - yield_ignore: Yield ignore ratio. If 0.0, yield(2) forfeits a whole
332///   execution slice. 0.25 yields three quarters of an execution slice and
333///   so on. If 1.0, yield is completely ignored.
334///
335/// - slice_us: Scheduling slice duration in microseconds.
336///
337/// - fifo: Use FIFO queues within the layer instead of the default vtime.
338///
339/// - preempt: If true, tasks in the layer will preempt tasks which belong
340///   to other non-preempting layers when no idle CPUs are available.
341///
342/// - preempt_first: If true, tasks in the layer will try to preempt tasks
343///   in their previous CPUs before trying to find idle CPUs.
344///
345/// - exclusive: If true, tasks in the layer will occupy the whole core. The
346///   other logical CPUs sharing the same core will be kept idle. This isn't
347///   a hard guarantee, so don't depend on it for security purposes.
348///
349/// - allow_node_aligned: DEPRECATED. Node-aligned tasks are now always
350///   dispatched on layer DSQs. This field is ignored if specified.
351///
352/// - prev_over_idle_core: On SMT enabled systems, prefer using the same CPU
353///   when picking a CPU for tasks on this layer, even if that CPUs SMT
354///   sibling is processing a task.
355///
356/// - weight: Weight of the layer, which is a range from 1 to 10000 with a
357///   default of 100. Layer weights are used during contention to prevent
358///   starvation across layers. Weights are used in combination with
359///   utilization to determine the infeasible adjusted weight with higher
360///   weights having a larger adjustment in adjusted utilization.
361///
362/// - disallow_open_after_us: Duration to wait after machine reaches saturation
363///   before confining tasks in Open layers.
364///
365/// - cpus_range_frac: Array of 2 floats between 0 and 1.0. Lower and upper
366///   bound fractions of all CPUs to give to a layer. Mutually exclusive
367///   with cpus_range.
368///
369/// - disallow_preempt_after_us: Duration to wait after machine reaches saturation
370///   before confining tasks to preempt.
371///
372/// - xllc_mig_min_us: Skip cross-LLC migrations if they are likely to run on
373///   their existing LLC sooner than this.
374///
375/// - idle_smt: *** DEPRECATED ****
376///
377/// - growth_algo: Determines the order in which CPUs are allocated to the
378///   layer as it grows. All algorithms are NUMA-aware and produce per-node
379///   core orderings. Three placement classes exist:
380///
381///   * Locality (most algorithms): prefer the layer's home NUMA node and
382///     spill to remote nodes only when local capacity is exhausted.
383///   * Balanced (RoundRobin): distribute proportionally across all NUMA
384///     nodes via cap-weighted water_fill. A congested node reduces its
385///     share but does not cap the total — freed capacity flows to less
386///     loaded nodes.
387///   * Strict spread (NodeSpread*): enforce equal CPU counts across all
388///     NUMA nodes, capped at the least available node. Use when the
389///     workload needs strictly equal per-node CPU counts (memory-
390///     bandwidth-bound or replicated work).
391///
392///   Default: Sticky.
393///
394/// - perf: CPU performance target. 0 means no configuration. A value
395///   between 1 and 1024 indicates the performance level CPUs running tasks
396///   in this layer are configured to using scx_bpf_cpuperf_set().
397///
398/// - idle_resume_us: Sets the idle resume QoS value. CPU idle time governors are expected to
399///   regard the minimum of the global (effective) CPU latency limit and the effective resume
400///   latency constraint for the given CPU as the upper limit for the exit latency of the idle
401///   states. See the latest kernel docs for more details:
402///   https://www.kernel.org/doc/html/latest/admin-guide/pm/cpuidle.html
403///
404/// - nodes: If set the layer will use the set of NUMA nodes for scheduling
405///   decisions. If unset then all available NUMA nodes will be used. If the
406///   llcs value is set the cpuset of NUMA nodes will be or'ed with the LLC
407///   config.
408///
409/// - llcs: If set the layer will use the set of LLCs (last level caches)
410///   for scheduling decisions. If unset then all LLCs will be used. If
411///   the nodes value is set the cpuset of LLCs will be or'ed with the nodes
412///   config.
413///
414///
415/// Similar to matches, adding new policies and extending existing ones
416/// should be relatively straightforward.
417///
418/// Configuration example and running scx_layered
419/// =============================================
420///
421/// An scx_layered config is composed of layer configs. A layer config is
422/// composed of a name, a set of matches, and a policy block. Running the
423/// following will write an example configuration into example.json.
424///
425///   $ scx_layered -e example.json
426///
427/// Note that the last layer in the configuration must have an empty match set
428/// as a catch-all for tasks which haven't been matched into previous layers.
429///
430/// The configuration can be specified in multiple json files and
431/// command line arguments, which are concatenated in the specified
432/// order. Each must contain valid layer configurations.
433///
434/// By default, an argument to scx_layered is interpreted as a JSON string. If
435/// the argument is a pointer to a JSON file, it should be prefixed with file:
436/// or f: as follows:
437///
438///   $ scx_layered file:example.json
439///   ...
440///   $ scx_layered f:example.json
441///
442/// Monitoring Statistics
443/// =====================
444///
445/// Run with `--stats INTERVAL` to enable stats monitoring. There is
446/// also an scx_stat server listening on /var/run/scx/root/stat that can
447/// be monitored by running `scx_layered --monitor INTERVAL` separately.
448///
449///   ```bash
450///   $ scx_layered --monitor 1
451///   tot= 117909 local=86.20 open_idle= 0.21 affn_viol= 1.37 proc=6ms
452///   busy= 34.2 util= 1733.6 load=  21744.1 fb_cpus=[n0:1]
453///     batch    : util/frac=   11.8/  0.7 load/frac=     29.7:  0.1 tasks=  2597
454///                tot=   3478 local=67.80 open_idle= 0.00 preempt= 0.00 affn_viol= 0.00
455///                cpus=  2 [  2,  2] 04000001 00000000
456///     immediate: util/frac= 1218.8/ 70.3 load/frac=  21399.9: 98.4 tasks=  1107
457///                tot=  68997 local=90.57 open_idle= 0.26 preempt= 9.36 affn_viol= 0.00
458///                cpus= 50 [ 50, 50] fbfffffe 000fffff
459///     normal   : util/frac=  502.9/ 29.0 load/frac=    314.5:  1.4 tasks=  3512
460///                tot=  45434 local=80.97 open_idle= 0.16 preempt= 0.00 affn_viol= 3.56
461///                cpus= 50 [ 50, 50] fbfffffe 000fffff
462///   ```
463///
464/// Global statistics: see [`SysStats`]
465///
466/// Per-layer statistics: see [`LayerStats`]
467///
468#[derive(Debug, Parser)]
469#[command(verbatim_doc_comment)]
470struct Opts {
471    /// Deprecated, noop, use RUST_LOG or --log-level instead.
472    #[clap(short = 'v', long, action = clap::ArgAction::Count)]
473    verbose: u8,
474
475    /// Scheduling slice duration in microseconds.
476    #[clap(short = 's', long, default_value = "20000")]
477    slice_us: u64,
478
479    /// Maximum consecutive execution time in microseconds. A task may be
480    /// allowed to keep executing on a CPU for this long. Note that this is
481    /// the upper limit and a task may have to moved off the CPU earlier. 0
482    /// indicates default - 20 * slice_us.
483    #[clap(short = 'M', long, default_value = "0")]
484    max_exec_us: u64,
485
486    /// Scheduling interval in seconds.
487    #[clap(short = 'i', long, default_value = "0.1")]
488    interval: f64,
489
490    /// ***DEPRECATED*** Disable load-fraction based max layer CPU limit.
491    /// recommended.
492    #[clap(short = 'n', long, default_value = "false")]
493    no_load_frac_limit: bool,
494
495    /// Exit debug dump buffer length. 0 indicates default.
496    #[clap(long, default_value = "0")]
497    exit_dump_len: u32,
498
499    /// Specify the logging level. Accepts rust's envfilter syntax for modular
500    /// logging: https://docs.rs/tracing-subscriber/latest/tracing_subscriber/filter/struct.EnvFilter.html#example-syntax. Examples: ["info", "warn,tokio=info"]
501    #[clap(long, default_value = "info")]
502    log_level: String,
503
504    /// Disable topology awareness. When enabled, the "nodes" and "llcs" settings on
505    /// a layer are ignored. Defaults to false on topologies with multiple NUMA nodes
506    /// or LLCs, and true otherwise.
507    #[arg(short = 't', long, num_args = 0..=1, default_missing_value = "true", require_equals = true)]
508    disable_topology: Option<bool>,
509
510    /// Disable monitor
511    #[clap(long)]
512    monitor_disable: bool,
513
514    /// Write example layer specifications into the file and exit.
515    #[clap(short = 'e', long)]
516    example: Option<String>,
517
518    /// ***DEPRECATED*** Disables preemption if the weighted load fraction
519    /// of a layer (load_frac_adj) exceeds the threshold. The default is
520    /// disabled (0.0).
521    #[clap(long, default_value = "0.0")]
522    layer_preempt_weight_disable: f64,
523
524    /// ***DEPRECATED*** Disables layer growth if the weighted load fraction
525    /// of a layer (load_frac_adj) exceeds the threshold. The default is
526    /// disabled (0.0).
527    #[clap(long, default_value = "0.0")]
528    layer_growth_weight_disable: f64,
529
530    /// Enable stats monitoring with the specified interval.
531    #[clap(long)]
532    stats: Option<f64>,
533
534    /// Run in stats monitoring mode with the specified interval. Scheduler
535    /// is not launched.
536    #[clap(long)]
537    monitor: Option<f64>,
538
539    /// Column limit for stats monitor output.
540    #[clap(long, default_value = "95")]
541    stats_columns: usize,
542
543    /// Disable per-LLC stats in monitor output.
544    #[clap(long)]
545    stats_no_llc: bool,
546
547    /// Run with example layer specifications (useful for e.g. CI pipelines)
548    #[clap(long)]
549    run_example: bool,
550
551    /// Allocate CPUs at an SMT granularity (not core)
552    #[clap(long)]
553    allow_partial_core: bool,
554
555    /// ***DEPRECATED *** Enables iteration over local LLCs first for
556    /// dispatch.
557    #[clap(long, default_value = "false")]
558    local_llc_iteration: bool,
559
560    /// Low priority fallback DSQs are used to execute tasks with custom CPU
561    /// affinities. These DSQs are immediately executed iff a CPU is
562    /// otherwise idle. However, after the specified wait, they are
563    /// guaranteed upto --lo-fb-share fraction of each CPU.
564    #[clap(long, default_value = "10000")]
565    lo_fb_wait_us: u64,
566
567    /// The fraction of CPU time guaranteed to low priority fallback DSQs.
568    /// See --lo-fb-wait-us.
569    #[clap(long, default_value = ".05")]
570    lo_fb_share: f64,
571
572    /// Disable antistall
573    #[clap(long, default_value = "false")]
574    disable_antistall: bool,
575
576    /// Enable numa topology based gpu task affinitization.
577    #[clap(long, default_value = "false")]
578    enable_gpu_affinitize: bool,
579
580    /// Interval at which to reaffinitize gpu tasks to numa nodes.
581    /// Defaults to 900s
582    #[clap(long, default_value = "900")]
583    gpu_affinitize_secs: u64,
584
585    /// Enable match debug
586    /// This stores a mapping of task tid
587    /// to layer id such that bpftool map dump
588    /// can be used to debug layer matches.
589    #[clap(long, default_value = "false")]
590    enable_match_debug: bool,
591
592    /// Maximum task runnable_at delay (in seconds) before antistall turns on
593    #[clap(long, default_value = "3")]
594    antistall_sec: u64,
595
596    /// Enable gpu support
597    #[clap(long, default_value = "false")]
598    enable_gpu_support: bool,
599
600    /// Gpu Kprobe Level
601    /// The value set here determines how aggressive
602    /// the kprobes enabled on gpu driver functions are.
603    /// Higher values are more aggressive, incurring more system overhead
604    /// and more accurately identifying PIDs using GPUs in a more timely manner.
605    /// Lower values incur less system overhead, at the cost of less accurately
606    /// identifying GPU pids and taking longer to do so.
607    #[clap(long, default_value = "3")]
608    gpu_kprobe_level: u64,
609
610    /// Enable utilization compensation for unattributed CPU work (irq, softirq, stolen). When
611    /// enabled, each CPU's layer usage is scaled by the inverse of available capacity to account
612    /// for time lost to interrupts.
613    #[clap(long, default_value = "false")]
614    util_compensation: bool,
615
616    /// Enable netdev IRQ balancing. This is experimental and should be used with caution.
617    #[clap(long, default_value = "false")]
618    netdev_irq_balance: bool,
619
620    /// Disable queued wakeup optimization.
621    #[clap(long, default_value = "false")]
622    disable_queued_wakeup: bool,
623
624    /// Per-cpu kthreads are preempting by default. Make it not so.
625    #[clap(long, default_value = "false")]
626    disable_percpu_kthread_preempt: bool,
627
628    /// Only highpri (nice < 0) per-cpu kthreads are preempting by default.
629    /// Make every per-cpu kthread preempting. Meaningful only if
630    /// --disable-percpu-kthread-preempt is not set.
631    #[clap(long, default_value = "false")]
632    percpu_kthread_preempt_all: bool,
633
634    /// Print scheduler version and exit.
635    #[clap(short = 'V', long, action = clap::ArgAction::SetTrue)]
636    version: bool,
637
638    /// Optional run ID for tracking scheduler instances.
639    #[clap(long)]
640    run_id: Option<u64>,
641
642    /// Show descriptions for statistics.
643    #[clap(long)]
644    help_stats: bool,
645
646    /// Layer specification. See --help.
647    specs: Vec<String>,
648
649    /// Periodically force tasks in layers using the AvgRuntime match rule to reevaluate which layer they belong to. Default period of 2s.
650    /// turns this off.
651    #[clap(long, default_value = "2000")]
652    layer_refresh_ms_avgruntime: u64,
653
654    /// Set the path for pinning the task hint map.
655    #[clap(long, default_value = "")]
656    task_hint_map: String,
657
658    /// Print the config (after template expansion) and exit.
659    #[clap(long, default_value = "false")]
660    print_and_exit: bool,
661
662    /// Enable affinitized task to use hi fallback queue to get more CPU time.
663    #[clap(long, default_value = "")]
664    hi_fb_thread_name: String,
665
666    #[clap(flatten, next_help_heading = "Topology Options")]
667    topology: TopologyArgs,
668
669    #[clap(flatten, next_help_heading = "Libbpf Options")]
670    pub libbpf: LibbpfOpts,
671}
672
673// Cgroup event types for inter-thread communication
674#[derive(Debug, Clone)]
675enum CgroupEvent {
676    Created {
677        path: String,
678        cgroup_id: u64,    // inode number
679        match_bitmap: u64, // bitmap of matched regex rules
680    },
681    Removed {
682        path: String,
683        cgroup_id: u64, // inode number
684    },
685}
686
687fn read_total_cpu(reader: &fb_procfs::ProcReader) -> Result<fb_procfs::CpuStat> {
688    reader
689        .read_stat()
690        .context("Failed to read procfs")?
691        .total_cpu
692        .ok_or_else(|| anyhow!("Could not read total cpu stat in proc"))
693}
694
695fn calc_util(curr: &fb_procfs::CpuStat, prev: &fb_procfs::CpuStat) -> Result<f64> {
696    match (curr, prev) {
697        (
698            fb_procfs::CpuStat {
699                user_usec: Some(curr_user),
700                nice_usec: Some(curr_nice),
701                system_usec: Some(curr_system),
702                idle_usec: Some(curr_idle),
703                iowait_usec: Some(curr_iowait),
704                irq_usec: Some(curr_irq),
705                softirq_usec: Some(curr_softirq),
706                stolen_usec: Some(curr_stolen),
707                ..
708            },
709            fb_procfs::CpuStat {
710                user_usec: Some(prev_user),
711                nice_usec: Some(prev_nice),
712                system_usec: Some(prev_system),
713                idle_usec: Some(prev_idle),
714                iowait_usec: Some(prev_iowait),
715                irq_usec: Some(prev_irq),
716                softirq_usec: Some(prev_softirq),
717                stolen_usec: Some(prev_stolen),
718                ..
719            },
720        ) => {
721            let idle_usec = curr_idle.saturating_sub(*prev_idle);
722            let iowait_usec = curr_iowait.saturating_sub(*prev_iowait);
723            let user_usec = curr_user.saturating_sub(*prev_user);
724            let system_usec = curr_system.saturating_sub(*prev_system);
725            let nice_usec = curr_nice.saturating_sub(*prev_nice);
726            let irq_usec = curr_irq.saturating_sub(*prev_irq);
727            let softirq_usec = curr_softirq.saturating_sub(*prev_softirq);
728            let stolen_usec = curr_stolen.saturating_sub(*prev_stolen);
729
730            let busy_usec =
731                user_usec + system_usec + nice_usec + irq_usec + softirq_usec + stolen_usec;
732            let total_usec = idle_usec + busy_usec + iowait_usec;
733            if total_usec > 0 {
734                Ok(((busy_usec as f64) / (total_usec as f64)).clamp(0.0, 1.0))
735            } else {
736                Ok(1.0)
737            }
738        }
739        _ => bail!("Missing stats in cpustat"),
740    }
741}
742
743fn copy_into_cstr(dst: &mut [i8], src: &str) {
744    let cstr = CString::new(src).unwrap();
745    let bytes = unsafe { std::mem::transmute::<&[u8], &[i8]>(cstr.as_bytes_with_nul()) };
746    dst[0..bytes.len()].copy_from_slice(bytes);
747}
748
749#[allow(clippy::needless_range_loop)]
750fn read_cpu_ctxs(skel: &BpfSkel) -> Result<Vec<bpf_intf::cpu_ctx>> {
751    let mut cpu_ctxs = vec![];
752    let cpu_ctxs_vec = skel
753        .maps
754        .cpu_ctxs
755        .lookup_percpu(&0u32.to_ne_bytes(), libbpf_rs::MapFlags::ANY)
756        .context("Failed to lookup cpu_ctx")?
757        .unwrap();
758    for cpu in 0..*NR_CPUS_POSSIBLE {
759        cpu_ctxs.push(
760            *plain::from_bytes(cpu_ctxs_vec[cpu].as_slice())
761                .expect("cpu_ctx: short or misaligned buffer"),
762        );
763    }
764    Ok(cpu_ctxs)
765}
766
767#[derive(Clone, Debug)]
768struct BpfStats {
769    gstats: Vec<u64>,
770    lstats: Vec<Vec<u64>>,
771    lstats_sums: Vec<u64>,
772    llc_lstats: Vec<Vec<Vec<u64>>>, // [layer][llc][stat]
773}
774
775impl BpfStats {
776    #[allow(clippy::needless_range_loop)]
777    fn read(skel: &BpfSkel, cpu_ctxs: &[bpf_intf::cpu_ctx]) -> Self {
778        let nr_layers = skel.maps.rodata_data.as_ref().unwrap().nr_layers as usize;
779        let nr_llcs = skel.maps.rodata_data.as_ref().unwrap().nr_llcs as usize;
780        let mut gstats = vec![0u64; NR_GSTATS];
781        let mut lstats = vec![vec![0u64; NR_LSTATS]; nr_layers];
782        let mut llc_lstats = vec![vec![vec![0u64; NR_LLC_LSTATS]; nr_llcs]; nr_layers];
783
784        for cpu in 0..*NR_CPUS_POSSIBLE {
785            for (stat, value) in gstats.iter_mut().enumerate() {
786                *value += cpu_ctxs[cpu].gstats[stat];
787            }
788            for (layer, layer_stats) in lstats.iter_mut().enumerate() {
789                for (stat, value) in layer_stats.iter_mut().enumerate() {
790                    *value += cpu_ctxs[cpu].lstats[layer][stat];
791                }
792            }
793        }
794
795        let mut lstats_sums = vec![0u64; NR_LSTATS];
796        for layer_stats in lstats.iter() {
797            for (stat, value) in lstats_sums.iter_mut().enumerate() {
798                *value += layer_stats[stat];
799            }
800        }
801
802        for llc_id in 0..nr_llcs {
803            // XXX - This would be a lot easier if llc_ctx were in
804            // the bss. Unfortunately, kernel < v6.12 crashes and
805            // kernel >= v6.12 fails verification after such
806            // conversion due to seemingly verifier bugs. Convert to
807            // bss maps later.
808            let v = skel
809                .maps
810                .llc_data
811                .lookup(&(llc_id as u32).to_ne_bytes(), libbpf_rs::MapFlags::ANY)
812                .unwrap()
813                .unwrap();
814            let llcc: &bpf_intf::llc_ctx =
815                plain::from_bytes(v.as_slice()).expect("llc_ctx: short or misaligned buffer");
816
817            for (layer_id, layer_llc_stats) in llc_lstats.iter_mut().enumerate() {
818                for (stat_id, stat_value) in layer_llc_stats[llc_id].iter_mut().enumerate() {
819                    *stat_value = llcc.lstats[layer_id][stat_id];
820                }
821            }
822        }
823
824        Self {
825            gstats,
826            lstats,
827            lstats_sums,
828            llc_lstats,
829        }
830    }
831}
832
833impl<'b> Sub<&'b BpfStats> for &BpfStats {
834    type Output = BpfStats;
835
836    fn sub(self, rhs: &'b BpfStats) -> BpfStats {
837        let vec_sub = |l: &[u64], r: &[u64]| l.iter().zip(r.iter()).map(|(l, r)| *l - *r).collect();
838        BpfStats {
839            gstats: vec_sub(&self.gstats, &rhs.gstats),
840            lstats: self
841                .lstats
842                .iter()
843                .zip(rhs.lstats.iter())
844                .map(|(l, r)| vec_sub(l, r))
845                .collect(),
846            lstats_sums: vec_sub(&self.lstats_sums, &rhs.lstats_sums),
847            llc_lstats: self
848                .llc_lstats
849                .iter()
850                .zip(rhs.llc_lstats.iter())
851                .map(|(l_layer, r_layer)| {
852                    l_layer
853                        .iter()
854                        .zip(r_layer.iter())
855                        .map(|(l_llc, r_llc)| {
856                            let (l_llc, mut r_llc) = (l_llc.clone(), r_llc.clone());
857                            // Lat is not subtractable, take L side.
858                            r_llc[bpf_intf::llc_layer_stat_id_LLC_LSTAT_LAT as usize] = 0;
859                            vec_sub(&l_llc, &r_llc)
860                        })
861                        .collect()
862                })
863                .collect(),
864        }
865    }
866}
867
868#[derive(Clone, Debug)]
869struct Stats {
870    at: Instant,
871    elapsed: Duration,
872    topo: Arc<Topology>,
873    nr_layers: usize,
874    nr_layer_tasks: Vec<usize>,
875    layer_nr_node_pinned_tasks: Vec<Vec<u64>>,
876
877    total_util: f64, // Running AVG of sum of layer_utils
878    layer_utils: Vec<Vec<f64>>,
879    layer_node_pinned_utils: Vec<Vec<f64>>,
880    prev_layer_node_pinned_usages: Vec<Vec<u64>>,
881    layer_node_utils: Vec<Vec<f64>>,
882    prev_layer_node_usages: Vec<Vec<u64>>,
883    layer_node_duty_sums: Vec<Vec<f64>>, // Per-node per-layer duty in CPU units (EWMA)
884    prev_layer_node_duty_raw: Vec<Vec<u64>>, // Raw accumulated layer_duty_sum values
885
886    layer_membws: Vec<Vec<f64>>, // Estimated memory bandsidth consumption
887    prev_layer_membw_agg: Vec<Vec<u64>>, // Estimated aggregate membw consumption
888
889    cpu_busy: f64, // Read from /proc, maybe higher than total_util
890    prev_total_cpu: fb_procfs::CpuStat,
891    prev_pmu_resctrl_membw: (u64, u64), // (PMU-reported membw, resctrl-reported membw)
892
893    util_compensation: bool,
894    layer_utils_compensated: Vec<Vec<f64>>, // EWMA of per-CPU-scaled layer utils
895    prev_cpu_layer_usages: Vec<u64>,        // Per-CPU per-layer usages for computing deltas
896    prev_per_cpu_stats: BTreeMap<u32, fb_procfs::CpuStat>,
897
898    system_cpu_util_ewma: f64,       // 10s EWMA of system CPU utilization
899    layer_dsq_insert_ewma: Vec<f64>, // 10s EWMA of per-layer DSQ insertion ratio
900
901    bpf_stats: BpfStats,
902    prev_bpf_stats: BpfStats,
903
904    processing_dur: Duration,
905    prev_processing_dur: Duration,
906
907    layer_slice_us: Vec<u64>,
908
909    gpu_tasks_affinitized: u64,
910    gpu_task_affinitization_ms: u64,
911}
912
913impl Stats {
914    #[allow(clippy::needless_range_loop)]
915    fn read_layer_membw_agg(cpu_ctxs: &[bpf_intf::cpu_ctx], nr_layers: usize) -> Vec<Vec<u64>> {
916        let mut layer_membw_agg = vec![vec![0u64; NR_LAYER_USAGES]; nr_layers];
917
918        for cpu in 0..*NR_CPUS_POSSIBLE {
919            for (layer, layer_membw) in layer_membw_agg.iter_mut().enumerate() {
920                for (usage, value) in layer_membw.iter_mut().enumerate() {
921                    *value += cpu_ctxs[cpu].layer_membw_agg[layer][usage];
922                }
923            }
924        }
925
926        layer_membw_agg
927    }
928
929    #[allow(clippy::needless_range_loop)]
930    fn read_layer_node_pinned_usages(
931        cpu_ctxs: &[bpf_intf::cpu_ctx],
932        topo: &Topology,
933        nr_layers: usize,
934        nr_nodes: usize,
935    ) -> Vec<Vec<u64>> {
936        let mut usages = vec![vec![0u64; nr_nodes]; nr_layers];
937
938        for cpu in 0..*NR_CPUS_POSSIBLE {
939            let node = topo.all_cpus.get(&cpu).map_or(0, |c| c.node_id);
940            for (layer, layer_usages) in usages.iter_mut().enumerate().take(nr_layers) {
941                layer_usages[node] += cpu_ctxs[cpu].node_pinned_usage[layer];
942            }
943        }
944
945        usages
946    }
947
948    #[allow(clippy::needless_range_loop)]
949    fn read_layer_node_usages(
950        cpu_ctxs: &[bpf_intf::cpu_ctx],
951        topo: &Topology,
952        nr_layers: usize,
953        nr_nodes: usize,
954    ) -> Vec<Vec<u64>> {
955        let mut usages = vec![vec![0u64; nr_nodes]; nr_layers];
956
957        for cpu in 0..*NR_CPUS_POSSIBLE {
958            let node = topo.all_cpus.get(&cpu).map_or(0, |c| c.node_id);
959            for (layer, layer_usages) in usages.iter_mut().enumerate().take(nr_layers) {
960                for usage in 0..=LAYER_USAGE_SUM_UPTO {
961                    layer_usages[node] += cpu_ctxs[cpu].layer_usages[layer][usage];
962                }
963            }
964        }
965
966        usages
967    }
968
969    #[allow(clippy::needless_range_loop)]
970    fn read_layer_node_duty_raw(
971        cpu_ctxs: &[bpf_intf::cpu_ctx],
972        topo: &Topology,
973        nr_layers: usize,
974        nr_nodes: usize,
975    ) -> Vec<Vec<u64>> {
976        let mut sums = vec![vec![0u64; nr_nodes]; nr_layers];
977
978        for cpu in 0..*NR_CPUS_POSSIBLE {
979            let node = topo.all_cpus.get(&cpu).map_or(0, |c| c.node_id);
980            for (layer, layer_sums) in sums.iter_mut().enumerate().take(nr_layers) {
981                layer_sums[node] += cpu_ctxs[cpu].layer_duty_sum[layer];
982            }
983        }
984
985        sums
986    }
987
988    #[allow(clippy::needless_range_loop)]
989    fn read_per_cpu_layer_usages(cpu_ctxs: &[bpf_intf::cpu_ctx], nr_layers: usize) -> Vec<u64> {
990        let stride = nr_layers * NR_LAYER_USAGES;
991        let mut flat = vec![0u64; *NR_CPUS_POSSIBLE * stride];
992
993        for cpu in 0..*NR_CPUS_POSSIBLE {
994            let base = cpu * stride;
995            for layer in 0..nr_layers {
996                for usage in 0..NR_LAYER_USAGES {
997                    flat[base + layer * NR_LAYER_USAGES + usage] =
998                        cpu_ctxs[cpu].layer_usages[layer][usage];
999                }
1000            }
1001        }
1002
1003        flat
1004    }
1005
1006    /// Use the membw reported by resctrl to normalize the values reported by hw counters.
1007    /// We have the following problem:
1008    /// 1) We want per-task memory bandwidth reporting. We cannot do this with resctrl, much
1009    ///    less transparently, since we would require different RMID for each task.
1010    /// 2) We want to directly use perf counters for tracking per-task memory bandwidth, but
1011    ///    we can't: Non-resctrl counters do not measure the right thing (e.g., they only measure
1012    ///    proxies like load operations),
1013    /// 3) Resctrl counters are not accessible directly so we cannot read them from the BPF side.
1014    ///
1015    /// Approximate per-task memory bandwidth using perf counters to measure _relative_ memory
1016    /// bandwidth usage.
1017    fn resctrl_read_total_membw() -> Result<u64> {
1018        let mut total_membw = 0u64;
1019        for entry in WalkDir::new("/sys/fs/resctrl/mon_data")
1020            .min_depth(1)
1021            .into_iter()
1022            .filter_map(Result::ok)
1023            .filter(|x| x.path().is_dir())
1024        {
1025            let mut path = entry.path().to_path_buf();
1026            path.push("mbm_total_bytes");
1027            total_membw += fs::read_to_string(path)?.trim().parse::<u64>()?;
1028        }
1029
1030        Ok(total_membw)
1031    }
1032
1033    fn new(
1034        skel: &mut BpfSkel,
1035        proc_reader: &fb_procfs::ProcReader,
1036        topo: Arc<Topology>,
1037        gpu_task_affinitizer: &GpuTaskAffinitizer,
1038        util_compensation: bool,
1039    ) -> Result<Self> {
1040        let nr_layers = skel.maps.rodata_data.as_ref().unwrap().nr_layers as usize;
1041        let nr_nodes = topo.nodes.len();
1042        let cpu_ctxs = read_cpu_ctxs(skel)?;
1043        let bpf_stats = BpfStats::read(skel, &cpu_ctxs);
1044        let pmu_membw = Self::read_layer_membw_agg(&cpu_ctxs, nr_layers);
1045
1046        Ok(Self {
1047            at: Instant::now(),
1048            elapsed: Default::default(),
1049
1050            topo: topo.clone(),
1051            nr_layers,
1052            nr_layer_tasks: vec![0; nr_layers],
1053            layer_nr_node_pinned_tasks: vec![vec![0; nr_nodes]; nr_layers],
1054
1055            total_util: 0.0,
1056            layer_utils: vec![vec![0.0; NR_LAYER_USAGES]; nr_layers],
1057            layer_node_pinned_utils: vec![vec![0.0; nr_nodes]; nr_layers],
1058            prev_layer_node_pinned_usages: Self::read_layer_node_pinned_usages(
1059                &cpu_ctxs, &topo, nr_layers, nr_nodes,
1060            ),
1061            layer_node_utils: vec![vec![0.0; nr_nodes]; nr_layers],
1062            prev_layer_node_usages: Self::read_layer_node_usages(
1063                &cpu_ctxs, &topo, nr_layers, nr_nodes,
1064            ),
1065            layer_node_duty_sums: vec![vec![0.0; nr_nodes]; nr_layers],
1066            prev_layer_node_duty_raw: Self::read_layer_node_duty_raw(
1067                &cpu_ctxs, &topo, nr_layers, nr_nodes,
1068            ),
1069            layer_membws: vec![vec![0.0; NR_LAYER_USAGES]; nr_layers],
1070            // This is not normalized because we don't have enough history to do so.
1071            // It should not matter too much, since the value is dropped on the first
1072            // iteration.
1073            prev_layer_membw_agg: pmu_membw,
1074            prev_pmu_resctrl_membw: (0, 0),
1075
1076            cpu_busy: 0.0,
1077            prev_total_cpu: read_total_cpu(proc_reader)?,
1078            util_compensation,
1079            layer_utils_compensated: vec![vec![0.0; NR_LAYER_USAGES]; nr_layers],
1080            prev_cpu_layer_usages: Self::read_per_cpu_layer_usages(&cpu_ctxs, nr_layers),
1081            prev_per_cpu_stats: BTreeMap::new(),
1082            system_cpu_util_ewma: 0.0,
1083            layer_dsq_insert_ewma: vec![0.0; nr_layers],
1084
1085            bpf_stats: bpf_stats.clone(),
1086            prev_bpf_stats: bpf_stats,
1087
1088            processing_dur: Default::default(),
1089            prev_processing_dur: Default::default(),
1090
1091            layer_slice_us: vec![0; nr_layers],
1092            gpu_tasks_affinitized: gpu_task_affinitizer.tasks_affinitized,
1093            gpu_task_affinitization_ms: gpu_task_affinitizer.last_task_affinitization_ms,
1094        })
1095    }
1096
1097    fn refresh(
1098        &mut self,
1099        skel: &mut BpfSkel,
1100        proc_reader: &fb_procfs::ProcReader,
1101        now: Instant,
1102        cur_processing_dur: Duration,
1103        gpu_task_affinitizer: &GpuTaskAffinitizer,
1104    ) -> Result<()> {
1105        let elapsed = now.duration_since(self.at);
1106        let elapsed_f64 = elapsed.as_secs_f64();
1107        let cpu_ctxs = read_cpu_ctxs(skel)?;
1108
1109        let layers = &skel.maps.bss_data.as_ref().unwrap().layers;
1110        let nr_layer_tasks: Vec<usize> = layers
1111            .iter()
1112            .take(self.nr_layers)
1113            .map(|layer| layer.nr_tasks as usize)
1114            .collect();
1115        let layer_nr_node_pinned_tasks: Vec<Vec<u64>> = layers
1116            .iter()
1117            .take(self.nr_layers)
1118            .map(|layer| {
1119                layer.node[..self.topo.nodes.len()]
1120                    .iter()
1121                    .map(|n| n.nr_pinned_tasks)
1122                    .collect()
1123            })
1124            .collect();
1125        let layer_slice_us: Vec<u64> = layers
1126            .iter()
1127            .take(self.nr_layers)
1128            .map(|layer| layer.slice_ns / 1000_u64)
1129            .collect();
1130
1131        let cur_layer_node_pinned_usages = Self::read_layer_node_pinned_usages(
1132            &cpu_ctxs,
1133            &self.topo,
1134            self.nr_layers,
1135            self.topo.nodes.len(),
1136        );
1137        let cur_layer_node_usages = Self::read_layer_node_usages(
1138            &cpu_ctxs,
1139            &self.topo,
1140            self.nr_layers,
1141            self.topo.nodes.len(),
1142        );
1143        let cur_layer_node_duty_raw = Self::read_layer_node_duty_raw(
1144            &cpu_ctxs,
1145            &self.topo,
1146            self.nr_layers,
1147            self.topo.nodes.len(),
1148        );
1149        let cur_layer_membw_agg = Self::read_layer_membw_agg(&cpu_ctxs, self.nr_layers);
1150
1151        // Memory BW normalization. It requires finding the delta according to perf, the delta
1152        // according to resctl, and finding the factor between them. This also helps in
1153        // determining whether this delta is stable.
1154        //
1155        // We scale only the raw PMC delta - the part of the counter incremented between two
1156        // periods. This is because the scaling factor is only valid for that time period.
1157        // Non-scaled samples are not comparable, while scaled ones are.
1158        let (pmu_prev, resctrl_prev) = self.prev_pmu_resctrl_membw;
1159        let pmu_cur: u64 = cur_layer_membw_agg
1160            .iter()
1161            .map(|membw_agg| membw_agg[LAYER_USAGE_OPEN] + membw_agg[LAYER_USAGE_OWNED])
1162            .sum();
1163        let resctrl_cur = Self::resctrl_read_total_membw()?;
1164        let factor = (resctrl_cur - resctrl_prev) as f64 / (pmu_cur - pmu_prev) as f64;
1165
1166        // Computes the runtime deltas and converts them from ns to s.
1167        let compute_diff = |cur_agg: &Vec<Vec<u64>>, prev_agg: &Vec<Vec<u64>>| {
1168            cur_agg
1169                .iter()
1170                .zip(prev_agg.iter())
1171                .map(|(cur, prev)| {
1172                    cur.iter()
1173                        .zip(prev.iter())
1174                        .map(|(c, p)| (c - p) as f64 / 1_000_000_000.0 / elapsed_f64)
1175                        .collect()
1176                })
1177                .collect()
1178        };
1179
1180        // Computes the total memory traffic done since last computation and normalizes to GBs.
1181        // We derive the rate of consumption elsewhere.
1182        let compute_mem_diff = |cur_agg: &Vec<Vec<u64>>, prev_agg: &Vec<Vec<u64>>| {
1183            cur_agg
1184                .iter()
1185                .zip(prev_agg.iter())
1186                .map(|(cur, prev)| {
1187                    cur.iter()
1188                        .zip(prev.iter())
1189                        .map(|(c, p)| (*c as i64 - *p as i64) as f64 / 1024_f64.powf(3.0))
1190                        .collect()
1191                })
1192                .collect()
1193        };
1194
1195        // Scale the raw value delta by the resctrl/pmc computed factor.
1196        let cur_layer_membw: Vec<Vec<f64>> =
1197            compute_mem_diff(&cur_layer_membw_agg, &self.prev_layer_membw_agg);
1198
1199        let cur_layer_membw: Vec<Vec<f64>> = cur_layer_membw
1200            .iter()
1201            .map(|x| x.iter().map(|x| *x * factor).collect())
1202            .collect();
1203
1204        let metric_decay =
1205            |cur_metric: Vec<Vec<f64>>, prev_metric: &Vec<Vec<f64>>, decay_rate: f64| {
1206                cur_metric
1207                    .iter()
1208                    .zip(prev_metric.iter())
1209                    .map(|(cur, prev)| {
1210                        cur.iter()
1211                            .zip(prev.iter())
1212                            .map(|(c, p)| {
1213                                let decay = decay_rate.powf(elapsed_f64);
1214                                p * decay + c * (1.0 - decay)
1215                            })
1216                            .collect()
1217                    })
1218                    .collect()
1219            };
1220
1221        let cur_node_pinned_utils: Vec<Vec<f64>> = compute_diff(
1222            &cur_layer_node_pinned_usages,
1223            &self.prev_layer_node_pinned_usages,
1224        );
1225        let layer_node_pinned_utils: Vec<Vec<f64>> = metric_decay(
1226            cur_node_pinned_utils,
1227            &self.layer_node_pinned_utils,
1228            *USAGE_DECAY,
1229        );
1230        let cur_node_utils: Vec<Vec<f64>> =
1231            compute_diff(&cur_layer_node_usages, &self.prev_layer_node_usages);
1232        let layer_node_utils: Vec<Vec<f64>> =
1233            metric_decay(cur_node_utils, &self.layer_node_utils, *USAGE_DECAY);
1234        let cur_node_duty: Vec<Vec<f64>> =
1235            compute_diff(&cur_layer_node_duty_raw, &self.prev_layer_node_duty_raw);
1236        let layer_node_duty_sums: Vec<Vec<f64>> =
1237            metric_decay(cur_node_duty, &self.layer_node_duty_sums, *USAGE_DECAY);
1238
1239        let layer_membws: Vec<Vec<f64>> = metric_decay(cur_layer_membw, &self.layer_membws, 0.0);
1240
1241        let proc_stat = proc_reader
1242            .read_stat()
1243            .context("Failed to read /proc/stat")?;
1244        let cur_total_cpu = proc_stat
1245            .total_cpu
1246            .ok_or_else(|| anyhow!("Could not read total cpu stat in proc"))?;
1247        let cpu_busy = calc_util(&cur_total_cpu, &self.prev_total_cpu)?;
1248
1249        // Calculate system CPU utilization EWMA (10 second window)
1250        const SYS_CPU_UTIL_EWMA_SECS: f64 = 10.0;
1251        let elapsed_f64 = elapsed.as_secs_f64();
1252        let alpha = elapsed_f64 / SYS_CPU_UTIL_EWMA_SECS.max(elapsed_f64);
1253        let system_cpu_util_ewma = alpha * cpu_busy + (1.0 - alpha) * self.system_cpu_util_ewma;
1254
1255        // Per-CPU scale factors: s[c] = Δt / (Δt - irq - softirq - stolen).
1256        // When compensation is off, all scales stay 1.0.
1257        let cur_per_cpu_stats = proc_stat.cpus_map.unwrap_or_default();
1258        let mut cpu_scales = vec![1.0f64; *NR_CPUS_POSSIBLE];
1259        if self.util_compensation {
1260            for (&cpu_id, cur_cpu_stat) in &cur_per_cpu_stats {
1261                let cpu = cpu_id as usize;
1262                if let Some(prev_cpu_stat) = self.prev_per_cpu_stats.get(&cpu_id) {
1263                    if let (
1264                        fb_procfs::CpuStat {
1265                            user_usec: Some(cu),
1266                            nice_usec: Some(cn),
1267                            system_usec: Some(cs),
1268                            idle_usec: Some(ci),
1269                            iowait_usec: Some(cw),
1270                            irq_usec: Some(cq),
1271                            softirq_usec: Some(cf),
1272                            stolen_usec: Some(ct),
1273                            ..
1274                        },
1275                        fb_procfs::CpuStat {
1276                            user_usec: Some(pu),
1277                            nice_usec: Some(pn),
1278                            system_usec: Some(ps),
1279                            idle_usec: Some(pi),
1280                            iowait_usec: Some(pw),
1281                            irq_usec: Some(pq),
1282                            softirq_usec: Some(pf),
1283                            stolen_usec: Some(pt),
1284                            ..
1285                        },
1286                    ) = (cur_cpu_stat, prev_cpu_stat)
1287                    {
1288                        let delta_total = cu.saturating_sub(*pu)
1289                            + cn.saturating_sub(*pn)
1290                            + cs.saturating_sub(*ps)
1291                            + ci.saturating_sub(*pi)
1292                            + cw.saturating_sub(*pw)
1293                            + cq.saturating_sub(*pq)
1294                            + cf.saturating_sub(*pf)
1295                            + ct.saturating_sub(*pt);
1296                        let overhead = cq.saturating_sub(*pq)
1297                            + cf.saturating_sub(*pf)
1298                            + ct.saturating_sub(*pt);
1299                        let available = delta_total.saturating_sub(overhead);
1300                        cpu_scales[cpu] = if available > 0 {
1301                            (delta_total as f64 / available as f64).clamp(1.0, 20.0)
1302                        } else {
1303                            1.0
1304                        };
1305                    }
1306                }
1307            }
1308        }
1309
1310        // Single pass over all CPUs: build both raw and compensated layer
1311        // util streams. When compensation is off, all scales are 1.0 so
1312        // comp == raw naturally.
1313        let cur_cpu_layer_usages = Self::read_per_cpu_layer_usages(&cpu_ctxs, self.nr_layers);
1314        let stride = self.nr_layers * NR_LAYER_USAGES;
1315        let mut raw_sums = vec![vec![0.0f64; NR_LAYER_USAGES]; self.nr_layers];
1316        let mut scaled_sums = vec![vec![0.0f64; NR_LAYER_USAGES]; self.nr_layers];
1317        #[allow(clippy::needless_range_loop)]
1318        for cpu in 0..*NR_CPUS_POSSIBLE {
1319            let scale = cpu_scales[cpu];
1320            let base = cpu * stride;
1321            for layer in 0..self.nr_layers {
1322                for usage in 0..NR_LAYER_USAGES {
1323                    let idx = base + layer * NR_LAYER_USAGES + usage;
1324                    let delta =
1325                        cur_cpu_layer_usages[idx].saturating_sub(self.prev_cpu_layer_usages[idx]);
1326                    if delta > 0 {
1327                        let delta_f = delta as f64;
1328                        raw_sums[layer][usage] += delta_f;
1329                        scaled_sums[layer][usage] += delta_f * scale;
1330                    }
1331                }
1332            }
1333        }
1334        let normalize = |sums: Vec<Vec<f64>>| -> Vec<Vec<f64>> {
1335            sums.into_iter()
1336                .map(|layer_sums| {
1337                    layer_sums
1338                        .into_iter()
1339                        .map(|s| s / 1_000_000_000.0 / elapsed_f64)
1340                        .collect()
1341                })
1342                .collect()
1343        };
1344        let layer_utils: Vec<Vec<f64>> =
1345            metric_decay(normalize(raw_sums), &self.layer_utils, *USAGE_DECAY);
1346        let layer_utils_compensated: Vec<Vec<f64>> = metric_decay(
1347            normalize(scaled_sums),
1348            &self.layer_utils_compensated,
1349            *USAGE_DECAY,
1350        );
1351
1352        let cur_bpf_stats = BpfStats::read(skel, &cpu_ctxs);
1353        let bpf_stats = &cur_bpf_stats - &self.prev_bpf_stats;
1354
1355        // Calculate per-layer DSQ insertion EWMA (10 second window)
1356        const DSQ_INSERT_EWMA_SECS: f64 = 10.0;
1357        let dsq_alpha = elapsed_f64 / DSQ_INSERT_EWMA_SECS.max(elapsed_f64);
1358        let layer_dsq_insert_ewma: Vec<f64> = (0..self.nr_layers)
1359            .map(|layer_id| {
1360                let sel_local = bpf_stats.lstats[layer_id]
1361                    [bpf_intf::layer_stat_id_LSTAT_SEL_LOCAL as usize]
1362                    as f64;
1363                let enq_local = bpf_stats.lstats[layer_id]
1364                    [bpf_intf::layer_stat_id_LSTAT_ENQ_LOCAL as usize]
1365                    as f64;
1366                let enq_dsq = bpf_stats.lstats[layer_id]
1367                    [bpf_intf::layer_stat_id_LSTAT_ENQ_DSQ as usize]
1368                    as f64;
1369                let total_dispatches = sel_local + enq_local + enq_dsq;
1370
1371                let cur_ratio = if total_dispatches > 0.0 {
1372                    enq_dsq / total_dispatches
1373                } else {
1374                    0.0
1375                };
1376
1377                dsq_alpha * cur_ratio + (1.0 - dsq_alpha) * self.layer_dsq_insert_ewma[layer_id]
1378            })
1379            .collect();
1380
1381        let processing_dur = cur_processing_dur
1382            .checked_sub(self.prev_processing_dur)
1383            .unwrap();
1384
1385        *self = Self {
1386            at: now,
1387            elapsed,
1388            topo: self.topo.clone(),
1389            nr_layers: self.nr_layers,
1390            nr_layer_tasks,
1391            layer_nr_node_pinned_tasks,
1392
1393            total_util: layer_utils
1394                .iter()
1395                .map(|x| x.iter().take(LAYER_USAGE_SUM_UPTO + 1).sum::<f64>())
1396                .sum(),
1397            layer_utils,
1398            layer_node_pinned_utils,
1399            prev_layer_node_pinned_usages: cur_layer_node_pinned_usages,
1400            layer_node_utils,
1401            prev_layer_node_usages: cur_layer_node_usages,
1402            layer_node_duty_sums,
1403            prev_layer_node_duty_raw: cur_layer_node_duty_raw,
1404
1405            layer_membws,
1406            prev_layer_membw_agg: cur_layer_membw_agg,
1407            // Was updated during normalization.
1408            prev_pmu_resctrl_membw: (pmu_cur, resctrl_cur),
1409
1410            cpu_busy,
1411            prev_total_cpu: cur_total_cpu,
1412            util_compensation: self.util_compensation,
1413            layer_utils_compensated,
1414            prev_cpu_layer_usages: cur_cpu_layer_usages,
1415            prev_per_cpu_stats: cur_per_cpu_stats,
1416            system_cpu_util_ewma,
1417            layer_dsq_insert_ewma,
1418
1419            bpf_stats,
1420            prev_bpf_stats: cur_bpf_stats,
1421
1422            processing_dur,
1423            prev_processing_dur: cur_processing_dur,
1424
1425            layer_slice_us,
1426            gpu_tasks_affinitized: gpu_task_affinitizer.tasks_affinitized,
1427            gpu_task_affinitization_ms: gpu_task_affinitizer.last_task_affinitization_ms,
1428        };
1429        Ok(())
1430    }
1431}
1432
1433#[derive(Debug)]
1434struct Layer {
1435    name: String,
1436    kind: LayerKind,
1437    growth_algo: LayerGrowthAlgo,
1438    core_order: Vec<Vec<usize>>,
1439
1440    assigned_llcs: Vec<Vec<usize>>,
1441
1442    nr_cpus: usize,
1443    nr_llc_cpus: Vec<usize>,
1444    nr_node_cpus: Vec<usize>,
1445    cpus: Cpumask,
1446    allowed_cpus: Cpumask,
1447
1448    /// Per-node count of CPUs allocated for pinned demand.
1449    nr_pinned_cpus: Vec<usize>,
1450}
1451
1452fn get_kallsyms_addr(sym_name: &str) -> Result<u64> {
1453    fs::read_to_string("/proc/kallsyms")?
1454        .lines()
1455        .find(|line| line.contains(sym_name))
1456        .and_then(|line| line.split_whitespace().next())
1457        .and_then(|addr| u64::from_str_radix(addr, 16).ok())
1458        .ok_or_else(|| anyhow!("Symbol '{}' not found", sym_name))
1459}
1460
1461fn resolve_cpus_pct_range(
1462    cpus_range: &Option<(usize, usize)>,
1463    cpus_range_frac: &Option<(f64, f64)>,
1464    max_cpus: usize,
1465) -> Result<(usize, usize)> {
1466    match (cpus_range, cpus_range_frac) {
1467        (Some(_x), Some(_y)) => {
1468            bail!("cpus_range cannot be used with cpus_pct.");
1469        }
1470        (Some((cpus_range_min, cpus_range_max)), None) => Ok((*cpus_range_min, *cpus_range_max)),
1471        (None, Some((cpus_frac_min, cpus_frac_max))) => {
1472            if *cpus_frac_min < 0_f64
1473                || *cpus_frac_min > 1_f64
1474                || *cpus_frac_max < 0_f64
1475                || *cpus_frac_max > 1_f64
1476            {
1477                bail!("cpus_range_frac values must be between 0.0 and 1.0");
1478            }
1479            let cpus_min_count = ((max_cpus as f64) * cpus_frac_min).round_ties_even() as usize;
1480            let cpus_max_count = ((max_cpus as f64) * cpus_frac_max).round_ties_even() as usize;
1481            Ok((
1482                std::cmp::max(cpus_min_count, 1),
1483                std::cmp::min(std::cmp::max(cpus_max_count, 1), max_cpus),
1484            ))
1485        }
1486        (None, None) => Ok((0, max_cpus)),
1487    }
1488}
1489
1490impl Layer {
1491    fn new(spec: &LayerSpec, topo: &Topology, core_order: &Vec<Vec<usize>>) -> Result<Self> {
1492        let name = &spec.name;
1493        let kind = spec.kind.clone();
1494        let mut allowed_cpus = Cpumask::new();
1495        match &kind {
1496            LayerKind::Confined {
1497                cpus_range,
1498                cpus_range_frac,
1499                common: LayerCommon { nodes, llcs, .. },
1500                ..
1501            } => {
1502                let cpus_range =
1503                    resolve_cpus_pct_range(cpus_range, cpus_range_frac, topo.all_cpus.len())?;
1504                if cpus_range.0 > cpus_range.1 || cpus_range.1 == 0 {
1505                    bail!("invalid cpus_range {:?}", cpus_range);
1506                }
1507                if nodes.is_empty() && llcs.is_empty() {
1508                    allowed_cpus.set_all();
1509                } else {
1510                    // build up the cpus bitset
1511                    for (node_id, node) in &topo.nodes {
1512                        // first do the matching for nodes
1513                        if nodes.contains(node_id) {
1514                            for &id in node.all_cpus.keys() {
1515                                allowed_cpus.set_cpu(id)?;
1516                            }
1517                        }
1518                        // next match on any LLCs
1519                        for (llc_id, llc) in &node.llcs {
1520                            if llcs.contains(llc_id) {
1521                                for &id in llc.all_cpus.keys() {
1522                                    allowed_cpus.set_cpu(id)?;
1523                                }
1524                            }
1525                        }
1526                    }
1527                }
1528            }
1529            LayerKind::Grouped {
1530                common: LayerCommon { nodes, llcs, .. },
1531                ..
1532            }
1533            | LayerKind::Open {
1534                common: LayerCommon { nodes, llcs, .. },
1535                ..
1536            } => {
1537                if nodes.is_empty() && llcs.is_empty() {
1538                    allowed_cpus.set_all();
1539                } else {
1540                    // build up the cpus bitset
1541                    for (node_id, node) in &topo.nodes {
1542                        // first do the matching for nodes
1543                        if nodes.contains(node_id) {
1544                            for &id in node.all_cpus.keys() {
1545                                allowed_cpus.set_cpu(id)?;
1546                            }
1547                        }
1548                        // next match on any LLCs
1549                        for (llc_id, llc) in &node.llcs {
1550                            if llcs.contains(llc_id) {
1551                                for &id in llc.all_cpus.keys() {
1552                                    allowed_cpus.set_cpu(id)?;
1553                                }
1554                            }
1555                        }
1556                    }
1557                }
1558            }
1559        }
1560
1561        // Util can be above 1.0 for grouped layers if
1562        // util_includes_open_cputime is set.
1563        if let Some(util_range) = kind.util_range() {
1564            if util_range.0 < 0.0 || util_range.1 < 0.0 || util_range.0 >= util_range.1 {
1565                bail!("invalid util_range {:?}", util_range);
1566            }
1567        }
1568
1569        let layer_growth_algo = kind.common().growth_algo.clone();
1570
1571        debug!(
1572            "layer: {} algo: {:?} core order: {:?}",
1573            name, &layer_growth_algo, core_order
1574        );
1575
1576        Ok(Self {
1577            name: name.into(),
1578            kind,
1579            growth_algo: layer_growth_algo,
1580            core_order: core_order.clone(),
1581
1582            assigned_llcs: vec![vec![]; topo.nodes.len()],
1583
1584            nr_cpus: 0,
1585            nr_llc_cpus: vec![0; topo.all_llcs.len()],
1586            nr_node_cpus: vec![0; topo.nodes.len()],
1587            cpus: Cpumask::new(),
1588            allowed_cpus,
1589
1590            nr_pinned_cpus: vec![0; topo.nodes.len()],
1591        })
1592    }
1593}
1594#[derive(Debug, Clone)]
1595struct NodeInfo {
1596    node_mask: nix::sched::CpuSet,
1597    _node_id: usize,
1598}
1599
1600#[derive(Debug)]
1601struct GpuTaskAffinitizer {
1602    // This struct tracks information necessary to numa affinitize
1603    // gpu tasks periodically when needed.
1604    gpu_devs_to_node_info: HashMap<u32, NodeInfo>,
1605    gpu_pids_to_devs: HashMap<Pid, u32>,
1606    last_process_time: Option<Instant>,
1607    sys: System,
1608    pid_map: HashMap<Pid, Vec<Pid>>,
1609    poll_interval: Duration,
1610    enable: bool,
1611    tasks_affinitized: u64,
1612    last_task_affinitization_ms: u64,
1613}
1614
1615impl GpuTaskAffinitizer {
1616    pub fn new(poll_interval: u64, enable: bool) -> GpuTaskAffinitizer {
1617        GpuTaskAffinitizer {
1618            gpu_devs_to_node_info: HashMap::new(),
1619            gpu_pids_to_devs: HashMap::new(),
1620            last_process_time: None,
1621            sys: System::default(),
1622            pid_map: HashMap::new(),
1623            poll_interval: Duration::from_secs(poll_interval),
1624            enable,
1625            tasks_affinitized: 0,
1626            last_task_affinitization_ms: 0,
1627        }
1628    }
1629
1630    fn find_one_cpu(&self, affinity: Vec<u64>) -> Result<u32> {
1631        for (chunk, &mask) in affinity.iter().enumerate() {
1632            let mut inner_offset: u64 = 1;
1633            for _ in 0..64 {
1634                if (mask & inner_offset) != 0 {
1635                    return Ok((64 * chunk + u64::trailing_zeros(inner_offset) as usize) as u32);
1636                }
1637                inner_offset <<= 1;
1638            }
1639        }
1640        anyhow::bail!("unable to get CPU from NVML bitmask");
1641    }
1642
1643    fn node_to_cpuset(&self, node: &scx_utils::Node) -> Result<CpuSet> {
1644        let mut cpuset = CpuSet::new();
1645        for cpu_id in node.all_cpus.keys() {
1646            cpuset.set(*cpu_id)?;
1647        }
1648        Ok(cpuset)
1649    }
1650
1651    fn init_dev_node_map(&mut self, topo: Arc<Topology>) -> Result<()> {
1652        let nvml = nvml()?;
1653        let device_count = nvml.device_count()?;
1654
1655        for idx in 0..device_count {
1656            let dev = nvml.device_by_index(idx)?;
1657            // For machines w/ up to 1024 CPUs.
1658            let cpu = dev.cpu_affinity(16)?;
1659            let ideal_cpu = self.find_one_cpu(cpu)?;
1660            if let Some(cpu) = topo.all_cpus.get(&(ideal_cpu as usize)) {
1661                self.gpu_devs_to_node_info.insert(
1662                    idx,
1663                    NodeInfo {
1664                        node_mask: self.node_to_cpuset(
1665                            topo.nodes.get(&cpu.node_id).expect("topo missing node"),
1666                        )?,
1667                        _node_id: cpu.node_id,
1668                    },
1669                );
1670            }
1671        }
1672        Ok(())
1673    }
1674
1675    fn update_gpu_pids(&mut self) -> Result<()> {
1676        let nvml = nvml()?;
1677        for i in 0..nvml.device_count()? {
1678            let device = nvml.device_by_index(i)?;
1679            for proc in device
1680                .running_compute_processes()?
1681                .into_iter()
1682                .chain(device.running_graphics_processes()?.into_iter())
1683            {
1684                self.gpu_pids_to_devs.insert(Pid::from_u32(proc.pid), i);
1685            }
1686        }
1687        Ok(())
1688    }
1689
1690    fn update_process_info(&mut self) -> Result<()> {
1691        self.sys.refresh_processes_specifics(
1692            ProcessesToUpdate::All,
1693            true,
1694            ProcessRefreshKind::nothing(),
1695        );
1696        self.pid_map.clear();
1697        for (pid, proc_) in self.sys.processes() {
1698            if let Some(ppid) = proc_.parent() {
1699                self.pid_map.entry(ppid).or_default().push(*pid);
1700            }
1701        }
1702        Ok(())
1703    }
1704
1705    fn get_child_pids_and_tids(&self, root_pid: Pid) -> HashSet<Pid> {
1706        let mut work = VecDeque::from([root_pid]);
1707        let mut pids_and_tids: HashSet<Pid> = HashSet::new();
1708
1709        while let Some(pid) = work.pop_front() {
1710            if pids_and_tids.insert(pid) {
1711                if let Some(kids) = self.pid_map.get(&pid) {
1712                    work.extend(kids);
1713                }
1714                if let Some(proc_) = self.sys.process(pid) {
1715                    if let Some(tasks) = proc_.tasks() {
1716                        pids_and_tids.extend(tasks.iter().copied());
1717                    }
1718                }
1719            }
1720        }
1721        pids_and_tids
1722    }
1723
1724    fn affinitize_gpu_pids(&mut self) -> Result<()> {
1725        if !self.enable {
1726            return Ok(());
1727        }
1728        for (pid, dev) in &self.gpu_pids_to_devs {
1729            let node_info = self
1730                .gpu_devs_to_node_info
1731                .get(dev)
1732                .expect("Unable to get gpu pid node mask");
1733            for child in self.get_child_pids_and_tids(*pid) {
1734                match nix::sched::sched_setaffinity(
1735                    nix::unistd::Pid::from_raw(child.as_u32() as i32),
1736                    &node_info.node_mask,
1737                ) {
1738                    Ok(_) => {
1739                        // Increment the global counter for successful affinitization
1740                        self.tasks_affinitized += 1;
1741                    }
1742                    Err(_) => {
1743                        debug!(
1744                            "Error affinitizing gpu pid {} to node {:#?}",
1745                            child.as_u32(),
1746                            node_info
1747                        );
1748                    }
1749                };
1750            }
1751        }
1752        Ok(())
1753    }
1754
1755    pub fn maybe_affinitize(&mut self) {
1756        if !self.enable {
1757            return;
1758        }
1759        let now = Instant::now();
1760
1761        if let Some(last_process_time) = self.last_process_time {
1762            if (now - last_process_time) < self.poll_interval {
1763                return;
1764            }
1765        }
1766
1767        match self.update_gpu_pids() {
1768            Ok(_) => {}
1769            Err(e) => {
1770                error!("Error updating GPU PIDs: {}", e);
1771            }
1772        };
1773        match self.update_process_info() {
1774            Ok(_) => {}
1775            Err(e) => {
1776                error!("Error updating process info to affinitize GPU PIDs: {}", e);
1777            }
1778        };
1779        match self.affinitize_gpu_pids() {
1780            Ok(_) => {}
1781            Err(e) => {
1782                error!("Error updating GPU PIDs: {}", e);
1783            }
1784        };
1785        self.last_process_time = Some(now);
1786        self.last_task_affinitization_ms = (Instant::now() - now).as_millis() as u64;
1787    }
1788
1789    pub fn init(&mut self, topo: Arc<Topology>) {
1790        if !self.enable || self.last_process_time.is_some() {
1791            return;
1792        }
1793
1794        match self.init_dev_node_map(topo) {
1795            Ok(_) => {}
1796            Err(e) => {
1797                error!("Error initializing gpu node dev map: {}", e);
1798            }
1799        };
1800        self.sys = System::new_all();
1801    }
1802}
1803
1804struct Scheduler<'a> {
1805    skel: BpfSkel<'a>,
1806    struct_ops: Option<libbpf_rs::Link>,
1807    layer_specs: Vec<LayerSpec>,
1808
1809    sched_intv: Duration,
1810    layer_refresh_intv: Duration,
1811
1812    cpu_pool: CpuPool,
1813    layers: Vec<Layer>,
1814    idle_qos_enabled: bool,
1815
1816    proc_reader: fb_procfs::ProcReader,
1817    sched_stats: Stats,
1818
1819    cgroup_regexes: Option<HashMap<u32, Regex>>,
1820
1821    nr_layer_cpus_ranges: Vec<(usize, usize)>,
1822    xnuma_mig_src: Vec<Vec<bool>>,
1823    growth_denied: Vec<Vec<bool>>,
1824    processing_dur: Duration,
1825
1826    topo: Arc<Topology>,
1827    netdevs: BTreeMap<String, NetDev>,
1828    stats_server: StatsServer<StatsReq, StatsRes>,
1829    gpu_task_handler: GpuTaskAffinitizer,
1830}
1831
1832const DUTY_CYCLE_SCALE: f64 = (1u64 << 20) as f64;
1833const XNUMA_RATE_DAMPEN: f64 = 0.5;
1834
1835/// Result of xnuma water-fill computation for a single layer.
1836struct XnumaRates {
1837    /// rates[src][dst]: migration rate in duty-cycle-scaled units.
1838    rates: Vec<Vec<u64>>,
1839}
1840
1841/// Determine per-node migration source state with two-threshold hysteresis.
1842///
1843/// Each (layer, node) independently decides if it's a migration source.
1844/// Open (is_mig_src=true) requires all three:
1845///   1. load/alloc > threshold.1 (significant load)
1846///   2. surplus/alloc > delta.1 (significant imbalance)
1847///   3. growth_denied (allocation can't solve it)
1848///
1849/// Close (is_mig_src=false) when any one:
1850///   1. load/alloc < threshold.0 (load dropped)
1851///   2. surplus/alloc < delta.0 (imbalance resolved)
1852///   3. !growth_denied (growth succeeded)
1853fn xnuma_check_active(
1854    duty_sums: &[f64],
1855    allocs: &[usize],
1856    threshold: (f64, f64),
1857    threshold_delta: (f64, f64),
1858    growth_denied: &[bool],
1859    currently_active: &[bool],
1860) -> Vec<bool> {
1861    let nr_nodes = duty_sums.len();
1862    let total_duty: f64 = duty_sums.iter().sum();
1863    let total_alloc: f64 = allocs.iter().map(|&a| a as f64).sum();
1864    let eq_ratio = if total_alloc > 0.0 {
1865        total_duty / total_alloc
1866    } else {
1867        0.0
1868    };
1869
1870    let (thresh_lo, thresh_hi) = threshold;
1871    let (delta_lo, delta_hi) = threshold_delta;
1872
1873    let mut result = vec![false; nr_nodes];
1874    for nid in 0..nr_nodes {
1875        let alloc = allocs[nid] as f64;
1876        if alloc <= 0.0 {
1877            if duty_sums[nid] > 0.0 && growth_denied[nid] {
1878                result[nid] = true;
1879            }
1880            continue;
1881        }
1882
1883        let load_ratio = duty_sums[nid] / alloc;
1884        let surplus = duty_sums[nid] - eq_ratio * alloc;
1885        let surplus_ratio = surplus / alloc;
1886
1887        let should_activate =
1888            load_ratio > thresh_hi && surplus_ratio > delta_hi && growth_denied[nid];
1889        let should_deactivate =
1890            load_ratio < thresh_lo || surplus_ratio < delta_lo || !growth_denied[nid];
1891
1892        if should_activate {
1893            result[nid] = true;
1894        } else if should_deactivate {
1895            result[nid] = false;
1896        } else {
1897            result[nid] = currently_active[nid];
1898        }
1899    }
1900    result
1901}
1902
1903/// Compute water-fill migration rates for a single layer.
1904///
1905/// Finds the equalization ratio (water line) across all nodes, then
1906/// computes per-(src, dst) migration rates proportional to each source's
1907/// surplus and each destination's share of total deficit.
1908fn xnuma_compute_rates(duty_sums: &[f64], allocs: &[usize]) -> XnumaRates {
1909    let nr_nodes = duty_sums.len();
1910    let total_duty: f64 = duty_sums.iter().sum();
1911    let total_alloc: f64 = allocs.iter().map(|&a| a as f64).sum();
1912
1913    if total_alloc <= 0.0 {
1914        return XnumaRates {
1915            rates: vec![vec![0u64; nr_nodes]; nr_nodes],
1916        };
1917    }
1918
1919    let eq_ratio = total_duty / total_alloc;
1920
1921    let mut surpluses = vec![0.0f64; nr_nodes];
1922    let mut deficits = vec![0.0f64; nr_nodes];
1923    for nid in 0..nr_nodes {
1924        let expected = eq_ratio * allocs[nid] as f64;
1925        let delta = duty_sums[nid] - expected;
1926        if delta > 0.0 {
1927            surpluses[nid] = delta;
1928        } else {
1929            deficits[nid] = -delta;
1930        }
1931    }
1932
1933    let total_deficit: f64 = deficits.iter().sum();
1934
1935    let mut rates = vec![vec![0u64; nr_nodes]; nr_nodes];
1936    for src in 0..nr_nodes {
1937        for dst in 0..nr_nodes {
1938            if src == dst || total_deficit <= 0.0 || surpluses[src] <= 0.0 {
1939                continue;
1940            }
1941            // Dampen: transfer half the surplus per cycle so convergence
1942            // is gradual rather than a single-step overcorrection.
1943            let migration = surpluses[src] * deficits[dst] / total_deficit * XNUMA_RATE_DAMPEN;
1944            rates[src][dst] = (migration * DUTY_CYCLE_SCALE) as u64;
1945        }
1946    }
1947
1948    XnumaRates { rates }
1949}
1950
1951impl<'a> Scheduler<'a> {
1952    fn init_layers(
1953        skel: &mut OpenBpfSkel,
1954        specs: &[LayerSpec],
1955        topo: &Topology,
1956    ) -> Result<HashMap<u32, Regex>> {
1957        skel.maps.rodata_data.as_mut().unwrap().nr_layers = specs.len() as u32;
1958        let mut perf_set = false;
1959
1960        let mut layer_iteration_order = (0..specs.len()).collect::<Vec<_>>();
1961        let mut layer_weights: Vec<usize> = vec![];
1962        let mut cgroup_regex_id = 0;
1963        let mut cgroup_regexes = HashMap::new();
1964
1965        for (spec_i, spec) in specs.iter().enumerate() {
1966            let layer = &mut skel.maps.bss_data.as_mut().unwrap().layers[spec_i];
1967
1968            for (or_i, or) in spec.matches.iter().enumerate() {
1969                for (and_i, and) in or.iter().enumerate() {
1970                    let mt = &mut layer.matches[or_i].matches[and_i];
1971
1972                    // Rules are allowlist-based by default
1973                    mt.exclude.write(false);
1974
1975                    match and {
1976                        LayerMatch::CgroupPrefix(prefix) => {
1977                            mt.kind = bpf_intf::layer_match_kind_MATCH_CGROUP_PREFIX as i32;
1978                            copy_into_cstr(&mut mt.cgroup_prefix, prefix.as_str());
1979                        }
1980                        LayerMatch::CgroupSuffix(suffix) => {
1981                            mt.kind = bpf_intf::layer_match_kind_MATCH_CGROUP_SUFFIX as i32;
1982                            copy_into_cstr(&mut mt.cgroup_suffix, suffix.as_str());
1983                        }
1984                        LayerMatch::CgroupRegex(regex_str) => {
1985                            if cgroup_regex_id >= bpf_intf::consts_MAX_CGROUP_REGEXES {
1986                                bail!(
1987                                    "Too many cgroup regex rules. Maximum allowed: {}",
1988                                    bpf_intf::consts_MAX_CGROUP_REGEXES
1989                                );
1990                            }
1991
1992                            // CgroupRegex matching handled in userspace via cgroup watcher
1993                            mt.kind = bpf_intf::layer_match_kind_MATCH_CGROUP_REGEX as i32;
1994                            mt.cgroup_regex_id = cgroup_regex_id;
1995
1996                            let regex = Regex::new(regex_str).with_context(|| {
1997                                format!("Invalid regex '{}' in layer '{}'", regex_str, spec.name)
1998                            })?;
1999                            cgroup_regexes.insert(cgroup_regex_id, regex);
2000                            cgroup_regex_id += 1;
2001                        }
2002                        LayerMatch::CgroupContains(substr) => {
2003                            mt.kind = bpf_intf::layer_match_kind_MATCH_CGROUP_CONTAINS as i32;
2004                            copy_into_cstr(&mut mt.cgroup_substr, substr.as_str());
2005                        }
2006                        LayerMatch::CommPrefix(prefix) => {
2007                            mt.kind = bpf_intf::layer_match_kind_MATCH_COMM_PREFIX as i32;
2008                            copy_into_cstr(&mut mt.comm_prefix, prefix.as_str());
2009                        }
2010                        LayerMatch::CommPrefixExclude(prefix) => {
2011                            mt.kind = bpf_intf::layer_match_kind_MATCH_COMM_PREFIX as i32;
2012                            mt.exclude.write(true);
2013                            copy_into_cstr(&mut mt.comm_prefix, prefix.as_str());
2014                        }
2015                        LayerMatch::PcommPrefix(prefix) => {
2016                            mt.kind = bpf_intf::layer_match_kind_MATCH_PCOMM_PREFIX as i32;
2017                            copy_into_cstr(&mut mt.pcomm_prefix, prefix.as_str());
2018                        }
2019                        LayerMatch::PcommPrefixExclude(prefix) => {
2020                            mt.kind = bpf_intf::layer_match_kind_MATCH_PCOMM_PREFIX as i32;
2021                            mt.exclude.write(true);
2022                            copy_into_cstr(&mut mt.pcomm_prefix, prefix.as_str());
2023                        }
2024                        LayerMatch::NiceAbove(nice) => {
2025                            mt.kind = bpf_intf::layer_match_kind_MATCH_NICE_ABOVE as i32;
2026                            mt.nice = *nice;
2027                        }
2028                        LayerMatch::NiceBelow(nice) => {
2029                            mt.kind = bpf_intf::layer_match_kind_MATCH_NICE_BELOW as i32;
2030                            mt.nice = *nice;
2031                        }
2032                        LayerMatch::NiceEquals(nice) => {
2033                            mt.kind = bpf_intf::layer_match_kind_MATCH_NICE_EQUALS as i32;
2034                            mt.nice = *nice;
2035                        }
2036                        LayerMatch::UIDEquals(user_id) => {
2037                            mt.kind = bpf_intf::layer_match_kind_MATCH_USER_ID_EQUALS as i32;
2038                            mt.user_id = *user_id;
2039                        }
2040                        LayerMatch::GIDEquals(group_id) => {
2041                            mt.kind = bpf_intf::layer_match_kind_MATCH_GROUP_ID_EQUALS as i32;
2042                            mt.group_id = *group_id;
2043                        }
2044                        LayerMatch::PIDEquals(pid) => {
2045                            mt.kind = bpf_intf::layer_match_kind_MATCH_PID_EQUALS as i32;
2046                            mt.pid = *pid;
2047                        }
2048                        LayerMatch::PPIDEquals(ppid) => {
2049                            mt.kind = bpf_intf::layer_match_kind_MATCH_PPID_EQUALS as i32;
2050                            mt.ppid = *ppid;
2051                        }
2052                        LayerMatch::TGIDEquals(tgid) => {
2053                            mt.kind = bpf_intf::layer_match_kind_MATCH_TGID_EQUALS as i32;
2054                            mt.tgid = *tgid;
2055                        }
2056                        LayerMatch::NSPIDEquals(nsid, pid) => {
2057                            mt.kind = bpf_intf::layer_match_kind_MATCH_NSPID_EQUALS as i32;
2058                            mt.nsid = *nsid;
2059                            mt.pid = *pid;
2060                        }
2061                        LayerMatch::NSEquals(nsid) => {
2062                            mt.kind = bpf_intf::layer_match_kind_MATCH_NS_EQUALS as i32;
2063                            mt.nsid = *nsid as u64;
2064                        }
2065                        LayerMatch::CmdJoin(joincmd) => {
2066                            mt.kind = bpf_intf::layer_match_kind_MATCH_SCXCMD_JOIN as i32;
2067                            copy_into_cstr(&mut mt.comm_prefix, joincmd);
2068                        }
2069                        LayerMatch::IsGroupLeader(polarity) => {
2070                            mt.kind = bpf_intf::layer_match_kind_MATCH_IS_GROUP_LEADER as i32;
2071                            mt.is_group_leader.write(*polarity);
2072                        }
2073                        LayerMatch::IsKthread(polarity) => {
2074                            mt.kind = bpf_intf::layer_match_kind_MATCH_IS_KTHREAD as i32;
2075                            mt.is_kthread.write(*polarity);
2076                        }
2077                        LayerMatch::UsedGpuTid(polarity) => {
2078                            mt.kind = bpf_intf::layer_match_kind_MATCH_USED_GPU_TID as i32;
2079                            mt.used_gpu_tid.write(*polarity);
2080                        }
2081                        LayerMatch::UsedGpuPid(polarity) => {
2082                            mt.kind = bpf_intf::layer_match_kind_MATCH_USED_GPU_PID as i32;
2083                            mt.used_gpu_pid.write(*polarity);
2084                        }
2085                        LayerMatch::AvgRuntime(min, max) => {
2086                            mt.kind = bpf_intf::layer_match_kind_MATCH_AVG_RUNTIME as i32;
2087                            mt.min_avg_runtime_us = *min;
2088                            mt.max_avg_runtime_us = *max;
2089                        }
2090                        LayerMatch::HintEquals(hint) => {
2091                            mt.kind = bpf_intf::layer_match_kind_MATCH_HINT_EQUALS as i32;
2092                            mt.hint = *hint;
2093                        }
2094                        LayerMatch::SystemCpuUtilBelow(threshold) => {
2095                            mt.kind = bpf_intf::layer_match_kind_MATCH_SYSTEM_CPU_UTIL_BELOW as i32;
2096                            mt.system_cpu_util_below = (*threshold * 10000.0) as u64;
2097                        }
2098                        LayerMatch::DsqInsertBelow(threshold) => {
2099                            mt.kind = bpf_intf::layer_match_kind_MATCH_DSQ_INSERT_BELOW as i32;
2100                            mt.dsq_insert_below = (*threshold * 10000.0) as u64;
2101                        }
2102                        LayerMatch::NumaNode(node_id) => {
2103                            if *node_id as usize >= topo.nodes.len() {
2104                                bail!(
2105                                    "Spec {:?} has invalid NUMA node ID {} (available nodes: 0-{})",
2106                                    spec.name,
2107                                    node_id,
2108                                    topo.nodes.len() - 1
2109                                );
2110                            }
2111                            mt.kind = bpf_intf::layer_match_kind_MATCH_NUMA_NODE as i32;
2112                            mt.numa_node_id = *node_id;
2113                        }
2114                    }
2115                }
2116                layer.matches[or_i].nr_match_ands = or.len() as i32;
2117            }
2118
2119            layer.nr_match_ors = spec.matches.len() as u32;
2120            layer.kind = spec.kind.as_bpf_enum();
2121
2122            {
2123                let LayerCommon {
2124                    min_exec_us,
2125                    yield_ignore,
2126                    perf,
2127                    preempt,
2128                    preempt_first,
2129                    exclusive,
2130                    skip_remote_node,
2131                    prev_over_idle_core,
2132                    growth_algo,
2133                    slice_us,
2134                    fifo,
2135                    weight,
2136                    disallow_open_after_us,
2137                    disallow_preempt_after_us,
2138                    xllc_mig_min_us,
2139                    placement,
2140                    member_expire_ms,
2141                    ..
2142                } = spec.kind.common();
2143
2144                layer.slice_ns = *slice_us * 1000;
2145                layer.fifo.write(*fifo);
2146                layer.min_exec_ns = min_exec_us * 1000;
2147                layer.yield_step_ns = if *yield_ignore > 0.999 {
2148                    0
2149                } else if *yield_ignore < 0.001 {
2150                    layer.slice_ns
2151                } else {
2152                    (layer.slice_ns as f64 * (1.0 - *yield_ignore)) as u64
2153                };
2154                let mut layer_name: String = spec.name.clone();
2155                layer_name.truncate(MAX_LAYER_NAME);
2156                copy_into_cstr(&mut layer.name, layer_name.as_str());
2157                layer.preempt.write(*preempt);
2158                layer.preempt_first.write(*preempt_first);
2159                layer.excl.write(*exclusive);
2160                layer.skip_remote_node.write(*skip_remote_node);
2161                layer.prev_over_idle_core.write(*prev_over_idle_core);
2162                layer.growth_algo = growth_algo.as_bpf_enum();
2163                layer.weight = *weight;
2164                layer.member_expire_ms = *member_expire_ms;
2165                layer.disallow_open_after_ns = match disallow_open_after_us.unwrap() {
2166                    v if v == u64::MAX => v,
2167                    v => v * 1000,
2168                };
2169                layer.disallow_preempt_after_ns = match disallow_preempt_after_us.unwrap() {
2170                    v if v == u64::MAX => v,
2171                    v => v * 1000,
2172                };
2173                layer.xllc_mig_min_ns = (xllc_mig_min_us * 1000.0) as u64;
2174                layer_weights.push(layer.weight.try_into().unwrap());
2175                layer.perf = u32::try_from(*perf)?;
2176
2177                let task_place = |place: u32| crate::types::layer_task_place(place);
2178                layer.task_place = match placement {
2179                    LayerPlacement::Standard => {
2180                        task_place(bpf_intf::layer_task_place_PLACEMENT_STD)
2181                    }
2182                    LayerPlacement::Sticky => {
2183                        task_place(bpf_intf::layer_task_place_PLACEMENT_STICK)
2184                    }
2185                    LayerPlacement::Floating => {
2186                        task_place(bpf_intf::layer_task_place_PLACEMENT_FLOAT)
2187                    }
2188                };
2189            }
2190
2191            layer.is_protected.write(match spec.kind {
2192                LayerKind::Open { .. } => false,
2193                LayerKind::Confined { protected, .. } | LayerKind::Grouped { protected, .. } => {
2194                    protected
2195                }
2196            });
2197
2198            layer.idle_confined.write(match spec.kind {
2199                LayerKind::Grouped { idle_confined, .. } => idle_confined,
2200                _ => false,
2201            });
2202
2203            match &spec.cpuset {
2204                Some(mask) => {
2205                    Self::update_cpumask(mask, &mut layer.cpuset);
2206                    layer.has_cpuset.write(true);
2207                }
2208                None => {
2209                    for i in 0..layer.cpuset.len() {
2210                        layer.cpuset[i] = u8::MAX;
2211                    }
2212                    layer.has_cpuset.write(false);
2213                }
2214            };
2215
2216            perf_set |= layer.perf > 0;
2217        }
2218
2219        layer_iteration_order.sort_by(|i, j| layer_weights[*i].cmp(&layer_weights[*j]));
2220        for (idx, layer_idx) in layer_iteration_order.iter().enumerate() {
2221            skel.maps
2222                .rodata_data
2223                .as_mut()
2224                .unwrap()
2225                .layer_iteration_order[idx] = *layer_idx as u32;
2226        }
2227
2228        if perf_set && !compat::ksym_exists("scx_bpf_cpuperf_set")? {
2229            warn!("cpufreq support not available, ignoring perf configurations");
2230        }
2231
2232        Ok(cgroup_regexes)
2233    }
2234
2235    fn init_nodes(skel: &mut OpenBpfSkel, _opts: &Opts, topo: &Topology) {
2236        skel.maps.rodata_data.as_mut().unwrap().nr_nodes = topo.nodes.len() as u32;
2237        skel.maps.rodata_data.as_mut().unwrap().nr_llcs = 0;
2238
2239        for (&node_id, node) in &topo.nodes {
2240            debug!("configuring node {}, LLCs {:?}", node_id, node.llcs.len());
2241            skel.maps.rodata_data.as_mut().unwrap().nr_llcs += node.llcs.len() as u32;
2242            let raw_numa_slice = node.span.as_raw_slice();
2243            let node_cpumask_slice =
2244                &mut skel.maps.rodata_data.as_mut().unwrap().numa_cpumasks[node_id];
2245            let (left, _) = node_cpumask_slice.split_at_mut(raw_numa_slice.len());
2246            left.clone_from_slice(raw_numa_slice);
2247            debug!(
2248                "node {} mask: {:?}",
2249                node_id,
2250                skel.maps.rodata_data.as_ref().unwrap().numa_cpumasks[node_id]
2251            );
2252
2253            for llc in node.llcs.values() {
2254                debug!("configuring llc {:?} for node {:?}", llc.id, node_id);
2255                skel.maps.rodata_data.as_mut().unwrap().llc_numa_id_map[llc.id] = node_id as u32;
2256            }
2257        }
2258
2259        for cpu in topo.all_cpus.values() {
2260            skel.maps.rodata_data.as_mut().unwrap().cpu_llc_id_map[cpu.id] = cpu.llc_id as u32;
2261        }
2262    }
2263
2264    fn init_cpu_prox_map(topo: &Topology, cpu_ctxs: &mut [bpf_intf::cpu_ctx]) {
2265        let radiate = |mut vec: Vec<usize>, center_id: usize| -> Vec<usize> {
2266            vec.sort_by_key(|&id| (center_id as i32 - id as i32).abs());
2267            vec
2268        };
2269        let radiate_cpu =
2270            |mut vec: Vec<usize>, center_cpu: usize, center_core: usize| -> Vec<usize> {
2271                vec.sort_by_key(|&id| {
2272                    (
2273                        (center_core as i32 - topo.all_cpus.get(&id).unwrap().core_id as i32).abs(),
2274                        (center_cpu as i32 - id as i32).abs(),
2275                    )
2276                });
2277                vec
2278            };
2279
2280        for (&cpu_id, cpu) in &topo.all_cpus {
2281            // Collect the spans.
2282            let mut core_span = topo.all_cores[&cpu.core_id].span.clone();
2283            let llc_span = &topo.all_llcs[&cpu.llc_id].span;
2284            let node_span = &topo.nodes[&cpu.node_id].span;
2285            let sys_span = &topo.span;
2286
2287            // Make the spans exclusive.
2288            let sys_span = sys_span.and(&node_span.not());
2289            let node_span = node_span.and(&llc_span.not());
2290            let llc_span = llc_span.and(&core_span.not());
2291            core_span.clear_cpu(cpu_id).unwrap();
2292
2293            // Convert them into arrays.
2294            let mut sys_order: Vec<usize> = sys_span.iter().collect();
2295            let mut node_order: Vec<usize> = node_span.iter().collect();
2296            let mut llc_order: Vec<usize> = llc_span.iter().collect();
2297            let mut core_order: Vec<usize> = core_span.iter().collect();
2298
2299            // Shuffle them so that different CPUs follow different orders.
2300            // Each CPU radiates in both directions based on the cpu id and
2301            // radiates out to the closest cores based on core ids.
2302
2303            sys_order = radiate_cpu(sys_order, cpu_id, cpu.core_id);
2304            node_order = radiate(node_order, cpu.node_id);
2305            llc_order = radiate_cpu(llc_order, cpu_id, cpu.core_id);
2306            core_order = radiate_cpu(core_order, cpu_id, cpu.core_id);
2307
2308            // Concatenate and record the topology boundaries.
2309            let mut order: Vec<usize> = vec![];
2310            let mut idx: usize = 0;
2311
2312            idx += 1;
2313            order.push(cpu_id);
2314
2315            idx += core_order.len();
2316            order.append(&mut core_order);
2317            let core_end = idx;
2318
2319            idx += llc_order.len();
2320            order.append(&mut llc_order);
2321            let llc_end = idx;
2322
2323            idx += node_order.len();
2324            order.append(&mut node_order);
2325            let node_end = idx;
2326
2327            idx += sys_order.len();
2328            order.append(&mut sys_order);
2329            let sys_end = idx;
2330
2331            debug!(
2332                "CPU[{}] proximity map[{}/{}/{}/{}]: {:?}",
2333                cpu_id, core_end, llc_end, node_end, sys_end, &order
2334            );
2335
2336            // Record in cpu_ctx.
2337            let pmap = &mut cpu_ctxs[cpu_id].prox_map;
2338            for (i, &cpu) in order.iter().enumerate() {
2339                pmap.cpus[i] = cpu as u16;
2340            }
2341            pmap.core_end = core_end as u32;
2342            pmap.llc_end = llc_end as u32;
2343            pmap.node_end = node_end as u32;
2344            pmap.sys_end = sys_end as u32;
2345        }
2346    }
2347
2348    fn convert_cpu_ctxs(cpu_ctxs: Vec<bpf_intf::cpu_ctx>) -> Vec<Vec<u8>> {
2349        cpu_ctxs
2350            .iter()
2351            .map(|cpu_ctx| unsafe { plain::as_bytes(cpu_ctx) }.to_vec())
2352            .collect()
2353    }
2354
2355    fn init_cpus(skel: &BpfSkel, layer_specs: &[LayerSpec], topo: &Topology) -> Result<()> {
2356        let key = (0_u32).to_ne_bytes();
2357        let mut cpu_ctxs: Vec<bpf_intf::cpu_ctx> = vec![];
2358        let cpu_ctxs_vec = skel
2359            .maps
2360            .cpu_ctxs
2361            .lookup_percpu(&key, libbpf_rs::MapFlags::ANY)
2362            .context("Failed to lookup cpu_ctx")?
2363            .unwrap();
2364
2365        let op_layers: Vec<u32> = layer_specs
2366            .iter()
2367            .enumerate()
2368            .filter(|(_idx, spec)| match &spec.kind {
2369                LayerKind::Open { .. } => spec.kind.common().preempt,
2370                _ => false,
2371            })
2372            .map(|(idx, _)| idx as u32)
2373            .collect();
2374        let on_layers: Vec<u32> = layer_specs
2375            .iter()
2376            .enumerate()
2377            .filter(|(_idx, spec)| match &spec.kind {
2378                LayerKind::Open { .. } => !spec.kind.common().preempt,
2379                _ => false,
2380            })
2381            .map(|(idx, _)| idx as u32)
2382            .collect();
2383        let gp_layers: Vec<u32> = layer_specs
2384            .iter()
2385            .enumerate()
2386            .filter(|(_idx, spec)| match &spec.kind {
2387                LayerKind::Grouped { .. } => spec.kind.common().preempt,
2388                _ => false,
2389            })
2390            .map(|(idx, _)| idx as u32)
2391            .collect();
2392        let gn_layers: Vec<u32> = layer_specs
2393            .iter()
2394            .enumerate()
2395            .filter(|(_idx, spec)| match &spec.kind {
2396                LayerKind::Grouped { .. } => !spec.kind.common().preempt,
2397                _ => false,
2398            })
2399            .map(|(idx, _)| idx as u32)
2400            .collect();
2401
2402        // FIXME - this incorrectly assumes all possible CPUs are consecutive.
2403        for cpu in 0..*NR_CPUS_POSSIBLE {
2404            cpu_ctxs.push(
2405                *plain::from_bytes(cpu_ctxs_vec[cpu].as_slice())
2406                    .expect("cpu_ctx: short or misaligned buffer"),
2407            );
2408
2409            let topo_cpu = topo.all_cpus.get(&cpu).unwrap();
2410            let is_big = topo_cpu.core_type == CoreType::Big { turbo: true };
2411            cpu_ctxs[cpu].cpu = cpu as i32;
2412            cpu_ctxs[cpu].layer_id = MAX_LAYERS as u32;
2413            cpu_ctxs[cpu].is_big = is_big;
2414
2415            fastrand::seed(cpu as u64);
2416
2417            let mut ogp_order = op_layers.clone();
2418            ogp_order.append(&mut gp_layers.clone());
2419            fastrand::shuffle(&mut ogp_order);
2420
2421            let mut ogn_order = on_layers.clone();
2422            ogn_order.append(&mut gn_layers.clone());
2423            fastrand::shuffle(&mut ogn_order);
2424
2425            let mut op_order = op_layers.clone();
2426            fastrand::shuffle(&mut op_order);
2427
2428            let mut on_order = on_layers.clone();
2429            fastrand::shuffle(&mut on_order);
2430
2431            let mut gp_order = gp_layers.clone();
2432            fastrand::shuffle(&mut gp_order);
2433
2434            let mut gn_order = gn_layers.clone();
2435            fastrand::shuffle(&mut gn_order);
2436
2437            for i in 0..MAX_LAYERS {
2438                cpu_ctxs[cpu].ogp_layer_order[i] =
2439                    ogp_order.get(i).cloned().unwrap_or(MAX_LAYERS as u32);
2440                cpu_ctxs[cpu].ogn_layer_order[i] =
2441                    ogn_order.get(i).cloned().unwrap_or(MAX_LAYERS as u32);
2442
2443                cpu_ctxs[cpu].op_layer_order[i] =
2444                    op_order.get(i).cloned().unwrap_or(MAX_LAYERS as u32);
2445                cpu_ctxs[cpu].on_layer_order[i] =
2446                    on_order.get(i).cloned().unwrap_or(MAX_LAYERS as u32);
2447                cpu_ctxs[cpu].gp_layer_order[i] =
2448                    gp_order.get(i).cloned().unwrap_or(MAX_LAYERS as u32);
2449                cpu_ctxs[cpu].gn_layer_order[i] =
2450                    gn_order.get(i).cloned().unwrap_or(MAX_LAYERS as u32);
2451            }
2452        }
2453
2454        Self::init_cpu_prox_map(topo, &mut cpu_ctxs);
2455
2456        skel.maps
2457            .cpu_ctxs
2458            .update_percpu(
2459                &key,
2460                &Self::convert_cpu_ctxs(cpu_ctxs),
2461                libbpf_rs::MapFlags::ANY,
2462            )
2463            .context("Failed to update cpu_ctx")?;
2464
2465        Ok(())
2466    }
2467
2468    fn init_single_prox_map_per_llc(
2469        skel: &mut BpfSkel,
2470        topo: &Topology,
2471        prox_map_idx: &usize,
2472    ) -> Result<()> {
2473        for (&llc_id, llc) in &topo.all_llcs {
2474            // Collect the orders.
2475            let mut node_order: Vec<usize> =
2476                topo.nodes[&llc.node_id].llcs.keys().cloned().collect();
2477            let mut sys_order: Vec<usize> = topo.all_llcs.keys().cloned().collect();
2478
2479            // Make the orders exclusive.
2480            sys_order.retain(|id| !node_order.contains(id));
2481            node_order.retain(|&id| id != llc_id);
2482
2483            // Shuffle so that different LLCs follow different orders. See
2484            // init_cpu_prox_map().
2485            fastrand::seed((*prox_map_idx as u64) << 32 | llc_id as u64);
2486            fastrand::shuffle(&mut sys_order);
2487            fastrand::shuffle(&mut node_order);
2488
2489            // Concatenate and record the node boundary.
2490            let mut order: Vec<usize> = vec![];
2491            let mut idx: usize = 0;
2492
2493            idx += 1;
2494            order.push(llc_id);
2495
2496            idx += node_order.len();
2497            order.append(&mut node_order);
2498            let node_end = idx;
2499
2500            idx += sys_order.len();
2501            order.append(&mut sys_order);
2502            let sys_end = idx;
2503
2504            debug!(
2505                "LLC[{}] proximity map {}[{}/{}]: {:?}",
2506                llc_id, prox_map_idx, node_end, sys_end, &order
2507            );
2508
2509            // Record in llc_ctx.
2510            //
2511            // XXX - This would be a lot easier if llc_ctx were in the bss.
2512            // See BpfStats::read().
2513            let key = (llc_id as u32).to_ne_bytes();
2514            let v = skel
2515                .maps
2516                .llc_data
2517                .lookup(&key, libbpf_rs::MapFlags::ANY)
2518                .unwrap()
2519                .unwrap();
2520            let mut llcc: bpf_intf::llc_ctx =
2521                *plain::from_bytes(v.as_slice()).expect("llc_ctx: short or misaligned buffer");
2522
2523            let pmap = &mut llcc.prox_maps[*prox_map_idx];
2524            for (i, &llc_id) in order.iter().enumerate() {
2525                pmap.llcs[i] = llc_id as u16;
2526            }
2527            pmap.node_end = node_end as u32;
2528            pmap.sys_end = sys_end as u32;
2529
2530            skel.maps.llc_data.update(
2531                &key,
2532                unsafe { plain::as_bytes(&llcc) },
2533                libbpf_rs::MapFlags::ANY,
2534            )?
2535        }
2536
2537        Ok(())
2538    }
2539
2540    fn init_llc_prox_map(skel: &mut BpfSkel, topo: &Topology) -> Result<()> {
2541        let num_proximity_maps = bpf_intf::consts_NUM_PROXIMITY_MAPS as usize;
2542        for prox_map_idx in 0..num_proximity_maps {
2543            Self::init_single_prox_map_per_llc(skel, topo, &prox_map_idx)?;
2544        }
2545
2546        Ok(())
2547    }
2548
2549    fn init_node_prox_map(skel: &mut BpfSkel, topo: &Topology) -> Result<()> {
2550        for (&node_id, node) in &topo.nodes {
2551            let mut order: Vec<(usize, usize)> = node
2552                .distance
2553                .iter()
2554                .enumerate()
2555                .filter(|&(nid, _)| nid != node_id)
2556                .map(|(nid, &dist)| (nid, dist))
2557                .collect();
2558            order.sort_by_key(|&(_, dist)| dist);
2559
2560            let key = (node_id as u32).to_ne_bytes();
2561
2562            // The map entry may not exist yet — create a zeroed one.
2563            let v = skel.maps.node_data.lookup(&key, libbpf_rs::MapFlags::ANY);
2564            let mut nodec: bpf_intf::node_ctx = match v {
2565                Ok(Some(v)) => {
2566                    *plain::from_bytes(v.as_slice()).expect("node_ctx: short or misaligned buffer")
2567                }
2568                _ => unsafe { MaybeUninit::zeroed().assume_init() },
2569            };
2570
2571            let pmap = &mut nodec.prox_map;
2572            for (i, &(nid, _)) in order.iter().enumerate() {
2573                pmap.nodes[i] = nid as u16;
2574            }
2575            pmap.sys_end = order.len() as u32;
2576
2577            debug!(
2578                "NODE[{}] prox_map[{}]: {:?}",
2579                node_id,
2580                pmap.sys_end,
2581                &order.iter().map(|(n, d)| (*n, *d)).collect::<Vec<_>>()
2582            );
2583
2584            skel.maps.node_data.update(
2585                &key,
2586                unsafe { plain::as_bytes(&nodec) },
2587                libbpf_rs::MapFlags::ANY,
2588            )?;
2589        }
2590        Ok(())
2591    }
2592
2593    fn init_node_ctx(skel: &mut BpfSkel, topo: &Topology, nr_layers: usize) -> Result<()> {
2594        let all_layers: Vec<u32> = (0..nr_layers as u32).collect();
2595        let node_empty_layers: Vec<Vec<u32>> =
2596            (0..topo.nodes.len()).map(|_| all_layers.clone()).collect();
2597        Self::refresh_node_ctx(skel, topo, &node_empty_layers, true);
2598        Ok(())
2599    }
2600
2601    fn init(
2602        opts: &'a Opts,
2603        layer_specs: &[LayerSpec],
2604        open_object: &'a mut MaybeUninit<OpenObject>,
2605        hint_to_layer_map: &HashMap<u64, HintLayerInfo>,
2606        membw_tracking: bool,
2607    ) -> Result<Self> {
2608        let nr_layers = layer_specs.len();
2609        let mut disable_topology = opts.disable_topology.unwrap_or(false);
2610
2611        let topo = Arc::new(if disable_topology {
2612            Topology::with_flattened_llc_node()?
2613        } else if opts.topology.virt_llc.is_some() {
2614            Topology::with_args(&opts.topology)?
2615        } else {
2616            Topology::new()?
2617        });
2618
2619        /*
2620         * FIXME: scx_layered incorrectly assumes that node, LLC and CPU IDs
2621         * are consecutive. Verify that they are on this system and bail if
2622         * not. It's lucky that core ID is not used anywhere as core IDs are
2623         * not consecutive on some Ryzen CPUs.
2624         */
2625        if topo.nodes.keys().enumerate().any(|(i, &k)| i != k) {
2626            bail!("Holes in node IDs detected: {:?}", topo.nodes.keys());
2627        }
2628        if topo.all_llcs.keys().enumerate().any(|(i, &k)| i != k) {
2629            bail!("Holes in LLC IDs detected: {:?}", topo.all_llcs.keys());
2630        }
2631        if topo.all_cpus.keys().enumerate().any(|(i, &k)| i != k) {
2632            bail!("Holes in CPU IDs detected: {:?}", topo.all_cpus.keys());
2633        }
2634
2635        let netdevs = if opts.netdev_irq_balance {
2636            let devs = read_netdevs()?;
2637            let total_irqs: usize = devs.values().map(|d| d.irqs.len()).sum();
2638            let breakdown = devs
2639                .iter()
2640                .map(|(iface, d)| format!("{iface}={}", d.irqs.len()))
2641                .collect::<Vec<_>>()
2642                .join(", ");
2643            info!(
2644                "Netdev IRQ balancing enabled: overriding {total_irqs} IRQ{} \
2645                 across {} interface{} [{breakdown}]",
2646                if total_irqs == 1 { "" } else { "s" },
2647                devs.len(),
2648                if devs.len() == 1 { "" } else { "s" },
2649            );
2650            devs
2651        } else {
2652            BTreeMap::new()
2653        };
2654
2655        if !disable_topology {
2656            if topo.nodes.len() == 1 && topo.nodes[&0].llcs.len() == 1 {
2657                disable_topology = true;
2658            };
2659            info!(
2660                "Topology awareness not specified, selecting {} based on hardware",
2661                if disable_topology {
2662                    "disabled"
2663                } else {
2664                    "enabled"
2665                }
2666            );
2667        };
2668
2669        let cpu_pool = CpuPool::new(topo.clone(), opts.allow_partial_core)?;
2670
2671        // If disabling topology awareness clear out any set NUMA/LLC configs and
2672        // it will fallback to using all cores.
2673        let layer_specs: Vec<_> = if disable_topology {
2674            info!("Disabling topology awareness");
2675            layer_specs
2676                .iter()
2677                .cloned()
2678                .map(|mut s| {
2679                    s.kind.common_mut().nodes.clear();
2680                    s.kind.common_mut().llcs.clear();
2681                    s
2682                })
2683                .collect()
2684        } else {
2685            layer_specs.to_vec()
2686        };
2687
2688        // Validate that spec node/LLC references exist in the topology.
2689        for spec in layer_specs.iter() {
2690            let mut seen = BTreeSet::new();
2691            for &node_id in spec.nodes().iter() {
2692                if !topo.nodes.contains_key(&node_id) {
2693                    bail!(
2694                        "layer {:?}: nodes references node {} which does not \
2695                         exist in the topology (available: {:?})",
2696                        spec.name,
2697                        node_id,
2698                        topo.nodes.keys().collect::<Vec<_>>()
2699                    );
2700                }
2701                if !seen.insert(node_id) {
2702                    bail!(
2703                        "layer {:?}: nodes contains duplicate node {}",
2704                        spec.name,
2705                        node_id
2706                    );
2707                }
2708            }
2709
2710            seen.clear();
2711            for &llc_id in spec.llcs().iter() {
2712                if !topo.all_llcs.contains_key(&llc_id) {
2713                    bail!(
2714                        "layer {:?}: llcs references LLC {} which does not \
2715                         exist in the topology (available: {:?})",
2716                        spec.name,
2717                        llc_id,
2718                        topo.all_llcs.keys().collect::<Vec<_>>()
2719                    );
2720                }
2721                if !seen.insert(llc_id) {
2722                    bail!(
2723                        "layer {:?}: llcs contains duplicate LLC {}",
2724                        spec.name,
2725                        llc_id
2726                    );
2727                }
2728            }
2729        }
2730
2731        for spec in layer_specs.iter() {
2732            let has_numa_node_match = spec
2733                .matches
2734                .iter()
2735                .flatten()
2736                .any(|m| matches!(m, LayerMatch::NumaNode(_)));
2737            let has_node_spread_algo = matches!(
2738                spec.kind.common().growth_algo,
2739                LayerGrowthAlgo::NodeSpread
2740                    | LayerGrowthAlgo::NodeSpreadReverse
2741                    | LayerGrowthAlgo::NodeSpreadRandom
2742            );
2743            if has_numa_node_match && has_node_spread_algo {
2744                bail!(
2745                    "layer {:?}: NumaNode matcher cannot be combined with {:?} \
2746                     growth algorithm. NodeSpread* allocates CPUs equally across \
2747                     ALL NUMA nodes, but NumaNode restricts tasks to one node's \
2748                     CPUs — CPUs on other nodes are wasted and utilization \
2749                     will never exceed 1/numa_nodes. Use a non-spread algorithm \
2750                     (e.g. Linear, Topo) instead.",
2751                    spec.name,
2752                    spec.kind.common().growth_algo
2753                );
2754            }
2755        }
2756
2757        // Check kernel features
2758        init_libbpf_logging(None);
2759        let kfuncs_in_syscall = scx_bpf_compat::kfuncs_supported_in_syscall()?;
2760        if !kfuncs_in_syscall {
2761            warn!("Using slow path: kfuncs not supported in syscall programs (a8e03b6bbb2c ∉ ker)");
2762        }
2763
2764        // Open the BPF prog first for verification.
2765        let debug_level = if opts.log_level.contains("trace") {
2766            2
2767        } else if opts.log_level.contains("debug") {
2768            1
2769        } else {
2770            0
2771        };
2772        let mut skel_builder = BpfSkelBuilder::default();
2773        skel_builder.obj_builder.debug(debug_level > 1);
2774
2775        info!(
2776            "Running scx_layered (build ID: {})",
2777            build_id::full_version(env!("CARGO_PKG_VERSION"))
2778        );
2779        let open_opts = opts.libbpf.clone().into_bpf_open_opts();
2780        let mut skel = scx_ops_open!(skel_builder, open_object, layered, open_opts)?;
2781
2782        // No memory BW tracking by default
2783        skel.progs.scx_pmu_switch_tc.set_autoload(membw_tracking);
2784        skel.progs.scx_pmu_tick_tc.set_autoload(membw_tracking);
2785
2786        let mut loaded_kprobes = HashSet::new();
2787
2788        // enable autoloads for conditionally loaded things
2789        // immediately after creating skel (because this is always before loading)
2790        if opts.enable_gpu_support {
2791            // by default, enable open if gpu support is enabled.
2792            // open has been observed to be relatively cheap to kprobe.
2793            if opts.gpu_kprobe_level >= 1 {
2794                compat::cond_kprobe_load("nvidia_open", &skel.progs.kprobe_nvidia_open)?;
2795                loaded_kprobes.insert("nvidia_open");
2796            }
2797            // enable the rest progressively based upon how often they are called
2798            // for observed workloads
2799            if opts.gpu_kprobe_level >= 2 {
2800                compat::cond_kprobe_load("nvidia_mmap", &skel.progs.kprobe_nvidia_mmap)?;
2801                loaded_kprobes.insert("nvidia_mmap");
2802            }
2803            if opts.gpu_kprobe_level >= 3 {
2804                compat::cond_kprobe_load("nvidia_poll", &skel.progs.kprobe_nvidia_poll)?;
2805                loaded_kprobes.insert("nvidia_poll");
2806            }
2807        }
2808
2809        let ext_sched_class_addr = get_kallsyms_addr("ext_sched_class");
2810        let idle_sched_class_addr = get_kallsyms_addr("idle_sched_class");
2811
2812        let event = if membw_tracking {
2813            setup_membw_tracking(&mut skel)?
2814        } else {
2815            0
2816        };
2817
2818        let rodata = skel.maps.rodata_data.as_mut().unwrap();
2819
2820        if let (Ok(ext_addr), Ok(idle_addr)) = (ext_sched_class_addr, idle_sched_class_addr) {
2821            rodata.ext_sched_class_addr = ext_addr;
2822            rodata.idle_sched_class_addr = idle_addr;
2823        } else {
2824            warn!(
2825                "Unable to get sched_class addresses from /proc/kallsyms, disabling skip_preempt."
2826            );
2827        }
2828
2829        rodata.slice_ns = scx_enums.SCX_SLICE_DFL;
2830        rodata.max_exec_ns = 20 * scx_enums.SCX_SLICE_DFL;
2831
2832        // Initialize skel according to @opts.
2833        skel.struct_ops.layered_mut().exit_dump_len = opts.exit_dump_len;
2834
2835        if !opts.disable_queued_wakeup {
2836            match *compat::SCX_OPS_ALLOW_QUEUED_WAKEUP {
2837                0 => info!("Kernel does not support queued wakeup optimization"),
2838                v => skel.struct_ops.layered_mut().flags |= v,
2839            }
2840        }
2841
2842        rodata.percpu_kthread_preempt = !opts.disable_percpu_kthread_preempt;
2843        rodata.percpu_kthread_preempt_all =
2844            !opts.disable_percpu_kthread_preempt && opts.percpu_kthread_preempt_all;
2845        rodata.debug = debug_level as u32;
2846        rodata.slice_ns = opts.slice_us * 1000;
2847        rodata.max_exec_ns = if opts.max_exec_us > 0 {
2848            opts.max_exec_us * 1000
2849        } else {
2850            opts.slice_us * 1000 * 20
2851        };
2852        rodata.nr_cpu_ids = *NR_CPU_IDS as u32;
2853        rodata.nr_possible_cpus = *NR_CPUS_POSSIBLE as u32;
2854        rodata.smt_enabled = topo.smt_enabled;
2855        rodata.has_little_cores = topo.has_little_cores();
2856        rodata.antistall_sec = opts.antistall_sec;
2857        rodata.monitor_disable = opts.monitor_disable;
2858        rodata.lo_fb_wait_ns = opts.lo_fb_wait_us * 1000;
2859        rodata.lo_fb_share_ppk = ((opts.lo_fb_share * 1024.0) as u32).clamp(1, 1024);
2860        rodata.enable_antistall = !opts.disable_antistall;
2861        rodata.enable_match_debug = opts.enable_match_debug;
2862        rodata.enable_gpu_support = opts.enable_gpu_support;
2863        rodata.kfuncs_supported_in_syscall = kfuncs_in_syscall;
2864
2865        for (cpu, sib) in topo.sibling_cpus().iter().enumerate() {
2866            rodata.__sibling_cpu[cpu] = *sib;
2867        }
2868        for cpu in topo.all_cpus.keys() {
2869            rodata.all_cpus[cpu / 8] |= 1 << (cpu % 8);
2870        }
2871
2872        rodata.nr_op_layers = layer_specs
2873            .iter()
2874            .filter(|spec| match &spec.kind {
2875                LayerKind::Open { .. } => spec.kind.common().preempt,
2876                _ => false,
2877            })
2878            .count() as u32;
2879        rodata.nr_on_layers = layer_specs
2880            .iter()
2881            .filter(|spec| match &spec.kind {
2882                LayerKind::Open { .. } => !spec.kind.common().preempt,
2883                _ => false,
2884            })
2885            .count() as u32;
2886        rodata.nr_gp_layers = layer_specs
2887            .iter()
2888            .filter(|spec| match &spec.kind {
2889                LayerKind::Grouped { .. } => spec.kind.common().preempt,
2890                _ => false,
2891            })
2892            .count() as u32;
2893        rodata.nr_gn_layers = layer_specs
2894            .iter()
2895            .filter(|spec| match &spec.kind {
2896                LayerKind::Grouped { .. } => !spec.kind.common().preempt,
2897                _ => false,
2898            })
2899            .count() as u32;
2900        rodata.nr_excl_layers = layer_specs
2901            .iter()
2902            .filter(|spec| spec.kind.common().exclusive)
2903            .count() as u32;
2904
2905        let mut min_open = u64::MAX;
2906        let mut min_preempt = u64::MAX;
2907
2908        for spec in layer_specs.iter() {
2909            if let LayerKind::Open { common, .. } = &spec.kind {
2910                min_open = min_open.min(common.disallow_open_after_us.unwrap());
2911                min_preempt = min_preempt.min(common.disallow_preempt_after_us.unwrap());
2912            }
2913        }
2914
2915        rodata.min_open_layer_disallow_open_after_ns = match min_open {
2916            u64::MAX => *DFL_DISALLOW_OPEN_AFTER_US,
2917            v => v,
2918        };
2919        rodata.min_open_layer_disallow_preempt_after_ns = match min_preempt {
2920            u64::MAX => *DFL_DISALLOW_PREEMPT_AFTER_US,
2921            v => v,
2922        };
2923
2924        // We set the pin path before loading the skeleton. This will ensure
2925        // libbpf creates and pins the map, or reuses the pinned map fd for us,
2926        // so that we can keep reusing the older map already pinned on scheduler
2927        // restarts.
2928        let layered_task_hint_map_path = &opts.task_hint_map;
2929        let hint_map = &mut skel.maps.scx_layered_task_hint_map;
2930        // Only set pin path if a path is provided.
2931        if !layered_task_hint_map_path.is_empty() {
2932            hint_map.set_pin_path(layered_task_hint_map_path).unwrap();
2933            rodata.task_hint_map_enabled = true;
2934        }
2935
2936        if !opts.hi_fb_thread_name.is_empty() {
2937            let bpf_hi_fb_thread_name = &mut rodata.hi_fb_thread_name;
2938            copy_into_cstr(bpf_hi_fb_thread_name, opts.hi_fb_thread_name.as_str());
2939            rodata.enable_hi_fb_thread_name_match = true;
2940        }
2941
2942        let cgroup_regexes = Self::init_layers(&mut skel, &layer_specs, &topo)?;
2943        skel.maps.rodata_data.as_mut().unwrap().nr_cgroup_regexes = cgroup_regexes.len() as u32;
2944        Self::init_nodes(&mut skel, opts, &topo);
2945
2946        let mut skel = scx_ops_load!(skel, layered, uei)?;
2947
2948        // Populate the mapping of hints to layer IDs for faster lookups
2949        if !hint_to_layer_map.is_empty() {
2950            for (k, v) in hint_to_layer_map.iter() {
2951                let key: u32 = *k as u32;
2952
2953                // Create hint_layer_info struct
2954                let mut info_bytes = vec![0u8; std::mem::size_of::<bpf_intf::hint_layer_info>()];
2955                let info_ptr = info_bytes.as_mut_ptr() as *mut bpf_intf::hint_layer_info;
2956                unsafe {
2957                    (*info_ptr).layer_id = v.layer_id as u32;
2958                    (*info_ptr).system_cpu_util_below = match v.system_cpu_util_below {
2959                        Some(threshold) => (threshold * 10000.0) as u64,
2960                        None => u64::MAX, // disabled sentinel
2961                    };
2962                    (*info_ptr).dsq_insert_below = match v.dsq_insert_below {
2963                        Some(threshold) => (threshold * 10000.0) as u64,
2964                        None => u64::MAX, // disabled sentinel
2965                    };
2966                }
2967
2968                skel.maps.hint_to_layer_id_map.update(
2969                    &key.to_ne_bytes(),
2970                    &info_bytes,
2971                    libbpf_rs::MapFlags::ANY,
2972                )?;
2973            }
2974        }
2975
2976        if membw_tracking {
2977            create_perf_fds(&mut skel, event)?;
2978        }
2979
2980        let mut layers = vec![];
2981        let layer_growth_orders =
2982            LayerGrowthAlgo::layer_core_orders(&cpu_pool, &layer_specs, &topo)?;
2983        for (idx, spec) in layer_specs.iter().enumerate() {
2984            let growth_order = layer_growth_orders
2985                .get(&idx)
2986                .with_context(|| "layer has no growth order".to_string())?;
2987            layers.push(Layer::new(spec, &topo, growth_order)?);
2988        }
2989
2990        let mut idle_qos_enabled = layers
2991            .iter()
2992            .any(|layer| layer.kind.common().idle_resume_us.unwrap_or(0) > 0);
2993        if idle_qos_enabled && !cpu_idle_resume_latency_supported() {
2994            warn!("idle_resume_us not supported, ignoring");
2995            idle_qos_enabled = false;
2996        }
2997
2998        Self::init_cpus(&skel, &layer_specs, &topo)?;
2999        Self::init_llc_prox_map(&mut skel, &topo)?;
3000        Self::init_node_prox_map(&mut skel, &topo)?;
3001        Self::init_node_ctx(&mut skel, &topo, nr_layers)?;
3002
3003        // Other stuff.
3004        let proc_reader = fb_procfs::ProcReader::new();
3005
3006        // Handle setup if layered is running in a pid namespace.
3007        let input = ProgramInput {
3008            ..Default::default()
3009        };
3010        let prog = &mut skel.progs.initialize_pid_namespace;
3011
3012        let _ = prog.test_run(input);
3013
3014        // XXX If we try to refresh the cpumasks here before attaching, we
3015        // sometimes (non-deterministically) don't see the updated values in
3016        // BPF. It would be better to update the cpumasks here before we
3017        // attach, but the value will quickly converge anyways so it's not a
3018        // huge problem in the interim until we figure it out.
3019
3020        // Allow all tasks to open and write to BPF task hint map, now that
3021        // we should have it pinned at the desired location.
3022        if !layered_task_hint_map_path.is_empty() {
3023            let path = CString::new(layered_task_hint_map_path.as_bytes()).unwrap();
3024            let mode: libc::mode_t = 0o666;
3025            unsafe {
3026                if libc::chmod(path.as_ptr(), mode) != 0 {
3027                    trace!("'chmod' to 666 of task hint map failed, continuing...");
3028                }
3029            }
3030        }
3031
3032        // Attach.
3033        let struct_ops = scx_ops_attach!(skel, layered)?;
3034
3035        // Turn on installed kprobes
3036        if opts.enable_gpu_support {
3037            if loaded_kprobes.contains("nvidia_open") {
3038                compat::cond_kprobe_attach("nvidia_open", &skel.progs.kprobe_nvidia_open)?;
3039            }
3040            if loaded_kprobes.contains("nvidia_mmap") {
3041                compat::cond_kprobe_attach("nvidia_mmap", &skel.progs.kprobe_nvidia_mmap)?;
3042            }
3043            if loaded_kprobes.contains("nvidia_poll") {
3044                compat::cond_kprobe_attach("nvidia_poll", &skel.progs.kprobe_nvidia_poll)?;
3045            }
3046        }
3047
3048        let stats_server = StatsServer::new(stats::server_data()).launch()?;
3049        let mut gpu_task_handler =
3050            GpuTaskAffinitizer::new(opts.gpu_affinitize_secs, opts.enable_gpu_affinitize);
3051        gpu_task_handler.init(topo.clone());
3052
3053        let sched = Self {
3054            struct_ops: Some(struct_ops),
3055            layer_specs,
3056
3057            sched_intv: Duration::from_secs_f64(opts.interval),
3058            layer_refresh_intv: Duration::from_millis(opts.layer_refresh_ms_avgruntime),
3059
3060            cpu_pool,
3061            layers,
3062            idle_qos_enabled,
3063
3064            sched_stats: Stats::new(
3065                &mut skel,
3066                &proc_reader,
3067                topo.clone(),
3068                &gpu_task_handler,
3069                opts.util_compensation,
3070            )?,
3071
3072            cgroup_regexes: Some(cgroup_regexes),
3073            nr_layer_cpus_ranges: vec![(0, 0); nr_layers],
3074            xnuma_mig_src: vec![vec![false; topo.nodes.len()]; nr_layers],
3075            growth_denied: vec![vec![false; topo.nodes.len()]; nr_layers],
3076            processing_dur: Default::default(),
3077
3078            proc_reader,
3079            skel,
3080
3081            topo,
3082            netdevs,
3083            stats_server,
3084            gpu_task_handler,
3085        };
3086
3087        info!("Layered Scheduler Attached. Run `scx_layered --monitor` for metrics.");
3088
3089        Ok(sched)
3090    }
3091
3092    fn update_cpumask(mask: &Cpumask, bpfmask: &mut [u8]) {
3093        for cpu in 0..mask.len() {
3094            if mask.test_cpu(cpu) {
3095                bpfmask[cpu / 8] |= 1 << (cpu % 8);
3096            } else {
3097                bpfmask[cpu / 8] &= !(1 << (cpu % 8));
3098            }
3099        }
3100    }
3101
3102    fn update_bpf_layer_cpumask(layer: &Layer, bpf_layer: &mut types::layer) {
3103        trace!("[{}] Updating BPF CPUs: {}", layer.name, &layer.cpus);
3104        Self::update_cpumask(&layer.cpus, &mut bpf_layer.cpus);
3105
3106        bpf_layer.nr_cpus = layer.nr_cpus as u32;
3107        for (llc_id, &nr_llc_cpus) in layer.nr_llc_cpus.iter().enumerate() {
3108            bpf_layer.nr_llc_cpus[llc_id] = nr_llc_cpus as u32;
3109        }
3110        for (node_id, &nr_node_cpus) in layer.nr_node_cpus.iter().enumerate() {
3111            bpf_layer.node[node_id].nr_cpus = nr_node_cpus as u32;
3112        }
3113
3114        bpf_layer.refresh_cpus = 1;
3115    }
3116
3117    fn update_netdev_cpumasks(&mut self) -> Result<()> {
3118        let available_cpus = self.cpu_pool.available_cpus();
3119        if available_cpus.is_empty() {
3120            return Ok(());
3121        }
3122
3123        for (iface, netdev) in self.netdevs.iter_mut() {
3124            let node = self
3125                .topo
3126                .nodes
3127                .values()
3128                .find(|n| n.id == netdev.node())
3129                .ok_or_else(|| anyhow!("Failed to get netdev node"))?;
3130            let node_cpus = node.span.clone();
3131            for (irq, irqmask) in netdev.irqs.iter_mut() {
3132                irqmask.clear_all();
3133                for cpu in available_cpus.iter() {
3134                    if !node_cpus.test_cpu(cpu) {
3135                        continue;
3136                    }
3137                    let _ = irqmask.set_cpu(cpu);
3138                }
3139                // If no CPUs are available in the node then spread the load across the node
3140                if irqmask.weight() == 0 {
3141                    for cpu in node_cpus.iter() {
3142                        let _ = irqmask.set_cpu(cpu);
3143                    }
3144                }
3145                trace!("{} updating irq {} cpumask {:?}", iface, irq, irqmask);
3146            }
3147            netdev.apply_cpumasks()?;
3148            debug!(
3149                "{iface}: applied affinity override to {} IRQ{}",
3150                netdev.irqs.len(),
3151                if netdev.irqs.len() == 1 { "" } else { "s" },
3152            );
3153        }
3154
3155        Ok(())
3156    }
3157
3158    fn clamp_target_by_membw(
3159        &self,
3160        layer: &Layer,
3161        membw_limit: f64,
3162        membw: f64,
3163        curtarget: u64,
3164    ) -> usize {
3165        let ncpu: u64 = layer.cpus.weight() as u64;
3166        let membw = (membw * 1024_f64.powf(3.0)).round() as u64;
3167        let membw_limit = (membw_limit * 1024_f64.powf(3.0)).round() as u64;
3168        let last_membw_percpu = if ncpu > 0 { membw / ncpu } else { 0 };
3169
3170        // Either there is no memory bandwidth limit set, or the counters
3171        // are not fully initialized yet. Just return the current target.
3172        if membw_limit == 0 || last_membw_percpu == 0 {
3173            return curtarget as usize;
3174        }
3175
3176        (membw_limit / last_membw_percpu) as usize
3177    }
3178
3179    /// Decompose per-layer CPU targets into per-node pinned demand and
3180    /// unpinned demand for unified_alloc(). Uses layer_node_pinned_utils
3181    /// to split each layer's target. All outputs are in alloc units.
3182    fn calc_raw_demands(&self, targets: &[(usize, usize)]) -> Vec<LayerDemand> {
3183        let au = self.cpu_pool.alloc_unit();
3184        let pinned_utils = &self.sched_stats.layer_node_pinned_utils;
3185        let nr_nodes = self.topo.nodes.len();
3186
3187        targets
3188            .iter()
3189            .enumerate()
3190            .map(|(idx, &(target, _min))| {
3191                let layer = &self.layers[idx];
3192                let weight = layer.kind.common().weight as usize;
3193
3194                // Open layers don't participate in allocation.
3195                if matches!(layer.kind, LayerKind::Open { .. }) {
3196                    return LayerDemand {
3197                        raw_pinned: vec![0; nr_nodes],
3198                        raw_unpinned: 0,
3199                        weight,
3200                        spread: false,
3201                    };
3202                }
3203
3204                // Only NodeSpread* keeps strict equal-per-node placement
3205                // via the spread:bool path.  RoundRobin migrated to a
3206                // single-tier node_groups (balanced via cap-weighted
3207                // water_fill in place_unpinned, no bottleneck cap).
3208                let spread = matches!(
3209                    layer.growth_algo,
3210                    LayerGrowthAlgo::NodeSpread
3211                        | LayerGrowthAlgo::NodeSpreadReverse
3212                        | LayerGrowthAlgo::NodeSpreadRandom
3213                );
3214
3215                let util_high = match &layer.kind {
3216                    LayerKind::Confined { util_range, .. }
3217                    | LayerKind::Grouped { util_range, .. } => util_range.1,
3218                    _ => 1.0,
3219                };
3220
3221                // Convert per-node pinned utilization to CPU demand.
3222                let mut raw_pinned = vec![0usize; nr_nodes];
3223                for n in 0..nr_nodes {
3224                    let pu = pinned_utils[idx][n];
3225                    if pu < 0.01 {
3226                        continue;
3227                    }
3228                    // Check this layer has allowed_cpus on this node.
3229                    let node_span = &self.topo.nodes[&n].span;
3230                    if layer.allowed_cpus.and(node_span).is_empty() {
3231                        continue;
3232                    }
3233                    let cpus = (pu / util_high).ceil() as usize;
3234                    // Round up to alloc units.
3235                    let units = cpus.div_ceil(au);
3236                    raw_pinned[n] = units;
3237                }
3238
3239                // Unpinned = remainder of the target.
3240                let target_units = target.div_ceil(au);
3241                let pinned_units: usize = raw_pinned.iter().sum();
3242                let raw_unpinned = target_units.saturating_sub(pinned_units);
3243
3244                LayerDemand {
3245                    raw_pinned,
3246                    raw_unpinned,
3247                    weight,
3248                    spread,
3249                }
3250            })
3251            .collect()
3252    }
3253
3254    /// Calculate how many CPUs each layer would like to have if there were
3255    /// no competition. When util_compensation is enabled, compensated
3256    /// utilization (scaled for irq/softirq/stolen overhead) is used
3257    /// instead of raw utilization. The CPU range is determined by
3258    /// applying the inverse of util_range and capping by cpus_range.
3259    /// If the current allocation is within the acceptable range, no
3260    /// change is made. Returns (target, min) pair for each layer.
3261    fn calc_target_nr_cpus(&self) -> Vec<(usize, usize)> {
3262        let nr_cpus = self.cpu_pool.topo.all_cpus.len();
3263        let utils = if self.sched_stats.util_compensation {
3264            &self.sched_stats.layer_utils_compensated
3265        } else {
3266            &self.sched_stats.layer_utils
3267        };
3268        let membws = &self.sched_stats.layer_membws;
3269
3270        let mut records: Vec<(u64, u64, u64, usize, usize, usize)> = vec![];
3271        let mut targets: Vec<(usize, usize)> = vec![];
3272
3273        for (idx, layer) in self.layers.iter().enumerate() {
3274            targets.push(match &layer.kind {
3275                LayerKind::Confined {
3276                    util_range,
3277                    cpus_range,
3278                    cpus_range_frac,
3279                    membw_gb,
3280                    ..
3281                }
3282                | LayerKind::Grouped {
3283                    util_range,
3284                    cpus_range,
3285                    cpus_range_frac,
3286                    membw_gb,
3287                    ..
3288                } => {
3289                    let cpus_range =
3290                        resolve_cpus_pct_range(cpus_range, cpus_range_frac, nr_cpus).unwrap();
3291
3292                    // A grouped layer can choose to include open cputime
3293                    // for sizing. Also, as an empty layer can only get CPU
3294                    // time through fallback (counted as owned) or open
3295                    // execution, add open cputime for empty layers.
3296                    let owned = utils[idx][LAYER_USAGE_OWNED];
3297                    let open = utils[idx][LAYER_USAGE_OPEN];
3298
3299                    let membw_owned = membws[idx][LAYER_USAGE_OWNED];
3300                    let membw_open = membws[idx][LAYER_USAGE_OPEN];
3301
3302                    let mut util = owned;
3303                    let mut membw = membw_owned;
3304                    if layer.kind.util_includes_open_cputime() || layer.nr_cpus == 0 {
3305                        util += open;
3306                        membw += membw_open;
3307                    }
3308
3309                    let util = if util < 0.01 { 0.0 } else { util };
3310
3311                    let low = (util / util_range.1).ceil() as usize;
3312                    let high = ((util / util_range.0).floor() as usize).max(low);
3313
3314                    let membw_limit = match membw_gb {
3315                        Some(membw_limit) => *membw_limit,
3316                        None => 0.0,
3317                    };
3318
3319                    trace!(
3320                        "layer {0} (membw, membw_limit): ({membw} gi_b, {membw_limit} gi_b)",
3321                        layer.name
3322                    );
3323
3324                    let target = layer.cpus.weight().clamp(low, high);
3325
3326                    records.push((
3327                        (owned * 100.0) as u64,
3328                        (open * 100.0) as u64,
3329                        (util * 100.0) as u64,
3330                        low,
3331                        high,
3332                        target,
3333                    ));
3334
3335                    let target = target.clamp(cpus_range.0, cpus_range.1);
3336                    let membw_target =
3337                        self.clamp_target_by_membw(layer, membw_limit, membw, target as u64);
3338
3339                    trace!("CPU target pre- and post-membw adjustment: {target} -> {membw_target}");
3340
3341                    // If there's no way to drop our memory usage down enough,
3342                    // pin the target CPUs to low and drop a warning.
3343                    if membw_target < cpus_range.0 {
3344                        warn!("cannot satisfy memory bw limit for layer {}", layer.name);
3345                        warn!("membw_target {membw_target} low {}", cpus_range.0);
3346                    };
3347
3348                    // Memory bandwidth target cannot override imposed limits or bump
3349                    // the target above what CPU usage-based throttling requires.
3350                    let target = membw_target.clamp(cpus_range.0, target);
3351
3352                    (target, cpus_range.0)
3353                }
3354                LayerKind::Open { .. } => (0, 0),
3355            });
3356        }
3357
3358        trace!("(owned, open, util, low, high, target): {:?}", &records);
3359        targets
3360    }
3361
3362    // Figure out a tuple (LLCs, extra_cpus) in terms of the target CPUs
3363    // computed by weighted_target_nr_cpus. Returns the number of full LLCs
3364    // occupied by a layer, and any extra CPUs that don't occupy a full LLC.
3365    fn compute_target_llcs(target: usize, topo: &Topology) -> (usize, usize) {
3366        // TODO(kkd): We assume each LLC has equal number of cores.
3367        let cores_per_llc = topo.all_cores.len() / topo.all_llcs.len();
3368        // TODO(kkd): We assume each core has fixed number of threads.
3369        let cpus_per_core = topo.all_cores.first_key_value().unwrap().1.cpus.len();
3370        let cpus_per_llc = cores_per_llc * cpus_per_core;
3371
3372        let full = target / cpus_per_llc;
3373        let extra = target % cpus_per_llc;
3374
3375        (full, extra.div_ceil(cpus_per_core))
3376    }
3377
3378    // Recalculate the core order for layers using StickyDynamic growth
3379    // algorithm. Uses per-node targets from unified_alloc() to decide how
3380    // many LLCs each layer gets on each node, then builds core_order and
3381    // applies CPU changes.
3382    fn recompute_layer_core_order(
3383        &mut self,
3384        layer_targets: &[(usize, usize)],
3385        layer_allocs: &[LayerAlloc],
3386        au: usize,
3387    ) -> Result<bool> {
3388        let nr_nodes = self.topo.nodes.len();
3389
3390        // Phase 1 — Free per-node: return excess LLCs to cpu_pool.
3391        debug!(
3392            " free: before pass: free_llcs={:?}",
3393            self.cpu_pool.free_llcs
3394        );
3395        for &(idx, _) in layer_targets.iter().rev() {
3396            let layer = &mut self.layers[idx];
3397
3398            if layer.growth_algo != LayerGrowthAlgo::StickyDynamic {
3399                continue;
3400            }
3401
3402            let alloc = &layer_allocs[idx];
3403
3404            for n in 0..nr_nodes {
3405                let assigned_on_n = layer.assigned_llcs[n].len();
3406                let target_full_n =
3407                    Self::compute_target_llcs(alloc.node_target(n) * au, &self.topo).0;
3408                let mut to_free = assigned_on_n.saturating_sub(target_full_n);
3409
3410                debug!(
3411                    " free: layer={} node={} assigned={} target_full={} to_free={}",
3412                    layer.name, n, assigned_on_n, target_full_n, to_free,
3413                );
3414
3415                while to_free > 0 {
3416                    if let Some(llc) = layer.assigned_llcs[n].pop() {
3417                        self.cpu_pool.return_llc(llc);
3418                        to_free -= 1;
3419                        debug!(" layer={} freed_llc={} from node={}", layer.name, llc, n);
3420                    } else {
3421                        break;
3422                    }
3423                }
3424            }
3425        }
3426        debug!(" free: after pass: free_llcs={:?}", self.cpu_pool.free_llcs);
3427
3428        // Phase 2 — Acquire per-node: claim LLCs from cpu_pool.
3429        for &(idx, _) in layer_targets.iter().rev() {
3430            let layer = &mut self.layers[idx];
3431
3432            if layer.growth_algo != LayerGrowthAlgo::StickyDynamic {
3433                continue;
3434            }
3435
3436            let alloc = &layer_allocs[idx];
3437
3438            for n in 0..nr_nodes {
3439                let cur_on_n = layer.assigned_llcs[n].len();
3440                let target_full_n =
3441                    Self::compute_target_llcs(alloc.node_target(n) * au, &self.topo).0;
3442                let mut to_alloc = target_full_n.saturating_sub(cur_on_n);
3443
3444                debug!(
3445                    " alloc: layer={} node={} cur={} target_full={} to_alloc={} free={}",
3446                    layer.name,
3447                    n,
3448                    cur_on_n,
3449                    target_full_n,
3450                    to_alloc,
3451                    self.cpu_pool.free_llcs.get(&n).map_or(0, |v| v.len()),
3452                );
3453
3454                while to_alloc > 0 {
3455                    if let Some(llc) = self.cpu_pool.take_llc_from_node(n) {
3456                        layer.assigned_llcs[n].push(llc);
3457                        to_alloc -= 1;
3458                        debug!(" layer={} alloc_llc={} on node={}", layer.name, llc, n);
3459                    } else {
3460                        break;
3461                    }
3462                }
3463            }
3464
3465            debug!(
3466                " alloc: layer={} assigned_llcs={:?}",
3467                layer.name, layer.assigned_llcs
3468            );
3469        }
3470
3471        // Phase 3 — Spillover per-node: consume extra cores from free LLCs.
3472        let cores_per_llc = self.topo.all_cores.len() / self.topo.all_llcs.len();
3473        let cpus_per_core = self.topo.all_cores.first_key_value().unwrap().1.cpus.len();
3474        let cpus_per_llc = cores_per_llc * cpus_per_core;
3475
3476        for &(idx, _) in layer_targets.iter() {
3477            let layer = &mut self.layers[idx];
3478
3479            if layer.growth_algo != LayerGrowthAlgo::StickyDynamic {
3480                continue;
3481            }
3482
3483            layer.core_order = vec![Vec::new(); nr_nodes];
3484            let alloc = &layer_allocs[idx];
3485
3486            for n in 0..nr_nodes {
3487                let mut extra = Self::compute_target_llcs(alloc.node_target(n) * au, &self.topo).1;
3488
3489                if let Some(node_llcs) = self.cpu_pool.free_llcs.get_mut(&n) {
3490                    for entry in node_llcs.iter_mut() {
3491                        if extra == 0 {
3492                            break;
3493                        }
3494                        let avail = cpus_per_llc - entry.1;
3495                        let mut used = extra.min(avail);
3496                        let cores_to_add = used;
3497
3498                        let shift = entry.1;
3499                        entry.1 += used;
3500
3501                        let llc_id = entry.0;
3502                        let llc = self.topo.all_llcs.get(&llc_id).unwrap();
3503
3504                        for core in llc.cores.iter().skip(shift) {
3505                            if used == 0 {
3506                                break;
3507                            }
3508                            layer.core_order[n].push(core.1.id);
3509                            used -= 1;
3510                        }
3511
3512                        extra -= cores_to_add;
3513                    }
3514                }
3515            }
3516
3517            for node_cores in &mut layer.core_order {
3518                node_cores.reverse();
3519            }
3520        }
3521
3522        // Reset consumed entries in free LLCs.
3523        for node_llcs in self.cpu_pool.free_llcs.values_mut() {
3524            for entry in node_llcs.iter_mut() {
3525                entry.1 = 0;
3526            }
3527        }
3528
3529        // Phase 4 — Build core_order: append cores from assigned LLCs.
3530        for &(idx, _) in layer_targets.iter() {
3531            let layer = &mut self.layers[idx];
3532
3533            if layer.growth_algo != LayerGrowthAlgo::StickyDynamic {
3534                continue;
3535            }
3536
3537            let all_assigned: HashSet<usize> =
3538                layer.assigned_llcs.iter().flatten().copied().collect();
3539
3540            for core in self.topo.all_cores.iter() {
3541                let llc_id = core.1.llc_id;
3542                if all_assigned.contains(&llc_id) {
3543                    let nid = core.1.node_id;
3544                    layer.core_order[nid].push(core.1.id);
3545                }
3546            }
3547            for node_cores in &mut layer.core_order {
3548                node_cores.reverse();
3549            }
3550
3551            debug!(
3552                " alloc: layer={} core_order={:?}",
3553                layer.name, layer.core_order
3554            );
3555        }
3556
3557        // Phase 5 — Apply CPU changes for StickyDynamic layers.
3558        // Two phases: first free per-node, then allocate per-node.
3559        let mut updated = false;
3560
3561        // Free excess CPUs per-node.
3562        for &(idx, _) in layer_targets.iter() {
3563            let layer = &mut self.layers[idx];
3564
3565            if layer.growth_algo != LayerGrowthAlgo::StickyDynamic {
3566                continue;
3567            }
3568
3569            for n in 0..nr_nodes {
3570                let mut node_target = Cpumask::new();
3571                for &core_id in &layer.core_order[n] {
3572                    if let Some(core) = self.topo.all_cores.get(&core_id) {
3573                        node_target |= &core.span;
3574                    }
3575                }
3576                node_target &= &layer.allowed_cpus;
3577
3578                let node_span = &self.topo.nodes[&n].span;
3579                let node_cur = layer.cpus.and(node_span);
3580                let cpus_to_free = node_cur.and(&node_target.not());
3581
3582                if cpus_to_free.weight() > 0 {
3583                    debug!(
3584                        " apply: layer={} freeing CPUs on node {}: {}",
3585                        layer.name, n, cpus_to_free
3586                    );
3587                    layer.cpus &= &cpus_to_free.not();
3588                    layer.nr_cpus -= cpus_to_free.weight();
3589                    for cpu in cpus_to_free.iter() {
3590                        layer.nr_llc_cpus[self.cpu_pool.topo.all_cpus[&cpu].llc_id] -= 1;
3591                        layer.nr_node_cpus[n] -= 1;
3592                    }
3593                    self.cpu_pool.free(&cpus_to_free)?;
3594                    updated = true;
3595                }
3596            }
3597        }
3598
3599        // Allocate needed CPUs per-node.
3600        for &(idx, _) in layer_targets.iter() {
3601            let layer = &mut self.layers[idx];
3602
3603            if layer.growth_algo != LayerGrowthAlgo::StickyDynamic {
3604                continue;
3605            }
3606
3607            for n in 0..nr_nodes {
3608                let mut node_target = Cpumask::new();
3609                for &core_id in &layer.core_order[n] {
3610                    if let Some(core) = self.topo.all_cores.get(&core_id) {
3611                        node_target |= &core.span;
3612                    }
3613                }
3614                node_target &= &layer.allowed_cpus;
3615
3616                let available_cpus = self.cpu_pool.available_cpus();
3617                let desired_to_alloc = node_target.and(&layer.cpus.clone().not());
3618                let cpus_to_alloc = desired_to_alloc.clone().and(&available_cpus);
3619
3620                if desired_to_alloc.weight() > cpus_to_alloc.weight() {
3621                    debug!(
3622                        " apply: layer={} node {} wanted to alloc {} CPUs but only {} available",
3623                        layer.name,
3624                        n,
3625                        desired_to_alloc.weight(),
3626                        cpus_to_alloc.weight()
3627                    );
3628                }
3629
3630                if cpus_to_alloc.weight() > 0 {
3631                    debug!(
3632                        " apply: layer={} allocating CPUs on node {}: {}",
3633                        layer.name, n, cpus_to_alloc
3634                    );
3635                    layer.cpus |= &cpus_to_alloc;
3636                    layer.nr_cpus += cpus_to_alloc.weight();
3637                    for cpu in cpus_to_alloc.iter() {
3638                        layer.nr_llc_cpus[self.cpu_pool.topo.all_cpus[&cpu].llc_id] += 1;
3639                        layer.nr_node_cpus[n] += 1;
3640                    }
3641                    self.cpu_pool.mark_allocated(&cpus_to_alloc)?;
3642                    updated = true;
3643                }
3644            }
3645
3646            debug!(
3647                " apply: layer={} final cpus.weight()={} nr_cpus={}",
3648                layer.name,
3649                layer.cpus.weight(),
3650                layer.nr_cpus
3651            );
3652        }
3653
3654        Ok(updated)
3655    }
3656
3657    fn refresh_node_ctx(
3658        skel: &mut BpfSkel,
3659        topo: &Topology,
3660        node_empty_layers: &[Vec<u32>],
3661        init: bool,
3662    ) {
3663        for &nid in topo.nodes.keys() {
3664            let mut arg: bpf_intf::refresh_node_ctx_arg =
3665                unsafe { MaybeUninit::zeroed().assume_init() };
3666            arg.node_id = nid as u32;
3667            arg.init = init as u32;
3668
3669            let empty = &node_empty_layers[nid];
3670            arg.nr_empty_layer_ids = empty.len() as u32;
3671            for (i, &lid) in empty.iter().enumerate() {
3672                arg.empty_layer_ids[i] = lid;
3673            }
3674            for i in empty.len()..MAX_LAYERS {
3675                arg.empty_layer_ids[i] = MAX_LAYERS as u32;
3676            }
3677
3678            if init {
3679                // Per-node LLC list — static topology, only set during init.
3680                let node = &topo.nodes[&nid];
3681                let llcs: Vec<u32> = node.llcs.keys().map(|&id| id as u32).collect();
3682                arg.nr_llcs = llcs.len() as u32;
3683                for (i, &llc_id) in llcs.iter().enumerate() {
3684                    arg.llcs[i] = llc_id;
3685                }
3686            }
3687
3688            let input = ProgramInput {
3689                context_in: Some(unsafe { plain::as_mut_bytes(&mut arg) }),
3690                ..Default::default()
3691            };
3692            let _ = skel.progs.refresh_node_ctx.test_run(input);
3693        }
3694    }
3695
3696    fn refresh_cpumasks(&mut self) -> Result<()> {
3697        let layer_is_open = |layer: &Layer| matches!(layer.kind, LayerKind::Open { .. });
3698
3699        let mut updated = false;
3700        let raw_targets = self.calc_target_nr_cpus();
3701        let au = self.cpu_pool.alloc_unit();
3702        let total_cpus = self.cpu_pool.topo.all_cpus.len();
3703
3704        // Dampen shrink: only drop halfway per cycle to avoid unnecessary
3705        // changes. There's some dampening built into util metrics but slow
3706        // down freeing further. This is solely based on intuition. Drop or
3707        // update according to real-world behavior.
3708        let targets: Vec<(usize, usize)> = raw_targets
3709            .iter()
3710            .enumerate()
3711            .map(|(idx, &(target, min))| {
3712                let cur = self.layers[idx].nr_cpus;
3713                if target < cur {
3714                    let dampened = cur - (cur - target).div_ceil(2);
3715                    (dampened.max(min), min)
3716                } else {
3717                    (target, min)
3718                }
3719            })
3720            .collect();
3721
3722        // Build demands for unified_alloc and compute per-node allocations.
3723        let demands = self.calc_raw_demands(&targets);
3724        let nr_nodes = self.topo.nodes.len();
3725        let node_caps: Vec<usize> = self
3726            .topo
3727            .nodes
3728            .values()
3729            .map(|n| n.span.weight() / au)
3730            .collect();
3731        let all_layer_nodes: Vec<&[usize]> = self
3732            .layer_specs
3733            .iter()
3734            .map(|s| s.nodes().as_slice())
3735            .collect();
3736        let norders: Vec<Vec<usize>> = (0..self.layers.len())
3737            .map(|idx| {
3738                layer_core_growth::node_order(
3739                    self.layer_specs[idx].nodes(),
3740                    &self.topo,
3741                    idx,
3742                    &all_layer_nodes,
3743                )
3744            })
3745            .collect();
3746        let node_groups: Vec<Vec<Vec<usize>>> = (0..self.layers.len())
3747            .map(|idx| {
3748                layer_core_growth::node_groups(
3749                    self.layer_specs[idx].nodes(),
3750                    &self.topo,
3751                    idx,
3752                    &all_layer_nodes,
3753                    &self.layer_specs[idx].kind.common().growth_algo,
3754                )
3755            })
3756            .collect();
3757        let layer_allocs = unified_alloc(total_cpus / au, &node_caps, &demands, &node_groups);
3758
3759        // Convert allocations back to CPU counts. Shrink dampening is
3760        // already applied to the targets fed into unified_alloc above.
3761        let cpu_targets: Vec<usize> = layer_allocs.iter().map(|a| a.total() * au).collect();
3762
3763        // Snapshot per-layer CPU counts for ALLOC debug logging.
3764        let prev_nr_cpus: Vec<usize> = self.layers.iter().map(|l| l.nr_cpus).collect();
3765
3766        let mut ascending: Vec<(usize, usize)> = cpu_targets.iter().copied().enumerate().collect();
3767        ascending.sort_by(|a, b| a.1.cmp(&b.1));
3768
3769        // Snapshot per-layer per-node CPU counts before allocation changes.
3770        let prev_node_cpus: Vec<Vec<usize>> =
3771            self.layers.iter().map(|l| l.nr_node_cpus.clone()).collect();
3772
3773        // Per-node SD allocation requires multiple LLCs. On flat topologies
3774        // (single LLC, e.g. VMs with topology disabled), SD layers fall through
3775        // to the non-SD grow/shrink loops below.
3776        let use_sd_alloc = self.topo.all_llcs.len() > 1;
3777        let sticky_dynamic_updated = if use_sd_alloc {
3778            self.recompute_layer_core_order(&ascending, &layer_allocs, au)?
3779        } else {
3780            false
3781        };
3782        updated |= sticky_dynamic_updated;
3783
3784        // Update BPF cpumasks for StickyDynamic layers if they were updated
3785        if sticky_dynamic_updated {
3786            for (idx, layer) in self.layers.iter().enumerate() {
3787                if layer.growth_algo == LayerGrowthAlgo::StickyDynamic {
3788                    Self::update_bpf_layer_cpumask(
3789                        layer,
3790                        &mut self.skel.maps.bss_data.as_mut().unwrap().layers[idx],
3791                    );
3792                }
3793            }
3794        }
3795
3796        // Shrink per-node: free excess CPUs from each node.
3797        for &(idx, _target) in ascending.iter().rev() {
3798            let layer = &mut self.layers[idx];
3799            if layer_is_open(layer) {
3800                continue;
3801            }
3802
3803            // Skip StickyDynamic layers when per-node SD allocation is active.
3804            if layer.growth_algo == LayerGrowthAlgo::StickyDynamic && use_sd_alloc {
3805                continue;
3806            }
3807
3808            let alloc = &layer_allocs[idx];
3809            let mut freed = false;
3810
3811            for n in 0..nr_nodes {
3812                let desired = alloc.node_target(n) * au;
3813                let mut to_free = layer.nr_node_cpus[n].saturating_sub(desired);
3814                let node_span = &self.topo.nodes[&n].span;
3815
3816                while to_free > 0 {
3817                    let node_cands = layer.cpus.and(node_span);
3818                    let cpus_to_free = match self
3819                        .cpu_pool
3820                        .next_to_free(&node_cands, layer.core_order[n].iter().rev())?
3821                    {
3822                        Some(ret) => ret,
3823                        None => break,
3824                    };
3825                    let nr = cpus_to_free.weight();
3826                    trace!(
3827                        "[{}] freeing CPUs on node {}: {}",
3828                        layer.name,
3829                        n,
3830                        &cpus_to_free
3831                    );
3832                    layer.cpus &= &cpus_to_free.not();
3833                    layer.nr_cpus -= nr;
3834                    for cpu in cpus_to_free.iter() {
3835                        let node_id = self.cpu_pool.topo.all_cpus[&cpu].node_id;
3836                        layer.nr_llc_cpus[self.cpu_pool.topo.all_cpus[&cpu].llc_id] -= 1;
3837                        layer.nr_node_cpus[node_id] -= 1;
3838                        layer.nr_pinned_cpus[node_id] =
3839                            layer.nr_pinned_cpus[node_id].min(layer.nr_node_cpus[node_id]);
3840                    }
3841                    self.cpu_pool.free(&cpus_to_free)?;
3842                    to_free = to_free.saturating_sub(nr);
3843                    freed = true;
3844                }
3845            }
3846
3847            if freed {
3848                Self::update_bpf_layer_cpumask(
3849                    layer,
3850                    &mut self.skel.maps.bss_data.as_mut().unwrap().layers[idx],
3851                );
3852                updated = true;
3853            }
3854        }
3855
3856        // Grow layers per-node using allocations from unified_alloc.
3857        for &(idx, _target) in &ascending {
3858            let layer = &mut self.layers[idx];
3859
3860            if layer_is_open(layer) {
3861                continue;
3862            }
3863
3864            // Skip StickyDynamic layers when per-node SD allocation is active.
3865            if layer.growth_algo == LayerGrowthAlgo::StickyDynamic && use_sd_alloc {
3866                continue;
3867            }
3868
3869            let alloc = &layer_allocs[idx];
3870            let norder = &norders[idx];
3871            let mut alloced = false;
3872
3873            for &node_id in norder.iter() {
3874                let node_target = alloc.node_target(node_id) * au;
3875                let cur_node = layer.nr_node_cpus[node_id];
3876                if node_target <= cur_node {
3877                    continue;
3878                }
3879                let pinned_target = alloc.pinned[node_id] * au;
3880                let mut nr_to_alloc = node_target - cur_node;
3881                let node_span = &self.topo.nodes[&node_id].span;
3882                let node_allowed = layer.allowed_cpus.and(node_span);
3883
3884                while nr_to_alloc > 0 {
3885                    let nr_alloced = match self.cpu_pool.alloc_cpus(
3886                        &node_allowed,
3887                        &layer.core_order[node_id],
3888                        nr_to_alloc,
3889                    ) {
3890                        Some(new_cpus) => {
3891                            let nr = new_cpus.weight();
3892                            layer.cpus |= &new_cpus;
3893                            layer.nr_cpus += nr;
3894                            for cpu in new_cpus.iter() {
3895                                layer.nr_llc_cpus[self.cpu_pool.topo.all_cpus[&cpu].llc_id] += 1;
3896                                let nid = self.cpu_pool.topo.all_cpus[&cpu].node_id;
3897                                layer.nr_node_cpus[nid] += 1;
3898                                if layer.nr_pinned_cpus[nid] < pinned_target {
3899                                    layer.nr_pinned_cpus[nid] += 1;
3900                                }
3901                            }
3902                            nr
3903                        }
3904                        None => 0,
3905                    };
3906                    if nr_alloced == 0 {
3907                        break;
3908                    }
3909                    alloced = true;
3910                    nr_to_alloc -= nr_alloced.min(nr_to_alloc);
3911                }
3912            }
3913
3914            if alloced {
3915                Self::update_bpf_layer_cpumask(
3916                    layer,
3917                    &mut self.skel.maps.bss_data.as_mut().unwrap().layers[idx],
3918                );
3919                updated = true;
3920            }
3921        }
3922
3923        // Tell BPF whether all CPUs are allocated.  When the system is
3924        // fully allocated, idle_confined layers have nowhere to grow, so
3925        // they should be allowed to use open (unprotected) idle CPUs from
3926        // other layers.
3927        let total_allocated: usize = self.layers.iter().map(|l| l.nr_cpus).sum();
3928        let fully_allocated = total_allocated >= total_cpus;
3929        for (idx, layer) in self.layers.iter().enumerate() {
3930            if !layer_is_open(layer) {
3931                self.skel.maps.bss_data.as_mut().unwrap().layers[idx]
3932                    .fully_allocated
3933                    .write(fully_allocated);
3934            }
3935        }
3936
3937        // Recompute growth_denied: for each node, check whether the
3938        // unpinned portion of the layer warranted growth (unpinned util /
3939        // util_high > unpinned CPUs) but the node didn't gain CPUs.  Only
3940        // unpinned matters because pinned tasks can't migrate cross-node.
3941        let node_utils = &self.sched_stats.layer_node_utils;
3942        let pinned_utils = &self.sched_stats.layer_node_pinned_utils;
3943        for (idx, layer) in self.layers.iter().enumerate() {
3944            self.growth_denied[idx].fill(false);
3945            let util_high = match layer.kind.util_range() {
3946                Some((_, high)) => high,
3947                None => continue,
3948            };
3949            for n in 0..nr_nodes {
3950                let unpinned_util = (node_utils[idx][n] - pinned_utils[idx][n]).max(0.0);
3951                let unpinned_cpus_needed = unpinned_util / util_high;
3952                let unpinned_cpus_have =
3953                    layer.nr_node_cpus[n].saturating_sub(layer.nr_pinned_cpus[n]) as f64;
3954                let wanted = unpinned_cpus_needed > unpinned_cpus_have;
3955                let got = layer.nr_node_cpus[n] > prev_node_cpus[idx][n];
3956                if wanted && !got {
3957                    self.growth_denied[idx][n] = true;
3958                }
3959            }
3960        }
3961
3962        // Log per-layer allocation changes.
3963        if updated {
3964            for (idx, layer) in self.layers.iter().enumerate() {
3965                if layer_is_open(layer) {
3966                    continue;
3967                }
3968                let prev = prev_nr_cpus[idx];
3969                let cur = layer.nr_cpus;
3970                if prev != cur {
3971                    debug!(
3972                        "ALLOC {} algo={:?} cpus:{}→{} mask={:x}",
3973                        layer.name, layer.growth_algo, prev, cur, layer.cpus,
3974                    );
3975                }
3976            }
3977            debug!(
3978                "ALLOC pool_available={}",
3979                self.cpu_pool.available_cpus().weight()
3980            );
3981        }
3982
3983        // Log allocation changes.
3984        if updated {
3985            let nr_nodes = self.topo.nodes.len();
3986            for (idx, layer) in self.layers.iter().enumerate() {
3987                if layer_is_open(layer) {
3988                    continue;
3989                }
3990                let prev = &prev_node_cpus[idx];
3991                let cur = &layer.nr_node_cpus;
3992                if prev == cur {
3993                    continue;
3994                }
3995                let per_node: String = (0..nr_nodes)
3996                    .map(|n| format!("n{}:{}→{}", n, prev[n], cur[n]))
3997                    .collect::<Vec<_>>()
3998                    .join(" ");
3999                let prev_total: usize = prev.iter().sum();
4000                let cur_total: usize = cur[..nr_nodes].iter().sum();
4001                let target: String = (0..nr_nodes)
4002                    .map(|n| format!("n{}:{}", n, layer_allocs[idx].node_target(n) * au))
4003                    .collect::<Vec<_>>()
4004                    .join(" ");
4005                debug!(
4006                    "ALLOC {} algo={:?} {} total:{}→{} target:[{}] mask={:x}",
4007                    layer.name,
4008                    layer.growth_algo,
4009                    per_node,
4010                    prev_total,
4011                    cur_total,
4012                    target,
4013                    layer.cpus,
4014                );
4015            }
4016            debug!(
4017                "ALLOC pool_available={}",
4018                self.cpu_pool.available_cpus().weight()
4019            );
4020        }
4021
4022        // Give the rest to the open layers.
4023        if updated {
4024            for (idx, layer) in self.layers.iter_mut().enumerate() {
4025                if !layer_is_open(layer) {
4026                    continue;
4027                }
4028
4029                let bpf_layer = &mut self.skel.maps.bss_data.as_mut().unwrap().layers[idx];
4030                let available_cpus = self.cpu_pool.available_cpus().and(&layer.allowed_cpus);
4031                let nr_available_cpus = available_cpus.weight();
4032
4033                // Open layers need the intersection of allowed cpus and
4034                // available cpus. Recompute per-LLC and per-node counts
4035                // since open layers bypass alloc/free.
4036                layer.cpus = available_cpus;
4037                layer.nr_cpus = nr_available_cpus;
4038                for llc in self.cpu_pool.topo.all_llcs.values() {
4039                    layer.nr_llc_cpus[llc.id] = layer.cpus.and(&llc.span).weight();
4040                }
4041                for node in self.cpu_pool.topo.nodes.values() {
4042                    layer.nr_node_cpus[node.id] = layer.cpus.and(&node.span).weight();
4043                    layer.nr_pinned_cpus[node.id] = 0;
4044                }
4045                Self::update_bpf_layer_cpumask(layer, bpf_layer);
4046            }
4047
4048            for (&node_id, &cpu) in &self.cpu_pool.fallback_cpus {
4049                self.skel.maps.bss_data.as_mut().unwrap().fallback_cpus[node_id] = cpu as u32;
4050            }
4051
4052            for (lidx, layer) in self.layers.iter().enumerate() {
4053                self.nr_layer_cpus_ranges[lidx] = (
4054                    self.nr_layer_cpus_ranges[lidx].0.min(layer.nr_cpus),
4055                    self.nr_layer_cpus_ranges[lidx].1.max(layer.nr_cpus),
4056                );
4057            }
4058
4059            // Trigger updates on the BPF side.
4060            let input = ProgramInput {
4061                ..Default::default()
4062            };
4063            let prog = &mut self.skel.progs.refresh_layer_cpumasks;
4064            let _ = prog.test_run(input);
4065
4066            // Update per-node empty layer IDs via BPF prog.
4067            let nr_nodes = self.topo.nodes.len();
4068            let node_empty_layers: Vec<Vec<u32>> = (0..nr_nodes)
4069                .map(|nid| {
4070                    self.layers
4071                        .iter()
4072                        .enumerate()
4073                        .filter(|(_lidx, layer)| layer.nr_node_cpus[nid] == 0)
4074                        .map(|(lidx, _)| lidx as u32)
4075                        .collect()
4076                })
4077                .collect();
4078            Self::refresh_node_ctx(&mut self.skel, &self.topo, &node_empty_layers, false);
4079        }
4080
4081        if let Err(e) = self.update_netdev_cpumasks() {
4082            warn!("Failed to update netdev IRQ cpumasks: {:#}", e);
4083        }
4084        Ok(())
4085    }
4086
4087    fn refresh_idle_qos(&mut self) -> Result<()> {
4088        if !self.idle_qos_enabled {
4089            return Ok(());
4090        }
4091
4092        let mut cpu_idle_qos = vec![0; *NR_CPU_IDS];
4093        for layer in self.layers.iter() {
4094            let idle_resume_us = layer.kind.common().idle_resume_us.unwrap_or(0) as i32;
4095            for cpu in layer.cpus.iter() {
4096                cpu_idle_qos[cpu] = idle_resume_us;
4097            }
4098        }
4099
4100        for (cpu, idle_resume_usec) in cpu_idle_qos.iter().enumerate() {
4101            update_cpu_idle_resume_latency(cpu, *idle_resume_usec)?;
4102        }
4103
4104        Ok(())
4105    }
4106
4107    fn refresh_xnuma(&mut self) {
4108        let nr_nodes = self.topo.nodes.len();
4109        if nr_nodes <= 1 {
4110            return;
4111        }
4112
4113        let duty_sums = &self.sched_stats.layer_node_duty_sums;
4114
4115        for (layer_idx, spec) in self.layer_specs.iter().enumerate() {
4116            let common = spec.kind.common();
4117            let threshold = common.xnuma_threshold;
4118            let threshold_delta = common.xnuma_threshold_delta;
4119            let bpf_layer = &mut self.skel.maps.bss_data.as_mut().unwrap().layers[layer_idx];
4120
4121            if threshold.0 <= 0.0 && threshold.1 <= 0.0 {
4122                // Off — all open, infinite budget
4123                for src in 0..nr_nodes {
4124                    bpf_layer.node[src].xnuma_is_mig_src.write(true);
4125                    for dst in 0..nr_nodes {
4126                        bpf_layer.node[src].xnuma[dst].rate = u64::MAX;
4127                    }
4128                }
4129                self.xnuma_mig_src[layer_idx].fill(false);
4130                continue;
4131            }
4132
4133            let layer = &self.layers[layer_idx];
4134            let is_mig_src = xnuma_check_active(
4135                &duty_sums[layer_idx],
4136                &layer.nr_node_cpus,
4137                threshold,
4138                threshold_delta,
4139                &self.growth_denied[layer_idx],
4140                &self.xnuma_mig_src[layer_idx],
4141            );
4142
4143            self.xnuma_mig_src[layer_idx] = is_mig_src.clone();
4144
4145            let result = xnuma_compute_rates(&duty_sums[layer_idx], &layer.nr_node_cpus);
4146
4147            // Write rates before is_mig_src so BPF sees valid rates
4148            // when the gate activates.
4149            for src in 0..nr_nodes {
4150                for dst in 0..nr_nodes {
4151                    bpf_layer.node[src].xnuma[dst].rate = result.rates[src][dst];
4152                }
4153            }
4154            for (nid, is_src) in is_mig_src.iter().enumerate().take(nr_nodes) {
4155                bpf_layer.node[nid].xnuma_is_mig_src.write(*is_src);
4156            }
4157        }
4158    }
4159
4160    fn step(&mut self) -> Result<()> {
4161        let started_at = Instant::now();
4162        self.sched_stats.refresh(
4163            &mut self.skel,
4164            &self.proc_reader,
4165            started_at,
4166            self.processing_dur,
4167            &self.gpu_task_handler,
4168        )?;
4169
4170        // Update BPF with EWMA values
4171        self.skel
4172            .maps
4173            .bss_data
4174            .as_mut()
4175            .unwrap()
4176            .system_cpu_util_ewma = (self.sched_stats.system_cpu_util_ewma * 10000.0) as u64;
4177
4178        for layer_id in 0..self.sched_stats.nr_layers {
4179            self.skel
4180                .maps
4181                .bss_data
4182                .as_mut()
4183                .unwrap()
4184                .layer_dsq_insert_ewma[layer_id] =
4185                (self.sched_stats.layer_dsq_insert_ewma[layer_id] * 10000.0) as u64;
4186        }
4187
4188        self.refresh_cpumasks()?;
4189        self.refresh_xnuma();
4190        self.refresh_idle_qos()?;
4191        self.gpu_task_handler.maybe_affinitize();
4192        self.processing_dur += Instant::now().duration_since(started_at);
4193        Ok(())
4194    }
4195
4196    fn generate_sys_stats(
4197        &mut self,
4198        stats: &Stats,
4199        cpus_ranges: &mut [(usize, usize)],
4200    ) -> Result<SysStats> {
4201        let bstats = &stats.bpf_stats;
4202        let mut sys_stats = SysStats::new(stats, bstats, &self.cpu_pool.fallback_cpus)?;
4203
4204        for (lidx, (spec, layer)) in self.layer_specs.iter().zip(self.layers.iter()).enumerate() {
4205            let layer_stats = LayerStats::new(
4206                lidx,
4207                layer,
4208                stats,
4209                bstats,
4210                cpus_ranges[lidx],
4211                self.xnuma_mig_src[lidx].iter().any(|&a| a),
4212            );
4213            sys_stats.layers.insert(spec.name.to_string(), layer_stats);
4214            cpus_ranges[lidx] = (layer.nr_cpus, layer.nr_cpus);
4215        }
4216
4217        Ok(sys_stats)
4218    }
4219
4220    // Helper function to process a cgroup creation (common logic for walkdir and inotify)
4221    fn process_cgroup_creation(
4222        path: &Path,
4223        cgroup_regexes: &HashMap<u32, Regex>,
4224        cgroup_path_to_id: &mut HashMap<String, u64>,
4225        sender: &crossbeam::channel::Sender<CgroupEvent>,
4226    ) {
4227        let path_str = path.to_string_lossy().to_string();
4228
4229        // Get cgroup ID (inode number)
4230        let cgroup_id = std::fs::metadata(path)
4231            .map(|metadata| {
4232                use std::os::unix::fs::MetadataExt;
4233                metadata.ino()
4234            })
4235            .unwrap_or(0);
4236
4237        // Build match bitmap by testing against CgroupRegex rules
4238        let mut match_bitmap = 0u64;
4239        for (rule_id, regex) in cgroup_regexes {
4240            if regex.is_match(&path_str) {
4241                match_bitmap |= 1u64 << rule_id;
4242            }
4243        }
4244
4245        // Store in hash
4246        cgroup_path_to_id.insert(path_str.clone(), cgroup_id);
4247
4248        // Send event
4249        if let Err(e) = sender.send(CgroupEvent::Created {
4250            path: path_str,
4251            cgroup_id,
4252            match_bitmap,
4253        }) {
4254            error!("Failed to send cgroup creation event: {}", e);
4255        }
4256    }
4257
4258    fn start_cgroup_watcher(
4259        shutdown: Arc<AtomicBool>,
4260        cgroup_regexes: HashMap<u32, Regex>,
4261    ) -> Result<Receiver<CgroupEvent>> {
4262        let mut inotify = Inotify::init().context("Failed to initialize inotify")?;
4263        let mut wd_to_path = HashMap::new();
4264
4265        // Create crossbeam channel for cgroup events (bounded to prevent memory issues)
4266        let (sender, receiver) = crossbeam::channel::bounded::<CgroupEvent>(1024);
4267
4268        // Watch for directory creation and deletion events
4269        let root_wd = inotify
4270            .watches()
4271            .add("/sys/fs/cgroup", WatchMask::CREATE | WatchMask::DELETE)
4272            .context("Failed to add watch for /sys/fs/cgroup")?;
4273        wd_to_path.insert(root_wd, PathBuf::from("/sys/fs/cgroup"));
4274
4275        // Also recursively watch existing directories for new subdirectories
4276        Self::add_recursive_watches(&mut inotify, &mut wd_to_path, Path::new("/sys/fs/cgroup"))?;
4277
4278        // Spawn watcher thread
4279        std::thread::spawn(move || {
4280            let mut buffer = [0; 4096];
4281            let inotify_fd = inotify.as_raw_fd();
4282            // Maintain hash of cgroup path -> cgroup ID (inode number)
4283            let mut cgroup_path_to_id = HashMap::<String, u64>::new();
4284
4285            // Populate existing cgroups
4286            for entry in WalkDir::new("/sys/fs/cgroup")
4287                .into_iter()
4288                .filter_map(|e| e.ok())
4289                .filter(|e| e.file_type().is_dir())
4290            {
4291                let path = entry.path();
4292                Self::process_cgroup_creation(
4293                    path,
4294                    &cgroup_regexes,
4295                    &mut cgroup_path_to_id,
4296                    &sender,
4297                );
4298            }
4299
4300            while !shutdown.load(Ordering::Relaxed) {
4301                // Use select to wait for events with a 100ms timeout
4302                let ready = unsafe {
4303                    let mut read_fds: libc::fd_set = std::mem::zeroed();
4304                    libc::FD_ZERO(&mut read_fds);
4305                    libc::FD_SET(inotify_fd, &mut read_fds);
4306
4307                    let mut timeout = libc::timeval {
4308                        tv_sec: 0,
4309                        tv_usec: 100_000, // 100ms
4310                    };
4311
4312                    libc::select(
4313                        inotify_fd + 1,
4314                        &mut read_fds,
4315                        std::ptr::null_mut(),
4316                        std::ptr::null_mut(),
4317                        &mut timeout,
4318                    )
4319                };
4320
4321                if ready <= 0 {
4322                    // Timeout or error, continue loop to check shutdown
4323                    continue;
4324                }
4325
4326                // Read events non-blocking
4327                let events = match inotify.read_events(&mut buffer) {
4328                    Ok(events) => events,
4329                    Err(e) => {
4330                        error!("Error reading inotify events: {}", e);
4331                        break;
4332                    }
4333                };
4334
4335                for event in events {
4336                    if !event.mask.contains(inotify::EventMask::CREATE)
4337                        && !event.mask.contains(inotify::EventMask::DELETE)
4338                    {
4339                        continue;
4340                    }
4341
4342                    let name = match event.name {
4343                        Some(name) => name,
4344                        None => continue,
4345                    };
4346
4347                    let parent_path = match wd_to_path.get(&event.wd) {
4348                        Some(parent) => parent,
4349                        None => {
4350                            warn!("Unknown watch descriptor: {:?}", event.wd);
4351                            continue;
4352                        }
4353                    };
4354
4355                    let path = parent_path.join(name.to_string_lossy().as_ref());
4356
4357                    if event.mask.contains(inotify::EventMask::CREATE) {
4358                        if !path.is_dir() {
4359                            continue;
4360                        }
4361
4362                        Self::process_cgroup_creation(
4363                            &path,
4364                            &cgroup_regexes,
4365                            &mut cgroup_path_to_id,
4366                            &sender,
4367                        );
4368
4369                        // Add watch for this new directory
4370                        match inotify
4371                            .watches()
4372                            .add(&path, WatchMask::CREATE | WatchMask::DELETE)
4373                        {
4374                            Ok(wd) => {
4375                                wd_to_path.insert(wd, path.clone());
4376                            }
4377                            Err(e) => {
4378                                warn!(
4379                                    "Failed to add watch for new cgroup {}: {}",
4380                                    path.display(),
4381                                    e
4382                                );
4383                            }
4384                        }
4385                    } else if event.mask.contains(inotify::EventMask::DELETE) {
4386                        let path_str = path.to_string_lossy().to_string();
4387
4388                        // Get cgroup ID from our hash (since the directory is gone, we can't stat it)
4389                        let cgroup_id = cgroup_path_to_id.remove(&path_str).unwrap_or(0);
4390
4391                        // Send removal event to main thread
4392                        if let Err(e) = sender.send(CgroupEvent::Removed {
4393                            path: path_str,
4394                            cgroup_id,
4395                        }) {
4396                            error!("Failed to send cgroup removal event: {}", e);
4397                        }
4398
4399                        // Find and remove the watch descriptor for this path
4400                        let wd_to_remove = wd_to_path.iter().find_map(|(wd, watched_path)| {
4401                            if watched_path == &path {
4402                                Some(wd.clone())
4403                            } else {
4404                                None
4405                            }
4406                        });
4407                        if let Some(wd) = wd_to_remove {
4408                            wd_to_path.remove(&wd);
4409                        }
4410                    }
4411                }
4412            }
4413        });
4414
4415        Ok(receiver)
4416    }
4417
4418    fn add_recursive_watches(
4419        inotify: &mut Inotify,
4420        wd_to_path: &mut HashMap<inotify::WatchDescriptor, PathBuf>,
4421        path: &Path,
4422    ) -> Result<()> {
4423        for entry in WalkDir::new(path)
4424            .into_iter()
4425            .filter_map(|e| e.ok())
4426            .filter(|e| e.file_type().is_dir())
4427            .skip(1)
4428        {
4429            let entry_path = entry.path();
4430            // Add watch for this directory
4431            match inotify
4432                .watches()
4433                .add(entry_path, WatchMask::CREATE | WatchMask::DELETE)
4434            {
4435                Ok(wd) => {
4436                    wd_to_path.insert(wd, entry_path.to_path_buf());
4437                }
4438                Err(e) => {
4439                    debug!("Failed to add watch for {}: {}", entry_path.display(), e);
4440                }
4441            }
4442        }
4443        Ok(())
4444    }
4445
4446    fn run(&mut self, shutdown: Arc<AtomicBool>) -> Result<UserExitInfo> {
4447        let (res_ch, req_ch) = self.stats_server.channels();
4448        let mut next_sched_at = Instant::now() + self.sched_intv;
4449        let enable_layer_refresh = !self.layer_refresh_intv.is_zero();
4450        let mut next_layer_refresh_at = Instant::now() + self.layer_refresh_intv;
4451        let mut cpus_ranges = HashMap::<ThreadId, Vec<(usize, usize)>>::new();
4452
4453        // Start the cgroup watcher only if there are CgroupRegex rules
4454        let cgroup_regexes = self.cgroup_regexes.take().unwrap();
4455        let cgroup_event_rx = if !cgroup_regexes.is_empty() {
4456            Some(Self::start_cgroup_watcher(
4457                shutdown.clone(),
4458                cgroup_regexes,
4459            )?)
4460        } else {
4461            None
4462        };
4463
4464        while !shutdown.load(Ordering::Relaxed) && !uei_exited!(&self.skel, uei) {
4465            let now = Instant::now();
4466
4467            if now >= next_sched_at {
4468                self.step()?;
4469                while next_sched_at < now {
4470                    next_sched_at += self.sched_intv;
4471                }
4472            }
4473
4474            if enable_layer_refresh && now >= next_layer_refresh_at {
4475                self.skel
4476                    .maps
4477                    .bss_data
4478                    .as_mut()
4479                    .unwrap()
4480                    .layer_refresh_seq_avgruntime += 1;
4481                while next_layer_refresh_at < now {
4482                    next_layer_refresh_at += self.layer_refresh_intv;
4483                }
4484            }
4485
4486            // Handle both stats requests and cgroup events with timeout
4487            let timeout_duration = next_sched_at.saturating_duration_since(Instant::now());
4488            let never_rx = crossbeam::channel::never();
4489            let cgroup_rx = cgroup_event_rx.as_ref().unwrap_or(&never_rx);
4490
4491            select! {
4492                recv(req_ch) -> msg => match msg {
4493                    Ok(StatsReq::Hello(tid)) => {
4494                        cpus_ranges.insert(
4495                            tid,
4496                            self.layers.iter().map(|l| (l.nr_cpus, l.nr_cpus)).collect(),
4497                        );
4498                        let stats =
4499                            Stats::new(&mut self.skel, &self.proc_reader, self.topo.clone(), &self.gpu_task_handler, self.sched_stats.util_compensation)?;
4500                        res_ch.send(StatsRes::Hello(Box::new(stats)))?;
4501                    }
4502                    Ok(StatsReq::Refresh(tid, mut stats)) => {
4503                        // Propagate self's layer cpu ranges into each stat's.
4504                        for i in 0..self.nr_layer_cpus_ranges.len() {
4505                            for (_, ranges) in cpus_ranges.iter_mut() {
4506                                ranges[i] = (
4507                                    ranges[i].0.min(self.nr_layer_cpus_ranges[i].0),
4508                                    ranges[i].1.max(self.nr_layer_cpus_ranges[i].1),
4509                                );
4510                            }
4511                            self.nr_layer_cpus_ranges[i] =
4512                                (self.layers[i].nr_cpus, self.layers[i].nr_cpus);
4513                        }
4514
4515                        stats.refresh(
4516                            &mut self.skel,
4517                            &self.proc_reader,
4518                            now,
4519                            self.processing_dur,
4520                            &self.gpu_task_handler,
4521                        )?;
4522                        let sys_stats =
4523                            self.generate_sys_stats(&stats, cpus_ranges.get_mut(&tid).unwrap())?;
4524                        res_ch.send(StatsRes::Refreshed(Box::new((*stats, sys_stats))))?;
4525                    }
4526                    Ok(StatsReq::Bye(tid)) => {
4527                        cpus_ranges.remove(&tid);
4528                        res_ch.send(StatsRes::Bye)?;
4529                    }
4530                    Err(e) => Err(e)?,
4531                },
4532
4533                recv(cgroup_rx) -> event => match event {
4534                    Ok(CgroupEvent::Created { path, cgroup_id, match_bitmap }) => {
4535                        // Insert into BPF map
4536                        self.skel.maps.cgroup_match_bitmap.update(
4537                            &cgroup_id.to_ne_bytes(),
4538                            &match_bitmap.to_ne_bytes(),
4539                            libbpf_rs::MapFlags::ANY,
4540                        ).with_context(|| format!(
4541                            "Failed to insert cgroup {}({}) into BPF map. Cgroup map may be full \
4542                             (max 16384 entries). Aborting.",
4543                            cgroup_id, path
4544                        ))?;
4545
4546                        debug!("Added cgroup {} to BPF map with bitmap 0x{:x}", cgroup_id, match_bitmap);
4547                    }
4548                    Ok(CgroupEvent::Removed { path, cgroup_id }) => {
4549                        // Delete from BPF map
4550                        if let Err(e) = self.skel.maps.cgroup_match_bitmap.delete(&cgroup_id.to_ne_bytes()) {
4551                            warn!("Failed to delete cgroup {} from BPF map: {}", cgroup_id, e);
4552                        } else {
4553                            debug!("Removed cgroup {}({}) from BPF map", cgroup_id, path);
4554                        }
4555                    }
4556                    Err(e) => {
4557                        error!("Error receiving cgroup event: {}", e);
4558                    }
4559                },
4560
4561                recv(crossbeam::channel::after(timeout_duration)) -> _ => {
4562                    // Timeout - continue main loop
4563                }
4564            }
4565        }
4566
4567        let _ = self.struct_ops.take();
4568        uei_report!(&self.skel, uei)
4569    }
4570}
4571
4572impl Drop for Scheduler<'_> {
4573    fn drop(&mut self) {
4574        info!("Unregister {SCHEDULER_NAME} scheduler");
4575
4576        if !self.netdevs.is_empty() {
4577            for (iface, netdev) in &self.netdevs {
4578                if let Err(e) = netdev.restore_cpumasks() {
4579                    warn!("Failed to restore {iface} IRQ affinity: {e}");
4580                }
4581            }
4582            info!("Restored original netdev IRQ affinity");
4583        }
4584
4585        if let Some(struct_ops) = self.struct_ops.take() {
4586            drop(struct_ops);
4587        }
4588    }
4589}
4590
4591fn write_example_file(path: &str) -> Result<()> {
4592    let mut f = fs::OpenOptions::new()
4593        .create_new(true)
4594        .write(true)
4595        .open(path)?;
4596    Ok(f.write_all(serde_json::to_string_pretty(&*EXAMPLE_CONFIG)?.as_bytes())?)
4597}
4598
4599struct HintLayerInfo {
4600    layer_id: usize,
4601    system_cpu_util_below: Option<f64>,
4602    dsq_insert_below: Option<f64>,
4603}
4604
4605fn verify_layer_specs(specs: &[LayerSpec]) -> Result<HashMap<u64, HintLayerInfo>> {
4606    let mut hint_to_layer_map = HashMap::<u64, (usize, String, Option<f64>, Option<f64>)>::new();
4607
4608    let nr_specs = specs.len();
4609    if nr_specs == 0 {
4610        bail!("No layer spec");
4611    }
4612    if nr_specs > MAX_LAYERS {
4613        bail!("Too many layer specs");
4614    }
4615
4616    for (idx, spec) in specs.iter().enumerate() {
4617        if idx < nr_specs - 1 {
4618            if spec.matches.is_empty() {
4619                bail!("Non-terminal spec {:?} has NULL matches", spec.name);
4620            }
4621        } else if spec.matches.len() != 1 || !spec.matches[0].is_empty() {
4622            bail!("Terminal spec {:?} must have an empty match", spec.name);
4623        }
4624
4625        if spec.matches.len() > MAX_LAYER_MATCH_ORS {
4626            bail!(
4627                "Spec {:?} has too many ({}) OR match blocks",
4628                spec.name,
4629                spec.matches.len()
4630            );
4631        }
4632
4633        for (ands_idx, ands) in spec.matches.iter().enumerate() {
4634            if ands.len() > NR_LAYER_MATCH_KINDS {
4635                bail!(
4636                    "Spec {:?}'s {}th OR block has too many ({}) match conditions",
4637                    spec.name,
4638                    ands_idx,
4639                    ands.len()
4640                );
4641            }
4642            let mut hint_equals_cnt = 0;
4643            let mut system_cpu_util_below_cnt = 0;
4644            let mut dsq_insert_below_cnt = 0;
4645            let mut hint_value: Option<u64> = None;
4646            let mut system_cpu_util_threshold: Option<f64> = None;
4647            let mut dsq_insert_threshold: Option<f64> = None;
4648            for one in ands.iter() {
4649                match one {
4650                    LayerMatch::CgroupPrefix(prefix) => {
4651                        if prefix.len() > MAX_PATH {
4652                            bail!("Spec {:?} has too long a cgroup prefix", spec.name);
4653                        }
4654                    }
4655                    LayerMatch::CgroupSuffix(suffix) => {
4656                        if suffix.len() > MAX_PATH {
4657                            bail!("Spec {:?} has too long a cgroup suffix", spec.name);
4658                        }
4659                    }
4660                    LayerMatch::CgroupContains(substr) => {
4661                        if substr.len() > MAX_PATH {
4662                            bail!("Spec {:?} has too long a cgroup substr", spec.name);
4663                        }
4664                    }
4665                    LayerMatch::CommPrefix(prefix) => {
4666                        if prefix.len() > MAX_COMM {
4667                            bail!("Spec {:?} has too long a comm prefix", spec.name);
4668                        }
4669                    }
4670                    LayerMatch::PcommPrefix(prefix) => {
4671                        if prefix.len() > MAX_COMM {
4672                            bail!("Spec {:?} has too long a process name prefix", spec.name);
4673                        }
4674                    }
4675                    LayerMatch::SystemCpuUtilBelow(threshold) => {
4676                        if *threshold < 0.0 || *threshold > 1.0 {
4677                            bail!(
4678                                "Spec {:?} has SystemCpuUtilBelow threshold outside the range [0.0, 1.0]",
4679                                spec.name
4680                            );
4681                        }
4682                        system_cpu_util_threshold = Some(*threshold);
4683                        system_cpu_util_below_cnt += 1;
4684                    }
4685                    LayerMatch::DsqInsertBelow(threshold) => {
4686                        if *threshold < 0.0 || *threshold > 1.0 {
4687                            bail!(
4688                                "Spec {:?} has DsqInsertBelow threshold outside the range [0.0, 1.0]",
4689                                spec.name
4690                            );
4691                        }
4692                        dsq_insert_threshold = Some(*threshold);
4693                        dsq_insert_below_cnt += 1;
4694                    }
4695                    LayerMatch::HintEquals(hint) => {
4696                        if *hint > 1024 {
4697                            bail!(
4698                                "Spec {:?} has hint value outside the range [0, 1024]",
4699                                spec.name
4700                            );
4701                        }
4702                        hint_value = Some(*hint);
4703                        hint_equals_cnt += 1;
4704                    }
4705                    _ => {}
4706                }
4707            }
4708            if hint_equals_cnt > 1 {
4709                bail!("Only 1 HintEquals match permitted per AND block");
4710            }
4711            let high_freq_matcher_cnt = system_cpu_util_below_cnt + dsq_insert_below_cnt;
4712            if high_freq_matcher_cnt > 0 {
4713                if hint_equals_cnt != 1 {
4714                    bail!("High-frequency matchers (SystemCpuUtilBelow, DsqInsertBelow) must be used with one HintEquals");
4715                }
4716                if system_cpu_util_below_cnt > 1 {
4717                    bail!("Only 1 SystemCpuUtilBelow match permitted per AND block");
4718                }
4719                if dsq_insert_below_cnt > 1 {
4720                    bail!("Only 1 DsqInsertBelow match permitted per AND block");
4721                }
4722                if ands.len() != hint_equals_cnt + system_cpu_util_below_cnt + dsq_insert_below_cnt
4723                {
4724                    bail!("High-frequency matchers must be used only with HintEquals (no other matchers)");
4725                }
4726            } else if hint_equals_cnt == 1 && ands.len() != 1 {
4727                bail!("HintEquals match cannot be in conjunction with other matches");
4728            }
4729
4730            // Insert hint into map if present
4731            if let Some(hint) = hint_value {
4732                if let Some((layer_id, name, _, _)) = hint_to_layer_map.get(&hint) {
4733                    if *layer_id != idx {
4734                        bail!(
4735                            "Spec {:?} has hint value ({}) that is already mapped to Spec {:?}",
4736                            spec.name,
4737                            hint,
4738                            name
4739                        );
4740                    }
4741                } else {
4742                    hint_to_layer_map.insert(
4743                        hint,
4744                        (
4745                            idx,
4746                            spec.name.clone(),
4747                            system_cpu_util_threshold,
4748                            dsq_insert_threshold,
4749                        ),
4750                    );
4751                }
4752            }
4753        }
4754
4755        match spec.kind {
4756            LayerKind::Confined {
4757                cpus_range,
4758                util_range,
4759                ..
4760            }
4761            | LayerKind::Grouped {
4762                cpus_range,
4763                util_range,
4764                ..
4765            } => {
4766                if let Some((cpus_min, cpus_max)) = cpus_range {
4767                    if cpus_min > cpus_max {
4768                        bail!(
4769                            "Spec {:?} has invalid cpus_range({}, {})",
4770                            spec.name,
4771                            cpus_min,
4772                            cpus_max
4773                        );
4774                    }
4775                }
4776                if util_range.0 >= util_range.1 {
4777                    bail!(
4778                        "Spec {:?} has invalid util_range ({}, {})",
4779                        spec.name,
4780                        util_range.0,
4781                        util_range.1
4782                    );
4783                }
4784            }
4785            _ => {}
4786        }
4787    }
4788
4789    Ok(hint_to_layer_map
4790        .into_iter()
4791        .map(|(k, v)| {
4792            (
4793                k,
4794                HintLayerInfo {
4795                    layer_id: v.0,
4796                    system_cpu_util_below: v.2,
4797                    dsq_insert_below: v.3,
4798                },
4799            )
4800        })
4801        .collect())
4802}
4803
4804fn name_suffix(cgroup: &str, len: usize) -> String {
4805    let suffixlen = std::cmp::min(len, cgroup.len());
4806    let suffixrev: String = cgroup.chars().rev().take(suffixlen).collect();
4807
4808    suffixrev.chars().rev().collect()
4809}
4810
4811fn traverse_sysfs(dir: &Path) -> Result<Vec<PathBuf>> {
4812    let mut paths = vec![];
4813
4814    if !dir.is_dir() {
4815        panic!("path {:?} does not correspond to directory", dir);
4816    }
4817
4818    let direntries = fs::read_dir(dir)?;
4819
4820    for entry in direntries {
4821        let path = entry?.path();
4822        if path.is_dir() {
4823            paths.append(&mut traverse_sysfs(&path)?);
4824            paths.push(path);
4825        }
4826    }
4827
4828    Ok(paths)
4829}
4830
4831fn find_cpumask(cgroup: &str) -> Cpumask {
4832    let mut path = String::from(cgroup);
4833    path.push_str("/cpuset.cpus.effective");
4834
4835    let description = fs::read_to_string(&mut path).unwrap();
4836
4837    Cpumask::from_cpulist(&description).unwrap()
4838}
4839
4840fn expand_template(rule: &LayerMatch) -> Result<Vec<(LayerMatch, Cpumask)>> {
4841    match rule {
4842        LayerMatch::CgroupSuffix(suffix) => Ok(traverse_sysfs(Path::new("/sys/fs/cgroup"))?
4843            .into_iter()
4844            .map(|cgroup| String::from(cgroup.to_str().expect("could not parse cgroup path")))
4845            .filter(|cgroup| cgroup.ends_with(suffix))
4846            .map(|cgroup| {
4847                (
4848                    {
4849                        let mut slashterminated = cgroup.clone();
4850                        slashterminated.push('/');
4851                        LayerMatch::CgroupSuffix(name_suffix(&slashterminated, 64))
4852                    },
4853                    find_cpumask(&cgroup),
4854                )
4855            })
4856            .collect()),
4857        LayerMatch::CgroupRegex(expr) => Ok(traverse_sysfs(Path::new("/sys/fs/cgroup"))?
4858            .into_iter()
4859            .map(|cgroup| String::from(cgroup.to_str().expect("could not parse cgroup path")))
4860            .filter(|cgroup| {
4861                let re = Regex::new(expr).unwrap();
4862                re.is_match(cgroup)
4863            })
4864            .map(|cgroup| {
4865                (
4866                    // Here we convert the regex match into a suffix match because we still need to
4867                    // do the matching on the bpf side and doing a regex match in bpf isn't
4868                    // easily done.
4869                    {
4870                        let mut slashterminated = cgroup.clone();
4871                        slashterminated.push('/');
4872                        LayerMatch::CgroupSuffix(name_suffix(&slashterminated, 64))
4873                    },
4874                    find_cpumask(&cgroup),
4875                )
4876            })
4877            .collect()),
4878        _ => panic!("Unimplemented template enum {:?}", rule),
4879    }
4880}
4881
4882fn create_perf_fds(skel: &mut BpfSkel, event: u64) -> Result<()> {
4883    let mut attr = perf::bindings::perf_event_attr {
4884        size: std::mem::size_of::<perf::bindings::perf_event_attr>() as u32,
4885        type_: perf::bindings::PERF_TYPE_RAW,
4886        config: event,
4887        sample_type: 0u64,
4888        ..Default::default()
4889    };
4890    attr.__bindgen_anon_1.sample_period = 0u64;
4891    attr.set_disabled(0);
4892
4893    let perf_events_map = &skel.maps.scx_pmu_map;
4894    let map_fd = unsafe { libbpf_sys::bpf_map__fd(perf_events_map.as_libbpf_object().as_ptr()) };
4895
4896    let mut failures = 0u64;
4897
4898    for cpu in 0..*NR_CPUS_POSSIBLE {
4899        let fd = unsafe { perf::perf_event_open(&mut attr as *mut _, -1, cpu as i32, -1, 0) };
4900        if fd < 0 {
4901            failures += 1;
4902            trace!(
4903                "perf_event_open failed cpu={cpu} errno={}",
4904                std::io::Error::last_os_error()
4905            );
4906            continue;
4907        }
4908
4909        let key = cpu as u32;
4910        let val = fd as u32;
4911        let ret = unsafe {
4912            libbpf_sys::bpf_map_update_elem(
4913                map_fd,
4914                &key as *const _ as *const _,
4915                &val as *const _ as *const _,
4916                0,
4917            )
4918        };
4919        if ret != 0 {
4920            trace!("bpf_map_update_elem failed cpu={cpu} fd={fd} ret={ret}");
4921        } else {
4922            trace!("mapped cpu={cpu} -> fd={fd}");
4923        }
4924    }
4925
4926    if failures > 0 {
4927        println!("membw tracking: failed to install {failures} counters");
4928        // Keep going, do not fail the scheduler for this
4929    }
4930
4931    Ok(())
4932}
4933
4934// Set up the counters
4935fn setup_membw_tracking(skel: &mut OpenBpfSkel) -> Result<u64> {
4936    let pmumanager = PMUManager::new()?;
4937    let codename = &pmumanager.codename as &str;
4938
4939    let pmuspec = match codename {
4940        "amdzen1" | "amdzen2" | "amdzen3" => {
4941            trace!("found AMD codename {codename}");
4942            pmumanager.pmus.get("ls_any_fills_from_sys.mem_io_local")
4943        }
4944        "amdzen4" | "amdzen5" => {
4945            trace!("found AMD codename {codename}");
4946            pmumanager.pmus.get("ls_any_fills_from_sys.dram_io_all")
4947        }
4948
4949        "haswell" | "broadwell" | "broadwellde" | "broadwellx" | "skylake" | "skylakex"
4950        | "cascadelakex" | "arrowlake" | "meteorlake" | "sapphirerapids" | "emeraldrapids"
4951        | "graniterapids" => {
4952            trace!("found Intel codename {codename}");
4953            pmumanager.pmus.get("LONGEST_LAT_CACHE.MISS")
4954        }
4955
4956        _ => {
4957            trace!("found unknown codename {codename}");
4958            None
4959        }
4960    };
4961
4962    let spec = pmuspec.ok_or("not_found").unwrap();
4963    let config = (spec.umask << 8) | spec.event[0];
4964
4965    // Install the counter in the BPF map
4966    skel.maps.rodata_data.as_mut().unwrap().membw_event = config;
4967
4968    Ok(config)
4969}
4970
4971#[clap_main::clap_main]
4972fn main(opts: Opts) -> Result<()> {
4973    if opts.version {
4974        println!(
4975            "scx_layered {}",
4976            build_id::full_version(env!("CARGO_PKG_VERSION"))
4977        );
4978        return Ok(());
4979    }
4980
4981    if opts.help_stats {
4982        stats::server_data().describe_meta(&mut std::io::stdout(), None)?;
4983        return Ok(());
4984    }
4985
4986    let env_filter = EnvFilter::try_from_default_env()
4987        .or_else(|_| match EnvFilter::try_new(&opts.log_level) {
4988            Ok(filter) => Ok(filter),
4989            Err(e) => {
4990                eprintln!(
4991                    "invalid log envvar: {}, using info, err is: {}",
4992                    opts.log_level, e
4993                );
4994                EnvFilter::try_new("info")
4995            }
4996        })
4997        .unwrap_or_else(|_| EnvFilter::new("info"));
4998
4999    match tracing_subscriber::fmt()
5000        .with_env_filter(env_filter)
5001        .with_target(true)
5002        .with_thread_ids(true)
5003        .with_file(true)
5004        .with_line_number(true)
5005        .try_init()
5006    {
5007        Ok(()) => {}
5008        Err(e) => eprintln!("failed to init logger: {}", e),
5009    }
5010
5011    if opts.verbose > 0 {
5012        warn!("Setting verbose via -v is deprecated and will be an error in future releases.");
5013    }
5014
5015    if opts.no_load_frac_limit {
5016        warn!("--no-load-frac-limit is deprecated and noop");
5017    }
5018    if opts.layer_preempt_weight_disable != 0.0 {
5019        warn!("--layer-preempt-weight-disable is deprecated and noop");
5020    }
5021    if opts.layer_growth_weight_disable != 0.0 {
5022        warn!("--layer-growth-weight-disable is deprecated and noop");
5023    }
5024    if opts.local_llc_iteration {
5025        warn!("--local_llc_iteration is deprecated and noop");
5026    }
5027
5028    debug!("opts={:?}", &opts);
5029
5030    if let Some(run_id) = opts.run_id {
5031        info!("scx_layered run_id: {}", run_id);
5032    }
5033
5034    let shutdown = Arc::new(AtomicBool::new(false));
5035    let shutdown_clone = shutdown.clone();
5036    ctrlc::set_handler(move || {
5037        shutdown_clone.store(true, Ordering::Relaxed);
5038    })
5039    .context("Error setting Ctrl-C handler")?;
5040
5041    if let Some(intv) = opts.monitor.or(opts.stats) {
5042        let shutdown_copy = shutdown.clone();
5043        let stats_columns = opts.stats_columns;
5044        let stats_no_llc = opts.stats_no_llc;
5045        let jh = std::thread::spawn(move || {
5046            match stats::monitor(
5047                Duration::from_secs_f64(intv),
5048                shutdown_copy,
5049                stats_columns,
5050                stats_no_llc,
5051            ) {
5052                Ok(_) => {
5053                    debug!("stats monitor thread finished successfully")
5054                }
5055                Err(error_object) => {
5056                    warn!(
5057                        "stats monitor thread finished because of an error {}",
5058                        error_object
5059                    )
5060                }
5061            }
5062        });
5063        if opts.monitor.is_some() {
5064            let _ = jh.join();
5065            return Ok(());
5066        }
5067    }
5068
5069    if let Some(path) = &opts.example {
5070        write_example_file(path)?;
5071        return Ok(());
5072    }
5073
5074    let mut layer_config = match opts.run_example {
5075        true => EXAMPLE_CONFIG.clone(),
5076        false => LayerConfig { specs: vec![] },
5077    };
5078
5079    for (idx, input) in opts.specs.iter().enumerate() {
5080        let specs = LayerSpec::parse(input)
5081            .context(format!("Failed to parse specs[{}] ({:?})", idx, input))?;
5082
5083        for spec in specs {
5084            match spec.template {
5085                Some(ref rule) => {
5086                    let matches = expand_template(rule)?;
5087                    // in the absence of matching cgroups, have template layers
5088                    // behave as non-template layers do.
5089                    if matches.is_empty() {
5090                        layer_config.specs.push(spec);
5091                    } else {
5092                        for (mt, mask) in matches {
5093                            let mut genspec = spec.clone();
5094
5095                            genspec.cpuset = Some(mask);
5096
5097                            // Push the new "and" rule into each "or" term.
5098                            for orterm in &mut genspec.matches {
5099                                orterm.push(mt.clone());
5100                            }
5101
5102                            match &mt {
5103                                LayerMatch::CgroupSuffix(cgroup) => genspec.name.push_str(cgroup),
5104                                _ => bail!("Template match has unexpected type"),
5105                            }
5106
5107                            // Push the generated layer into the config
5108                            layer_config.specs.push(genspec);
5109                        }
5110                    }
5111                }
5112
5113                None => {
5114                    layer_config.specs.push(spec);
5115                }
5116            }
5117        }
5118    }
5119
5120    for spec in layer_config.specs.iter_mut() {
5121        let common = spec.kind.common_mut();
5122
5123        if common.slice_us == 0 {
5124            common.slice_us = opts.slice_us;
5125        }
5126
5127        if common.weight == 0 {
5128            common.weight = DEFAULT_LAYER_WEIGHT;
5129        }
5130        common.weight = common.weight.clamp(MIN_LAYER_WEIGHT, MAX_LAYER_WEIGHT);
5131
5132        if common.preempt {
5133            if common.disallow_open_after_us.is_some() {
5134                warn!(
5135                    "Preempt layer {} has non-null disallow_open_after_us, ignored",
5136                    &spec.name
5137                );
5138            }
5139            if common.disallow_preempt_after_us.is_some() {
5140                warn!(
5141                    "Preempt layer {} has non-null disallow_preempt_after_us, ignored",
5142                    &spec.name
5143                );
5144            }
5145            common.disallow_open_after_us = Some(u64::MAX);
5146            common.disallow_preempt_after_us = Some(u64::MAX);
5147        } else {
5148            if common.disallow_open_after_us.is_none() {
5149                common.disallow_open_after_us = Some(*DFL_DISALLOW_OPEN_AFTER_US);
5150            }
5151
5152            if common.disallow_preempt_after_us.is_none() {
5153                common.disallow_preempt_after_us = Some(*DFL_DISALLOW_PREEMPT_AFTER_US);
5154            }
5155        }
5156
5157        if common.idle_smt.is_some() {
5158            warn!("Layer {} has deprecated flag \"idle_smt\"", &spec.name);
5159        }
5160
5161        if common.allow_node_aligned.is_some() {
5162            warn!("Layer {} has deprecated flag \"allow_node_aligned\", node-aligned tasks are now always dispatched on layer DSQs", &spec.name);
5163        }
5164    }
5165
5166    let membw_required = layer_config.specs.iter().any(|spec| match spec.kind {
5167        LayerKind::Confined { membw_gb, .. } | LayerKind::Grouped { membw_gb, .. } => {
5168            membw_gb.is_some()
5169        }
5170        LayerKind::Open { .. } => false,
5171    });
5172
5173    if opts.print_and_exit {
5174        println!("specs={}", serde_json::to_string_pretty(&layer_config)?);
5175        return Ok(());
5176    }
5177
5178    debug!("specs={}", serde_json::to_string_pretty(&layer_config)?);
5179    let hint_to_layer_map = verify_layer_specs(&layer_config.specs)?;
5180
5181    let mut open_object = MaybeUninit::uninit();
5182    loop {
5183        let mut sched = Scheduler::init(
5184            &opts,
5185            &layer_config.specs,
5186            &mut open_object,
5187            &hint_to_layer_map,
5188            membw_required,
5189        )?;
5190        if !sched.run(shutdown.clone())?.should_restart() {
5191            break;
5192        }
5193    }
5194
5195    Ok(())
5196}
5197
5198#[cfg(test)]
5199mod xnuma_tests {
5200    use super::*;
5201
5202    // Default thresholds for tests
5203    const THRESH: (f64, f64) = (0.6, 0.7);
5204    const DELTA: (f64, f64) = (0.2, 0.3);
5205
5206    // =====================================================================
5207    // xnuma_check_active tests — per-(layer, node) two-threshold
5208    // =====================================================================
5209
5210    #[test]
5211    fn test_activation_below_threshold() {
5212        // Both nodes below threshold — both closed
5213        let duty = vec![40.0, 40.0];
5214        let allocs = vec![96, 96];
5215        let gd = vec![true, true];
5216        let cur = vec![false, false];
5217        let result = xnuma_check_active(&duty, &allocs, THRESH, DELTA, &gd, &cur);
5218        // ratio = 0.42 < 0.6 (low) → deactivate
5219        assert!(!result[0]);
5220        assert!(!result[1]);
5221    }
5222
5223    #[test]
5224    fn test_activation_above_all_thresholds() {
5225        // N0 overloaded + imbalanced + growth denied → open
5226        let duty = vec![90.0, 20.0];
5227        let allocs = vec![96, 96];
5228        let gd = vec![true, false];
5229        let cur = vec![false, false];
5230        let result = xnuma_check_active(&duty, &allocs, THRESH, DELTA, &gd, &cur);
5231        // N0: load=0.94>0.7, eq=110/192=0.573, surplus=90-55=35, ratio=0.365>0.3, gd=true → open
5232        assert!(result[0]);
5233        // N1: load=0.21<0.6 → closed
5234        assert!(!result[1]);
5235    }
5236
5237    #[test]
5238    fn test_activation_requires_growth_denied() {
5239        // N0 overloaded + imbalanced but growth NOT denied → closed
5240        let duty = vec![90.0, 20.0];
5241        let allocs = vec![96, 96];
5242        let gd = vec![false, false]; // growth succeeded
5243        let cur = vec![false, false];
5244        let result = xnuma_check_active(&duty, &allocs, THRESH, DELTA, &gd, &cur);
5245        assert!(!result[0]); // !growth_denied → deactivate
5246    }
5247
5248    #[test]
5249    fn test_symmetric_high_load_stays_closed() {
5250        // Both nodes above threshold but balanced (delta=0) → closed
5251        let duty = vec![80.0, 80.0];
5252        let allocs = vec![96, 96];
5253        let gd = vec![true, true];
5254        let cur = vec![false, false];
5255        let result = xnuma_check_active(&duty, &allocs, THRESH, DELTA, &gd, &cur);
5256        // load=0.83>0.7 but surplus=0, surplus_ratio=0 < 0.3 → no activation
5257        assert!(!result[0]);
5258        assert!(!result[1]);
5259    }
5260
5261    #[test]
5262    fn test_hysteresis_stays_active() {
5263        // N0 was active, now between thresholds → stays active
5264        let duty = vec![75.0, 20.0];
5265        let allocs = vec![96, 96];
5266        let gd = vec![true, false];
5267        let cur = vec![true, false]; // N0 was active
5268        let result = xnuma_check_active(&duty, &allocs, THRESH, DELTA, &gd, &cur);
5269        // N0: load=0.78 > 0.6(lo) and < 0.7(hi), surplus=(75-20)/2=27.5, ratio=0.286 > 0.2(lo)
5270        // gd=true. activate: 0.78<0.7 NO. deactivate: 0.78>0.6 NO, 0.286>0.2 NO, gd=true NO
5271        // → hysteresis preserves true
5272        assert!(result[0]);
5273    }
5274
5275    #[test]
5276    fn test_hysteresis_stays_inactive() {
5277        // N0 was inactive, in hysteresis band → stays inactive
5278        let duty = vec![75.0, 20.0];
5279        let allocs = vec![96, 96];
5280        let gd = vec![true, false];
5281        let cur = vec![false, false]; // N0 was inactive
5282        let result = xnuma_check_active(&duty, &allocs, THRESH, DELTA, &gd, &cur);
5283        // Same conditions but currently inactive → stays inactive
5284        assert!(!result[0]);
5285    }
5286
5287    #[test]
5288    fn test_deactivation_load_drops() {
5289        // N0 was active, load drops below low → closes
5290        let duty = vec![50.0, 50.0];
5291        let allocs = vec![96, 96];
5292        let gd = vec![true, true];
5293        let cur = vec![true, false];
5294        let result = xnuma_check_active(&duty, &allocs, THRESH, DELTA, &gd, &cur);
5295        // N0: load=0.52 < 0.6(lo) → deactivate
5296        assert!(!result[0]);
5297    }
5298
5299    #[test]
5300    fn test_deactivation_growth_succeeds() {
5301        // N0 was active, growth now succeeds → closes
5302        let duty = vec![90.0, 20.0];
5303        let allocs = vec![96, 96];
5304        let gd = vec![false, false]; // growth succeeded
5305        let cur = vec![true, false];
5306        let result = xnuma_check_active(&duty, &allocs, THRESH, DELTA, &gd, &cur);
5307        // !growth_denied → deactivate regardless of load/delta
5308        assert!(!result[0]);
5309    }
5310
5311    #[test]
5312    fn test_zero_alloc_with_duty_and_growth_denied() {
5313        // Zero alloc but duty > 0 and growth denied → open
5314        let duty = vec![50.0, 0.0];
5315        let allocs = vec![0, 96];
5316        let gd = vec![true, false];
5317        let cur = vec![false, false];
5318        let result = xnuma_check_active(&duty, &allocs, THRESH, DELTA, &gd, &cur);
5319        assert!(result[0]); // zero alloc + duty + gd → source
5320    }
5321
5322    #[test]
5323    fn test_all_zero_alloc() {
5324        let duty = vec![0.0, 0.0];
5325        let allocs = vec![0, 0];
5326        let gd = vec![true, true];
5327        let cur = vec![true, true];
5328        let result = xnuma_check_active(&duty, &allocs, THRESH, DELTA, &gd, &cur);
5329        assert!(!result[0]);
5330        assert!(!result[1]);
5331    }
5332
5333    #[test]
5334    fn test_three_nodes_mixed() {
5335        // 3 nodes: N0 overloaded+imbalanced+gd, N1 moderate, N2 low
5336        let duty = vec![90.0, 40.0, 10.0];
5337        let allocs = vec![96, 96, 96];
5338        let gd = vec![true, true, false];
5339        let cur = vec![false, false, false];
5340        let result = xnuma_check_active(&duty, &allocs, THRESH, DELTA, &gd, &cur);
5341        // eq_ratio = 140/288 ≈ 0.486
5342        // N0: load=0.94>0.7, surplus=90-46.7=43.3, ratio=0.451>0.3, gd=true → open
5343        assert!(result[0]);
5344        // N1: load=0.42<0.6 → deactivate
5345        assert!(!result[1]);
5346        // N2: load=0.10<0.6 → deactivate
5347        assert!(!result[2]);
5348    }
5349
5350    #[test]
5351    fn test_per_node_independence() {
5352        // N0 active, N1 deactivates independently
5353        let duty = vec![90.0, 50.0];
5354        let allocs = vec![96, 96];
5355        let gd = vec![true, true];
5356        let cur = vec![true, true];
5357        let result = xnuma_check_active(&duty, &allocs, THRESH, DELTA, &gd, &cur);
5358        // N0: load=0.94>0.7, eq=140/192=0.729, surplus=90-70=20, ratio=0.208>0.2(lo)
5359        //   activate: 0.94>0.7 YES, 0.208>0.3 NO → no activate
5360        //   deactivate: 0.94>0.6 NO, 0.208>0.2 NO, gd=true NO → no deactivate → preserve true
5361        assert!(result[0]);
5362        // N1: load=0.52<0.6 → deactivate
5363        assert!(!result[1]);
5364    }
5365
5366    // =====================================================================
5367    // xnuma_compute_rates tests — basic
5368    // =====================================================================
5369
5370    #[test]
5371    fn test_rates_balanced_load() {
5372        // Equal load per CPU on both nodes — no migration needed
5373        let duty = vec![48.0, 48.0];
5374        let allocs = vec![96, 96];
5375        let result = xnuma_compute_rates(&duty, &allocs);
5376
5377        // No surplus/deficit → all rates zero
5378        for src in 0..2 {
5379            for dst in 0..2 {
5380                assert_eq!(result.rates[src][dst], 0);
5381            }
5382        }
5383    }
5384
5385    #[test]
5386    fn test_rates_one_overloaded() {
5387        // N0 has 80 CPUs of duty, N1 has 40. Both have 96 CPUs.
5388        // eq_ratio = 120/192 = 0.625
5389        // N0 expected = 0.625 * 96 = 60, surplus = 80 - 60 = 20
5390        // N1 expected = 0.625 * 96 = 60, deficit = 60 - 40 = 20
5391        let duty = vec![80.0, 40.0];
5392        let allocs = vec![96, 96];
5393        let result = xnuma_compute_rates(&duty, &allocs);
5394
5395        // rate[0][1] should be positive (migrate from N0 to N1)
5396        assert!(result.rates[0][1] > 0);
5397        // rate[1][0] should be 0 (N1 has no surplus)
5398        assert_eq!(result.rates[1][0], 0);
5399        // Self-rates always 0
5400        assert_eq!(result.rates[0][0], 0);
5401        assert_eq!(result.rates[1][1], 0);
5402
5403        // Verify the rate magnitude: migration = 20.0 * DAMPEN, scaled
5404        let expected_rate = (20.0 * XNUMA_RATE_DAMPEN * DUTY_CYCLE_SCALE) as u64;
5405        assert_eq!(result.rates[0][1], expected_rate);
5406    }
5407
5408    #[test]
5409    fn test_rates_asymmetric_allocation() {
5410        // N0 has 48 CPUs, N1 has 144 CPUs. Duty 48 each.
5411        // eq_ratio = 96/192 = 0.5
5412        // N0 expected = 0.5 * 48 = 24, surplus = 48 - 24 = 24
5413        // N1 expected = 0.5 * 144 = 72, deficit = 72 - 48 = 24
5414        let duty = vec![48.0, 48.0];
5415        let allocs = vec![48, 144];
5416        let result = xnuma_compute_rates(&duty, &allocs);
5417
5418        let expected_rate = (24.0 * XNUMA_RATE_DAMPEN * DUTY_CYCLE_SCALE) as u64;
5419        assert_eq!(result.rates[0][1], expected_rate);
5420        assert_eq!(result.rates[1][0], 0);
5421    }
5422
5423    // =====================================================================
5424    // xnuma_compute_rates tests — multi-node
5425    // =====================================================================
5426
5427    #[test]
5428    fn test_rates_three_nodes_one_source() {
5429        // N0 overloaded, N1 and N2 are deficit
5430        // allocs: 96 each, duty: N0=120, N1=30, N2=30
5431        // eq_ratio = 180/288 = 0.625
5432        // N0 expected = 60, surplus = 60
5433        // N1 expected = 60, deficit = 30
5434        // N2 expected = 60, deficit = 30
5435        let duty = vec![120.0, 30.0, 30.0];
5436        let allocs = vec![96, 96, 96];
5437        let result = xnuma_compute_rates(&duty, &allocs);
5438
5439        // Total deficit = 60. N1 gets 30/60 = 50%, N2 gets 30/60 = 50%
5440        // rate[0][1] = 60 * 30/60 * DAMPEN = 15
5441        // rate[0][2] = 60 * 30/60 * DAMPEN = 15
5442        let rate_01 = (30.0 * XNUMA_RATE_DAMPEN * DUTY_CYCLE_SCALE) as u64;
5443        let rate_02 = (30.0 * XNUMA_RATE_DAMPEN * DUTY_CYCLE_SCALE) as u64;
5444        assert_eq!(result.rates[0][1], rate_01);
5445        assert_eq!(result.rates[0][2], rate_02);
5446
5447        // No reverse flow
5448        assert_eq!(result.rates[1][0], 0);
5449        assert_eq!(result.rates[2][0], 0);
5450        assert_eq!(result.rates[1][2], 0);
5451        assert_eq!(result.rates[2][1], 0);
5452    }
5453
5454    #[test]
5455    fn test_rates_three_nodes_unequal_deficit() {
5456        // N0 overloaded. N1 slight deficit, N2 large deficit.
5457        // allocs: 96 each, duty: N0=120, N1=50, N2=10
5458        // eq_ratio = 180/288 = 0.625
5459        // N0: expected=60, surplus=60
5460        // N1: expected=60, deficit=10
5461        // N2: expected=60, deficit=50
5462        let duty = vec![120.0, 50.0, 10.0];
5463        let allocs = vec![96, 96, 96];
5464        let result = xnuma_compute_rates(&duty, &allocs);
5465
5466        // Total deficit = 60. N1 share = 10/60, N2 share = 50/60
5467        // rate[0][1] = 60 * 10/60 * DAMPEN = 5
5468        // rate[0][2] = 60 * 50/60 * DAMPEN = 25
5469        let rate_01 = (10.0 * XNUMA_RATE_DAMPEN * DUTY_CYCLE_SCALE) as u64;
5470        let rate_02 = (50.0 * XNUMA_RATE_DAMPEN * DUTY_CYCLE_SCALE) as u64;
5471        assert_eq!(result.rates[0][1], rate_01);
5472        assert_eq!(result.rates[0][2], rate_02);
5473    }
5474
5475    #[test]
5476    fn test_rates_two_sources_one_sink() {
5477        // N0 and N1 overloaded, N2 deficit
5478        // allocs: 96 each, duty: N0=80, N1=70, N2=30
5479        // total = 180, eq_ratio = 180/288 = 0.625
5480        // N0: expected=60, surplus=20
5481        // N1: expected=60, surplus=10
5482        // N2: expected=60, deficit=30
5483        let duty = vec![80.0, 70.0, 30.0];
5484        let allocs = vec![96, 96, 96];
5485        let result = xnuma_compute_rates(&duty, &allocs);
5486
5487        // Total deficit = 30. Only N2 is deficit, so deficit share = 1.0
5488        // rate[0][2] = 20 * 1.0 * DAMPEN = 10
5489        // rate[1][2] = 10 * 1.0 * DAMPEN = 5
5490        let rate_02 = (20.0 * XNUMA_RATE_DAMPEN * DUTY_CYCLE_SCALE) as u64;
5491        let rate_12 = (10.0 * XNUMA_RATE_DAMPEN * DUTY_CYCLE_SCALE) as u64;
5492        assert_eq!(result.rates[0][2], rate_02);
5493        assert_eq!(result.rates[1][2], rate_12);
5494
5495        // No flow between sources
5496        assert_eq!(result.rates[0][1], 0);
5497        assert_eq!(result.rates[1][0], 0);
5498    }
5499
5500    // =====================================================================
5501    // Conservation invariants
5502    // =====================================================================
5503
5504    #[test]
5505    fn test_conservation_per_source_outbound() {
5506        // Each source's total outbound should equal its surplus * DAMPEN (scaled).
5507        // Total outbound from src = surplus[src] * DAMPEN * DUTY_CYCLE_SCALE
5508        let duty = vec![100.0, 30.0, 50.0, 20.0];
5509        let allocs = vec![96, 96, 96, 96];
5510        let nr = 4;
5511        let result = xnuma_compute_rates(&duty, &allocs);
5512
5513        let total_duty: f64 = duty.iter().sum();
5514        let total_alloc: f64 = allocs.iter().map(|&a| a as f64).sum();
5515        let eq_ratio = total_duty / total_alloc;
5516
5517        for src in 0..nr {
5518            let expected = eq_ratio * allocs[src] as f64;
5519            let surplus = (duty[src] - expected).max(0.0);
5520            let total_outbound: u64 = (0..nr).map(|dst| result.rates[src][dst]).sum();
5521            let expected_rate = (surplus * XNUMA_RATE_DAMPEN * DUTY_CYCLE_SCALE) as u64;
5522            assert_eq!(
5523                total_outbound, expected_rate,
5524                "node {} outbound mismatch",
5525                src
5526            );
5527        }
5528    }
5529
5530    #[test]
5531    fn test_conservation_surplus_equals_deficit() {
5532        // Mathematical invariant: total surplus == total deficit in water-fill
5533        let duty = vec![100.0, 30.0, 50.0];
5534        let allocs = vec![96, 96, 96];
5535
5536        let total_duty: f64 = duty.iter().sum();
5537        let total_alloc: f64 = allocs.iter().map(|&a| a as f64).sum();
5538        let eq_ratio = total_duty / total_alloc;
5539
5540        let mut total_surplus = 0.0f64;
5541        let mut total_deficit = 0.0f64;
5542        for i in 0..3 {
5543            let expected = eq_ratio * allocs[i] as f64;
5544            let delta = duty[i] - expected;
5545            if delta > 0.0 {
5546                total_surplus += delta;
5547            } else {
5548                total_deficit += -delta;
5549            }
5550        }
5551        assert!((total_surplus - total_deficit).abs() < 1e-10);
5552    }
5553
5554    #[test]
5555    fn test_self_rates_always_zero() {
5556        let duty = vec![100.0, 30.0, 50.0];
5557        let allocs = vec![96, 96, 96];
5558        let result = xnuma_compute_rates(&duty, &allocs);
5559
5560        for nid in 0..3 {
5561            assert_eq!(result.rates[nid][nid], 0);
5562        }
5563    }
5564
5565    #[test]
5566    fn test_deficit_nodes_have_zero_outbound() {
5567        // Deficit nodes should have zero outbound rates
5568        let duty = vec![100.0, 20.0, 30.0, 50.0];
5569        let allocs = vec![96, 96, 96, 96];
5570        let result = xnuma_compute_rates(&duty, &allocs);
5571
5572        // eq_ratio = 200/384 ≈ 0.521. N0: surplus=50, N3: surplus=0
5573        // N1: deficit=30, N2: deficit=20
5574        // Deficit nodes (outbound sum == 0) should have all zero rates
5575        for nid in 0..4 {
5576            let outbound: u64 = (0..4).map(|dst| result.rates[nid][dst]).sum();
5577            if outbound == 0 {
5578                for dst in 0..4 {
5579                    assert_eq!(
5580                        result.rates[nid][dst], 0,
5581                        "deficit node {} has non-zero rate to {}",
5582                        nid, dst
5583                    );
5584                }
5585            }
5586        }
5587    }
5588
5589    // =====================================================================
5590    // Edge cases
5591    // =====================================================================
5592
5593    #[test]
5594    fn test_rates_zero_duty_everywhere() {
5595        let duty = vec![0.0, 0.0];
5596        let allocs = vec![96, 96];
5597        let result = xnuma_compute_rates(&duty, &allocs);
5598
5599        for src in 0..2 {
5600            for dst in 0..2 {
5601                assert_eq!(result.rates[src][dst], 0);
5602            }
5603        }
5604    }
5605
5606    #[test]
5607    fn test_rates_zero_alloc() {
5608        let duty = vec![50.0, 50.0];
5609        let allocs = vec![0, 0];
5610        let result = xnuma_compute_rates(&duty, &allocs);
5611
5612        // total_alloc = 0 → early return with all zeros
5613        for src in 0..2 {
5614            for dst in 0..2 {
5615                assert_eq!(result.rates[src][dst], 0);
5616            }
5617        }
5618    }
5619
5620    #[test]
5621    fn test_rates_all_load_one_node() {
5622        // All duty on N0, nothing on N1
5623        // allocs: 96 each, duty: N0=96, N1=0
5624        // eq_ratio = 96/192 = 0.5
5625        // N0: expected=48, surplus=48
5626        // N1: expected=48, deficit=48
5627        let duty = vec![96.0, 0.0];
5628        let allocs = vec![96, 96];
5629        let result = xnuma_compute_rates(&duty, &allocs);
5630
5631        let expected_rate = (48.0 * XNUMA_RATE_DAMPEN * DUTY_CYCLE_SCALE) as u64;
5632        assert_eq!(result.rates[0][1], expected_rate);
5633    }
5634
5635    #[test]
5636    fn test_rates_single_node() {
5637        // Single node — balanced by definition
5638        let duty = vec![96.0];
5639        let allocs = vec![96];
5640        let result = xnuma_compute_rates(&duty, &allocs);
5641
5642        assert_eq!(result.rates[0][0], 0);
5643    }
5644
5645    #[test]
5646    fn test_rates_one_node_zero_alloc() {
5647        // N0 has allocation, N1 has zero
5648        // eq_ratio = 80/96
5649        // N0: expected=80, surplus=0 → balanced
5650        // N1: expected=0, deficit=0 → zero alloc skipped effectively
5651        let duty = vec![80.0, 0.0];
5652        let allocs = vec![96, 0];
5653        let result = xnuma_compute_rates(&duty, &allocs);
5654
5655        // N0: surplus = 80 - (80/96 * 96) = 0
5656        // N1: deficit = (80/96 * 0) - 0 = 0
5657        // All zeros — nothing to migrate
5658        assert_eq!(result.rates[0][1], 0);
5659        assert_eq!(result.rates[1][0], 0);
5660    }
5661
5662    // =====================================================================
5663    // Rate magnitude and scaling
5664    // =====================================================================
5665
5666    #[test]
5667    fn test_rate_scaling() {
5668        // Verify rates are in DUTY_CYCLE_SCALE units
5669        let duty = vec![80.0, 40.0];
5670        let allocs = vec![96, 96];
5671        let result = xnuma_compute_rates(&duty, &allocs);
5672
5673        // surplus = 20, deficit = 20 → migration = 20 * DAMPEN = 10
5674        // rate = 10 * (1 << 20)
5675        let expected = (20.0 * XNUMA_RATE_DAMPEN * DUTY_CYCLE_SCALE) as u64;
5676        assert_eq!(result.rates[0][1], expected);
5677        assert_eq!(expected, 10 * (1 << 20));
5678    }
5679
5680    #[test]
5681    fn test_rates_tiny_imbalance() {
5682        // Very small imbalance — should still produce non-zero rate
5683        let duty = vec![48.001, 47.999];
5684        let allocs = vec![96, 96];
5685        let result = xnuma_compute_rates(&duty, &allocs);
5686
5687        // surplus ≈ 0.001, rate ≈ 0.001 * 2^20 ≈ 1048
5688        assert!(result.rates[0][1] > 0);
5689        assert!(result.rates[0][1] < (1 << 20)); // Less than 1.0 CPU worth
5690    }
5691
5692    #[test]
5693    fn test_rates_large_values() {
5694        // Large system: 8 nodes, 96 CPUs each
5695        let duty = vec![300.0, 100.0, 50.0, 50.0, 50.0, 50.0, 50.0, 50.0];
5696        let allocs = vec![96, 96, 96, 96, 96, 96, 96, 96];
5697        let result = xnuma_compute_rates(&duty, &allocs);
5698
5699        // eq_ratio = 700/768 ≈ 0.911. N0 surplus=212.5, N1 surplus=12.5
5700        // N2-N7 deficit=37.5 each. N0 and N1 have surplus.
5701        assert!(result.rates[0][2] > 0); // N0 → N2 (deficit node)
5702        assert!(result.rates[1][2] > 0); // N1 → N2 (N1 also has surplus)
5703
5704        // Verify conservation: per-source outbound ≈ surplus * DAMPEN (within
5705        // truncation tolerance — each `as u64` can lose up to 1 per cell)
5706        let total_duty: f64 = duty.iter().sum();
5707        let total_alloc: f64 = allocs.iter().map(|&a| a as f64).sum();
5708        let eq_ratio = total_duty / total_alloc;
5709        let nr = 8;
5710        for src in 0..nr {
5711            let surplus = (duty[src] - eq_ratio * allocs[src] as f64).max(0.0);
5712            let outbound: u64 = (0..nr).map(|dst| result.rates[src][dst]).sum();
5713            let expected = (surplus * XNUMA_RATE_DAMPEN * DUTY_CYCLE_SCALE) as u64;
5714            let tolerance = nr as u64; // up to 1 per destination cell
5715            assert!(
5716                outbound.abs_diff(expected) <= tolerance,
5717                "node {} outbound {} vs expected {}, diff {}",
5718                src,
5719                outbound,
5720                expected,
5721                outbound.abs_diff(expected)
5722            );
5723        }
5724    }
5725
5726    // =====================================================================
5727    // Hysteresis integration
5728    // =====================================================================
5729
5730    #[test]
5731    fn test_hysteresis_cycle() {
5732        let allocs = vec![96, 96];
5733        let gd = vec![true, false]; // N0 growth denied
5734
5735        // Start: all closed, low load
5736        let active =
5737            xnuma_check_active(&[40.0, 40.0], &allocs, THRESH, DELTA, &gd, &[false, false]);
5738        assert!(!active[0]); // load 0.42 < 0.6(lo) → deactivate
5739
5740        // N0 overloaded + imbalanced + gd → opens
5741        let active =
5742            xnuma_check_active(&[90.0, 20.0], &allocs, THRESH, DELTA, &gd, &[false, false]);
5743        assert!(active[0]); // load 0.94>0.7, surplus=35, ratio=0.365>0.3, gd=true
5744
5745        // Load decreases but in hysteresis band → stays open
5746        let active = xnuma_check_active(&[75.0, 20.0], &allocs, THRESH, DELTA, &gd, &[true, false]);
5747        assert!(active[0]); // load 0.78 between 0.6 and 0.7, surplus ok, gd → preserve
5748
5749        // Load drops below low threshold → closes
5750        let active = xnuma_check_active(&[50.0, 50.0], &allocs, THRESH, DELTA, &gd, &[true, false]);
5751        assert!(!active[0]); // load 0.52 < 0.6(lo) → deactivate
5752    }
5753
5754    #[test]
5755    fn test_hysteresis_growth_toggle() {
5756        // N0 was open, then growth succeeds → closes
5757        let allocs = vec![96, 96];
5758        let gd_denied = vec![true, false];
5759        let gd_ok = vec![false, false];
5760
5761        // Active with growth denied
5762        let active = xnuma_check_active(
5763            &[90.0, 20.0],
5764            &allocs,
5765            THRESH,
5766            DELTA,
5767            &gd_denied,
5768            &[false, false],
5769        );
5770        assert!(active[0]);
5771
5772        // Growth succeeds → !gd → deactivate
5773        let active = xnuma_check_active(
5774            &[90.0, 20.0],
5775            &allocs,
5776            THRESH,
5777            DELTA,
5778            &gd_ok,
5779            &[true, false],
5780        );
5781        assert!(!active[0]); // !growth_denied → deactivate
5782    }
5783
5784    // =====================================================================
5785    // Proportional distribution to multiple sinks
5786    // =====================================================================
5787
5788    #[test]
5789    fn test_proportional_sink_distribution() {
5790        // 4 nodes: N0 source, N1-N3 sinks with different deficits
5791        // allocs: 96 each, duty: N0=180, N1=20, N2=40, N3=0
5792        // total = 240, eq_ratio = 240/384 = 0.625
5793        // N0: expected=60, surplus=120
5794        // N1: expected=60, deficit=40
5795        // N2: expected=60, deficit=20
5796        // N3: expected=60, deficit=60
5797        // total_deficit = 120
5798        let duty = vec![180.0, 20.0, 40.0, 0.0];
5799        let allocs = vec![96, 96, 96, 96];
5800        let result = xnuma_compute_rates(&duty, &allocs);
5801
5802        // rate[0][1] = 120 * 40/120 * DAMPEN = 20
5803        // rate[0][2] = 120 * 20/120 * DAMPEN = 10
5804        // rate[0][3] = 120 * 60/120 * DAMPEN = 30
5805        assert_eq!(
5806            result.rates[0][1],
5807            (40.0 * XNUMA_RATE_DAMPEN * DUTY_CYCLE_SCALE) as u64
5808        );
5809        assert_eq!(
5810            result.rates[0][2],
5811            (20.0 * XNUMA_RATE_DAMPEN * DUTY_CYCLE_SCALE) as u64
5812        );
5813        assert_eq!(
5814            result.rates[0][3],
5815            (60.0 * XNUMA_RATE_DAMPEN * DUTY_CYCLE_SCALE) as u64
5816        );
5817
5818        // Total outbound from N0 = 20 + 10 + 30 = 60 (half of surplus, dampened)
5819        let total_from_n0: u64 = (0..4).map(|dst| result.rates[0][dst]).sum();
5820        assert_eq!(
5821            total_from_n0,
5822            (120.0 * XNUMA_RATE_DAMPEN * DUTY_CYCLE_SCALE) as u64
5823        );
5824    }
5825}
5826
5827#[cfg(test)]
5828mod util_compensation_tests {
5829    /// Compute the scale factor: s = Δt / (Δt - irq - softirq - stolen)
5830    fn compute_scale(delta_total: u64, overhead: u64) -> f64 {
5831        let available = delta_total.saturating_sub(overhead);
5832        if available > 0 {
5833            (delta_total as f64 / available as f64).clamp(1.0, 20.0)
5834        } else {
5835            1.0
5836        }
5837    }
5838
5839    /// Simulate the per-CPU-scaled aggregation that refresh() does.
5840    fn scaled_aggregate(
5841        deltas: &[Vec<u64>],
5842        scales: &[f64],
5843        nr_layers: usize,
5844        elapsed: f64,
5845    ) -> Vec<f64> {
5846        (0..nr_layers)
5847            .map(|layer| {
5848                let mut sum = 0.0f64;
5849                for (cpu, cpu_deltas) in deltas.iter().enumerate() {
5850                    sum += cpu_deltas[layer] as f64 * scales[cpu];
5851                }
5852                sum / 1_000_000_000.0 / elapsed
5853            })
5854            .collect()
5855    }
5856
5857    #[test]
5858    fn test_scale_no_overhead() {
5859        // No irq/softirq/stolen → scale = 1.0
5860        assert!((compute_scale(1000, 0) - 1.0).abs() < 0.01);
5861    }
5862
5863    #[test]
5864    fn test_scale_half_overhead() {
5865        // 50% overhead → s = 1000/500 = 2.0
5866        assert!((compute_scale(1000, 500) - 2.0).abs() < 0.01);
5867    }
5868
5869    #[test]
5870    fn test_scale_high_overhead() {
5871        // 90% overhead → s = 1000/100 = 10.0
5872        assert!((compute_scale(1000, 900) - 10.0).abs() < 0.01);
5873    }
5874
5875    #[test]
5876    fn test_scale_very_high_overhead() {
5877        // 95% overhead → s = 1000/50 = 20.0 (at clamp boundary)
5878        assert!((compute_scale(1000, 950) - 20.0).abs() < 0.01);
5879    }
5880
5881    #[test]
5882    fn test_scale_clamped_at_max() {
5883        // 98% overhead → ratio = 50.0, clamped to 20.0
5884        assert!((compute_scale(1000, 980) - 20.0).abs() < 0.01);
5885    }
5886
5887    #[test]
5888    fn test_scale_all_overhead() {
5889        // 100% overhead → available = 0, returns 1.0
5890        assert!((compute_scale(1000, 1000) - 1.0).abs() < 0.01);
5891    }
5892
5893    #[test]
5894    fn test_scale_idle_cpu() {
5895        // Idle CPU: delta_total = 0 → returns 1.0
5896        assert!((compute_scale(0, 0) - 1.0).abs() < 0.01);
5897    }
5898
5899    #[test]
5900    fn test_scale_small_overhead_mostly_idle() {
5901        // 1% softirq + 1% user + 98% idle = total 10000, overhead 100
5902        // s = 10000 / 9900 ≈ 1.01 — nearly 1.0, not pathological
5903        let s = compute_scale(10000, 100);
5904        assert!((s - 1.01).abs() < 0.01, "expected ~1.01, got {}", s);
5905    }
5906
5907    #[test]
5908    fn test_uniform_scale_matches_unscaled() {
5909        let deltas = vec![vec![1_000_000_000u64; 2]; 4];
5910        let scales = vec![1.0; 4];
5911        let result = scaled_aggregate(&deltas, &scales, 2, 1.0);
5912        assert!((result[0] - 4.0).abs() < 0.01);
5913        assert!((result[1] - 4.0).abs() < 0.01);
5914    }
5915
5916    #[test]
5917    fn test_hot_cpu_weighted_more() {
5918        // CPU 0: scale=2.0, CPU 1: scale=1.0
5919        // Layer 0: 900ms on CPU 0, 100ms on CPU 1
5920        // Compensated: 900*2.0 + 100*1.0 = 1900ms = 1.9
5921        let deltas = vec![vec![900_000_000, 0], vec![100_000_000, 0]];
5922        let scales = vec![2.0, 1.0];
5923        let result = scaled_aggregate(&deltas, &scales, 2, 1.0);
5924        assert!(
5925            (result[0] - 1.9).abs() < 0.01,
5926            "expected 1.9, got {}",
5927            result[0]
5928        );
5929    }
5930
5931    #[test]
5932    fn test_cold_cpu_weighted_less() {
5933        let deltas = vec![vec![100_000_000, 0], vec![900_000_000, 0]];
5934        let scales = vec![2.0, 1.0];
5935        let result = scaled_aggregate(&deltas, &scales, 2, 1.0);
5936        assert!(
5937            (result[0] - 1.1).abs() < 0.01,
5938            "expected 1.1, got {}",
5939            result[0]
5940        );
5941    }
5942
5943    #[test]
5944    fn test_no_usage_no_compensation() {
5945        let deltas = vec![vec![0u64; 2]; 4];
5946        let scales = vec![5.0; 4];
5947        let result = scaled_aggregate(&deltas, &scales, 2, 1.0);
5948        assert_eq!(result[0], 0.0);
5949        assert_eq!(result[1], 0.0);
5950    }
5951
5952    #[test]
5953    fn test_multilayer_independent_scaling() {
5954        let deltas = vec![
5955            vec![800_000_000, 200_000_000],
5956            vec![200_000_000, 800_000_000],
5957        ];
5958        let scales = vec![3.0, 1.0];
5959        let result = scaled_aggregate(&deltas, &scales, 2, 1.0);
5960        assert!((result[0] - 2.6).abs() < 0.01);
5961        assert!((result[1] - 1.4).abs() < 0.01);
5962    }
5963
5964    #[test]
5965    fn test_elapsed_time_normalization() {
5966        let deltas = vec![vec![500_000_000u64; 1]; 1];
5967        let scales = vec![1.0];
5968        let result = scaled_aggregate(&deltas, &scales, 1, 2.0);
5969        assert!((result[0] - 0.25).abs() < 0.01);
5970    }
5971
5972    #[test]
5973    fn test_many_cpus_mixed_scales() {
5974        let deltas = vec![vec![1_000_000_000u64; 1]; 8];
5975        let scales = vec![2.5, 1.0, 2.5, 1.0, 2.5, 1.0, 2.5, 1.0];
5976        let result = scaled_aggregate(&deltas, &scales, 1, 1.0);
5977        assert!((result[0] - 14.0).abs() < 0.01);
5978    }
5979
5980    #[test]
5981    fn test_compensated_ge_raw() {
5982        // Since scale >= 1.0 always, compensated >= raw for any input.
5983        let deltas = vec![
5984            vec![500_000_000u64; 3],
5985            vec![300_000_000; 3],
5986            vec![200_000_000; 3],
5987        ];
5988        let scales_raw = vec![1.0; 3];
5989        let scales_comp = vec![1.5, 2.0, 1.0];
5990        let raw = scaled_aggregate(&deltas, &scales_raw, 3, 1.0);
5991        let comp = scaled_aggregate(&deltas, &scales_comp, 3, 1.0);
5992        for i in 0..3 {
5993            assert!(
5994                comp[i] >= raw[i] - 0.001,
5995                "layer {}: comp {} < raw {}",
5996                i,
5997                comp[i],
5998                raw[i]
5999            );
6000        }
6001    }
6002}