Skip to main content

scx_layered/
main.rs

1// Copyright (c) Meta Platforms, Inc. and affiliates.
2
3// This software may be used and distributed according to the terms of the
4// GNU General Public License version 2.
5mod bpf_skel;
6mod stats;
7
8use std::collections::BTreeMap;
9use std::collections::BTreeSet;
10use std::collections::HashMap;
11use std::collections::HashSet;
12use std::ffi::CString;
13use std::fs;
14use std::io::Write;
15use std::mem::MaybeUninit;
16use std::ops::Sub;
17use std::path::Path;
18use std::path::PathBuf;
19use std::sync::atomic::AtomicBool;
20use std::sync::atomic::Ordering;
21use std::sync::Arc;
22use std::thread::ThreadId;
23use std::time::Duration;
24use std::time::Instant;
25
26use inotify::{Inotify, WatchMask};
27use libc;
28use std::os::unix::io::AsRawFd;
29
30use anyhow::anyhow;
31use anyhow::bail;
32use anyhow::Context;
33use anyhow::Result;
34pub use bpf_skel::*;
35use clap::Parser;
36use crossbeam::channel::Receiver;
37use crossbeam::select;
38use lazy_static::lazy_static;
39use libbpf_rs::libbpf_sys;
40use libbpf_rs::AsRawLibbpf;
41use libbpf_rs::MapCore as _;
42use libbpf_rs::OpenObject;
43use libbpf_rs::ProgramInput;
44use nix::sched::CpuSet;
45use nvml_wrapper::error::NvmlError;
46use nvml_wrapper::Nvml;
47use once_cell::sync::OnceCell;
48use regex::Regex;
49use scx_bpf_compat;
50use scx_layered::alloc::{unified_alloc, LayerAlloc, LayerDemand};
51use scx_layered::*;
52use scx_raw_pmu::PMUManager;
53use scx_stats::prelude::*;
54use scx_utils::build_id;
55use scx_utils::compat;
56use scx_utils::init_libbpf_logging;
57use scx_utils::libbpf_clap_opts::LibbpfOpts;
58use scx_utils::perf;
59use scx_utils::pm::{cpu_idle_resume_latency_supported, update_cpu_idle_resume_latency};
60use scx_utils::read_netdevs;
61use scx_utils::scx_enums;
62use scx_utils::scx_ops_attach;
63use scx_utils::scx_ops_load;
64use scx_utils::scx_ops_open;
65use scx_utils::uei_exited;
66use scx_utils::uei_report;
67use scx_utils::CoreType;
68use scx_utils::Cpumask;
69use scx_utils::NetDev;
70use scx_utils::Topology;
71use scx_utils::TopologyArgs;
72use scx_utils::UserExitInfo;
73use scx_utils::NR_CPUS_POSSIBLE;
74use scx_utils::NR_CPU_IDS;
75use stats::LayerStats;
76use stats::StatsReq;
77use stats::StatsRes;
78use stats::SysStats;
79use std::collections::VecDeque;
80use sysinfo::{Pid, ProcessRefreshKind, ProcessesToUpdate, System};
81use tracing::{debug, error, info, trace, warn};
82use tracing_subscriber::filter::EnvFilter;
83use walkdir::WalkDir;
84
85const SCHEDULER_NAME: &str = "scx_layered";
86const MAX_PATH: usize = bpf_intf::consts_MAX_PATH as usize;
87const MAX_COMM: usize = bpf_intf::consts_MAX_COMM as usize;
88const MAX_LAYER_WEIGHT: u32 = bpf_intf::consts_MAX_LAYER_WEIGHT;
89const MIN_LAYER_WEIGHT: u32 = bpf_intf::consts_MIN_LAYER_WEIGHT;
90const MAX_LAYER_MATCH_ORS: usize = bpf_intf::consts_MAX_LAYER_MATCH_ORS as usize;
91const MAX_LAYER_NAME: usize = bpf_intf::consts_MAX_LAYER_NAME as usize;
92const MAX_LAYERS: usize = bpf_intf::consts_MAX_LAYERS as usize;
93const DEFAULT_LAYER_WEIGHT: u32 = bpf_intf::consts_DEFAULT_LAYER_WEIGHT;
94const USAGE_HALF_LIFE: u32 = bpf_intf::consts_USAGE_HALF_LIFE;
95const USAGE_HALF_LIFE_F64: f64 = USAGE_HALF_LIFE as f64 / 1_000_000_000.0;
96
97const LAYER_USAGE_OWNED: usize = bpf_intf::layer_usage_LAYER_USAGE_OWNED as usize;
98const LAYER_USAGE_OPEN: usize = bpf_intf::layer_usage_LAYER_USAGE_OPEN as usize;
99const LAYER_USAGE_SUM_UPTO: usize = bpf_intf::layer_usage_LAYER_USAGE_SUM_UPTO as usize;
100const LAYER_USAGE_PROTECTED: usize = bpf_intf::layer_usage_LAYER_USAGE_PROTECTED as usize;
101const LAYER_USAGE_PROTECTED_PREEMPT: usize =
102    bpf_intf::layer_usage_LAYER_USAGE_PROTECTED_PREEMPT as usize;
103const NR_LAYER_USAGES: usize = bpf_intf::layer_usage_NR_LAYER_USAGES as usize;
104
105const NR_GSTATS: usize = bpf_intf::global_stat_id_NR_GSTATS as usize;
106const NR_LSTATS: usize = bpf_intf::layer_stat_id_NR_LSTATS as usize;
107const NR_LLC_LSTATS: usize = bpf_intf::llc_layer_stat_id_NR_LLC_LSTATS as usize;
108
109const NR_LAYER_MATCH_KINDS: usize = bpf_intf::layer_match_kind_NR_LAYER_MATCH_KINDS as usize;
110
111static NVML: OnceCell<Nvml> = OnceCell::new();
112
113fn nvml() -> Result<&'static Nvml, NvmlError> {
114    NVML.get_or_try_init(Nvml::init)
115}
116
117lazy_static! {
118    static ref USAGE_DECAY: f64 = 0.5f64.powf(1.0 / USAGE_HALF_LIFE_F64);
119    static ref DFL_DISALLOW_OPEN_AFTER_US: u64 = 2 * scx_enums.SCX_SLICE_DFL / 1000;
120    static ref DFL_DISALLOW_PREEMPT_AFTER_US: u64 = 4 * scx_enums.SCX_SLICE_DFL / 1000;
121    static ref EXAMPLE_CONFIG: LayerConfig = serde_json::from_str(
122        r#"[
123          {
124            "name": "batch",
125            "comment": "tasks under system.slice or tasks with nice value > 0",
126            "matches": [[{"CgroupPrefix": "system.slice/"}], [{"NiceAbove": 0}]],
127            "kind": {"Confined": {
128              "util_range": [0.8, 0.9], "cpus_range": [0, 16],
129              "min_exec_us": 1000, "slice_us": 20000, "weight": 100,
130              "xllc_mig_min_us": 1000.0, "perf": 1024
131            }}
132          },
133          {
134            "name": "immediate",
135            "comment": "tasks under workload.slice with nice value < 0",
136            "matches": [[{"CgroupPrefix": "workload.slice/"}, {"NiceBelow": 0}]],
137            "kind": {"Open": {
138              "min_exec_us": 100, "yield_ignore": 0.25, "slice_us": 20000,
139              "preempt": true, "exclusive": true,
140              "prev_over_idle_core": true,
141              "weight": 100, "perf": 1024
142            }}
143          },
144          {
145            "name": "stress-ng",
146            "comment": "stress-ng test layer",
147            "matches": [[{"CommPrefix": "stress-ng"}], [{"PcommPrefix": "stress-ng"}]],
148            "kind": {"Confined": {
149              "util_range": [0.2, 0.8],
150              "min_exec_us": 800, "preempt": true, "slice_us": 800,
151              "weight": 100, "growth_algo": "Topo", "perf": 1024
152            }}
153          },
154          {
155            "name": "normal",
156            "comment": "the rest",
157            "matches": [[]],
158            "kind": {"Grouped": {
159              "util_range": [0.5, 0.6], "util_includes_open_cputime": true,
160              "min_exec_us": 200, "slice_us": 20000, "weight": 100,
161              "xllc_mig_min_us": 100.0, "growth_algo": "Linear", "perf": 1024
162            }}
163          }
164        ]"#,
165    )
166    .unwrap();
167}
168
169/// scx_layered: A highly configurable multi-layer sched_ext scheduler
170///
171/// scx_layered allows classifying tasks into multiple layers and applying
172/// different scheduling policies to them. The configuration is specified in
173/// json and composed of two parts - matches and policies.
174///
175/// Matches
176/// =======
177///
178/// Whenever a task is forked or its attributes are changed, the task goes
179/// through a series of matches to determine the layer it belongs to. A
180/// match set is composed of OR groups of AND blocks. An example:
181///
182///   "matches": [
183///     [
184///       {
185///         "CgroupPrefix": "system.slice/"
186///       }
187///     ],
188///     [
189///       {
190///         "CommPrefix": "fbagent"
191///       },
192///       {
193///         "NiceAbove": 0
194///       }
195///     ]
196///   ],
197///
198/// The outer array contains the OR groups and the inner AND blocks, so the
199/// above matches:
200///
201/// - Tasks which are in the cgroup sub-hierarchy under "system.slice".
202///
203/// - Or tasks whose comm starts with "fbagent" and have a nice value > 0.
204///
205/// Currently, the following matches are supported:
206///
207/// - CgroupPrefix: Matches the prefix of the cgroup that the task belongs
208///   to. As this is a string match, whether the pattern has the trailing
209///   '/' makes a difference. For example, "TOP/CHILD/" only matches tasks
210///   which are under that particular cgroup while "TOP/CHILD" also matches
211///   tasks under "TOP/CHILD0/" or "TOP/CHILD1/".
212///
213/// - CommPrefix: Matches the task's comm prefix.
214///
215/// - PcommPrefix: Matches the task's thread group leader's comm prefix.
216///
217/// - NiceAbove: Matches if the task's nice value is greater than the
218///   pattern.
219///
220/// - NiceBelow: Matches if the task's nice value is smaller than the
221///   pattern.
222///
223/// - NiceEquals: Matches if the task's nice value is exactly equal to
224///   the pattern.
225///
226/// - UIDEquals: Matches if the task's effective user id matches the value
227///
228/// - GIDEquals: Matches if the task's effective group id matches the value.
229///
230/// - PIDEquals: Matches if the task's pid matches the value.
231///
232/// - PPIDEquals: Matches if the task's ppid matches the value.
233///
234/// - TGIDEquals: Matches if the task's tgid matches the value.
235///
236/// - NSPIDEquals: Matches if the task's namespace id and pid matches the values.
237///
238/// - NSEquals: Matches if the task's namespace id matches the values.
239///
240/// - IsGroupLeader: Bool. When true, matches if the task is group leader
241///   (i.e. PID == TGID), aka the thread from which other threads are made.
242///   When false, matches if the task is *not* the group leader (i.e. the rest).
243///
244/// - CmdJoin: Matches when the task uses pthread_setname_np to send a join/leave
245/// command to the scheduler. See examples/cmdjoin.c for more details.
246///
247/// - UsedGpuTid: Bool. When true, matches if the tasks which have used
248///   gpus by tid.
249///
250/// - UsedGpuPid: Bool. When true, matches if the tasks which have used gpu
251///   by tgid/pid.
252///
253/// - [EXPERIMENTAL] AvgRuntime: (u64, u64). Match tasks whose average runtime
254///   is within the provided values [min, max).
255///
256/// - HintEquals: u64. Match tasks whose hint value equals this value.
257///   The value must be in the range [0, 1024].
258///
259/// - SystemCpuUtilBelow: f64. Match when the system CPU utilization fraction
260///   is below the specified threshold (a value in the range [0.0, 1.0]). This
261///   option can only be used in conjunction with HintEquals.
262///
263/// - DsqInsertBelow: f64. Match when the layer DSQ insertion fraction is below
264///   the specified threshold (a value in the range [0.0, 1.0]). This option can
265///   only be used in conjunction with HintEquals.
266///
267/// While there are complexity limitations as the matches are performed in
268/// BPF, it is straightforward to add more types of matches.
269///
270/// Templates
271/// ---------
272///
273/// Templates let us create a variable number of layers dynamically at initialization
274/// time out of a cgroup name suffix/prefix. Sometimes we know there are multiple
275/// applications running on a machine, each with their own cgroup but do not know the
276/// exact names of the applications or cgroups, e.g., in cloud computing contexts where
277/// workloads are placed on machines dynamically and run under cgroups whose name is
278/// autogenerated. In that case, we cannot hardcode the cgroup match rules when writing
279/// the configuration. We thus cannot easily prevent tasks from different cgroups from
280/// falling into the same layer and affecting each other's performance.
281///
282///
283/// Templates offer a solution to this problem by generating one layer for each such cgroup,
284/// provided these cgroups share a suffix, and that the suffix is unique to them. Templates
285/// have a cgroup suffix rule that we use to find the relevant cgroups in the system. For each
286/// such cgroup, we copy the layer config and add a matching rule that matches just this cgroup.
287///
288///
289/// Policies
290/// ========
291///
292/// The following is an example policy configuration for a layer.
293///
294///   "kind": {
295///     "Confined": {
296///       "cpus_range": [1, 8],
297///       "util_range": [0.8, 0.9]
298///     }
299///   }
300///
301/// It's of "Confined" kind, which tries to concentrate the layer's tasks
302/// into a limited number of CPUs. In the above case, the number of CPUs
303/// assigned to the layer is scaled between 1 and 8 so that the per-cpu
304/// utilization is kept between 80% and 90%. If the CPUs are loaded higher
305/// than 90%, more CPUs are allocated to the layer. If the utilization drops
306/// below 80%, the layer loses CPUs.
307///
308/// Currently, the following policy kinds are supported:
309///
310/// - Confined: Tasks are restricted to the allocated CPUs. The number of
311///   CPUs allocated is modulated to keep the per-CPU utilization in
312///   "util_range". The range can optionally be restricted with the
313///   "cpus_range" property.
314///
315/// - Grouped: Similar to Confined but tasks may spill outside if there are
316///   idle CPUs outside the allocated ones. The range can optionally be
317///   restricted with the "cpus_range" property.
318///
319/// - Open: Prefer the CPUs which are not occupied by Confined or Grouped
320///   layers. Tasks in this group will spill into occupied CPUs if there are
321///   no unoccupied idle CPUs.
322///
323/// All layers take the following options:
324///
325/// - min_exec_us: Minimum execution time in microseconds. Whenever a task
326///   is scheduled in, this is the minimum CPU time that it's charged no
327///   matter how short the actual execution time may be.
328///
329/// - yield_ignore: Yield ignore ratio. If 0.0, yield(2) forfeits a whole
330///   execution slice. 0.25 yields three quarters of an execution slice and
331///   so on. If 1.0, yield is completely ignored.
332///
333/// - slice_us: Scheduling slice duration in microseconds.
334///
335/// - fifo: Use FIFO queues within the layer instead of the default vtime.
336///
337/// - preempt: If true, tasks in the layer will preempt tasks which belong
338///   to other non-preempting layers when no idle CPUs are available.
339///
340/// - preempt_first: If true, tasks in the layer will try to preempt tasks
341///   in their previous CPUs before trying to find idle CPUs.
342///
343/// - exclusive: If true, tasks in the layer will occupy the whole core. The
344///   other logical CPUs sharing the same core will be kept idle. This isn't
345///   a hard guarantee, so don't depend on it for security purposes.
346///
347/// - allow_node_aligned: DEPRECATED. Node-aligned tasks are now always
348///   dispatched on layer DSQs. This field is ignored if specified.
349///
350/// - prev_over_idle_core: On SMT enabled systems, prefer using the same CPU
351///   when picking a CPU for tasks on this layer, even if that CPUs SMT
352///   sibling is processing a task.
353///
354/// - weight: Weight of the layer, which is a range from 1 to 10000 with a
355///   default of 100. Layer weights are used during contention to prevent
356///   starvation across layers. Weights are used in combination with
357///   utilization to determine the infeasible adjusted weight with higher
358///   weights having a larger adjustment in adjusted utilization.
359///
360/// - disallow_open_after_us: Duration to wait after machine reaches saturation
361///   before confining tasks in Open layers.
362///
363/// - cpus_range_frac: Array of 2 floats between 0 and 1.0. Lower and upper
364///   bound fractions of all CPUs to give to a layer. Mutually exclusive
365///   with cpus_range.
366///
367/// - disallow_preempt_after_us: Duration to wait after machine reaches saturation
368///   before confining tasks to preempt.
369///
370/// - xllc_mig_min_us: Skip cross-LLC migrations if they are likely to run on
371///   their existing LLC sooner than this.
372///
373/// - idle_smt: *** DEPRECATED ****
374///
375/// - growth_algo: Determines the order in which CPUs are allocated to the
376///   layer as it grows. All algorithms are NUMA-aware and produce per-node
377///   core orderings. Most are locality algorithms that prefer the layer's
378///   home node and spill to remote nodes only when local capacity is
379///   exhausted. NUMA-spread algorithms (RoundRobin, NodeSpread*) instead
380///   enforce equal CPU counts across all NUMA nodes. Default: Sticky.
381///
382/// - perf: CPU performance target. 0 means no configuration. A value
383///   between 1 and 1024 indicates the performance level CPUs running tasks
384///   in this layer are configured to using scx_bpf_cpuperf_set().
385///
386/// - idle_resume_us: Sets the idle resume QoS value. CPU idle time governors are expected to
387///   regard the minimum of the global (effective) CPU latency limit and the effective resume
388///   latency constraint for the given CPU as the upper limit for the exit latency of the idle
389///   states. See the latest kernel docs for more details:
390///   https://www.kernel.org/doc/html/latest/admin-guide/pm/cpuidle.html
391///
392/// - nodes: If set the layer will use the set of NUMA nodes for scheduling
393///   decisions. If unset then all available NUMA nodes will be used. If the
394///   llcs value is set the cpuset of NUMA nodes will be or'ed with the LLC
395///   config.
396///
397/// - llcs: If set the layer will use the set of LLCs (last level caches)
398///   for scheduling decisions. If unset then all LLCs will be used. If
399///   the nodes value is set the cpuset of LLCs will be or'ed with the nodes
400///   config.
401///
402///
403/// Similar to matches, adding new policies and extending existing ones
404/// should be relatively straightforward.
405///
406/// Configuration example and running scx_layered
407/// =============================================
408///
409/// An scx_layered config is composed of layer configs. A layer config is
410/// composed of a name, a set of matches, and a policy block. Running the
411/// following will write an example configuration into example.json.
412///
413///   $ scx_layered -e example.json
414///
415/// Note that the last layer in the configuration must have an empty match set
416/// as a catch-all for tasks which haven't been matched into previous layers.
417///
418/// The configuration can be specified in multiple json files and
419/// command line arguments, which are concatenated in the specified
420/// order. Each must contain valid layer configurations.
421///
422/// By default, an argument to scx_layered is interpreted as a JSON string. If
423/// the argument is a pointer to a JSON file, it should be prefixed with file:
424/// or f: as follows:
425///
426///   $ scx_layered file:example.json
427///   ...
428///   $ scx_layered f:example.json
429///
430/// Monitoring Statistics
431/// =====================
432///
433/// Run with `--stats INTERVAL` to enable stats monitoring. There is
434/// also an scx_stat server listening on /var/run/scx/root/stat that can
435/// be monitored by running `scx_layered --monitor INTERVAL` separately.
436///
437///   ```bash
438///   $ scx_layered --monitor 1
439///   tot= 117909 local=86.20 open_idle= 0.21 affn_viol= 1.37 proc=6ms
440///   busy= 34.2 util= 1733.6 load=  21744.1 fb_cpus=[n0:1]
441///     batch    : util/frac=   11.8/  0.7 load/frac=     29.7:  0.1 tasks=  2597
442///                tot=   3478 local=67.80 open_idle= 0.00 preempt= 0.00 affn_viol= 0.00
443///                cpus=  2 [  2,  2] 04000001 00000000
444///     immediate: util/frac= 1218.8/ 70.3 load/frac=  21399.9: 98.4 tasks=  1107
445///                tot=  68997 local=90.57 open_idle= 0.26 preempt= 9.36 affn_viol= 0.00
446///                cpus= 50 [ 50, 50] fbfffffe 000fffff
447///     normal   : util/frac=  502.9/ 29.0 load/frac=    314.5:  1.4 tasks=  3512
448///                tot=  45434 local=80.97 open_idle= 0.16 preempt= 0.00 affn_viol= 3.56
449///                cpus= 50 [ 50, 50] fbfffffe 000fffff
450///   ```
451///
452/// Global statistics: see [`SysStats`]
453///
454/// Per-layer statistics: see [`LayerStats`]
455///
456#[derive(Debug, Parser)]
457#[command(verbatim_doc_comment)]
458struct Opts {
459    /// Deprecated, noop, use RUST_LOG or --log-level instead.
460    #[clap(short = 'v', long, action = clap::ArgAction::Count)]
461    verbose: u8,
462
463    /// Scheduling slice duration in microseconds.
464    #[clap(short = 's', long, default_value = "20000")]
465    slice_us: u64,
466
467    /// Maximum consecutive execution time in microseconds. A task may be
468    /// allowed to keep executing on a CPU for this long. Note that this is
469    /// the upper limit and a task may have to moved off the CPU earlier. 0
470    /// indicates default - 20 * slice_us.
471    #[clap(short = 'M', long, default_value = "0")]
472    max_exec_us: u64,
473
474    /// Scheduling interval in seconds.
475    #[clap(short = 'i', long, default_value = "0.1")]
476    interval: f64,
477
478    /// ***DEPRECATED*** Disable load-fraction based max layer CPU limit.
479    /// recommended.
480    #[clap(short = 'n', long, default_value = "false")]
481    no_load_frac_limit: bool,
482
483    /// Exit debug dump buffer length. 0 indicates default.
484    #[clap(long, default_value = "0")]
485    exit_dump_len: u32,
486
487    /// Specify the logging level. Accepts rust's envfilter syntax for modular
488    /// logging: https://docs.rs/tracing-subscriber/latest/tracing_subscriber/filter/struct.EnvFilter.html#example-syntax. Examples: ["info", "warn,tokio=info"]
489    #[clap(long, default_value = "info")]
490    log_level: String,
491
492    /// Disable topology awareness. When enabled, the "nodes" and "llcs" settings on
493    /// a layer are ignored. Defaults to false on topologies with multiple NUMA nodes
494    /// or LLCs, and true otherwise.
495    #[arg(short = 't', long, num_args = 0..=1, default_missing_value = "true", require_equals = true)]
496    disable_topology: Option<bool>,
497
498    /// Disable monitor
499    #[clap(long)]
500    monitor_disable: bool,
501
502    /// Write example layer specifications into the file and exit.
503    #[clap(short = 'e', long)]
504    example: Option<String>,
505
506    /// ***DEPRECATED*** Disables preemption if the weighted load fraction
507    /// of a layer (load_frac_adj) exceeds the threshold. The default is
508    /// disabled (0.0).
509    #[clap(long, default_value = "0.0")]
510    layer_preempt_weight_disable: f64,
511
512    /// ***DEPRECATED*** Disables layer growth if the weighted load fraction
513    /// of a layer (load_frac_adj) exceeds the threshold. The default is
514    /// disabled (0.0).
515    #[clap(long, default_value = "0.0")]
516    layer_growth_weight_disable: f64,
517
518    /// Enable stats monitoring with the specified interval.
519    #[clap(long)]
520    stats: Option<f64>,
521
522    /// Run in stats monitoring mode with the specified interval. Scheduler
523    /// is not launched.
524    #[clap(long)]
525    monitor: Option<f64>,
526
527    /// Column limit for stats monitor output.
528    #[clap(long, default_value = "95")]
529    stats_columns: usize,
530
531    /// Disable per-LLC stats in monitor output.
532    #[clap(long)]
533    stats_no_llc: bool,
534
535    /// Run with example layer specifications (useful for e.g. CI pipelines)
536    #[clap(long)]
537    run_example: bool,
538
539    /// Allocate CPUs at an SMT granularity (not core)
540    #[clap(long)]
541    allow_partial_core: bool,
542
543    /// ***DEPRECATED *** Enables iteration over local LLCs first for
544    /// dispatch.
545    #[clap(long, default_value = "false")]
546    local_llc_iteration: bool,
547
548    /// Low priority fallback DSQs are used to execute tasks with custom CPU
549    /// affinities. These DSQs are immediately executed iff a CPU is
550    /// otherwise idle. However, after the specified wait, they are
551    /// guaranteed upto --lo-fb-share fraction of each CPU.
552    #[clap(long, default_value = "10000")]
553    lo_fb_wait_us: u64,
554
555    /// The fraction of CPU time guaranteed to low priority fallback DSQs.
556    /// See --lo-fb-wait-us.
557    #[clap(long, default_value = ".05")]
558    lo_fb_share: f64,
559
560    /// Disable antistall
561    #[clap(long, default_value = "false")]
562    disable_antistall: bool,
563
564    /// Enable numa topology based gpu task affinitization.
565    #[clap(long, default_value = "false")]
566    enable_gpu_affinitize: bool,
567
568    /// Interval at which to reaffinitize gpu tasks to numa nodes.
569    /// Defaults to 900s
570    #[clap(long, default_value = "900")]
571    gpu_affinitize_secs: u64,
572
573    /// Enable match debug
574    /// This stores a mapping of task tid
575    /// to layer id such that bpftool map dump
576    /// can be used to debug layer matches.
577    #[clap(long, default_value = "false")]
578    enable_match_debug: bool,
579
580    /// Maximum task runnable_at delay (in seconds) before antistall turns on
581    #[clap(long, default_value = "3")]
582    antistall_sec: u64,
583
584    /// Enable gpu support
585    #[clap(long, default_value = "false")]
586    enable_gpu_support: bool,
587
588    /// Gpu Kprobe Level
589    /// The value set here determines how aggressive
590    /// the kprobes enabled on gpu driver functions are.
591    /// Higher values are more aggressive, incurring more system overhead
592    /// and more accurately identifying PIDs using GPUs in a more timely manner.
593    /// Lower values incur less system overhead, at the cost of less accurately
594    /// identifying GPU pids and taking longer to do so.
595    #[clap(long, default_value = "3")]
596    gpu_kprobe_level: u64,
597
598    /// Enable netdev IRQ balancing. This is experimental and should be used with caution.
599    #[clap(long, default_value = "false")]
600    netdev_irq_balance: bool,
601
602    /// Disable queued wakeup optimization.
603    #[clap(long, default_value = "false")]
604    disable_queued_wakeup: bool,
605
606    /// Per-cpu kthreads are preempting by default. Make it not so.
607    #[clap(long, default_value = "false")]
608    disable_percpu_kthread_preempt: bool,
609
610    /// Only highpri (nice < 0) per-cpu kthreads are preempting by default.
611    /// Make every per-cpu kthread preempting. Meaningful only if
612    /// --disable-percpu-kthread-preempt is not set.
613    #[clap(long, default_value = "false")]
614    percpu_kthread_preempt_all: bool,
615
616    /// Print scheduler version and exit.
617    #[clap(short = 'V', long, action = clap::ArgAction::SetTrue)]
618    version: bool,
619
620    /// Optional run ID for tracking scheduler instances.
621    #[clap(long)]
622    run_id: Option<u64>,
623
624    /// Show descriptions for statistics.
625    #[clap(long)]
626    help_stats: bool,
627
628    /// Layer specification. See --help.
629    specs: Vec<String>,
630
631    /// Periodically force tasks in layers using the AvgRuntime match rule to reevaluate which layer they belong to. Default period of 2s.
632    /// turns this off.
633    #[clap(long, default_value = "2000")]
634    layer_refresh_ms_avgruntime: u64,
635
636    /// Set the path for pinning the task hint map.
637    #[clap(long, default_value = "")]
638    task_hint_map: String,
639
640    /// Print the config (after template expansion) and exit.
641    #[clap(long, default_value = "false")]
642    print_and_exit: bool,
643
644    /// Enable affinitized task to use hi fallback queue to get more CPU time.
645    #[clap(long, default_value = "")]
646    hi_fb_thread_name: String,
647
648    #[clap(flatten, next_help_heading = "Topology Options")]
649    topology: TopologyArgs,
650
651    #[clap(flatten, next_help_heading = "Libbpf Options")]
652    pub libbpf: LibbpfOpts,
653}
654
655// Cgroup event types for inter-thread communication
656#[derive(Debug, Clone)]
657enum CgroupEvent {
658    Created {
659        path: String,
660        cgroup_id: u64,    // inode number
661        match_bitmap: u64, // bitmap of matched regex rules
662    },
663    Removed {
664        path: String,
665        cgroup_id: u64, // inode number
666    },
667}
668
669fn read_total_cpu(reader: &fb_procfs::ProcReader) -> Result<fb_procfs::CpuStat> {
670    reader
671        .read_stat()
672        .context("Failed to read procfs")?
673        .total_cpu
674        .ok_or_else(|| anyhow!("Could not read total cpu stat in proc"))
675}
676
677fn calc_util(curr: &fb_procfs::CpuStat, prev: &fb_procfs::CpuStat) -> Result<f64> {
678    match (curr, prev) {
679        (
680            fb_procfs::CpuStat {
681                user_usec: Some(curr_user),
682                nice_usec: Some(curr_nice),
683                system_usec: Some(curr_system),
684                idle_usec: Some(curr_idle),
685                iowait_usec: Some(curr_iowait),
686                irq_usec: Some(curr_irq),
687                softirq_usec: Some(curr_softirq),
688                stolen_usec: Some(curr_stolen),
689                ..
690            },
691            fb_procfs::CpuStat {
692                user_usec: Some(prev_user),
693                nice_usec: Some(prev_nice),
694                system_usec: Some(prev_system),
695                idle_usec: Some(prev_idle),
696                iowait_usec: Some(prev_iowait),
697                irq_usec: Some(prev_irq),
698                softirq_usec: Some(prev_softirq),
699                stolen_usec: Some(prev_stolen),
700                ..
701            },
702        ) => {
703            let idle_usec = curr_idle.saturating_sub(*prev_idle);
704            let iowait_usec = curr_iowait.saturating_sub(*prev_iowait);
705            let user_usec = curr_user.saturating_sub(*prev_user);
706            let system_usec = curr_system.saturating_sub(*prev_system);
707            let nice_usec = curr_nice.saturating_sub(*prev_nice);
708            let irq_usec = curr_irq.saturating_sub(*prev_irq);
709            let softirq_usec = curr_softirq.saturating_sub(*prev_softirq);
710            let stolen_usec = curr_stolen.saturating_sub(*prev_stolen);
711
712            let busy_usec =
713                user_usec + system_usec + nice_usec + irq_usec + softirq_usec + stolen_usec;
714            let total_usec = idle_usec + busy_usec + iowait_usec;
715            if total_usec > 0 {
716                Ok(((busy_usec as f64) / (total_usec as f64)).clamp(0.0, 1.0))
717            } else {
718                Ok(1.0)
719            }
720        }
721        _ => bail!("Missing stats in cpustat"),
722    }
723}
724
725fn copy_into_cstr(dst: &mut [i8], src: &str) {
726    let cstr = CString::new(src).unwrap();
727    let bytes = unsafe { std::mem::transmute::<&[u8], &[i8]>(cstr.as_bytes_with_nul()) };
728    dst[0..bytes.len()].copy_from_slice(bytes);
729}
730
731fn read_cpu_ctxs(skel: &BpfSkel) -> Result<Vec<bpf_intf::cpu_ctx>> {
732    let mut cpu_ctxs = vec![];
733    let cpu_ctxs_vec = skel
734        .maps
735        .cpu_ctxs
736        .lookup_percpu(&0u32.to_ne_bytes(), libbpf_rs::MapFlags::ANY)
737        .context("Failed to lookup cpu_ctx")?
738        .unwrap();
739    for cpu in 0..*NR_CPUS_POSSIBLE {
740        cpu_ctxs.push(*unsafe {
741            &*(cpu_ctxs_vec[cpu].as_slice().as_ptr() as *const bpf_intf::cpu_ctx)
742        });
743    }
744    Ok(cpu_ctxs)
745}
746
747#[derive(Clone, Debug)]
748struct BpfStats {
749    gstats: Vec<u64>,
750    lstats: Vec<Vec<u64>>,
751    lstats_sums: Vec<u64>,
752    llc_lstats: Vec<Vec<Vec<u64>>>, // [layer][llc][stat]
753}
754
755impl BpfStats {
756    fn read(skel: &BpfSkel, cpu_ctxs: &[bpf_intf::cpu_ctx]) -> Self {
757        let nr_layers = skel.maps.rodata_data.as_ref().unwrap().nr_layers as usize;
758        let nr_llcs = skel.maps.rodata_data.as_ref().unwrap().nr_llcs as usize;
759        let mut gstats = vec![0u64; NR_GSTATS];
760        let mut lstats = vec![vec![0u64; NR_LSTATS]; nr_layers];
761        let mut llc_lstats = vec![vec![vec![0u64; NR_LLC_LSTATS]; nr_llcs]; nr_layers];
762
763        for cpu in 0..*NR_CPUS_POSSIBLE {
764            for stat in 0..NR_GSTATS {
765                gstats[stat] += cpu_ctxs[cpu].gstats[stat];
766            }
767            for layer in 0..nr_layers {
768                for stat in 0..NR_LSTATS {
769                    lstats[layer][stat] += cpu_ctxs[cpu].lstats[layer][stat];
770                }
771            }
772        }
773
774        let mut lstats_sums = vec![0u64; NR_LSTATS];
775        for layer in 0..nr_layers {
776            for stat in 0..NR_LSTATS {
777                lstats_sums[stat] += lstats[layer][stat];
778            }
779        }
780
781        for llc_id in 0..nr_llcs {
782            // XXX - This would be a lot easier if llc_ctx were in
783            // the bss. Unfortunately, kernel < v6.12 crashes and
784            // kernel >= v6.12 fails verification after such
785            // conversion due to seemingly verifier bugs. Convert to
786            // bss maps later.
787            let key = llc_id as u32;
788            let llc_id_slice =
789                unsafe { std::slice::from_raw_parts((&key as *const u32) as *const u8, 4) };
790            let v = skel
791                .maps
792                .llc_data
793                .lookup(llc_id_slice, libbpf_rs::MapFlags::ANY)
794                .unwrap()
795                .unwrap();
796            let llcc = unsafe { *(v.as_slice().as_ptr() as *const bpf_intf::llc_ctx) };
797
798            for layer_id in 0..nr_layers {
799                for stat_id in 0..NR_LLC_LSTATS {
800                    llc_lstats[layer_id][llc_id][stat_id] = llcc.lstats[layer_id][stat_id];
801                }
802            }
803        }
804
805        Self {
806            gstats,
807            lstats,
808            lstats_sums,
809            llc_lstats,
810        }
811    }
812}
813
814impl<'a, 'b> Sub<&'b BpfStats> for &'a BpfStats {
815    type Output = BpfStats;
816
817    fn sub(self, rhs: &'b BpfStats) -> BpfStats {
818        let vec_sub = |l: &[u64], r: &[u64]| l.iter().zip(r.iter()).map(|(l, r)| *l - *r).collect();
819        BpfStats {
820            gstats: vec_sub(&self.gstats, &rhs.gstats),
821            lstats: self
822                .lstats
823                .iter()
824                .zip(rhs.lstats.iter())
825                .map(|(l, r)| vec_sub(l, r))
826                .collect(),
827            lstats_sums: vec_sub(&self.lstats_sums, &rhs.lstats_sums),
828            llc_lstats: self
829                .llc_lstats
830                .iter()
831                .zip(rhs.llc_lstats.iter())
832                .map(|(l_layer, r_layer)| {
833                    l_layer
834                        .iter()
835                        .zip(r_layer.iter())
836                        .map(|(l_llc, r_llc)| {
837                            let (l_llc, mut r_llc) = (l_llc.clone(), r_llc.clone());
838                            // Lat is not subtractable, take L side.
839                            r_llc[bpf_intf::llc_layer_stat_id_LLC_LSTAT_LAT as usize] = 0;
840                            vec_sub(&l_llc, &r_llc)
841                        })
842                        .collect()
843                })
844                .collect(),
845        }
846    }
847}
848
849#[derive(Clone, Debug)]
850struct Stats {
851    at: Instant,
852    elapsed: Duration,
853    topo: Arc<Topology>,
854    nr_layers: usize,
855    nr_layer_tasks: Vec<usize>,
856    layer_nr_node_pinned_tasks: Vec<Vec<u64>>,
857
858    total_util: f64, // Running AVG of sum of layer_utils
859    layer_utils: Vec<Vec<f64>>,
860    prev_layer_usages: Vec<Vec<u64>>,
861    layer_node_pinned_utils: Vec<Vec<f64>>,
862    prev_layer_node_pinned_usages: Vec<Vec<u64>>,
863
864    layer_membws: Vec<Vec<f64>>, // Estimated memory bandsidth consumption
865    prev_layer_membw_agg: Vec<Vec<u64>>, // Estimated aggregate membw consumption
866
867    cpu_busy: f64, // Read from /proc, maybe higher than total_util
868    prev_total_cpu: fb_procfs::CpuStat,
869    prev_pmu_resctrl_membw: (u64, u64), // (PMU-reported membw, resctrl-reported membw)
870
871    system_cpu_util_ewma: f64,       // 10s EWMA of system CPU utilization
872    layer_dsq_insert_ewma: Vec<f64>, // 10s EWMA of per-layer DSQ insertion ratio
873
874    bpf_stats: BpfStats,
875    prev_bpf_stats: BpfStats,
876
877    processing_dur: Duration,
878    prev_processing_dur: Duration,
879
880    layer_slice_us: Vec<u64>,
881
882    gpu_tasks_affinitized: u64,
883    gpu_task_affinitization_ms: u64,
884}
885
886impl Stats {
887    fn read_layer_usages(cpu_ctxs: &[bpf_intf::cpu_ctx], nr_layers: usize) -> Vec<Vec<u64>> {
888        let mut layer_usages = vec![vec![0u64; NR_LAYER_USAGES]; nr_layers];
889
890        for cpu in 0..*NR_CPUS_POSSIBLE {
891            for layer in 0..nr_layers {
892                for usage in 0..NR_LAYER_USAGES {
893                    layer_usages[layer][usage] += cpu_ctxs[cpu].layer_usages[layer][usage];
894                }
895            }
896        }
897
898        layer_usages
899    }
900
901    // Same as above, but for aggregate memory bandwidth consumption.
902    fn read_layer_membw_agg(cpu_ctxs: &[bpf_intf::cpu_ctx], nr_layers: usize) -> Vec<Vec<u64>> {
903        let mut layer_membw_agg = vec![vec![0u64; NR_LAYER_USAGES]; nr_layers];
904
905        for cpu in 0..*NR_CPUS_POSSIBLE {
906            for layer in 0..nr_layers {
907                for usage in 0..NR_LAYER_USAGES {
908                    layer_membw_agg[layer][usage] += cpu_ctxs[cpu].layer_membw_agg[layer][usage];
909                }
910            }
911        }
912
913        layer_membw_agg
914    }
915
916    fn read_layer_node_pinned_usages(
917        cpu_ctxs: &[bpf_intf::cpu_ctx],
918        topo: &Topology,
919        nr_layers: usize,
920        nr_nodes: usize,
921    ) -> Vec<Vec<u64>> {
922        let mut usages = vec![vec![0u64; nr_nodes]; nr_layers];
923
924        for cpu in 0..*NR_CPUS_POSSIBLE {
925            let node = topo.all_cpus.get(&cpu).map_or(0, |c| c.node_id);
926            for layer in 0..nr_layers {
927                usages[layer][node] += cpu_ctxs[cpu].node_pinned_usage[layer];
928            }
929        }
930
931        usages
932    }
933
934    /// Use the membw reported by resctrl to normalize the values reported by hw counters.
935    /// We have the following problem:
936    /// 1) We want per-task memory bandwidth reporting. We cannot do this with resctrl, much
937    /// less transparently, since we would require different RMID for each task.
938    /// 2) We want to directly use perf counters for tracking per-task memory bandwidth, but
939    /// we can't: Non-resctrl counters do not measure the right thing (e.g., they only measure
940    /// proxies like load operations),
941    /// 3) Resctrl counters are not accessible directly so we cannot read them from the BPF side.
942    ///
943    /// Approximate per-task memory bandwidth using perf counters to measure _relative_ memory
944    /// bandwidth usage.
945    fn resctrl_read_total_membw() -> Result<u64> {
946        let mut total_membw = 0u64;
947        for entry in WalkDir::new("/sys/fs/resctrl/mon_data")
948            .min_depth(1)
949            .into_iter()
950            .filter_map(Result::ok)
951            .filter(|x| x.path().is_dir())
952        {
953            let mut path = entry.path().to_path_buf();
954            path.push("mbm_total_bytes");
955            total_membw += fs::read_to_string(path)?.trim().parse::<u64>()?;
956        }
957
958        Ok(total_membw)
959    }
960
961    fn new(
962        skel: &mut BpfSkel,
963        proc_reader: &fb_procfs::ProcReader,
964        topo: Arc<Topology>,
965        gpu_task_affinitizer: &GpuTaskAffinitizer,
966    ) -> Result<Self> {
967        let nr_layers = skel.maps.rodata_data.as_ref().unwrap().nr_layers as usize;
968        let nr_nodes = topo.nodes.len();
969        let cpu_ctxs = read_cpu_ctxs(skel)?;
970        let bpf_stats = BpfStats::read(skel, &cpu_ctxs);
971        let pmu_membw = Self::read_layer_membw_agg(&cpu_ctxs, nr_layers);
972
973        Ok(Self {
974            at: Instant::now(),
975            elapsed: Default::default(),
976
977            topo: topo.clone(),
978            nr_layers,
979            nr_layer_tasks: vec![0; nr_layers],
980            layer_nr_node_pinned_tasks: vec![vec![0; nr_nodes]; nr_layers],
981
982            total_util: 0.0,
983            layer_utils: vec![vec![0.0; NR_LAYER_USAGES]; nr_layers],
984            prev_layer_usages: Self::read_layer_usages(&cpu_ctxs, nr_layers),
985            layer_node_pinned_utils: vec![vec![0.0; nr_nodes]; nr_layers],
986            prev_layer_node_pinned_usages: Self::read_layer_node_pinned_usages(
987                &cpu_ctxs, &topo, nr_layers, nr_nodes,
988            ),
989
990            layer_membws: vec![vec![0.0; NR_LAYER_USAGES]; nr_layers],
991            // This is not normalized because we don't have enough history to do so.
992            // It should not matter too much, since the value is dropped on the first
993            // iteration.
994            prev_layer_membw_agg: pmu_membw,
995            prev_pmu_resctrl_membw: (0, 0),
996
997            cpu_busy: 0.0,
998            prev_total_cpu: read_total_cpu(proc_reader)?,
999            system_cpu_util_ewma: 0.0,
1000            layer_dsq_insert_ewma: vec![0.0; nr_layers],
1001
1002            bpf_stats: bpf_stats.clone(),
1003            prev_bpf_stats: bpf_stats,
1004
1005            processing_dur: Default::default(),
1006            prev_processing_dur: Default::default(),
1007
1008            layer_slice_us: vec![0; nr_layers],
1009            gpu_tasks_affinitized: gpu_task_affinitizer.tasks_affinitized,
1010            gpu_task_affinitization_ms: gpu_task_affinitizer.last_task_affinitization_ms,
1011        })
1012    }
1013
1014    fn refresh(
1015        &mut self,
1016        skel: &mut BpfSkel,
1017        proc_reader: &fb_procfs::ProcReader,
1018        now: Instant,
1019        cur_processing_dur: Duration,
1020        gpu_task_affinitizer: &GpuTaskAffinitizer,
1021    ) -> Result<()> {
1022        let elapsed = now.duration_since(self.at);
1023        let elapsed_f64 = elapsed.as_secs_f64();
1024        let cpu_ctxs = read_cpu_ctxs(skel)?;
1025
1026        let layers = &skel.maps.bss_data.as_ref().unwrap().layers;
1027        let nr_layer_tasks: Vec<usize> = layers
1028            .iter()
1029            .take(self.nr_layers)
1030            .map(|layer| layer.nr_tasks as usize)
1031            .collect();
1032        let layer_nr_node_pinned_tasks: Vec<Vec<u64>> = layers
1033            .iter()
1034            .take(self.nr_layers)
1035            .map(|layer| {
1036                layer.node[..self.topo.nodes.len()]
1037                    .iter()
1038                    .map(|n| n.nr_pinned_tasks)
1039                    .collect()
1040            })
1041            .collect();
1042        let layer_slice_us: Vec<u64> = layers
1043            .iter()
1044            .take(self.nr_layers)
1045            .map(|layer| layer.slice_ns / 1000_u64)
1046            .collect();
1047
1048        let cur_layer_usages = Self::read_layer_usages(&cpu_ctxs, self.nr_layers);
1049        let cur_layer_node_pinned_usages = Self::read_layer_node_pinned_usages(
1050            &cpu_ctxs,
1051            &self.topo,
1052            self.nr_layers,
1053            self.topo.nodes.len(),
1054        );
1055        let cur_layer_membw_agg = Self::read_layer_membw_agg(&cpu_ctxs, self.nr_layers);
1056
1057        // Memory BW normalization. It requires finding the delta according to perf, the delta
1058        // according to resctl, and finding the factor between them. This also helps in
1059        // determining whether this delta is stable.
1060        //
1061        // We scale only the raw PMC delta - the part of the counter incremented between two
1062        // periods. This is because the scaling factor is only valid for that time period.
1063        // Non-scaled samples are not comparable, while scaled ones are.
1064        let (pmu_prev, resctrl_prev) = self.prev_pmu_resctrl_membw;
1065        let pmu_cur: u64 = cur_layer_membw_agg
1066            .iter()
1067            .map(|membw_agg| membw_agg[LAYER_USAGE_OPEN] + membw_agg[LAYER_USAGE_OWNED])
1068            .sum();
1069        let resctrl_cur = Self::resctrl_read_total_membw()?;
1070        let factor = (resctrl_cur - resctrl_prev) as f64 / (pmu_cur - pmu_prev) as f64;
1071
1072        // Computes the runtime deltas and converts them from ns to s.
1073        let compute_diff = |cur_agg: &Vec<Vec<u64>>, prev_agg: &Vec<Vec<u64>>| {
1074            cur_agg
1075                .iter()
1076                .zip(prev_agg.iter())
1077                .map(|(cur, prev)| {
1078                    cur.iter()
1079                        .zip(prev.iter())
1080                        .map(|(c, p)| (c - p) as f64 / 1_000_000_000.0 / elapsed_f64)
1081                        .collect()
1082                })
1083                .collect()
1084        };
1085
1086        // Computes the total memory traffic done since last computation and normalizes to GBs.
1087        // We derive the rate of consumption elsewhere.
1088        let compute_mem_diff = |cur_agg: &Vec<Vec<u64>>, prev_agg: &Vec<Vec<u64>>| {
1089            cur_agg
1090                .iter()
1091                .zip(prev_agg.iter())
1092                .map(|(cur, prev)| {
1093                    cur.iter()
1094                        .zip(prev.iter())
1095                        .map(|(c, p)| (*c as i64 - *p as i64) as f64 / (1024 as f64).powf(3.0))
1096                        .collect()
1097                })
1098                .collect()
1099        };
1100
1101        let cur_layer_utils: Vec<Vec<f64>> =
1102            compute_diff(&cur_layer_usages, &self.prev_layer_usages);
1103
1104        // Scale the raw value delta by the resctrl/pmc computed factor.
1105        let cur_layer_membw: Vec<Vec<f64>> =
1106            compute_mem_diff(&cur_layer_membw_agg, &self.prev_layer_membw_agg);
1107
1108        let cur_layer_membw: Vec<Vec<f64>> = cur_layer_membw
1109            .iter()
1110            .map(|x| x.iter().map(|x| *x * factor).collect())
1111            .collect();
1112
1113        let metric_decay =
1114            |cur_metric: Vec<Vec<f64>>, prev_metric: &Vec<Vec<f64>>, decay_rate: f64| {
1115                cur_metric
1116                    .iter()
1117                    .zip(prev_metric.iter())
1118                    .map(|(cur, prev)| {
1119                        cur.iter()
1120                            .zip(prev.iter())
1121                            .map(|(c, p)| {
1122                                let decay = decay_rate.powf(elapsed_f64);
1123                                p * decay + c * (1.0 - decay)
1124                            })
1125                            .collect()
1126                    })
1127                    .collect()
1128            };
1129
1130        let layer_utils: Vec<Vec<f64>> =
1131            metric_decay(cur_layer_utils, &self.layer_utils, *USAGE_DECAY);
1132        let cur_node_pinned_utils: Vec<Vec<f64>> = compute_diff(
1133            &cur_layer_node_pinned_usages,
1134            &self.prev_layer_node_pinned_usages,
1135        );
1136        let layer_node_pinned_utils: Vec<Vec<f64>> = metric_decay(
1137            cur_node_pinned_utils,
1138            &self.layer_node_pinned_utils,
1139            *USAGE_DECAY,
1140        );
1141
1142        let layer_membws: Vec<Vec<f64>> = metric_decay(cur_layer_membw, &self.layer_membws, 0.0);
1143
1144        let cur_total_cpu = read_total_cpu(proc_reader)?;
1145        let cpu_busy = calc_util(&cur_total_cpu, &self.prev_total_cpu)?;
1146
1147        // Calculate system CPU utilization EWMA (10 second window)
1148        const SYS_CPU_UTIL_EWMA_SECS: f64 = 10.0;
1149        let elapsed_f64 = elapsed.as_secs_f64();
1150        let alpha = elapsed_f64 / SYS_CPU_UTIL_EWMA_SECS.max(elapsed_f64);
1151        let system_cpu_util_ewma = alpha * cpu_busy + (1.0 - alpha) * self.system_cpu_util_ewma;
1152
1153        let cur_bpf_stats = BpfStats::read(skel, &cpu_ctxs);
1154        let bpf_stats = &cur_bpf_stats - &self.prev_bpf_stats;
1155
1156        // Calculate per-layer DSQ insertion EWMA (10 second window)
1157        const DSQ_INSERT_EWMA_SECS: f64 = 10.0;
1158        let dsq_alpha = elapsed_f64 / DSQ_INSERT_EWMA_SECS.max(elapsed_f64);
1159        let layer_dsq_insert_ewma: Vec<f64> = (0..self.nr_layers)
1160            .map(|layer_id| {
1161                let sel_local = bpf_stats.lstats[layer_id]
1162                    [bpf_intf::layer_stat_id_LSTAT_SEL_LOCAL as usize]
1163                    as f64;
1164                let enq_local = bpf_stats.lstats[layer_id]
1165                    [bpf_intf::layer_stat_id_LSTAT_ENQ_LOCAL as usize]
1166                    as f64;
1167                let enq_dsq = bpf_stats.lstats[layer_id]
1168                    [bpf_intf::layer_stat_id_LSTAT_ENQ_DSQ as usize]
1169                    as f64;
1170                let total_dispatches = sel_local + enq_local + enq_dsq;
1171
1172                let cur_ratio = if total_dispatches > 0.0 {
1173                    enq_dsq / total_dispatches
1174                } else {
1175                    0.0
1176                };
1177
1178                dsq_alpha * cur_ratio + (1.0 - dsq_alpha) * self.layer_dsq_insert_ewma[layer_id]
1179            })
1180            .collect();
1181
1182        let processing_dur = cur_processing_dur
1183            .checked_sub(self.prev_processing_dur)
1184            .unwrap();
1185
1186        *self = Self {
1187            at: now,
1188            elapsed,
1189            topo: self.topo.clone(),
1190            nr_layers: self.nr_layers,
1191            nr_layer_tasks,
1192            layer_nr_node_pinned_tasks,
1193
1194            total_util: layer_utils
1195                .iter()
1196                .map(|x| x.iter().take(LAYER_USAGE_SUM_UPTO + 1).sum::<f64>())
1197                .sum(),
1198            layer_utils,
1199            prev_layer_usages: cur_layer_usages,
1200            layer_node_pinned_utils,
1201            prev_layer_node_pinned_usages: cur_layer_node_pinned_usages,
1202
1203            layer_membws,
1204            prev_layer_membw_agg: cur_layer_membw_agg,
1205            // Was updated during normalization.
1206            prev_pmu_resctrl_membw: (pmu_cur, resctrl_cur),
1207
1208            cpu_busy,
1209            prev_total_cpu: cur_total_cpu,
1210            system_cpu_util_ewma,
1211            layer_dsq_insert_ewma,
1212
1213            bpf_stats,
1214            prev_bpf_stats: cur_bpf_stats,
1215
1216            processing_dur,
1217            prev_processing_dur: cur_processing_dur,
1218
1219            layer_slice_us,
1220            gpu_tasks_affinitized: gpu_task_affinitizer.tasks_affinitized,
1221            gpu_task_affinitization_ms: gpu_task_affinitizer.last_task_affinitization_ms,
1222        };
1223        Ok(())
1224    }
1225}
1226
1227#[derive(Debug)]
1228struct Layer {
1229    name: String,
1230    kind: LayerKind,
1231    growth_algo: LayerGrowthAlgo,
1232    core_order: Vec<Vec<usize>>,
1233
1234    assigned_llcs: Vec<Vec<usize>>,
1235
1236    nr_cpus: usize,
1237    nr_llc_cpus: Vec<usize>,
1238    nr_node_cpus: Vec<usize>,
1239    cpus: Cpumask,
1240    allowed_cpus: Cpumask,
1241
1242    /// Per-node count of CPUs allocated for pinned demand.
1243    nr_pinned_cpus: Vec<usize>,
1244}
1245
1246fn get_kallsyms_addr(sym_name: &str) -> Result<u64> {
1247    fs::read_to_string("/proc/kallsyms")?
1248        .lines()
1249        .find(|line| line.contains(sym_name))
1250        .and_then(|line| line.split_whitespace().next())
1251        .and_then(|addr| u64::from_str_radix(addr, 16).ok())
1252        .ok_or_else(|| anyhow!("Symbol '{}' not found", sym_name))
1253}
1254
1255fn resolve_cpus_pct_range(
1256    cpus_range: &Option<(usize, usize)>,
1257    cpus_range_frac: &Option<(f64, f64)>,
1258    max_cpus: usize,
1259) -> Result<(usize, usize)> {
1260    match (cpus_range, cpus_range_frac) {
1261        (Some(_x), Some(_y)) => {
1262            bail!("cpus_range cannot be used with cpus_pct.");
1263        }
1264        (Some((cpus_range_min, cpus_range_max)), None) => Ok((*cpus_range_min, *cpus_range_max)),
1265        (None, Some((cpus_frac_min, cpus_frac_max))) => {
1266            if *cpus_frac_min < 0_f64
1267                || *cpus_frac_min > 1_f64
1268                || *cpus_frac_max < 0_f64
1269                || *cpus_frac_max > 1_f64
1270            {
1271                bail!("cpus_range_frac values must be between 0.0 and 1.0");
1272            }
1273            let cpus_min_count = ((max_cpus as f64) * cpus_frac_min).round_ties_even() as usize;
1274            let cpus_max_count = ((max_cpus as f64) * cpus_frac_max).round_ties_even() as usize;
1275            Ok((
1276                std::cmp::max(cpus_min_count, 1),
1277                std::cmp::min(cpus_max_count, max_cpus),
1278            ))
1279        }
1280        (None, None) => Ok((0, max_cpus)),
1281    }
1282}
1283
1284impl Layer {
1285    fn new(spec: &LayerSpec, topo: &Topology, core_order: &Vec<Vec<usize>>) -> Result<Self> {
1286        let name = &spec.name;
1287        let kind = spec.kind.clone();
1288        let mut allowed_cpus = Cpumask::new();
1289        match &kind {
1290            LayerKind::Confined {
1291                cpus_range,
1292                cpus_range_frac,
1293                common: LayerCommon { nodes, llcs, .. },
1294                ..
1295            } => {
1296                let cpus_range =
1297                    resolve_cpus_pct_range(cpus_range, cpus_range_frac, topo.all_cpus.len())?;
1298                if cpus_range.0 > cpus_range.1 || cpus_range.1 == 0 {
1299                    bail!("invalid cpus_range {:?}", cpus_range);
1300                }
1301                if nodes.is_empty() && llcs.is_empty() {
1302                    allowed_cpus.set_all();
1303                } else {
1304                    // build up the cpus bitset
1305                    for (node_id, node) in &topo.nodes {
1306                        // first do the matching for nodes
1307                        if nodes.contains(node_id) {
1308                            for &id in node.all_cpus.keys() {
1309                                allowed_cpus.set_cpu(id)?;
1310                            }
1311                        }
1312                        // next match on any LLCs
1313                        for (llc_id, llc) in &node.llcs {
1314                            if llcs.contains(llc_id) {
1315                                for &id in llc.all_cpus.keys() {
1316                                    allowed_cpus.set_cpu(id)?;
1317                                }
1318                            }
1319                        }
1320                    }
1321                }
1322            }
1323            LayerKind::Grouped {
1324                common: LayerCommon { nodes, llcs, .. },
1325                ..
1326            }
1327            | LayerKind::Open {
1328                common: LayerCommon { nodes, llcs, .. },
1329                ..
1330            } => {
1331                if nodes.is_empty() && llcs.is_empty() {
1332                    allowed_cpus.set_all();
1333                } else {
1334                    // build up the cpus bitset
1335                    for (node_id, node) in &topo.nodes {
1336                        // first do the matching for nodes
1337                        if nodes.contains(node_id) {
1338                            for &id in node.all_cpus.keys() {
1339                                allowed_cpus.set_cpu(id)?;
1340                            }
1341                        }
1342                        // next match on any LLCs
1343                        for (llc_id, llc) in &node.llcs {
1344                            if llcs.contains(llc_id) {
1345                                for &id in llc.all_cpus.keys() {
1346                                    allowed_cpus.set_cpu(id)?;
1347                                }
1348                            }
1349                        }
1350                    }
1351                }
1352            }
1353        }
1354
1355        // Util can be above 1.0 for grouped layers if
1356        // util_includes_open_cputime is set.
1357        if let Some(util_range) = kind.util_range() {
1358            if util_range.0 < 0.0 || util_range.1 < 0.0 || util_range.0 >= util_range.1 {
1359                bail!("invalid util_range {:?}", util_range);
1360            }
1361        }
1362
1363        let layer_growth_algo = kind.common().growth_algo.clone();
1364
1365        debug!(
1366            "layer: {} algo: {:?} core order: {:?}",
1367            name, &layer_growth_algo, core_order
1368        );
1369
1370        Ok(Self {
1371            name: name.into(),
1372            kind,
1373            growth_algo: layer_growth_algo,
1374            core_order: core_order.clone(),
1375
1376            assigned_llcs: vec![vec![]; topo.nodes.len()],
1377
1378            nr_cpus: 0,
1379            nr_llc_cpus: vec![0; topo.all_llcs.len()],
1380            nr_node_cpus: vec![0; topo.nodes.len()],
1381            cpus: Cpumask::new(),
1382            allowed_cpus,
1383
1384            nr_pinned_cpus: vec![0; topo.nodes.len()],
1385        })
1386    }
1387}
1388#[derive(Debug, Clone)]
1389struct NodeInfo {
1390    node_mask: nix::sched::CpuSet,
1391    _node_id: usize,
1392}
1393
1394#[derive(Debug)]
1395struct GpuTaskAffinitizer {
1396    // This struct tracks information necessary to numa affinitize
1397    // gpu tasks periodically when needed.
1398    gpu_devs_to_node_info: HashMap<u32, NodeInfo>,
1399    gpu_pids_to_devs: HashMap<Pid, u32>,
1400    last_process_time: Option<Instant>,
1401    sys: System,
1402    pid_map: HashMap<Pid, Vec<Pid>>,
1403    poll_interval: Duration,
1404    enable: bool,
1405    tasks_affinitized: u64,
1406    last_task_affinitization_ms: u64,
1407}
1408
1409impl GpuTaskAffinitizer {
1410    pub fn new(poll_interval: u64, enable: bool) -> GpuTaskAffinitizer {
1411        GpuTaskAffinitizer {
1412            gpu_devs_to_node_info: HashMap::new(),
1413            gpu_pids_to_devs: HashMap::new(),
1414            last_process_time: None,
1415            sys: System::default(),
1416            pid_map: HashMap::new(),
1417            poll_interval: Duration::from_secs(poll_interval),
1418            enable,
1419            tasks_affinitized: 0,
1420            last_task_affinitization_ms: 0,
1421        }
1422    }
1423
1424    fn find_one_cpu(&self, affinity: Vec<u64>) -> Result<u32> {
1425        for (chunk, &mask) in affinity.iter().enumerate() {
1426            let mut inner_offset: u64 = 1;
1427            for _ in 0..64 {
1428                if (mask & inner_offset) != 0 {
1429                    return Ok((64 * chunk + u64::trailing_zeros(inner_offset) as usize) as u32);
1430                }
1431                inner_offset = inner_offset << 1;
1432            }
1433        }
1434        anyhow::bail!("unable to get CPU from NVML bitmask");
1435    }
1436
1437    fn node_to_cpuset(&self, node: &scx_utils::Node) -> Result<CpuSet> {
1438        let mut cpuset = CpuSet::new();
1439        for (cpu_id, _cpu) in &node.all_cpus {
1440            cpuset.set(*cpu_id)?;
1441        }
1442        Ok(cpuset)
1443    }
1444
1445    fn init_dev_node_map(&mut self, topo: Arc<Topology>) -> Result<()> {
1446        let nvml = nvml()?;
1447        let device_count = nvml.device_count()?;
1448
1449        for idx in 0..device_count {
1450            let dev = nvml.device_by_index(idx)?;
1451            // For machines w/ up to 1024 CPUs.
1452            let cpu = dev.cpu_affinity(16)?;
1453            let ideal_cpu = self.find_one_cpu(cpu)?;
1454            if let Some(cpu) = topo.all_cpus.get(&(ideal_cpu as usize)) {
1455                self.gpu_devs_to_node_info.insert(
1456                    idx,
1457                    NodeInfo {
1458                        node_mask: self.node_to_cpuset(
1459                            topo.nodes.get(&cpu.node_id).expect("topo missing node"),
1460                        )?,
1461                        _node_id: cpu.node_id,
1462                    },
1463                );
1464            }
1465        }
1466        Ok(())
1467    }
1468
1469    fn update_gpu_pids(&mut self) -> Result<()> {
1470        let nvml = nvml()?;
1471        for i in 0..nvml.device_count()? {
1472            let device = nvml.device_by_index(i)?;
1473            for proc in device
1474                .running_compute_processes()?
1475                .into_iter()
1476                .chain(device.running_graphics_processes()?.into_iter())
1477            {
1478                self.gpu_pids_to_devs.insert(Pid::from_u32(proc.pid), i);
1479            }
1480        }
1481        Ok(())
1482    }
1483
1484    fn update_process_info(&mut self) -> Result<()> {
1485        self.sys.refresh_processes_specifics(
1486            ProcessesToUpdate::All,
1487            true,
1488            ProcessRefreshKind::nothing(),
1489        );
1490        self.pid_map.clear();
1491        for (pid, proc_) in self.sys.processes() {
1492            if let Some(ppid) = proc_.parent() {
1493                self.pid_map.entry(ppid).or_default().push(*pid);
1494            }
1495        }
1496        Ok(())
1497    }
1498
1499    fn get_child_pids_and_tids(&self, root_pid: Pid) -> HashSet<Pid> {
1500        let mut work = VecDeque::from([root_pid]);
1501        let mut pids_and_tids: HashSet<Pid> = HashSet::new();
1502
1503        while let Some(pid) = work.pop_front() {
1504            if pids_and_tids.insert(pid) {
1505                if let Some(kids) = self.pid_map.get(&pid) {
1506                    work.extend(kids);
1507                }
1508                if let Some(proc_) = self.sys.process(pid) {
1509                    if let Some(tasks) = proc_.tasks() {
1510                        pids_and_tids.extend(tasks.iter().copied());
1511                    }
1512                }
1513            }
1514        }
1515        pids_and_tids
1516    }
1517
1518    fn affinitize_gpu_pids(&mut self) -> Result<()> {
1519        if !self.enable {
1520            return Ok(());
1521        }
1522        for (pid, dev) in &self.gpu_pids_to_devs {
1523            let node_info = self
1524                .gpu_devs_to_node_info
1525                .get(&dev)
1526                .expect("Unable to get gpu pid node mask");
1527            for child in self.get_child_pids_and_tids(*pid) {
1528                match nix::sched::sched_setaffinity(
1529                    nix::unistd::Pid::from_raw(child.as_u32() as i32),
1530                    &node_info.node_mask,
1531                ) {
1532                    Ok(_) => {
1533                        // Increment the global counter for successful affinitization
1534                        self.tasks_affinitized += 1;
1535                    }
1536                    Err(_) => {
1537                        debug!(
1538                            "Error affinitizing gpu pid {} to node {:#?}",
1539                            child.as_u32(),
1540                            node_info
1541                        );
1542                    }
1543                };
1544            }
1545        }
1546        Ok(())
1547    }
1548
1549    pub fn maybe_affinitize(&mut self) {
1550        if !self.enable {
1551            return;
1552        }
1553        let now = Instant::now();
1554
1555        if let Some(last_process_time) = self.last_process_time {
1556            if (now - last_process_time) < self.poll_interval {
1557                return;
1558            }
1559        }
1560
1561        match self.update_gpu_pids() {
1562            Ok(_) => {}
1563            Err(e) => {
1564                error!("Error updating GPU PIDs: {}", e);
1565            }
1566        };
1567        match self.update_process_info() {
1568            Ok(_) => {}
1569            Err(e) => {
1570                error!("Error updating process info to affinitize GPU PIDs: {}", e);
1571            }
1572        };
1573        match self.affinitize_gpu_pids() {
1574            Ok(_) => {}
1575            Err(e) => {
1576                error!("Error updating GPU PIDs: {}", e);
1577            }
1578        };
1579        self.last_process_time = Some(now);
1580        self.last_task_affinitization_ms = (Instant::now() - now).as_millis() as u64;
1581
1582        return;
1583    }
1584
1585    pub fn init(&mut self, topo: Arc<Topology>) {
1586        if !self.enable || self.last_process_time.is_some() {
1587            return;
1588        }
1589
1590        match self.init_dev_node_map(topo) {
1591            Ok(_) => {}
1592            Err(e) => {
1593                error!("Error initializing gpu node dev map: {}", e);
1594            }
1595        };
1596        self.sys = System::new_all();
1597        return;
1598    }
1599}
1600
1601struct Scheduler<'a> {
1602    skel: BpfSkel<'a>,
1603    struct_ops: Option<libbpf_rs::Link>,
1604    layer_specs: Vec<LayerSpec>,
1605
1606    sched_intv: Duration,
1607    layer_refresh_intv: Duration,
1608
1609    cpu_pool: CpuPool,
1610    layers: Vec<Layer>,
1611    idle_qos_enabled: bool,
1612
1613    proc_reader: fb_procfs::ProcReader,
1614    sched_stats: Stats,
1615
1616    cgroup_regexes: Option<HashMap<u32, Regex>>,
1617
1618    nr_layer_cpus_ranges: Vec<(usize, usize)>,
1619    processing_dur: Duration,
1620
1621    topo: Arc<Topology>,
1622    netdevs: BTreeMap<String, NetDev>,
1623    stats_server: StatsServer<StatsReq, StatsRes>,
1624    gpu_task_handler: GpuTaskAffinitizer,
1625}
1626
1627impl<'a> Scheduler<'a> {
1628    fn init_layers(
1629        skel: &mut OpenBpfSkel,
1630        specs: &[LayerSpec],
1631        topo: &Topology,
1632    ) -> Result<HashMap<u32, Regex>> {
1633        skel.maps.rodata_data.as_mut().unwrap().nr_layers = specs.len() as u32;
1634        let mut perf_set = false;
1635
1636        let mut layer_iteration_order = (0..specs.len()).collect::<Vec<_>>();
1637        let mut layer_weights: Vec<usize> = vec![];
1638        let mut cgroup_regex_id = 0;
1639        let mut cgroup_regexes = HashMap::new();
1640
1641        for (spec_i, spec) in specs.iter().enumerate() {
1642            let layer = &mut skel.maps.bss_data.as_mut().unwrap().layers[spec_i];
1643
1644            for (or_i, or) in spec.matches.iter().enumerate() {
1645                for (and_i, and) in or.iter().enumerate() {
1646                    let mt = &mut layer.matches[or_i].matches[and_i];
1647
1648                    // Rules are allowlist-based by default
1649                    mt.exclude.write(false);
1650
1651                    match and {
1652                        LayerMatch::CgroupPrefix(prefix) => {
1653                            mt.kind = bpf_intf::layer_match_kind_MATCH_CGROUP_PREFIX as i32;
1654                            copy_into_cstr(&mut mt.cgroup_prefix, prefix.as_str());
1655                        }
1656                        LayerMatch::CgroupSuffix(suffix) => {
1657                            mt.kind = bpf_intf::layer_match_kind_MATCH_CGROUP_SUFFIX as i32;
1658                            copy_into_cstr(&mut mt.cgroup_suffix, suffix.as_str());
1659                        }
1660                        LayerMatch::CgroupRegex(regex_str) => {
1661                            if cgroup_regex_id >= bpf_intf::consts_MAX_CGROUP_REGEXES {
1662                                bail!(
1663                                    "Too many cgroup regex rules. Maximum allowed: {}",
1664                                    bpf_intf::consts_MAX_CGROUP_REGEXES
1665                                );
1666                            }
1667
1668                            // CgroupRegex matching handled in userspace via cgroup watcher
1669                            mt.kind = bpf_intf::layer_match_kind_MATCH_CGROUP_REGEX as i32;
1670                            mt.cgroup_regex_id = cgroup_regex_id;
1671
1672                            let regex = Regex::new(regex_str).with_context(|| {
1673                                format!("Invalid regex '{}' in layer '{}'", regex_str, spec.name)
1674                            })?;
1675                            cgroup_regexes.insert(cgroup_regex_id, regex);
1676                            cgroup_regex_id += 1;
1677                        }
1678                        LayerMatch::CgroupContains(substr) => {
1679                            mt.kind = bpf_intf::layer_match_kind_MATCH_CGROUP_CONTAINS as i32;
1680                            copy_into_cstr(&mut mt.cgroup_substr, substr.as_str());
1681                        }
1682                        LayerMatch::CommPrefix(prefix) => {
1683                            mt.kind = bpf_intf::layer_match_kind_MATCH_COMM_PREFIX as i32;
1684                            copy_into_cstr(&mut mt.comm_prefix, prefix.as_str());
1685                        }
1686                        LayerMatch::CommPrefixExclude(prefix) => {
1687                            mt.kind = bpf_intf::layer_match_kind_MATCH_COMM_PREFIX as i32;
1688                            mt.exclude.write(true);
1689                            copy_into_cstr(&mut mt.comm_prefix, prefix.as_str());
1690                        }
1691                        LayerMatch::PcommPrefix(prefix) => {
1692                            mt.kind = bpf_intf::layer_match_kind_MATCH_PCOMM_PREFIX as i32;
1693                            copy_into_cstr(&mut mt.pcomm_prefix, prefix.as_str());
1694                        }
1695                        LayerMatch::PcommPrefixExclude(prefix) => {
1696                            mt.kind = bpf_intf::layer_match_kind_MATCH_PCOMM_PREFIX as i32;
1697                            mt.exclude.write(true);
1698                            copy_into_cstr(&mut mt.pcomm_prefix, prefix.as_str());
1699                        }
1700                        LayerMatch::NiceAbove(nice) => {
1701                            mt.kind = bpf_intf::layer_match_kind_MATCH_NICE_ABOVE as i32;
1702                            mt.nice = *nice;
1703                        }
1704                        LayerMatch::NiceBelow(nice) => {
1705                            mt.kind = bpf_intf::layer_match_kind_MATCH_NICE_BELOW as i32;
1706                            mt.nice = *nice;
1707                        }
1708                        LayerMatch::NiceEquals(nice) => {
1709                            mt.kind = bpf_intf::layer_match_kind_MATCH_NICE_EQUALS as i32;
1710                            mt.nice = *nice;
1711                        }
1712                        LayerMatch::UIDEquals(user_id) => {
1713                            mt.kind = bpf_intf::layer_match_kind_MATCH_USER_ID_EQUALS as i32;
1714                            mt.user_id = *user_id;
1715                        }
1716                        LayerMatch::GIDEquals(group_id) => {
1717                            mt.kind = bpf_intf::layer_match_kind_MATCH_GROUP_ID_EQUALS as i32;
1718                            mt.group_id = *group_id;
1719                        }
1720                        LayerMatch::PIDEquals(pid) => {
1721                            mt.kind = bpf_intf::layer_match_kind_MATCH_PID_EQUALS as i32;
1722                            mt.pid = *pid;
1723                        }
1724                        LayerMatch::PPIDEquals(ppid) => {
1725                            mt.kind = bpf_intf::layer_match_kind_MATCH_PPID_EQUALS as i32;
1726                            mt.ppid = *ppid;
1727                        }
1728                        LayerMatch::TGIDEquals(tgid) => {
1729                            mt.kind = bpf_intf::layer_match_kind_MATCH_TGID_EQUALS as i32;
1730                            mt.tgid = *tgid;
1731                        }
1732                        LayerMatch::NSPIDEquals(nsid, pid) => {
1733                            mt.kind = bpf_intf::layer_match_kind_MATCH_NSPID_EQUALS as i32;
1734                            mt.nsid = *nsid;
1735                            mt.pid = *pid;
1736                        }
1737                        LayerMatch::NSEquals(nsid) => {
1738                            mt.kind = bpf_intf::layer_match_kind_MATCH_NS_EQUALS as i32;
1739                            mt.nsid = *nsid as u64;
1740                        }
1741                        LayerMatch::CmdJoin(joincmd) => {
1742                            mt.kind = bpf_intf::layer_match_kind_MATCH_SCXCMD_JOIN as i32;
1743                            copy_into_cstr(&mut mt.comm_prefix, joincmd);
1744                        }
1745                        LayerMatch::IsGroupLeader(polarity) => {
1746                            mt.kind = bpf_intf::layer_match_kind_MATCH_IS_GROUP_LEADER as i32;
1747                            mt.is_group_leader.write(*polarity);
1748                        }
1749                        LayerMatch::IsKthread(polarity) => {
1750                            mt.kind = bpf_intf::layer_match_kind_MATCH_IS_KTHREAD as i32;
1751                            mt.is_kthread.write(*polarity);
1752                        }
1753                        LayerMatch::UsedGpuTid(polarity) => {
1754                            mt.kind = bpf_intf::layer_match_kind_MATCH_USED_GPU_TID as i32;
1755                            mt.used_gpu_tid.write(*polarity);
1756                        }
1757                        LayerMatch::UsedGpuPid(polarity) => {
1758                            mt.kind = bpf_intf::layer_match_kind_MATCH_USED_GPU_PID as i32;
1759                            mt.used_gpu_pid.write(*polarity);
1760                        }
1761                        LayerMatch::AvgRuntime(min, max) => {
1762                            mt.kind = bpf_intf::layer_match_kind_MATCH_AVG_RUNTIME as i32;
1763                            mt.min_avg_runtime_us = *min;
1764                            mt.max_avg_runtime_us = *max;
1765                        }
1766                        LayerMatch::HintEquals(hint) => {
1767                            mt.kind = bpf_intf::layer_match_kind_MATCH_HINT_EQUALS as i32;
1768                            mt.hint = *hint;
1769                        }
1770                        LayerMatch::SystemCpuUtilBelow(threshold) => {
1771                            mt.kind = bpf_intf::layer_match_kind_MATCH_SYSTEM_CPU_UTIL_BELOW as i32;
1772                            mt.system_cpu_util_below = (*threshold * 10000.0) as u64;
1773                        }
1774                        LayerMatch::DsqInsertBelow(threshold) => {
1775                            mt.kind = bpf_intf::layer_match_kind_MATCH_DSQ_INSERT_BELOW as i32;
1776                            mt.dsq_insert_below = (*threshold * 10000.0) as u64;
1777                        }
1778                        LayerMatch::NumaNode(node_id) => {
1779                            if *node_id as usize >= topo.nodes.len() {
1780                                bail!(
1781                                    "Spec {:?} has invalid NUMA node ID {} (available nodes: 0-{})",
1782                                    spec.name,
1783                                    node_id,
1784                                    topo.nodes.len() - 1
1785                                );
1786                            }
1787                            mt.kind = bpf_intf::layer_match_kind_MATCH_NUMA_NODE as i32;
1788                            mt.numa_node_id = *node_id;
1789                        }
1790                    }
1791                }
1792                layer.matches[or_i].nr_match_ands = or.len() as i32;
1793            }
1794
1795            layer.nr_match_ors = spec.matches.len() as u32;
1796            layer.kind = spec.kind.as_bpf_enum();
1797
1798            {
1799                let LayerCommon {
1800                    min_exec_us,
1801                    yield_ignore,
1802                    perf,
1803                    preempt,
1804                    preempt_first,
1805                    exclusive,
1806                    skip_remote_node,
1807                    prev_over_idle_core,
1808                    growth_algo,
1809                    slice_us,
1810                    fifo,
1811                    weight,
1812                    disallow_open_after_us,
1813                    disallow_preempt_after_us,
1814                    xllc_mig_min_us,
1815                    placement,
1816                    member_expire_ms,
1817                    ..
1818                } = spec.kind.common();
1819
1820                layer.slice_ns = *slice_us * 1000;
1821                layer.fifo.write(*fifo);
1822                layer.min_exec_ns = min_exec_us * 1000;
1823                layer.yield_step_ns = if *yield_ignore > 0.999 {
1824                    0
1825                } else if *yield_ignore < 0.001 {
1826                    layer.slice_ns
1827                } else {
1828                    (layer.slice_ns as f64 * (1.0 - *yield_ignore)) as u64
1829                };
1830                let mut layer_name: String = spec.name.clone();
1831                layer_name.truncate(MAX_LAYER_NAME);
1832                copy_into_cstr(&mut layer.name, layer_name.as_str());
1833                layer.preempt.write(*preempt);
1834                layer.preempt_first.write(*preempt_first);
1835                layer.excl.write(*exclusive);
1836                layer.skip_remote_node.write(*skip_remote_node);
1837                layer.prev_over_idle_core.write(*prev_over_idle_core);
1838                layer.growth_algo = growth_algo.as_bpf_enum();
1839                layer.weight = *weight;
1840                layer.member_expire_ms = *member_expire_ms;
1841                layer.disallow_open_after_ns = match disallow_open_after_us.unwrap() {
1842                    v if v == u64::MAX => v,
1843                    v => v * 1000,
1844                };
1845                layer.disallow_preempt_after_ns = match disallow_preempt_after_us.unwrap() {
1846                    v if v == u64::MAX => v,
1847                    v => v * 1000,
1848                };
1849                layer.xllc_mig_min_ns = (xllc_mig_min_us * 1000.0) as u64;
1850                layer_weights.push(layer.weight.try_into().unwrap());
1851                layer.perf = u32::try_from(*perf)?;
1852
1853                let task_place = |place: u32| crate::types::layer_task_place(place);
1854                layer.task_place = match placement {
1855                    LayerPlacement::Standard => {
1856                        task_place(bpf_intf::layer_task_place_PLACEMENT_STD as u32)
1857                    }
1858                    LayerPlacement::Sticky => {
1859                        task_place(bpf_intf::layer_task_place_PLACEMENT_STICK as u32)
1860                    }
1861                    LayerPlacement::Floating => {
1862                        task_place(bpf_intf::layer_task_place_PLACEMENT_FLOAT as u32)
1863                    }
1864                };
1865            }
1866
1867            layer.is_protected.write(match spec.kind {
1868                LayerKind::Open { .. } => false,
1869                LayerKind::Confined { protected, .. } | LayerKind::Grouped { protected, .. } => {
1870                    protected
1871                }
1872            });
1873
1874            match &spec.cpuset {
1875                Some(mask) => {
1876                    Self::update_cpumask(&mask, &mut layer.cpuset);
1877                    layer.has_cpuset.write(true);
1878                }
1879                None => {
1880                    for i in 0..layer.cpuset.len() {
1881                        layer.cpuset[i] = u8::MAX;
1882                    }
1883                    layer.has_cpuset.write(false);
1884                }
1885            };
1886
1887            perf_set |= layer.perf > 0;
1888        }
1889
1890        layer_iteration_order.sort_by(|i, j| layer_weights[*i].cmp(&layer_weights[*j]));
1891        for (idx, layer_idx) in layer_iteration_order.iter().enumerate() {
1892            skel.maps
1893                .rodata_data
1894                .as_mut()
1895                .unwrap()
1896                .layer_iteration_order[idx] = *layer_idx as u32;
1897        }
1898
1899        if perf_set && !compat::ksym_exists("scx_bpf_cpuperf_set")? {
1900            warn!("cpufreq support not available, ignoring perf configurations");
1901        }
1902
1903        Ok(cgroup_regexes)
1904    }
1905
1906    fn init_nodes(skel: &mut OpenBpfSkel, _opts: &Opts, topo: &Topology) {
1907        skel.maps.rodata_data.as_mut().unwrap().nr_nodes = topo.nodes.len() as u32;
1908        skel.maps.rodata_data.as_mut().unwrap().nr_llcs = 0;
1909
1910        for (&node_id, node) in &topo.nodes {
1911            debug!("configuring node {}, LLCs {:?}", node_id, node.llcs.len());
1912            skel.maps.rodata_data.as_mut().unwrap().nr_llcs += node.llcs.len() as u32;
1913            let raw_numa_slice = node.span.as_raw_slice();
1914            let node_cpumask_slice =
1915                &mut skel.maps.rodata_data.as_mut().unwrap().numa_cpumasks[node_id];
1916            let (left, _) = node_cpumask_slice.split_at_mut(raw_numa_slice.len());
1917            left.clone_from_slice(raw_numa_slice);
1918            debug!(
1919                "node {} mask: {:?}",
1920                node_id,
1921                skel.maps.rodata_data.as_ref().unwrap().numa_cpumasks[node_id]
1922            );
1923
1924            for llc in node.llcs.values() {
1925                debug!("configuring llc {:?} for node {:?}", llc.id, node_id);
1926                skel.maps.rodata_data.as_mut().unwrap().llc_numa_id_map[llc.id] = node_id as u32;
1927            }
1928        }
1929
1930        for cpu in topo.all_cpus.values() {
1931            skel.maps.rodata_data.as_mut().unwrap().cpu_llc_id_map[cpu.id] = cpu.llc_id as u32;
1932        }
1933    }
1934
1935    fn init_cpu_prox_map(topo: &Topology, cpu_ctxs: &mut [bpf_intf::cpu_ctx]) {
1936        let radiate = |mut vec: Vec<usize>, center_id: usize| -> Vec<usize> {
1937            vec.sort_by_key(|&id| (center_id as i32 - id as i32).abs());
1938            vec
1939        };
1940        let radiate_cpu =
1941            |mut vec: Vec<usize>, center_cpu: usize, center_core: usize| -> Vec<usize> {
1942                vec.sort_by_key(|&id| {
1943                    (
1944                        (center_core as i32 - topo.all_cpus.get(&id).unwrap().core_id as i32).abs(),
1945                        (center_cpu as i32 - id as i32).abs(),
1946                    )
1947                });
1948                vec
1949            };
1950
1951        for (&cpu_id, cpu) in &topo.all_cpus {
1952            // Collect the spans.
1953            let mut core_span = topo.all_cores[&cpu.core_id].span.clone();
1954            let llc_span = &topo.all_llcs[&cpu.llc_id].span;
1955            let node_span = &topo.nodes[&cpu.node_id].span;
1956            let sys_span = &topo.span;
1957
1958            // Make the spans exclusive.
1959            let sys_span = sys_span.and(&node_span.not());
1960            let node_span = node_span.and(&llc_span.not());
1961            let llc_span = llc_span.and(&core_span.not());
1962            core_span.clear_cpu(cpu_id).unwrap();
1963
1964            // Convert them into arrays.
1965            let mut sys_order: Vec<usize> = sys_span.iter().collect();
1966            let mut node_order: Vec<usize> = node_span.iter().collect();
1967            let mut llc_order: Vec<usize> = llc_span.iter().collect();
1968            let mut core_order: Vec<usize> = core_span.iter().collect();
1969
1970            // Shuffle them so that different CPUs follow different orders.
1971            // Each CPU radiates in both directions based on the cpu id and
1972            // radiates out to the closest cores based on core ids.
1973
1974            sys_order = radiate_cpu(sys_order, cpu_id, cpu.core_id);
1975            node_order = radiate(node_order, cpu.node_id);
1976            llc_order = radiate_cpu(llc_order, cpu_id, cpu.core_id);
1977            core_order = radiate_cpu(core_order, cpu_id, cpu.core_id);
1978
1979            // Concatenate and record the topology boundaries.
1980            let mut order: Vec<usize> = vec![];
1981            let mut idx: usize = 0;
1982
1983            idx += 1;
1984            order.push(cpu_id);
1985
1986            idx += core_order.len();
1987            order.append(&mut core_order);
1988            let core_end = idx;
1989
1990            idx += llc_order.len();
1991            order.append(&mut llc_order);
1992            let llc_end = idx;
1993
1994            idx += node_order.len();
1995            order.append(&mut node_order);
1996            let node_end = idx;
1997
1998            idx += sys_order.len();
1999            order.append(&mut sys_order);
2000            let sys_end = idx;
2001
2002            debug!(
2003                "CPU[{}] proximity map[{}/{}/{}/{}]: {:?}",
2004                cpu_id, core_end, llc_end, node_end, sys_end, &order
2005            );
2006
2007            // Record in cpu_ctx.
2008            let pmap = &mut cpu_ctxs[cpu_id].prox_map;
2009            for (i, &cpu) in order.iter().enumerate() {
2010                pmap.cpus[i] = cpu as u16;
2011            }
2012            pmap.core_end = core_end as u32;
2013            pmap.llc_end = llc_end as u32;
2014            pmap.node_end = node_end as u32;
2015            pmap.sys_end = sys_end as u32;
2016        }
2017    }
2018
2019    fn convert_cpu_ctxs(cpu_ctxs: Vec<bpf_intf::cpu_ctx>) -> Vec<Vec<u8>> {
2020        cpu_ctxs
2021            .into_iter()
2022            .map(|cpu_ctx| {
2023                let bytes = unsafe {
2024                    std::slice::from_raw_parts(
2025                        &cpu_ctx as *const bpf_intf::cpu_ctx as *const u8,
2026                        std::mem::size_of::<bpf_intf::cpu_ctx>(),
2027                    )
2028                };
2029                bytes.to_vec()
2030            })
2031            .collect()
2032    }
2033
2034    fn init_cpus(skel: &BpfSkel, layer_specs: &[LayerSpec], topo: &Topology) -> Result<()> {
2035        let key = (0_u32).to_ne_bytes();
2036        let mut cpu_ctxs: Vec<bpf_intf::cpu_ctx> = vec![];
2037        let cpu_ctxs_vec = skel
2038            .maps
2039            .cpu_ctxs
2040            .lookup_percpu(&key, libbpf_rs::MapFlags::ANY)
2041            .context("Failed to lookup cpu_ctx")?
2042            .unwrap();
2043
2044        let op_layers: Vec<u32> = layer_specs
2045            .iter()
2046            .enumerate()
2047            .filter(|(_idx, spec)| match &spec.kind {
2048                LayerKind::Open { .. } => spec.kind.common().preempt,
2049                _ => false,
2050            })
2051            .map(|(idx, _)| idx as u32)
2052            .collect();
2053        let on_layers: Vec<u32> = layer_specs
2054            .iter()
2055            .enumerate()
2056            .filter(|(_idx, spec)| match &spec.kind {
2057                LayerKind::Open { .. } => !spec.kind.common().preempt,
2058                _ => false,
2059            })
2060            .map(|(idx, _)| idx as u32)
2061            .collect();
2062        let gp_layers: Vec<u32> = layer_specs
2063            .iter()
2064            .enumerate()
2065            .filter(|(_idx, spec)| match &spec.kind {
2066                LayerKind::Grouped { .. } => spec.kind.common().preempt,
2067                _ => false,
2068            })
2069            .map(|(idx, _)| idx as u32)
2070            .collect();
2071        let gn_layers: Vec<u32> = layer_specs
2072            .iter()
2073            .enumerate()
2074            .filter(|(_idx, spec)| match &spec.kind {
2075                LayerKind::Grouped { .. } => !spec.kind.common().preempt,
2076                _ => false,
2077            })
2078            .map(|(idx, _)| idx as u32)
2079            .collect();
2080
2081        // FIXME - this incorrectly assumes all possible CPUs are consecutive.
2082        for cpu in 0..*NR_CPUS_POSSIBLE {
2083            cpu_ctxs.push(*unsafe {
2084                &*(cpu_ctxs_vec[cpu].as_slice().as_ptr() as *const bpf_intf::cpu_ctx)
2085            });
2086
2087            let topo_cpu = topo.all_cpus.get(&cpu).unwrap();
2088            let is_big = topo_cpu.core_type == CoreType::Big { turbo: true };
2089            cpu_ctxs[cpu].cpu = cpu as i32;
2090            cpu_ctxs[cpu].layer_id = MAX_LAYERS as u32;
2091            cpu_ctxs[cpu].is_big = is_big;
2092
2093            fastrand::seed(cpu as u64);
2094
2095            let mut ogp_order = op_layers.clone();
2096            ogp_order.append(&mut gp_layers.clone());
2097            fastrand::shuffle(&mut ogp_order);
2098
2099            let mut ogn_order = on_layers.clone();
2100            ogn_order.append(&mut gn_layers.clone());
2101            fastrand::shuffle(&mut ogn_order);
2102
2103            let mut op_order = op_layers.clone();
2104            fastrand::shuffle(&mut op_order);
2105
2106            let mut on_order = on_layers.clone();
2107            fastrand::shuffle(&mut on_order);
2108
2109            let mut gp_order = gp_layers.clone();
2110            fastrand::shuffle(&mut gp_order);
2111
2112            let mut gn_order = gn_layers.clone();
2113            fastrand::shuffle(&mut gn_order);
2114
2115            for i in 0..MAX_LAYERS {
2116                cpu_ctxs[cpu].ogp_layer_order[i] =
2117                    ogp_order.get(i).cloned().unwrap_or(MAX_LAYERS as u32);
2118                cpu_ctxs[cpu].ogn_layer_order[i] =
2119                    ogn_order.get(i).cloned().unwrap_or(MAX_LAYERS as u32);
2120
2121                cpu_ctxs[cpu].op_layer_order[i] =
2122                    op_order.get(i).cloned().unwrap_or(MAX_LAYERS as u32);
2123                cpu_ctxs[cpu].on_layer_order[i] =
2124                    on_order.get(i).cloned().unwrap_or(MAX_LAYERS as u32);
2125                cpu_ctxs[cpu].gp_layer_order[i] =
2126                    gp_order.get(i).cloned().unwrap_or(MAX_LAYERS as u32);
2127                cpu_ctxs[cpu].gn_layer_order[i] =
2128                    gn_order.get(i).cloned().unwrap_or(MAX_LAYERS as u32);
2129            }
2130        }
2131
2132        Self::init_cpu_prox_map(topo, &mut cpu_ctxs);
2133
2134        skel.maps
2135            .cpu_ctxs
2136            .update_percpu(
2137                &key,
2138                &Self::convert_cpu_ctxs(cpu_ctxs),
2139                libbpf_rs::MapFlags::ANY,
2140            )
2141            .context("Failed to update cpu_ctx")?;
2142
2143        Ok(())
2144    }
2145
2146    fn init_llc_prox_map(skel: &mut BpfSkel, topo: &Topology) -> Result<()> {
2147        for (&llc_id, llc) in &topo.all_llcs {
2148            // Collect the orders.
2149            let mut node_order: Vec<usize> =
2150                topo.nodes[&llc.node_id].llcs.keys().cloned().collect();
2151            let mut sys_order: Vec<usize> = topo.all_llcs.keys().cloned().collect();
2152
2153            // Make the orders exclusive.
2154            sys_order.retain(|id| !node_order.contains(id));
2155            node_order.retain(|&id| id != llc_id);
2156
2157            // Shuffle so that different LLCs follow different orders. See
2158            // init_cpu_prox_map().
2159            fastrand::seed(llc_id as u64);
2160            fastrand::shuffle(&mut sys_order);
2161            fastrand::shuffle(&mut node_order);
2162
2163            // Concatenate and record the node boundary.
2164            let mut order: Vec<usize> = vec![];
2165            let mut idx: usize = 0;
2166
2167            idx += 1;
2168            order.push(llc_id);
2169
2170            idx += node_order.len();
2171            order.append(&mut node_order);
2172            let node_end = idx;
2173
2174            idx += sys_order.len();
2175            order.append(&mut sys_order);
2176            let sys_end = idx;
2177
2178            debug!(
2179                "LLC[{}] proximity map[{}/{}]: {:?}",
2180                llc_id, node_end, sys_end, &order
2181            );
2182
2183            // Record in llc_ctx.
2184            //
2185            // XXX - This would be a lot easier if llc_ctx were in the bss.
2186            // See BpfStats::read().
2187            let key = llc_id as u32;
2188            let llc_id_slice =
2189                unsafe { std::slice::from_raw_parts((&key as *const u32) as *const u8, 4) };
2190            let v = skel
2191                .maps
2192                .llc_data
2193                .lookup(llc_id_slice, libbpf_rs::MapFlags::ANY)
2194                .unwrap()
2195                .unwrap();
2196            let mut llcc = unsafe { *(v.as_slice().as_ptr() as *const bpf_intf::llc_ctx) };
2197
2198            let pmap = &mut llcc.prox_map;
2199            for (i, &llc_id) in order.iter().enumerate() {
2200                pmap.llcs[i] = llc_id as u16;
2201            }
2202            pmap.node_end = node_end as u32;
2203            pmap.sys_end = sys_end as u32;
2204
2205            let v = unsafe {
2206                std::slice::from_raw_parts(
2207                    &llcc as *const bpf_intf::llc_ctx as *const u8,
2208                    std::mem::size_of::<bpf_intf::llc_ctx>(),
2209                )
2210            };
2211
2212            skel.maps
2213                .llc_data
2214                .update(llc_id_slice, v, libbpf_rs::MapFlags::ANY)?
2215        }
2216
2217        Ok(())
2218    }
2219
2220    fn init_node_ctx(skel: &mut BpfSkel, topo: &Topology, nr_layers: usize) -> Result<()> {
2221        let all_layers: Vec<u32> = (0..nr_layers as u32).collect();
2222        let node_empty_layers: Vec<Vec<u32>> =
2223            (0..topo.nodes.len()).map(|_| all_layers.clone()).collect();
2224        Self::refresh_node_ctx(skel, topo, &node_empty_layers, true);
2225        Ok(())
2226    }
2227
2228    fn init(
2229        opts: &'a Opts,
2230        layer_specs: &[LayerSpec],
2231        open_object: &'a mut MaybeUninit<OpenObject>,
2232        hint_to_layer_map: &HashMap<u64, HintLayerInfo>,
2233        membw_tracking: bool,
2234    ) -> Result<Self> {
2235        let nr_layers = layer_specs.len();
2236        let mut disable_topology = opts.disable_topology.unwrap_or(false);
2237
2238        let topo = Arc::new(if disable_topology {
2239            Topology::with_flattened_llc_node()?
2240        } else if opts.topology.virt_llc.is_some() {
2241            Topology::with_args(&opts.topology)?
2242        } else {
2243            Topology::new()?
2244        });
2245
2246        /*
2247         * FIXME: scx_layered incorrectly assumes that node, LLC and CPU IDs
2248         * are consecutive. Verify that they are on this system and bail if
2249         * not. It's lucky that core ID is not used anywhere as core IDs are
2250         * not consecutive on some Ryzen CPUs.
2251         */
2252        if topo.nodes.keys().enumerate().any(|(i, &k)| i != k) {
2253            bail!("Holes in node IDs detected: {:?}", topo.nodes.keys());
2254        }
2255        if topo.all_llcs.keys().enumerate().any(|(i, &k)| i != k) {
2256            bail!("Holes in LLC IDs detected: {:?}", topo.all_llcs.keys());
2257        }
2258        if topo.all_cpus.keys().enumerate().any(|(i, &k)| i != k) {
2259            bail!("Holes in CPU IDs detected: {:?}", topo.all_cpus.keys());
2260        }
2261
2262        let netdevs = if opts.netdev_irq_balance {
2263            let devs = read_netdevs()?;
2264            let total_irqs: usize = devs.values().map(|d| d.irqs.len()).sum();
2265            let breakdown = devs
2266                .iter()
2267                .map(|(iface, d)| format!("{iface}={}", d.irqs.len()))
2268                .collect::<Vec<_>>()
2269                .join(", ");
2270            info!(
2271                "Netdev IRQ balancing enabled: overriding {total_irqs} IRQ{} \
2272                 across {} interface{} [{breakdown}]",
2273                if total_irqs == 1 { "" } else { "s" },
2274                devs.len(),
2275                if devs.len() == 1 { "" } else { "s" },
2276            );
2277            devs
2278        } else {
2279            BTreeMap::new()
2280        };
2281
2282        if !disable_topology {
2283            if topo.nodes.len() == 1 && topo.nodes[&0].llcs.len() == 1 {
2284                disable_topology = true;
2285            };
2286            info!(
2287                "Topology awareness not specified, selecting {} based on hardware",
2288                if disable_topology {
2289                    "disabled"
2290                } else {
2291                    "enabled"
2292                }
2293            );
2294        };
2295
2296        let cpu_pool = CpuPool::new(topo.clone(), opts.allow_partial_core)?;
2297
2298        // If disabling topology awareness clear out any set NUMA/LLC configs and
2299        // it will fallback to using all cores.
2300        let layer_specs: Vec<_> = if disable_topology {
2301            info!("Disabling topology awareness");
2302            layer_specs
2303                .iter()
2304                .cloned()
2305                .map(|mut s| {
2306                    s.kind.common_mut().nodes.clear();
2307                    s.kind.common_mut().llcs.clear();
2308                    s
2309                })
2310                .collect()
2311        } else {
2312            layer_specs.to_vec()
2313        };
2314
2315        // Validate that spec node/LLC references exist in the topology.
2316        for spec in layer_specs.iter() {
2317            let mut seen = BTreeSet::new();
2318            for &node_id in spec.nodes().iter() {
2319                if !topo.nodes.contains_key(&node_id) {
2320                    bail!(
2321                        "layer {:?}: nodes references node {} which does not \
2322                         exist in the topology (available: {:?})",
2323                        spec.name,
2324                        node_id,
2325                        topo.nodes.keys().collect::<Vec<_>>()
2326                    );
2327                }
2328                if !seen.insert(node_id) {
2329                    bail!(
2330                        "layer {:?}: nodes contains duplicate node {}",
2331                        spec.name,
2332                        node_id
2333                    );
2334                }
2335            }
2336
2337            seen.clear();
2338            for &llc_id in spec.llcs().iter() {
2339                if !topo.all_llcs.contains_key(&llc_id) {
2340                    bail!(
2341                        "layer {:?}: llcs references LLC {} which does not \
2342                         exist in the topology (available: {:?})",
2343                        spec.name,
2344                        llc_id,
2345                        topo.all_llcs.keys().collect::<Vec<_>>()
2346                    );
2347                }
2348                if !seen.insert(llc_id) {
2349                    bail!(
2350                        "layer {:?}: llcs contains duplicate LLC {}",
2351                        spec.name,
2352                        llc_id
2353                    );
2354                }
2355            }
2356        }
2357
2358        for spec in layer_specs.iter() {
2359            let has_numa_node_match = spec
2360                .matches
2361                .iter()
2362                .flatten()
2363                .any(|m| matches!(m, LayerMatch::NumaNode(_)));
2364            let has_node_spread_algo = matches!(
2365                spec.kind.common().growth_algo,
2366                LayerGrowthAlgo::NodeSpread
2367                    | LayerGrowthAlgo::NodeSpreadReverse
2368                    | LayerGrowthAlgo::NodeSpreadRandom
2369            );
2370            if has_numa_node_match && has_node_spread_algo {
2371                bail!(
2372                    "layer {:?}: NumaNode matcher cannot be combined with {:?} \
2373                     growth algorithm. NodeSpread* allocates CPUs equally across \
2374                     ALL NUMA nodes, but NumaNode restricts tasks to one node's \
2375                     CPUs — CPUs on other nodes are wasted and utilization \
2376                     will never exceed 1/numa_nodes. Use a non-spread algorithm \
2377                     (e.g. Linear, Topo) instead.",
2378                    spec.name,
2379                    spec.kind.common().growth_algo
2380                );
2381            }
2382        }
2383
2384        // Check kernel features
2385        init_libbpf_logging(None);
2386        let kfuncs_in_syscall = scx_bpf_compat::kfuncs_supported_in_syscall()?;
2387        if !kfuncs_in_syscall {
2388            warn!("Using slow path: kfuncs not supported in syscall programs (a8e03b6bbb2c ∉ ker)");
2389        }
2390
2391        // Open the BPF prog first for verification.
2392        let debug_level = if opts.log_level.contains("trace") {
2393            2
2394        } else if opts.log_level.contains("debug") {
2395            1
2396        } else {
2397            0
2398        };
2399        let mut skel_builder = BpfSkelBuilder::default();
2400        skel_builder.obj_builder.debug(debug_level > 1);
2401
2402        info!(
2403            "Running scx_layered (build ID: {})",
2404            build_id::full_version(env!("CARGO_PKG_VERSION"))
2405        );
2406        let open_opts = opts.libbpf.clone().into_bpf_open_opts();
2407        let mut skel = scx_ops_open!(skel_builder, open_object, layered, open_opts)?;
2408
2409        // No memory BW tracking by default
2410        skel.progs.scx_pmu_switch_tc.set_autoload(membw_tracking);
2411        skel.progs.scx_pmu_tick_tc.set_autoload(membw_tracking);
2412
2413        let mut loaded_kprobes = HashSet::new();
2414
2415        // enable autoloads for conditionally loaded things
2416        // immediately after creating skel (because this is always before loading)
2417        if opts.enable_gpu_support {
2418            // by default, enable open if gpu support is enabled.
2419            // open has been observed to be relatively cheap to kprobe.
2420            if opts.gpu_kprobe_level >= 1 {
2421                compat::cond_kprobe_load("nvidia_open", &skel.progs.kprobe_nvidia_open)?;
2422                loaded_kprobes.insert("nvidia_open");
2423            }
2424            // enable the rest progressively based upon how often they are called
2425            // for observed workloads
2426            if opts.gpu_kprobe_level >= 2 {
2427                compat::cond_kprobe_load("nvidia_mmap", &skel.progs.kprobe_nvidia_mmap)?;
2428                loaded_kprobes.insert("nvidia_mmap");
2429            }
2430            if opts.gpu_kprobe_level >= 3 {
2431                compat::cond_kprobe_load("nvidia_poll", &skel.progs.kprobe_nvidia_poll)?;
2432                loaded_kprobes.insert("nvidia_poll");
2433            }
2434        }
2435
2436        let ext_sched_class_addr = get_kallsyms_addr("ext_sched_class");
2437        let idle_sched_class_addr = get_kallsyms_addr("idle_sched_class");
2438
2439        let event = if membw_tracking {
2440            setup_membw_tracking(&mut skel)?
2441        } else {
2442            0
2443        };
2444
2445        let rodata = skel.maps.rodata_data.as_mut().unwrap();
2446
2447        if ext_sched_class_addr.is_ok() && idle_sched_class_addr.is_ok() {
2448            rodata.ext_sched_class_addr = ext_sched_class_addr.unwrap();
2449            rodata.idle_sched_class_addr = idle_sched_class_addr.unwrap();
2450        } else {
2451            warn!(
2452                "Unable to get sched_class addresses from /proc/kallsyms, disabling skip_preempt."
2453            );
2454        }
2455
2456        rodata.slice_ns = scx_enums.SCX_SLICE_DFL;
2457        rodata.max_exec_ns = 20 * scx_enums.SCX_SLICE_DFL;
2458
2459        // Initialize skel according to @opts.
2460        skel.struct_ops.layered_mut().exit_dump_len = opts.exit_dump_len;
2461
2462        if !opts.disable_queued_wakeup {
2463            match *compat::SCX_OPS_ALLOW_QUEUED_WAKEUP {
2464                0 => info!("Kernel does not support queued wakeup optimization"),
2465                v => skel.struct_ops.layered_mut().flags |= v,
2466            }
2467        }
2468
2469        rodata.percpu_kthread_preempt = !opts.disable_percpu_kthread_preempt;
2470        rodata.percpu_kthread_preempt_all =
2471            !opts.disable_percpu_kthread_preempt && opts.percpu_kthread_preempt_all;
2472        rodata.debug = debug_level as u32;
2473        rodata.slice_ns = opts.slice_us * 1000;
2474        rodata.max_exec_ns = if opts.max_exec_us > 0 {
2475            opts.max_exec_us * 1000
2476        } else {
2477            opts.slice_us * 1000 * 20
2478        };
2479        rodata.nr_cpu_ids = *NR_CPU_IDS as u32;
2480        rodata.nr_possible_cpus = *NR_CPUS_POSSIBLE as u32;
2481        rodata.smt_enabled = topo.smt_enabled;
2482        rodata.has_little_cores = topo.has_little_cores();
2483        rodata.antistall_sec = opts.antistall_sec;
2484        rodata.monitor_disable = opts.monitor_disable;
2485        rodata.lo_fb_wait_ns = opts.lo_fb_wait_us * 1000;
2486        rodata.lo_fb_share_ppk = ((opts.lo_fb_share * 1024.0) as u32).clamp(1, 1024);
2487        rodata.enable_antistall = !opts.disable_antistall;
2488        rodata.enable_match_debug = opts.enable_match_debug;
2489        rodata.enable_gpu_support = opts.enable_gpu_support;
2490        rodata.kfuncs_supported_in_syscall = kfuncs_in_syscall;
2491
2492        for (cpu, sib) in topo.sibling_cpus().iter().enumerate() {
2493            rodata.__sibling_cpu[cpu] = *sib;
2494        }
2495        for cpu in topo.all_cpus.keys() {
2496            rodata.all_cpus[cpu / 8] |= 1 << (cpu % 8);
2497        }
2498
2499        rodata.nr_op_layers = layer_specs
2500            .iter()
2501            .filter(|spec| match &spec.kind {
2502                LayerKind::Open { .. } => spec.kind.common().preempt,
2503                _ => false,
2504            })
2505            .count() as u32;
2506        rodata.nr_on_layers = layer_specs
2507            .iter()
2508            .filter(|spec| match &spec.kind {
2509                LayerKind::Open { .. } => !spec.kind.common().preempt,
2510                _ => false,
2511            })
2512            .count() as u32;
2513        rodata.nr_gp_layers = layer_specs
2514            .iter()
2515            .filter(|spec| match &spec.kind {
2516                LayerKind::Grouped { .. } => spec.kind.common().preempt,
2517                _ => false,
2518            })
2519            .count() as u32;
2520        rodata.nr_gn_layers = layer_specs
2521            .iter()
2522            .filter(|spec| match &spec.kind {
2523                LayerKind::Grouped { .. } => !spec.kind.common().preempt,
2524                _ => false,
2525            })
2526            .count() as u32;
2527        rodata.nr_excl_layers = layer_specs
2528            .iter()
2529            .filter(|spec| spec.kind.common().exclusive)
2530            .count() as u32;
2531
2532        let mut min_open = u64::MAX;
2533        let mut min_preempt = u64::MAX;
2534
2535        for spec in layer_specs.iter() {
2536            if let LayerKind::Open { common, .. } = &spec.kind {
2537                min_open = min_open.min(common.disallow_open_after_us.unwrap());
2538                min_preempt = min_preempt.min(common.disallow_preempt_after_us.unwrap());
2539            }
2540        }
2541
2542        rodata.min_open_layer_disallow_open_after_ns = match min_open {
2543            u64::MAX => *DFL_DISALLOW_OPEN_AFTER_US,
2544            v => v,
2545        };
2546        rodata.min_open_layer_disallow_preempt_after_ns = match min_preempt {
2547            u64::MAX => *DFL_DISALLOW_PREEMPT_AFTER_US,
2548            v => v,
2549        };
2550
2551        // We set the pin path before loading the skeleton. This will ensure
2552        // libbpf creates and pins the map, or reuses the pinned map fd for us,
2553        // so that we can keep reusing the older map already pinned on scheduler
2554        // restarts.
2555        let layered_task_hint_map_path = &opts.task_hint_map;
2556        let hint_map = &mut skel.maps.scx_layered_task_hint_map;
2557        // Only set pin path if a path is provided.
2558        if layered_task_hint_map_path.is_empty() == false {
2559            hint_map.set_pin_path(layered_task_hint_map_path).unwrap();
2560            rodata.task_hint_map_enabled = true;
2561        }
2562
2563        if !opts.hi_fb_thread_name.is_empty() {
2564            let bpf_hi_fb_thread_name = &mut rodata.hi_fb_thread_name;
2565            copy_into_cstr(bpf_hi_fb_thread_name, opts.hi_fb_thread_name.as_str());
2566            rodata.enable_hi_fb_thread_name_match = true;
2567        }
2568
2569        let cgroup_regexes = Self::init_layers(&mut skel, &layer_specs, &topo)?;
2570        skel.maps.rodata_data.as_mut().unwrap().nr_cgroup_regexes = cgroup_regexes.len() as u32;
2571        Self::init_nodes(&mut skel, opts, &topo);
2572
2573        let mut skel = scx_ops_load!(skel, layered, uei)?;
2574
2575        // Populate the mapping of hints to layer IDs for faster lookups
2576        if hint_to_layer_map.len() != 0 {
2577            for (k, v) in hint_to_layer_map.iter() {
2578                let key: u32 = *k as u32;
2579
2580                // Create hint_layer_info struct
2581                let mut info_bytes = vec![0u8; std::mem::size_of::<bpf_intf::hint_layer_info>()];
2582                let info_ptr = info_bytes.as_mut_ptr() as *mut bpf_intf::hint_layer_info;
2583                unsafe {
2584                    (*info_ptr).layer_id = v.layer_id as u32;
2585                    (*info_ptr).system_cpu_util_below = match v.system_cpu_util_below {
2586                        Some(threshold) => (threshold * 10000.0) as u64,
2587                        None => u64::MAX, // disabled sentinel
2588                    };
2589                    (*info_ptr).dsq_insert_below = match v.dsq_insert_below {
2590                        Some(threshold) => (threshold * 10000.0) as u64,
2591                        None => u64::MAX, // disabled sentinel
2592                    };
2593                }
2594
2595                skel.maps.hint_to_layer_id_map.update(
2596                    &key.to_ne_bytes(),
2597                    &info_bytes,
2598                    libbpf_rs::MapFlags::ANY,
2599                )?;
2600            }
2601        }
2602
2603        if membw_tracking {
2604            create_perf_fds(&mut skel, event)?;
2605        }
2606
2607        let mut layers = vec![];
2608        let layer_growth_orders =
2609            LayerGrowthAlgo::layer_core_orders(&cpu_pool, &layer_specs, &topo)?;
2610        for (idx, spec) in layer_specs.iter().enumerate() {
2611            let growth_order = layer_growth_orders
2612                .get(&idx)
2613                .with_context(|| "layer has no growth order".to_string())?;
2614            layers.push(Layer::new(spec, &topo, growth_order)?);
2615        }
2616
2617        let mut idle_qos_enabled = layers
2618            .iter()
2619            .any(|layer| layer.kind.common().idle_resume_us.unwrap_or(0) > 0);
2620        if idle_qos_enabled && !cpu_idle_resume_latency_supported() {
2621            warn!("idle_resume_us not supported, ignoring");
2622            idle_qos_enabled = false;
2623        }
2624
2625        Self::init_cpus(&skel, &layer_specs, &topo)?;
2626        Self::init_llc_prox_map(&mut skel, &topo)?;
2627        Self::init_node_ctx(&mut skel, &topo, nr_layers)?;
2628
2629        // Other stuff.
2630        let proc_reader = fb_procfs::ProcReader::new();
2631
2632        // Handle setup if layered is running in a pid namespace.
2633        let input = ProgramInput {
2634            ..Default::default()
2635        };
2636        let prog = &mut skel.progs.initialize_pid_namespace;
2637
2638        let _ = prog.test_run(input);
2639
2640        // XXX If we try to refresh the cpumasks here before attaching, we
2641        // sometimes (non-deterministically) don't see the updated values in
2642        // BPF. It would be better to update the cpumasks here before we
2643        // attach, but the value will quickly converge anyways so it's not a
2644        // huge problem in the interim until we figure it out.
2645
2646        // Allow all tasks to open and write to BPF task hint map, now that
2647        // we should have it pinned at the desired location.
2648        if layered_task_hint_map_path.is_empty() == false {
2649            let path = CString::new(layered_task_hint_map_path.as_bytes()).unwrap();
2650            let mode: libc::mode_t = 0o666;
2651            unsafe {
2652                if libc::chmod(path.as_ptr(), mode) != 0 {
2653                    trace!("'chmod' to 666 of task hint map failed, continuing...");
2654                }
2655            }
2656        }
2657
2658        // Attach.
2659        let struct_ops = scx_ops_attach!(skel, layered)?;
2660
2661        // Turn on installed kprobes
2662        if opts.enable_gpu_support {
2663            if loaded_kprobes.contains("nvidia_open") {
2664                compat::cond_kprobe_attach("nvidia_open", &skel.progs.kprobe_nvidia_open)?;
2665            }
2666            if loaded_kprobes.contains("nvidia_mmap") {
2667                compat::cond_kprobe_attach("nvidia_mmap", &skel.progs.kprobe_nvidia_mmap)?;
2668            }
2669            if loaded_kprobes.contains("nvidia_poll") {
2670                compat::cond_kprobe_attach("nvidia_poll", &skel.progs.kprobe_nvidia_poll)?;
2671            }
2672        }
2673
2674        let stats_server = StatsServer::new(stats::server_data()).launch()?;
2675        let mut gpu_task_handler =
2676            GpuTaskAffinitizer::new(opts.gpu_affinitize_secs, opts.enable_gpu_affinitize);
2677        gpu_task_handler.init(topo.clone());
2678
2679        let sched = Self {
2680            struct_ops: Some(struct_ops),
2681            layer_specs,
2682
2683            sched_intv: Duration::from_secs_f64(opts.interval),
2684            layer_refresh_intv: Duration::from_millis(opts.layer_refresh_ms_avgruntime),
2685
2686            cpu_pool,
2687            layers,
2688            idle_qos_enabled,
2689
2690            sched_stats: Stats::new(&mut skel, &proc_reader, topo.clone(), &gpu_task_handler)?,
2691
2692            cgroup_regexes: Some(cgroup_regexes),
2693            nr_layer_cpus_ranges: vec![(0, 0); nr_layers],
2694            processing_dur: Default::default(),
2695
2696            proc_reader,
2697            skel,
2698
2699            topo,
2700            netdevs,
2701            stats_server,
2702            gpu_task_handler,
2703        };
2704
2705        info!("Layered Scheduler Attached. Run `scx_layered --monitor` for metrics.");
2706
2707        Ok(sched)
2708    }
2709
2710    fn update_cpumask(mask: &Cpumask, bpfmask: &mut [u8]) {
2711        for cpu in 0..mask.len() {
2712            if mask.test_cpu(cpu) {
2713                bpfmask[cpu / 8] |= 1 << (cpu % 8);
2714            } else {
2715                bpfmask[cpu / 8] &= !(1 << (cpu % 8));
2716            }
2717        }
2718    }
2719
2720    fn update_bpf_layer_cpumask(layer: &Layer, bpf_layer: &mut types::layer) {
2721        trace!("[{}] Updating BPF CPUs: {}", layer.name, &layer.cpus);
2722        Self::update_cpumask(&layer.cpus, &mut bpf_layer.cpus);
2723
2724        bpf_layer.nr_cpus = layer.nr_cpus as u32;
2725        for (llc_id, &nr_llc_cpus) in layer.nr_llc_cpus.iter().enumerate() {
2726            bpf_layer.nr_llc_cpus[llc_id] = nr_llc_cpus as u32;
2727        }
2728        for (node_id, &nr_node_cpus) in layer.nr_node_cpus.iter().enumerate() {
2729            bpf_layer.node[node_id].nr_cpus = nr_node_cpus as u32;
2730        }
2731
2732        bpf_layer.refresh_cpus = 1;
2733    }
2734
2735    fn update_netdev_cpumasks(&mut self) -> Result<()> {
2736        let available_cpus = self.cpu_pool.available_cpus();
2737        if available_cpus.is_empty() {
2738            return Ok(());
2739        }
2740
2741        for (iface, netdev) in self.netdevs.iter_mut() {
2742            let node = self
2743                .topo
2744                .nodes
2745                .values()
2746                .find(|n| n.id == netdev.node())
2747                .ok_or_else(|| anyhow!("Failed to get netdev node"))?;
2748            let node_cpus = node.span.clone();
2749            for (irq, irqmask) in netdev.irqs.iter_mut() {
2750                irqmask.clear_all();
2751                for cpu in available_cpus.iter() {
2752                    if !node_cpus.test_cpu(cpu) {
2753                        continue;
2754                    }
2755                    let _ = irqmask.set_cpu(cpu);
2756                }
2757                // If no CPUs are available in the node then spread the load across the node
2758                if irqmask.weight() == 0 {
2759                    for cpu in node_cpus.iter() {
2760                        let _ = irqmask.set_cpu(cpu);
2761                    }
2762                }
2763                trace!("{} updating irq {} cpumask {:?}", iface, irq, irqmask);
2764            }
2765            netdev.apply_cpumasks()?;
2766            debug!(
2767                "{iface}: applied affinity override to {} IRQ{}",
2768                netdev.irqs.len(),
2769                if netdev.irqs.len() == 1 { "" } else { "s" },
2770            );
2771        }
2772
2773        Ok(())
2774    }
2775
2776    fn clamp_target_by_membw(
2777        &self,
2778        layer: &Layer,
2779        membw_limit: f64,
2780        membw: f64,
2781        curtarget: u64,
2782    ) -> usize {
2783        let ncpu: u64 = layer.cpus.weight() as u64;
2784        let membw = (membw * (1024 as f64).powf(3.0)).round() as u64;
2785        let membw_limit = (membw_limit * (1024 as f64).powf(3.0)).round() as u64;
2786        let last_membw_percpu = if ncpu > 0 { membw / ncpu } else { 0 };
2787
2788        // Either there is no memory bandwidth limit set, or the counters
2789        // are not fully initialized yet. Just return the current target.
2790        if membw_limit == 0 || last_membw_percpu == 0 {
2791            return curtarget as usize;
2792        }
2793
2794        return (membw_limit / last_membw_percpu) as usize;
2795    }
2796
2797    /// Decompose per-layer CPU targets into per-node pinned demand and
2798    /// unpinned demand for unified_alloc(). Uses layer_node_pinned_utils
2799    /// to split each layer's target. All outputs are in alloc units.
2800    fn calc_raw_demands(&self, targets: &[(usize, usize)]) -> Vec<LayerDemand> {
2801        let au = self.cpu_pool.alloc_unit();
2802        let pinned_utils = &self.sched_stats.layer_node_pinned_utils;
2803        let nr_nodes = self.topo.nodes.len();
2804
2805        targets
2806            .iter()
2807            .enumerate()
2808            .map(|(idx, &(target, _min))| {
2809                let layer = &self.layers[idx];
2810                let weight = layer.kind.common().weight as usize;
2811
2812                // Open layers don't participate in allocation.
2813                if matches!(layer.kind, LayerKind::Open { .. }) {
2814                    return LayerDemand {
2815                        raw_pinned: vec![0; nr_nodes],
2816                        raw_unpinned: 0,
2817                        weight,
2818                        spread: false,
2819                    };
2820                }
2821
2822                let spread = matches!(
2823                    layer.growth_algo,
2824                    LayerGrowthAlgo::NodeSpread
2825                        | LayerGrowthAlgo::NodeSpreadReverse
2826                        | LayerGrowthAlgo::NodeSpreadRandom
2827                        | LayerGrowthAlgo::RoundRobin
2828                );
2829
2830                let util_high = match &layer.kind {
2831                    LayerKind::Confined { util_range, .. }
2832                    | LayerKind::Grouped { util_range, .. } => util_range.1,
2833                    _ => 1.0,
2834                };
2835
2836                // Convert per-node pinned utilization to CPU demand.
2837                let mut raw_pinned = vec![0usize; nr_nodes];
2838                for n in 0..nr_nodes {
2839                    let pu = pinned_utils[idx][n];
2840                    if pu < 0.01 {
2841                        continue;
2842                    }
2843                    // Check this layer has allowed_cpus on this node.
2844                    let node_span = &self.topo.nodes[&n].span;
2845                    if layer.allowed_cpus.and(node_span).is_empty() {
2846                        continue;
2847                    }
2848                    let cpus = (pu / util_high).ceil() as usize;
2849                    // Round up to alloc units.
2850                    let units = (cpus + au - 1) / au;
2851                    raw_pinned[n] = units;
2852                }
2853
2854                // Unpinned = remainder of the target.
2855                let target_units = target.div_ceil(au);
2856                let pinned_units: usize = raw_pinned.iter().sum();
2857                let raw_unpinned = target_units.saturating_sub(pinned_units);
2858
2859                LayerDemand {
2860                    raw_pinned,
2861                    raw_unpinned,
2862                    weight,
2863                    spread,
2864                }
2865            })
2866            .collect()
2867    }
2868
2869    /// Calculate how many CPUs each layer would like to have if there were
2870    /// no competition. The CPU range is determined by applying the inverse
2871    /// of util_range and then capping by cpus_range. If the current
2872    /// allocation is within the acceptable range, no change is made.
2873    /// Returns (target, min) pair for each layer.
2874    fn calc_target_nr_cpus(&self) -> Vec<(usize, usize)> {
2875        let nr_cpus = self.cpu_pool.topo.all_cpus.len();
2876        let utils = &self.sched_stats.layer_utils;
2877        let membws = &self.sched_stats.layer_membws;
2878
2879        let mut records: Vec<(u64, u64, u64, usize, usize, usize)> = vec![];
2880        let mut targets: Vec<(usize, usize)> = vec![];
2881
2882        for (idx, layer) in self.layers.iter().enumerate() {
2883            targets.push(match &layer.kind {
2884                LayerKind::Confined {
2885                    util_range,
2886                    cpus_range,
2887                    cpus_range_frac,
2888                    membw_gb,
2889                    ..
2890                }
2891                | LayerKind::Grouped {
2892                    util_range,
2893                    cpus_range,
2894                    cpus_range_frac,
2895                    membw_gb,
2896                    ..
2897                } => {
2898                    let cpus_range =
2899                        resolve_cpus_pct_range(cpus_range, cpus_range_frac, nr_cpus).unwrap();
2900
2901                    // A grouped layer can choose to include open cputime
2902                    // for sizing. Also, as an empty layer can only get CPU
2903                    // time through fallback (counted as owned) or open
2904                    // execution, add open cputime for empty layers.
2905                    let owned = utils[idx][LAYER_USAGE_OWNED];
2906                    let open = utils[idx][LAYER_USAGE_OPEN];
2907
2908                    let membw_owned = membws[idx][LAYER_USAGE_OWNED];
2909                    let membw_open = membws[idx][LAYER_USAGE_OPEN];
2910
2911                    let mut util = owned;
2912                    let mut membw = membw_owned;
2913                    if layer.kind.util_includes_open_cputime() || layer.nr_cpus == 0 {
2914                        util += open;
2915                        membw += membw_open;
2916                    }
2917
2918                    let util = if util < 0.01 { 0.0 } else { util };
2919                    let low = (util / util_range.1).ceil() as usize;
2920                    let high = ((util / util_range.0).floor() as usize).max(low);
2921
2922                    let membw_limit = match membw_gb {
2923                        Some(membw_limit) => *membw_limit,
2924                        None => 0.0,
2925                    };
2926
2927                    trace!(
2928                        "layer {0} (membw, membw_limit): ({membw} gi_b, {membw_limit} gi_b)",
2929                        layer.name
2930                    );
2931
2932                    let target = layer.cpus.weight().clamp(low, high);
2933
2934                    records.push((
2935                        (owned * 100.0) as u64,
2936                        (open * 100.0) as u64,
2937                        (util * 100.0) as u64,
2938                        low,
2939                        high,
2940                        target,
2941                    ));
2942
2943                    let target = target.clamp(cpus_range.0, cpus_range.1);
2944                    let membw_target = self.clamp_target_by_membw(
2945                        &layer,
2946                        membw_limit as f64,
2947                        membw as f64,
2948                        target as u64,
2949                    );
2950
2951                    trace!("CPU target pre- and post-membw adjustment: {target} -> {membw_target}");
2952
2953                    // If there's no way to drop our memory usage down enough,
2954                    // pin the target CPUs to low and drop a warning.
2955                    if membw_target < cpus_range.0 {
2956                        warn!("cannot satisfy memory bw limit for layer {}", layer.name);
2957                        warn!("membw_target {membw_target} low {}", cpus_range.0);
2958                    };
2959
2960                    // Memory bandwidth target cannot override imposed limits or bump
2961                    // the target above what CPU usage-based throttling requires.
2962                    let target = membw_target.clamp(cpus_range.0, target);
2963
2964                    (target, cpus_range.0)
2965                }
2966                LayerKind::Open { .. } => (0, 0),
2967            });
2968        }
2969
2970        trace!("(owned, open, util, low, high, target): {:?}", &records);
2971        targets
2972    }
2973
2974    // Figure out a tuple (LLCs, extra_cpus) in terms of the target CPUs
2975    // computed by weighted_target_nr_cpus. Returns the number of full LLCs
2976    // occupied by a layer, and any extra CPUs that don't occupy a full LLC.
2977    fn compute_target_llcs(target: usize, topo: &Topology) -> (usize, usize) {
2978        // TODO(kkd): We assume each LLC has equal number of cores.
2979        let cores_per_llc = topo.all_cores.len() / topo.all_llcs.len();
2980        // TODO(kkd): We assume each core has fixed number of threads.
2981        let cpus_per_core = topo.all_cores.first_key_value().unwrap().1.cpus.len();
2982        let cpus_per_llc = cores_per_llc * cpus_per_core;
2983
2984        let full = target / cpus_per_llc;
2985        let extra = target % cpus_per_llc;
2986
2987        (full, extra.div_ceil(cpus_per_core))
2988    }
2989
2990    // Recalculate the core order for layers using StickyDynamic growth
2991    // algorithm. Uses per-node targets from unified_alloc() to decide how
2992    // many LLCs each layer gets on each node, then builds core_order and
2993    // applies CPU changes.
2994    fn recompute_layer_core_order(
2995        &mut self,
2996        layer_targets: &[(usize, usize)],
2997        layer_allocs: &[LayerAlloc],
2998        au: usize,
2999    ) -> Result<bool> {
3000        let nr_nodes = self.topo.nodes.len();
3001
3002        // Phase 1 — Free per-node: return excess LLCs to cpu_pool.
3003        debug!(
3004            " free: before pass: free_llcs={:?}",
3005            self.cpu_pool.free_llcs
3006        );
3007        for &(idx, _) in layer_targets.iter().rev() {
3008            let layer = &mut self.layers[idx];
3009
3010            if layer.growth_algo != LayerGrowthAlgo::StickyDynamic {
3011                continue;
3012            }
3013
3014            let alloc = &layer_allocs[idx];
3015
3016            for n in 0..nr_nodes {
3017                let assigned_on_n = layer.assigned_llcs[n].len();
3018                let target_full_n =
3019                    Self::compute_target_llcs(alloc.node_target(n) * au, &self.topo).0;
3020                let mut to_free = assigned_on_n.saturating_sub(target_full_n);
3021
3022                debug!(
3023                    " free: layer={} node={} assigned={} target_full={} to_free={}",
3024                    layer.name, n, assigned_on_n, target_full_n, to_free,
3025                );
3026
3027                while to_free > 0 {
3028                    if let Some(llc) = layer.assigned_llcs[n].pop() {
3029                        self.cpu_pool.return_llc(llc);
3030                        to_free -= 1;
3031                        debug!(" layer={} freed_llc={} from node={}", layer.name, llc, n);
3032                    } else {
3033                        break;
3034                    }
3035                }
3036            }
3037        }
3038        debug!(" free: after pass: free_llcs={:?}", self.cpu_pool.free_llcs);
3039
3040        // Phase 2 — Acquire per-node: claim LLCs from cpu_pool.
3041        for &(idx, _) in layer_targets.iter().rev() {
3042            let layer = &mut self.layers[idx];
3043
3044            if layer.growth_algo != LayerGrowthAlgo::StickyDynamic {
3045                continue;
3046            }
3047
3048            let alloc = &layer_allocs[idx];
3049
3050            for n in 0..nr_nodes {
3051                let cur_on_n = layer.assigned_llcs[n].len();
3052                let target_full_n =
3053                    Self::compute_target_llcs(alloc.node_target(n) * au, &self.topo).0;
3054                let mut to_alloc = target_full_n.saturating_sub(cur_on_n);
3055
3056                debug!(
3057                    " alloc: layer={} node={} cur={} target_full={} to_alloc={} free={}",
3058                    layer.name,
3059                    n,
3060                    cur_on_n,
3061                    target_full_n,
3062                    to_alloc,
3063                    self.cpu_pool.free_llcs.get(&n).map_or(0, |v| v.len()),
3064                );
3065
3066                while to_alloc > 0 {
3067                    if let Some(llc) = self.cpu_pool.take_llc_from_node(n) {
3068                        layer.assigned_llcs[n].push(llc);
3069                        to_alloc -= 1;
3070                        debug!(" layer={} alloc_llc={} on node={}", layer.name, llc, n);
3071                    } else {
3072                        break;
3073                    }
3074                }
3075            }
3076
3077            debug!(
3078                " alloc: layer={} assigned_llcs={:?}",
3079                layer.name, layer.assigned_llcs
3080            );
3081        }
3082
3083        // Phase 3 — Spillover per-node: consume extra cores from free LLCs.
3084        let cores_per_llc = self.topo.all_cores.len() / self.topo.all_llcs.len();
3085        let cpus_per_core = self.topo.all_cores.first_key_value().unwrap().1.cpus.len();
3086        let cpus_per_llc = cores_per_llc * cpus_per_core;
3087
3088        for &(idx, _) in layer_targets.iter() {
3089            let layer = &mut self.layers[idx];
3090
3091            if layer.growth_algo != LayerGrowthAlgo::StickyDynamic {
3092                continue;
3093            }
3094
3095            layer.core_order = vec![Vec::new(); nr_nodes];
3096            let alloc = &layer_allocs[idx];
3097
3098            for n in 0..nr_nodes {
3099                let mut extra = Self::compute_target_llcs(alloc.node_target(n) * au, &self.topo).1;
3100
3101                if let Some(node_llcs) = self.cpu_pool.free_llcs.get_mut(&n) {
3102                    for entry in node_llcs.iter_mut() {
3103                        if extra == 0 {
3104                            break;
3105                        }
3106                        let avail = cpus_per_llc - entry.1;
3107                        let mut used = extra.min(avail);
3108                        let cores_to_add = used;
3109
3110                        let shift = entry.1;
3111                        entry.1 += used;
3112
3113                        let llc_id = entry.0;
3114                        let llc = self.topo.all_llcs.get(&llc_id).unwrap();
3115
3116                        for core in llc.cores.iter().skip(shift) {
3117                            if used == 0 {
3118                                break;
3119                            }
3120                            layer.core_order[n].push(core.1.id);
3121                            used -= 1;
3122                        }
3123
3124                        extra -= cores_to_add;
3125                    }
3126                }
3127            }
3128
3129            for node_cores in &mut layer.core_order {
3130                node_cores.reverse();
3131            }
3132        }
3133
3134        // Reset consumed entries in free LLCs.
3135        for node_llcs in self.cpu_pool.free_llcs.values_mut() {
3136            for entry in node_llcs.iter_mut() {
3137                entry.1 = 0;
3138            }
3139        }
3140
3141        // Phase 4 — Build core_order: append cores from assigned LLCs.
3142        for &(idx, _) in layer_targets.iter() {
3143            let layer = &mut self.layers[idx];
3144
3145            if layer.growth_algo != LayerGrowthAlgo::StickyDynamic {
3146                continue;
3147            }
3148
3149            let all_assigned: HashSet<usize> =
3150                layer.assigned_llcs.iter().flatten().copied().collect();
3151
3152            for core in self.topo.all_cores.iter() {
3153                let llc_id = core.1.llc_id;
3154                if all_assigned.contains(&llc_id) {
3155                    let nid = core.1.node_id;
3156                    layer.core_order[nid].push(core.1.id);
3157                }
3158            }
3159            for node_cores in &mut layer.core_order {
3160                node_cores.reverse();
3161            }
3162
3163            debug!(
3164                " alloc: layer={} core_order={:?}",
3165                layer.name, layer.core_order
3166            );
3167        }
3168
3169        // Phase 5 — Apply CPU changes for StickyDynamic layers.
3170        // Two phases: first free per-node, then allocate per-node.
3171        let mut updated = false;
3172
3173        // Free excess CPUs per-node.
3174        for &(idx, _) in layer_targets.iter() {
3175            let layer = &mut self.layers[idx];
3176
3177            if layer.growth_algo != LayerGrowthAlgo::StickyDynamic {
3178                continue;
3179            }
3180
3181            for n in 0..nr_nodes {
3182                let mut node_target = Cpumask::new();
3183                for &core_id in &layer.core_order[n] {
3184                    if let Some(core) = self.topo.all_cores.get(&core_id) {
3185                        node_target |= &core.span;
3186                    }
3187                }
3188                node_target &= &layer.allowed_cpus;
3189
3190                let node_span = &self.topo.nodes[&n].span;
3191                let node_cur = layer.cpus.and(node_span);
3192                let cpus_to_free = node_cur.and(&node_target.not());
3193
3194                if cpus_to_free.weight() > 0 {
3195                    debug!(
3196                        " apply: layer={} freeing CPUs on node {}: {}",
3197                        layer.name, n, cpus_to_free
3198                    );
3199                    layer.cpus &= &cpus_to_free.not();
3200                    layer.nr_cpus -= cpus_to_free.weight();
3201                    for cpu in cpus_to_free.iter() {
3202                        layer.nr_llc_cpus[self.cpu_pool.topo.all_cpus[&cpu].llc_id] -= 1;
3203                        layer.nr_node_cpus[n] -= 1;
3204                    }
3205                    self.cpu_pool.free(&cpus_to_free)?;
3206                    updated = true;
3207                }
3208            }
3209        }
3210
3211        // Allocate needed CPUs per-node.
3212        for &(idx, _) in layer_targets.iter() {
3213            let layer = &mut self.layers[idx];
3214
3215            if layer.growth_algo != LayerGrowthAlgo::StickyDynamic {
3216                continue;
3217            }
3218
3219            for n in 0..nr_nodes {
3220                let mut node_target = Cpumask::new();
3221                for &core_id in &layer.core_order[n] {
3222                    if let Some(core) = self.topo.all_cores.get(&core_id) {
3223                        node_target |= &core.span;
3224                    }
3225                }
3226                node_target &= &layer.allowed_cpus;
3227
3228                let available_cpus = self.cpu_pool.available_cpus();
3229                let desired_to_alloc = node_target.and(&layer.cpus.clone().not());
3230                let cpus_to_alloc = desired_to_alloc.clone().and(&available_cpus);
3231
3232                if desired_to_alloc.weight() > cpus_to_alloc.weight() {
3233                    debug!(
3234                        " apply: layer={} node {} wanted to alloc {} CPUs but only {} available",
3235                        layer.name,
3236                        n,
3237                        desired_to_alloc.weight(),
3238                        cpus_to_alloc.weight()
3239                    );
3240                }
3241
3242                if cpus_to_alloc.weight() > 0 {
3243                    debug!(
3244                        " apply: layer={} allocating CPUs on node {}: {}",
3245                        layer.name, n, cpus_to_alloc
3246                    );
3247                    layer.cpus |= &cpus_to_alloc;
3248                    layer.nr_cpus += cpus_to_alloc.weight();
3249                    for cpu in cpus_to_alloc.iter() {
3250                        layer.nr_llc_cpus[self.cpu_pool.topo.all_cpus[&cpu].llc_id] += 1;
3251                        layer.nr_node_cpus[n] += 1;
3252                    }
3253                    self.cpu_pool.mark_allocated(&cpus_to_alloc)?;
3254                    updated = true;
3255                }
3256            }
3257
3258            debug!(
3259                " apply: layer={} final cpus.weight()={} nr_cpus={}",
3260                layer.name,
3261                layer.cpus.weight(),
3262                layer.nr_cpus
3263            );
3264        }
3265
3266        Ok(updated)
3267    }
3268
3269    fn refresh_node_ctx(
3270        skel: &mut BpfSkel,
3271        topo: &Topology,
3272        node_empty_layers: &[Vec<u32>],
3273        init: bool,
3274    ) {
3275        for &nid in topo.nodes.keys() {
3276            let mut arg: bpf_intf::refresh_node_ctx_arg =
3277                unsafe { MaybeUninit::zeroed().assume_init() };
3278            arg.node_id = nid as u32;
3279            arg.init = init as u32;
3280
3281            let empty = &node_empty_layers[nid];
3282            arg.nr_empty_layer_ids = empty.len() as u32;
3283            for (i, &lid) in empty.iter().enumerate() {
3284                arg.empty_layer_ids[i] = lid;
3285            }
3286            for i in empty.len()..MAX_LAYERS {
3287                arg.empty_layer_ids[i] = MAX_LAYERS as u32;
3288            }
3289
3290            if init {
3291                // Per-node LLC list — static topology, only set during init.
3292                let node = &topo.nodes[&nid];
3293                let llcs: Vec<u32> = node.llcs.keys().map(|&id| id as u32).collect();
3294                arg.nr_llcs = llcs.len() as u32;
3295                for (i, &llc_id) in llcs.iter().enumerate() {
3296                    arg.llcs[i] = llc_id;
3297                }
3298            }
3299
3300            let input = ProgramInput {
3301                context_in: Some(unsafe {
3302                    std::slice::from_raw_parts_mut(
3303                        &mut arg as *mut _ as *mut u8,
3304                        std::mem::size_of_val(&arg),
3305                    )
3306                }),
3307                ..Default::default()
3308            };
3309            let _ = skel.progs.refresh_node_ctx.test_run(input);
3310        }
3311    }
3312
3313    fn refresh_cpumasks(&mut self) -> Result<()> {
3314        let layer_is_open = |layer: &Layer| matches!(layer.kind, LayerKind::Open { .. });
3315
3316        let mut updated = false;
3317        let raw_targets = self.calc_target_nr_cpus();
3318        let au = self.cpu_pool.alloc_unit();
3319        let total_cpus = self.cpu_pool.topo.all_cpus.len();
3320
3321        // Dampen shrink: only drop halfway per cycle to avoid unnecessary
3322        // changes. There's some dampening built into util metrics but slow
3323        // down freeing further. This is solely based on intuition. Drop or
3324        // update according to real-world behavior.
3325        let targets: Vec<(usize, usize)> = raw_targets
3326            .iter()
3327            .enumerate()
3328            .map(|(idx, &(target, min))| {
3329                let cur = self.layers[idx].nr_cpus;
3330                if target < cur {
3331                    let dampened = cur - (cur - target).div_ceil(2);
3332                    (dampened.max(min), min)
3333                } else {
3334                    (target, min)
3335                }
3336            })
3337            .collect();
3338
3339        // Build demands for unified_alloc and compute per-node allocations.
3340        let demands = self.calc_raw_demands(&targets);
3341        let nr_nodes = self.topo.nodes.len();
3342        let node_caps: Vec<usize> = self
3343            .topo
3344            .nodes
3345            .values()
3346            .map(|n| n.span.weight() / au)
3347            .collect();
3348        let all_layer_nodes: Vec<&[usize]> = self
3349            .layer_specs
3350            .iter()
3351            .map(|s| s.nodes().as_slice())
3352            .collect();
3353        let norders: Vec<Vec<usize>> = (0..self.layers.len())
3354            .map(|idx| {
3355                layer_core_growth::node_order(
3356                    self.layer_specs[idx].nodes(),
3357                    &self.topo,
3358                    idx,
3359                    &all_layer_nodes,
3360                )
3361            })
3362            .collect();
3363        let cur_node_cpus: Vec<Vec<usize>> = self
3364            .layers
3365            .iter()
3366            .map(|layer| (0..nr_nodes).map(|n| layer.nr_node_cpus[n] / au).collect())
3367            .collect();
3368        let layer_allocs = unified_alloc(
3369            total_cpus / au,
3370            &node_caps,
3371            &demands,
3372            &cur_node_cpus,
3373            &norders,
3374        );
3375
3376        // Convert allocations back to CPU counts. Shrink dampening is
3377        // already applied to the targets fed into unified_alloc above.
3378        let cpu_targets: Vec<usize> = layer_allocs.iter().map(|a| a.total() * au).collect();
3379
3380        // Snapshot per-layer CPU counts for ALLOC debug logging.
3381        let prev_nr_cpus: Vec<usize> = self.layers.iter().map(|l| l.nr_cpus).collect();
3382
3383        let mut ascending: Vec<(usize, usize)> = cpu_targets.iter().copied().enumerate().collect();
3384        ascending.sort_by(|a, b| a.1.cmp(&b.1));
3385
3386        // Snapshot per-layer per-node CPU counts before allocation changes.
3387        let prev_node_cpus: Vec<Vec<usize>> =
3388            self.layers.iter().map(|l| l.nr_node_cpus.clone()).collect();
3389
3390        // Per-node SD allocation requires multiple LLCs. On flat topologies
3391        // (single LLC, e.g. VMs with topology disabled), SD layers fall through
3392        // to the non-SD grow/shrink loops below.
3393        let use_sd_alloc = self.topo.all_llcs.len() > 1;
3394        let sticky_dynamic_updated = if use_sd_alloc {
3395            self.recompute_layer_core_order(&ascending, &layer_allocs, au)?
3396        } else {
3397            false
3398        };
3399        updated |= sticky_dynamic_updated;
3400
3401        // Update BPF cpumasks for StickyDynamic layers if they were updated
3402        if sticky_dynamic_updated {
3403            for (idx, layer) in self.layers.iter().enumerate() {
3404                if layer.growth_algo == LayerGrowthAlgo::StickyDynamic {
3405                    Self::update_bpf_layer_cpumask(
3406                        layer,
3407                        &mut self.skel.maps.bss_data.as_mut().unwrap().layers[idx],
3408                    );
3409                }
3410            }
3411        }
3412
3413        // Shrink per-node: free excess CPUs from each node.
3414        for &(idx, _target) in ascending.iter().rev() {
3415            let layer = &mut self.layers[idx];
3416            if layer_is_open(layer) {
3417                continue;
3418            }
3419
3420            // Skip StickyDynamic layers when per-node SD allocation is active.
3421            if layer.growth_algo == LayerGrowthAlgo::StickyDynamic && use_sd_alloc {
3422                continue;
3423            }
3424
3425            let alloc = &layer_allocs[idx];
3426            let mut freed = false;
3427
3428            for n in 0..nr_nodes {
3429                let desired = alloc.node_target(n) * au;
3430                let mut to_free = layer.nr_node_cpus[n].saturating_sub(desired);
3431                let node_span = &self.topo.nodes[&n].span;
3432
3433                while to_free > 0 {
3434                    let node_cands = layer.cpus.and(node_span);
3435                    let cpus_to_free = match self
3436                        .cpu_pool
3437                        .next_to_free(&node_cands, layer.core_order[n].iter().rev())?
3438                    {
3439                        Some(ret) => ret,
3440                        None => break,
3441                    };
3442                    let nr = cpus_to_free.weight();
3443                    trace!(
3444                        "[{}] freeing CPUs on node {}: {}",
3445                        layer.name,
3446                        n,
3447                        &cpus_to_free
3448                    );
3449                    layer.cpus &= &cpus_to_free.not();
3450                    layer.nr_cpus -= nr;
3451                    for cpu in cpus_to_free.iter() {
3452                        let node_id = self.cpu_pool.topo.all_cpus[&cpu].node_id;
3453                        layer.nr_llc_cpus[self.cpu_pool.topo.all_cpus[&cpu].llc_id] -= 1;
3454                        layer.nr_node_cpus[node_id] -= 1;
3455                        layer.nr_pinned_cpus[node_id] =
3456                            layer.nr_pinned_cpus[node_id].min(layer.nr_node_cpus[node_id]);
3457                    }
3458                    self.cpu_pool.free(&cpus_to_free)?;
3459                    to_free = to_free.saturating_sub(nr);
3460                    freed = true;
3461                }
3462            }
3463
3464            if freed {
3465                Self::update_bpf_layer_cpumask(
3466                    layer,
3467                    &mut self.skel.maps.bss_data.as_mut().unwrap().layers[idx],
3468                );
3469                updated = true;
3470            }
3471        }
3472
3473        // Grow layers per-node using allocations from unified_alloc.
3474        for &(idx, _target) in &ascending {
3475            let layer = &mut self.layers[idx];
3476
3477            if layer_is_open(layer) {
3478                continue;
3479            }
3480
3481            // Skip StickyDynamic layers when per-node SD allocation is active.
3482            if layer.growth_algo == LayerGrowthAlgo::StickyDynamic && use_sd_alloc {
3483                continue;
3484            }
3485
3486            let alloc = &layer_allocs[idx];
3487            let norder = &norders[idx];
3488            let mut alloced = false;
3489
3490            for &node_id in norder.iter() {
3491                let node_target = alloc.node_target(node_id) * au;
3492                let cur_node = layer.nr_node_cpus[node_id];
3493                if node_target <= cur_node {
3494                    continue;
3495                }
3496                let pinned_target = alloc.pinned[node_id] * au;
3497                let mut nr_to_alloc = node_target - cur_node;
3498                let node_span = &self.topo.nodes[&node_id].span;
3499                let node_allowed = layer.allowed_cpus.and(node_span);
3500
3501                while nr_to_alloc > 0 {
3502                    let nr_alloced = match self.cpu_pool.alloc_cpus(
3503                        &node_allowed,
3504                        &layer.core_order[node_id],
3505                        nr_to_alloc,
3506                    ) {
3507                        Some(new_cpus) => {
3508                            let nr = new_cpus.weight();
3509                            layer.cpus |= &new_cpus;
3510                            layer.nr_cpus += nr;
3511                            for cpu in new_cpus.iter() {
3512                                layer.nr_llc_cpus[self.cpu_pool.topo.all_cpus[&cpu].llc_id] += 1;
3513                                let nid = self.cpu_pool.topo.all_cpus[&cpu].node_id;
3514                                layer.nr_node_cpus[nid] += 1;
3515                                if layer.nr_pinned_cpus[nid] < pinned_target {
3516                                    layer.nr_pinned_cpus[nid] += 1;
3517                                }
3518                            }
3519                            nr
3520                        }
3521                        None => 0,
3522                    };
3523                    if nr_alloced == 0 {
3524                        break;
3525                    }
3526                    alloced = true;
3527                    nr_to_alloc -= nr_alloced.min(nr_to_alloc);
3528                }
3529            }
3530
3531            if alloced {
3532                Self::update_bpf_layer_cpumask(
3533                    layer,
3534                    &mut self.skel.maps.bss_data.as_mut().unwrap().layers[idx],
3535                );
3536                updated = true;
3537            }
3538        }
3539
3540        // Log per-layer allocation changes.
3541        if updated {
3542            for (idx, layer) in self.layers.iter().enumerate() {
3543                if layer_is_open(layer) {
3544                    continue;
3545                }
3546                let prev = prev_nr_cpus[idx];
3547                let cur = layer.nr_cpus;
3548                if prev != cur {
3549                    debug!(
3550                        "ALLOC {} algo={:?} cpus:{}→{} mask={:x}",
3551                        layer.name, layer.growth_algo, prev, cur, layer.cpus,
3552                    );
3553                }
3554            }
3555            debug!(
3556                "ALLOC pool_available={}",
3557                self.cpu_pool.available_cpus().weight()
3558            );
3559        }
3560
3561        // Log allocation changes.
3562        if updated {
3563            let nr_nodes = self.topo.nodes.len();
3564            for (idx, layer) in self.layers.iter().enumerate() {
3565                if layer_is_open(layer) {
3566                    continue;
3567                }
3568                let prev = &prev_node_cpus[idx];
3569                let cur = &layer.nr_node_cpus;
3570                if prev == cur {
3571                    continue;
3572                }
3573                let per_node: String = (0..nr_nodes)
3574                    .map(|n| format!("n{}:{}→{}", n, prev[n], cur[n]))
3575                    .collect::<Vec<_>>()
3576                    .join(" ");
3577                let prev_total: usize = prev.iter().sum();
3578                let cur_total: usize = cur[..nr_nodes].iter().sum();
3579                let target: String = (0..nr_nodes)
3580                    .map(|n| format!("n{}:{}", n, layer_allocs[idx].node_target(n) * au))
3581                    .collect::<Vec<_>>()
3582                    .join(" ");
3583                debug!(
3584                    "ALLOC {} algo={:?} {} total:{}→{} target:[{}] mask={:x}",
3585                    layer.name,
3586                    layer.growth_algo,
3587                    per_node,
3588                    prev_total,
3589                    cur_total,
3590                    target,
3591                    layer.cpus,
3592                );
3593            }
3594            debug!(
3595                "ALLOC pool_available={}",
3596                self.cpu_pool.available_cpus().weight()
3597            );
3598        }
3599
3600        // Give the rest to the open layers.
3601        if updated {
3602            for (idx, layer) in self.layers.iter_mut().enumerate() {
3603                if !layer_is_open(layer) {
3604                    continue;
3605                }
3606
3607                let bpf_layer = &mut self.skel.maps.bss_data.as_mut().unwrap().layers[idx];
3608                let available_cpus = self.cpu_pool.available_cpus().and(&layer.allowed_cpus);
3609                let nr_available_cpus = available_cpus.weight();
3610
3611                // Open layers need the intersection of allowed cpus and
3612                // available cpus. Recompute per-LLC and per-node counts
3613                // since open layers bypass alloc/free.
3614                layer.cpus = available_cpus;
3615                layer.nr_cpus = nr_available_cpus;
3616                for llc in self.cpu_pool.topo.all_llcs.values() {
3617                    layer.nr_llc_cpus[llc.id] = layer.cpus.and(&llc.span).weight();
3618                }
3619                for node in self.cpu_pool.topo.nodes.values() {
3620                    layer.nr_node_cpus[node.id] = layer.cpus.and(&node.span).weight();
3621                    layer.nr_pinned_cpus[node.id] = 0;
3622                }
3623                Self::update_bpf_layer_cpumask(layer, bpf_layer);
3624            }
3625
3626            for (&node_id, &cpu) in &self.cpu_pool.fallback_cpus {
3627                self.skel.maps.bss_data.as_mut().unwrap().fallback_cpus[node_id] = cpu as u32;
3628            }
3629
3630            for (lidx, layer) in self.layers.iter().enumerate() {
3631                self.nr_layer_cpus_ranges[lidx] = (
3632                    self.nr_layer_cpus_ranges[lidx].0.min(layer.nr_cpus),
3633                    self.nr_layer_cpus_ranges[lidx].1.max(layer.nr_cpus),
3634                );
3635            }
3636
3637            // Trigger updates on the BPF side.
3638            let input = ProgramInput {
3639                ..Default::default()
3640            };
3641            let prog = &mut self.skel.progs.refresh_layer_cpumasks;
3642            let _ = prog.test_run(input);
3643
3644            // Update per-node empty layer IDs via BPF prog.
3645            let nr_nodes = self.topo.nodes.len();
3646            let node_empty_layers: Vec<Vec<u32>> = (0..nr_nodes)
3647                .map(|nid| {
3648                    self.layers
3649                        .iter()
3650                        .enumerate()
3651                        .filter(|(_lidx, layer)| layer.nr_node_cpus[nid] == 0)
3652                        .map(|(lidx, _)| lidx as u32)
3653                        .collect()
3654                })
3655                .collect();
3656            Self::refresh_node_ctx(&mut self.skel, &self.topo, &node_empty_layers, false);
3657        }
3658
3659        if let Err(e) = self.update_netdev_cpumasks() {
3660            warn!("Failed to update netdev IRQ cpumasks: {:#}", e);
3661        }
3662        Ok(())
3663    }
3664
3665    fn refresh_idle_qos(&mut self) -> Result<()> {
3666        if !self.idle_qos_enabled {
3667            return Ok(());
3668        }
3669
3670        let mut cpu_idle_qos = vec![0; *NR_CPU_IDS];
3671        for layer in self.layers.iter() {
3672            let idle_resume_us = layer.kind.common().idle_resume_us.unwrap_or(0) as i32;
3673            for cpu in layer.cpus.iter() {
3674                cpu_idle_qos[cpu] = idle_resume_us;
3675            }
3676        }
3677
3678        for (cpu, idle_resume_usec) in cpu_idle_qos.iter().enumerate() {
3679            update_cpu_idle_resume_latency(cpu, *idle_resume_usec)?;
3680        }
3681
3682        Ok(())
3683    }
3684
3685    fn step(&mut self) -> Result<()> {
3686        let started_at = Instant::now();
3687        self.sched_stats.refresh(
3688            &mut self.skel,
3689            &self.proc_reader,
3690            started_at,
3691            self.processing_dur,
3692            &self.gpu_task_handler,
3693        )?;
3694
3695        // Update BPF with EWMA values
3696        self.skel
3697            .maps
3698            .bss_data
3699            .as_mut()
3700            .unwrap()
3701            .system_cpu_util_ewma = (self.sched_stats.system_cpu_util_ewma * 10000.0) as u64;
3702
3703        for layer_id in 0..self.sched_stats.nr_layers {
3704            self.skel
3705                .maps
3706                .bss_data
3707                .as_mut()
3708                .unwrap()
3709                .layer_dsq_insert_ewma[layer_id] =
3710                (self.sched_stats.layer_dsq_insert_ewma[layer_id] * 10000.0) as u64;
3711        }
3712
3713        self.refresh_cpumasks()?;
3714        self.refresh_idle_qos()?;
3715        self.gpu_task_handler.maybe_affinitize();
3716        self.processing_dur += Instant::now().duration_since(started_at);
3717        Ok(())
3718    }
3719
3720    fn generate_sys_stats(
3721        &mut self,
3722        stats: &Stats,
3723        cpus_ranges: &mut [(usize, usize)],
3724    ) -> Result<SysStats> {
3725        let bstats = &stats.bpf_stats;
3726        let mut sys_stats = SysStats::new(stats, bstats, &self.cpu_pool.fallback_cpus)?;
3727
3728        for (lidx, (spec, layer)) in self.layer_specs.iter().zip(self.layers.iter()).enumerate() {
3729            let layer_stats = LayerStats::new(lidx, layer, stats, bstats, cpus_ranges[lidx]);
3730            sys_stats.layers.insert(spec.name.to_string(), layer_stats);
3731            cpus_ranges[lidx] = (layer.nr_cpus, layer.nr_cpus);
3732        }
3733
3734        Ok(sys_stats)
3735    }
3736
3737    // Helper function to process a cgroup creation (common logic for walkdir and inotify)
3738    fn process_cgroup_creation(
3739        path: &Path,
3740        cgroup_regexes: &HashMap<u32, Regex>,
3741        cgroup_path_to_id: &mut HashMap<String, u64>,
3742        sender: &crossbeam::channel::Sender<CgroupEvent>,
3743    ) {
3744        let path_str = path.to_string_lossy().to_string();
3745
3746        // Get cgroup ID (inode number)
3747        let cgroup_id = std::fs::metadata(path)
3748            .map(|metadata| {
3749                use std::os::unix::fs::MetadataExt;
3750                metadata.ino()
3751            })
3752            .unwrap_or(0);
3753
3754        // Build match bitmap by testing against CgroupRegex rules
3755        let mut match_bitmap = 0u64;
3756        for (rule_id, regex) in cgroup_regexes {
3757            if regex.is_match(&path_str) {
3758                match_bitmap |= 1u64 << rule_id;
3759            }
3760        }
3761
3762        // Store in hash
3763        cgroup_path_to_id.insert(path_str.clone(), cgroup_id);
3764
3765        // Send event
3766        if let Err(e) = sender.send(CgroupEvent::Created {
3767            path: path_str,
3768            cgroup_id,
3769            match_bitmap,
3770        }) {
3771            error!("Failed to send cgroup creation event: {}", e);
3772        }
3773    }
3774
3775    fn start_cgroup_watcher(
3776        shutdown: Arc<AtomicBool>,
3777        cgroup_regexes: HashMap<u32, Regex>,
3778    ) -> Result<Receiver<CgroupEvent>> {
3779        let mut inotify = Inotify::init().context("Failed to initialize inotify")?;
3780        let mut wd_to_path = HashMap::new();
3781
3782        // Create crossbeam channel for cgroup events (bounded to prevent memory issues)
3783        let (sender, receiver) = crossbeam::channel::bounded::<CgroupEvent>(1024);
3784
3785        // Watch for directory creation and deletion events
3786        let root_wd = inotify
3787            .watches()
3788            .add("/sys/fs/cgroup", WatchMask::CREATE | WatchMask::DELETE)
3789            .context("Failed to add watch for /sys/fs/cgroup")?;
3790        wd_to_path.insert(root_wd, PathBuf::from("/sys/fs/cgroup"));
3791
3792        // Also recursively watch existing directories for new subdirectories
3793        Self::add_recursive_watches(&mut inotify, &mut wd_to_path, Path::new("/sys/fs/cgroup"))?;
3794
3795        // Spawn watcher thread
3796        std::thread::spawn(move || {
3797            let mut buffer = [0; 4096];
3798            let inotify_fd = inotify.as_raw_fd();
3799            // Maintain hash of cgroup path -> cgroup ID (inode number)
3800            let mut cgroup_path_to_id = HashMap::<String, u64>::new();
3801
3802            // Populate existing cgroups
3803            for entry in WalkDir::new("/sys/fs/cgroup")
3804                .into_iter()
3805                .filter_map(|e| e.ok())
3806                .filter(|e| e.file_type().is_dir())
3807            {
3808                let path = entry.path();
3809                Self::process_cgroup_creation(
3810                    path,
3811                    &cgroup_regexes,
3812                    &mut cgroup_path_to_id,
3813                    &sender,
3814                );
3815            }
3816
3817            while !shutdown.load(Ordering::Relaxed) {
3818                // Use select to wait for events with a 100ms timeout
3819                let ready = unsafe {
3820                    let mut read_fds: libc::fd_set = std::mem::zeroed();
3821                    libc::FD_ZERO(&mut read_fds);
3822                    libc::FD_SET(inotify_fd, &mut read_fds);
3823
3824                    let mut timeout = libc::timeval {
3825                        tv_sec: 0,
3826                        tv_usec: 100_000, // 100ms
3827                    };
3828
3829                    libc::select(
3830                        inotify_fd + 1,
3831                        &mut read_fds,
3832                        std::ptr::null_mut(),
3833                        std::ptr::null_mut(),
3834                        &mut timeout,
3835                    )
3836                };
3837
3838                if ready <= 0 {
3839                    // Timeout or error, continue loop to check shutdown
3840                    continue;
3841                }
3842
3843                // Read events non-blocking
3844                let events = match inotify.read_events(&mut buffer) {
3845                    Ok(events) => events,
3846                    Err(e) => {
3847                        error!("Error reading inotify events: {}", e);
3848                        break;
3849                    }
3850                };
3851
3852                for event in events {
3853                    if !event.mask.contains(inotify::EventMask::CREATE)
3854                        && !event.mask.contains(inotify::EventMask::DELETE)
3855                    {
3856                        continue;
3857                    }
3858
3859                    let name = match event.name {
3860                        Some(name) => name,
3861                        None => continue,
3862                    };
3863
3864                    let parent_path = match wd_to_path.get(&event.wd) {
3865                        Some(parent) => parent,
3866                        None => {
3867                            warn!("Unknown watch descriptor: {:?}", event.wd);
3868                            continue;
3869                        }
3870                    };
3871
3872                    let path = parent_path.join(name.to_string_lossy().as_ref());
3873
3874                    if event.mask.contains(inotify::EventMask::CREATE) {
3875                        if !path.is_dir() {
3876                            continue;
3877                        }
3878
3879                        Self::process_cgroup_creation(
3880                            &path,
3881                            &cgroup_regexes,
3882                            &mut cgroup_path_to_id,
3883                            &sender,
3884                        );
3885
3886                        // Add watch for this new directory
3887                        match inotify
3888                            .watches()
3889                            .add(&path, WatchMask::CREATE | WatchMask::DELETE)
3890                        {
3891                            Ok(wd) => {
3892                                wd_to_path.insert(wd, path.clone());
3893                            }
3894                            Err(e) => {
3895                                warn!(
3896                                    "Failed to add watch for new cgroup {}: {}",
3897                                    path.display(),
3898                                    e
3899                                );
3900                            }
3901                        }
3902                    } else if event.mask.contains(inotify::EventMask::DELETE) {
3903                        let path_str = path.to_string_lossy().to_string();
3904
3905                        // Get cgroup ID from our hash (since the directory is gone, we can't stat it)
3906                        let cgroup_id = cgroup_path_to_id.remove(&path_str).unwrap_or(0);
3907
3908                        // Send removal event to main thread
3909                        if let Err(e) = sender.send(CgroupEvent::Removed {
3910                            path: path_str,
3911                            cgroup_id,
3912                        }) {
3913                            error!("Failed to send cgroup removal event: {}", e);
3914                        }
3915
3916                        // Find and remove the watch descriptor for this path
3917                        let wd_to_remove = wd_to_path.iter().find_map(|(wd, watched_path)| {
3918                            if watched_path == &path {
3919                                Some(wd.clone())
3920                            } else {
3921                                None
3922                            }
3923                        });
3924                        if let Some(wd) = wd_to_remove {
3925                            wd_to_path.remove(&wd);
3926                        }
3927                    }
3928                }
3929            }
3930        });
3931
3932        Ok(receiver)
3933    }
3934
3935    fn add_recursive_watches(
3936        inotify: &mut Inotify,
3937        wd_to_path: &mut HashMap<inotify::WatchDescriptor, PathBuf>,
3938        path: &Path,
3939    ) -> Result<()> {
3940        for entry in WalkDir::new(path)
3941            .into_iter()
3942            .filter_map(|e| e.ok())
3943            .filter(|e| e.file_type().is_dir())
3944            .skip(1)
3945        {
3946            let entry_path = entry.path();
3947            // Add watch for this directory
3948            match inotify
3949                .watches()
3950                .add(entry_path, WatchMask::CREATE | WatchMask::DELETE)
3951            {
3952                Ok(wd) => {
3953                    wd_to_path.insert(wd, entry_path.to_path_buf());
3954                }
3955                Err(e) => {
3956                    debug!("Failed to add watch for {}: {}", entry_path.display(), e);
3957                }
3958            }
3959        }
3960        Ok(())
3961    }
3962
3963    fn run(&mut self, shutdown: Arc<AtomicBool>) -> Result<UserExitInfo> {
3964        let (res_ch, req_ch) = self.stats_server.channels();
3965        let mut next_sched_at = Instant::now() + self.sched_intv;
3966        let enable_layer_refresh = !self.layer_refresh_intv.is_zero();
3967        let mut next_layer_refresh_at = Instant::now() + self.layer_refresh_intv;
3968        let mut cpus_ranges = HashMap::<ThreadId, Vec<(usize, usize)>>::new();
3969
3970        // Start the cgroup watcher only if there are CgroupRegex rules
3971        let cgroup_regexes = self.cgroup_regexes.take().unwrap();
3972        let cgroup_event_rx = if !cgroup_regexes.is_empty() {
3973            Some(Self::start_cgroup_watcher(
3974                shutdown.clone(),
3975                cgroup_regexes,
3976            )?)
3977        } else {
3978            None
3979        };
3980
3981        while !shutdown.load(Ordering::Relaxed) && !uei_exited!(&self.skel, uei) {
3982            let now = Instant::now();
3983
3984            if now >= next_sched_at {
3985                self.step()?;
3986                while next_sched_at < now {
3987                    next_sched_at += self.sched_intv;
3988                }
3989            }
3990
3991            if enable_layer_refresh && now >= next_layer_refresh_at {
3992                self.skel
3993                    .maps
3994                    .bss_data
3995                    .as_mut()
3996                    .unwrap()
3997                    .layer_refresh_seq_avgruntime += 1;
3998                while next_layer_refresh_at < now {
3999                    next_layer_refresh_at += self.layer_refresh_intv;
4000                }
4001            }
4002
4003            // Handle both stats requests and cgroup events with timeout
4004            let timeout_duration = next_sched_at.saturating_duration_since(Instant::now());
4005            let never_rx = crossbeam::channel::never();
4006            let cgroup_rx = cgroup_event_rx.as_ref().unwrap_or(&never_rx);
4007
4008            select! {
4009                recv(req_ch) -> msg => match msg {
4010                    Ok(StatsReq::Hello(tid)) => {
4011                        cpus_ranges.insert(
4012                            tid,
4013                            self.layers.iter().map(|l| (l.nr_cpus, l.nr_cpus)).collect(),
4014                        );
4015                        let stats =
4016                            Stats::new(&mut self.skel, &self.proc_reader, self.topo.clone(), &self.gpu_task_handler)?;
4017                        res_ch.send(StatsRes::Hello(stats))?;
4018                    }
4019                    Ok(StatsReq::Refresh(tid, mut stats)) => {
4020                        // Propagate self's layer cpu ranges into each stat's.
4021                        for i in 0..self.nr_layer_cpus_ranges.len() {
4022                            for (_, ranges) in cpus_ranges.iter_mut() {
4023                                ranges[i] = (
4024                                    ranges[i].0.min(self.nr_layer_cpus_ranges[i].0),
4025                                    ranges[i].1.max(self.nr_layer_cpus_ranges[i].1),
4026                                );
4027                            }
4028                            self.nr_layer_cpus_ranges[i] =
4029                                (self.layers[i].nr_cpus, self.layers[i].nr_cpus);
4030                        }
4031
4032                        stats.refresh(
4033                            &mut self.skel,
4034                            &self.proc_reader,
4035                            now,
4036                            self.processing_dur,
4037                            &self.gpu_task_handler,
4038                        )?;
4039                        let sys_stats =
4040                            self.generate_sys_stats(&stats, cpus_ranges.get_mut(&tid).unwrap())?;
4041                        res_ch.send(StatsRes::Refreshed((stats, sys_stats)))?;
4042                    }
4043                    Ok(StatsReq::Bye(tid)) => {
4044                        cpus_ranges.remove(&tid);
4045                        res_ch.send(StatsRes::Bye)?;
4046                    }
4047                    Err(e) => Err(e)?,
4048                },
4049
4050                recv(cgroup_rx) -> event => match event {
4051                    Ok(CgroupEvent::Created { path, cgroup_id, match_bitmap }) => {
4052                        // Insert into BPF map
4053                        self.skel.maps.cgroup_match_bitmap.update(
4054                            &cgroup_id.to_ne_bytes(),
4055                            &match_bitmap.to_ne_bytes(),
4056                            libbpf_rs::MapFlags::ANY,
4057                        ).with_context(|| format!(
4058                            "Failed to insert cgroup {}({}) into BPF map. Cgroup map may be full \
4059                             (max 16384 entries). Aborting.",
4060                            cgroup_id, path
4061                        ))?;
4062
4063                        debug!("Added cgroup {} to BPF map with bitmap 0x{:x}", cgroup_id, match_bitmap);
4064                    }
4065                    Ok(CgroupEvent::Removed { path, cgroup_id }) => {
4066                        // Delete from BPF map
4067                        if let Err(e) = self.skel.maps.cgroup_match_bitmap.delete(&cgroup_id.to_ne_bytes()) {
4068                            warn!("Failed to delete cgroup {} from BPF map: {}", cgroup_id, e);
4069                        } else {
4070                            debug!("Removed cgroup {}({}) from BPF map", cgroup_id, path);
4071                        }
4072                    }
4073                    Err(e) => {
4074                        error!("Error receiving cgroup event: {}", e);
4075                    }
4076                },
4077
4078                recv(crossbeam::channel::after(timeout_duration)) -> _ => {
4079                    // Timeout - continue main loop
4080                }
4081            }
4082        }
4083
4084        let _ = self.struct_ops.take();
4085        uei_report!(&self.skel, uei)
4086    }
4087}
4088
4089impl Drop for Scheduler<'_> {
4090    fn drop(&mut self) {
4091        info!("Unregister {SCHEDULER_NAME} scheduler");
4092
4093        if !self.netdevs.is_empty() {
4094            for (iface, netdev) in &self.netdevs {
4095                if let Err(e) = netdev.restore_cpumasks() {
4096                    warn!("Failed to restore {iface} IRQ affinity: {e}");
4097                }
4098            }
4099            info!("Restored original netdev IRQ affinity");
4100        }
4101
4102        if let Some(struct_ops) = self.struct_ops.take() {
4103            drop(struct_ops);
4104        }
4105    }
4106}
4107
4108fn write_example_file(path: &str) -> Result<()> {
4109    let mut f = fs::OpenOptions::new()
4110        .create_new(true)
4111        .write(true)
4112        .open(path)?;
4113    Ok(f.write_all(serde_json::to_string_pretty(&*EXAMPLE_CONFIG)?.as_bytes())?)
4114}
4115
4116struct HintLayerInfo {
4117    layer_id: usize,
4118    system_cpu_util_below: Option<f64>,
4119    dsq_insert_below: Option<f64>,
4120}
4121
4122fn verify_layer_specs(specs: &[LayerSpec]) -> Result<HashMap<u64, HintLayerInfo>> {
4123    let mut hint_to_layer_map = HashMap::<u64, (usize, String, Option<f64>, Option<f64>)>::new();
4124
4125    let nr_specs = specs.len();
4126    if nr_specs == 0 {
4127        bail!("No layer spec");
4128    }
4129    if nr_specs > MAX_LAYERS {
4130        bail!("Too many layer specs");
4131    }
4132
4133    for (idx, spec) in specs.iter().enumerate() {
4134        if idx < nr_specs - 1 {
4135            if spec.matches.is_empty() {
4136                bail!("Non-terminal spec {:?} has NULL matches", spec.name);
4137            }
4138        } else {
4139            if spec.matches.len() != 1 || !spec.matches[0].is_empty() {
4140                bail!("Terminal spec {:?} must have an empty match", spec.name);
4141            }
4142        }
4143
4144        if spec.matches.len() > MAX_LAYER_MATCH_ORS {
4145            bail!(
4146                "Spec {:?} has too many ({}) OR match blocks",
4147                spec.name,
4148                spec.matches.len()
4149            );
4150        }
4151
4152        for (ands_idx, ands) in spec.matches.iter().enumerate() {
4153            if ands.len() > NR_LAYER_MATCH_KINDS {
4154                bail!(
4155                    "Spec {:?}'s {}th OR block has too many ({}) match conditions",
4156                    spec.name,
4157                    ands_idx,
4158                    ands.len()
4159                );
4160            }
4161            let mut hint_equals_cnt = 0;
4162            let mut system_cpu_util_below_cnt = 0;
4163            let mut dsq_insert_below_cnt = 0;
4164            let mut hint_value: Option<u64> = None;
4165            let mut system_cpu_util_threshold: Option<f64> = None;
4166            let mut dsq_insert_threshold: Option<f64> = None;
4167            for one in ands.iter() {
4168                match one {
4169                    LayerMatch::CgroupPrefix(prefix) => {
4170                        if prefix.len() > MAX_PATH {
4171                            bail!("Spec {:?} has too long a cgroup prefix", spec.name);
4172                        }
4173                    }
4174                    LayerMatch::CgroupSuffix(suffix) => {
4175                        if suffix.len() > MAX_PATH {
4176                            bail!("Spec {:?} has too long a cgroup suffix", spec.name);
4177                        }
4178                    }
4179                    LayerMatch::CgroupContains(substr) => {
4180                        if substr.len() > MAX_PATH {
4181                            bail!("Spec {:?} has too long a cgroup substr", spec.name);
4182                        }
4183                    }
4184                    LayerMatch::CommPrefix(prefix) => {
4185                        if prefix.len() > MAX_COMM {
4186                            bail!("Spec {:?} has too long a comm prefix", spec.name);
4187                        }
4188                    }
4189                    LayerMatch::PcommPrefix(prefix) => {
4190                        if prefix.len() > MAX_COMM {
4191                            bail!("Spec {:?} has too long a process name prefix", spec.name);
4192                        }
4193                    }
4194                    LayerMatch::SystemCpuUtilBelow(threshold) => {
4195                        if *threshold < 0.0 || *threshold > 1.0 {
4196                            bail!(
4197                                "Spec {:?} has SystemCpuUtilBelow threshold outside the range [0.0, 1.0]",
4198                                spec.name
4199                            );
4200                        }
4201                        system_cpu_util_threshold = Some(*threshold);
4202                        system_cpu_util_below_cnt += 1;
4203                    }
4204                    LayerMatch::DsqInsertBelow(threshold) => {
4205                        if *threshold < 0.0 || *threshold > 1.0 {
4206                            bail!(
4207                                "Spec {:?} has DsqInsertBelow threshold outside the range [0.0, 1.0]",
4208                                spec.name
4209                            );
4210                        }
4211                        dsq_insert_threshold = Some(*threshold);
4212                        dsq_insert_below_cnt += 1;
4213                    }
4214                    LayerMatch::HintEquals(hint) => {
4215                        if *hint > 1024 {
4216                            bail!(
4217                                "Spec {:?} has hint value outside the range [0, 1024]",
4218                                spec.name
4219                            );
4220                        }
4221                        hint_value = Some(*hint);
4222                        hint_equals_cnt += 1;
4223                    }
4224                    _ => {}
4225                }
4226            }
4227            if hint_equals_cnt > 1 {
4228                bail!("Only 1 HintEquals match permitted per AND block");
4229            }
4230            let high_freq_matcher_cnt = system_cpu_util_below_cnt + dsq_insert_below_cnt;
4231            if high_freq_matcher_cnt > 0 {
4232                if hint_equals_cnt != 1 {
4233                    bail!("High-frequency matchers (SystemCpuUtilBelow, DsqInsertBelow) must be used with one HintEquals");
4234                }
4235                if system_cpu_util_below_cnt > 1 {
4236                    bail!("Only 1 SystemCpuUtilBelow match permitted per AND block");
4237                }
4238                if dsq_insert_below_cnt > 1 {
4239                    bail!("Only 1 DsqInsertBelow match permitted per AND block");
4240                }
4241                if ands.len() != hint_equals_cnt + system_cpu_util_below_cnt + dsq_insert_below_cnt
4242                {
4243                    bail!("High-frequency matchers must be used only with HintEquals (no other matchers)");
4244                }
4245            } else if hint_equals_cnt == 1 && ands.len() != 1 {
4246                bail!("HintEquals match cannot be in conjunction with other matches");
4247            }
4248
4249            // Insert hint into map if present
4250            if let Some(hint) = hint_value {
4251                if let Some((layer_id, name, _, _)) = hint_to_layer_map.get(&hint) {
4252                    if *layer_id != idx {
4253                        bail!(
4254                            "Spec {:?} has hint value ({}) that is already mapped to Spec {:?}",
4255                            spec.name,
4256                            hint,
4257                            name
4258                        );
4259                    }
4260                } else {
4261                    hint_to_layer_map.insert(
4262                        hint,
4263                        (
4264                            idx,
4265                            spec.name.clone(),
4266                            system_cpu_util_threshold,
4267                            dsq_insert_threshold,
4268                        ),
4269                    );
4270                }
4271            }
4272        }
4273
4274        match spec.kind {
4275            LayerKind::Confined {
4276                cpus_range,
4277                util_range,
4278                ..
4279            }
4280            | LayerKind::Grouped {
4281                cpus_range,
4282                util_range,
4283                ..
4284            } => {
4285                if let Some((cpus_min, cpus_max)) = cpus_range {
4286                    if cpus_min > cpus_max {
4287                        bail!(
4288                            "Spec {:?} has invalid cpus_range({}, {})",
4289                            spec.name,
4290                            cpus_min,
4291                            cpus_max
4292                        );
4293                    }
4294                }
4295                if util_range.0 >= util_range.1 {
4296                    bail!(
4297                        "Spec {:?} has invalid util_range ({}, {})",
4298                        spec.name,
4299                        util_range.0,
4300                        util_range.1
4301                    );
4302                }
4303            }
4304            _ => {}
4305        }
4306    }
4307
4308    Ok(hint_to_layer_map
4309        .into_iter()
4310        .map(|(k, v)| {
4311            (
4312                k,
4313                HintLayerInfo {
4314                    layer_id: v.0,
4315                    system_cpu_util_below: v.2,
4316                    dsq_insert_below: v.3,
4317                },
4318            )
4319        })
4320        .collect())
4321}
4322
4323fn name_suffix(cgroup: &str, len: usize) -> String {
4324    let suffixlen = std::cmp::min(len, cgroup.len());
4325    let suffixrev: String = cgroup.chars().rev().take(suffixlen).collect();
4326
4327    suffixrev.chars().rev().collect()
4328}
4329
4330fn traverse_sysfs(dir: &Path) -> Result<Vec<PathBuf>> {
4331    let mut paths = vec![];
4332
4333    if !dir.is_dir() {
4334        panic!("path {:?} does not correspond to directory", dir);
4335    }
4336
4337    let direntries = fs::read_dir(dir)?;
4338
4339    for entry in direntries {
4340        let path = entry?.path();
4341        if path.is_dir() {
4342            paths.append(&mut traverse_sysfs(&path)?);
4343            paths.push(path);
4344        }
4345    }
4346
4347    Ok(paths)
4348}
4349
4350fn find_cpumask(cgroup: &str) -> Cpumask {
4351    let mut path = String::from(cgroup);
4352    path.push_str("/cpuset.cpus.effective");
4353
4354    let description = fs::read_to_string(&mut path).unwrap();
4355
4356    Cpumask::from_cpulist(&description).unwrap()
4357}
4358
4359fn expand_template(rule: &LayerMatch) -> Result<Vec<(LayerMatch, Cpumask)>> {
4360    match rule {
4361        LayerMatch::CgroupSuffix(suffix) => Ok(traverse_sysfs(Path::new("/sys/fs/cgroup"))?
4362            .into_iter()
4363            .map(|cgroup| String::from(cgroup.to_str().expect("could not parse cgroup path")))
4364            .filter(|cgroup| cgroup.ends_with(suffix))
4365            .map(|cgroup| {
4366                (
4367                    {
4368                        let mut slashterminated = cgroup.clone();
4369                        slashterminated.push('/');
4370                        LayerMatch::CgroupSuffix(name_suffix(&slashterminated, 64))
4371                    },
4372                    find_cpumask(&cgroup),
4373                )
4374            })
4375            .collect()),
4376        LayerMatch::CgroupRegex(expr) => Ok(traverse_sysfs(Path::new("/sys/fs/cgroup"))?
4377            .into_iter()
4378            .map(|cgroup| String::from(cgroup.to_str().expect("could not parse cgroup path")))
4379            .filter(|cgroup| {
4380                let re = Regex::new(expr).unwrap();
4381                re.is_match(cgroup)
4382            })
4383            .map(|cgroup| {
4384                (
4385                    // Here we convert the regex match into a suffix match because we still need to
4386                    // do the matching on the bpf side and doing a regex match in bpf isn't
4387                    // easily done.
4388                    {
4389                        let mut slashterminated = cgroup.clone();
4390                        slashterminated.push('/');
4391                        LayerMatch::CgroupSuffix(name_suffix(&slashterminated, 64))
4392                    },
4393                    find_cpumask(&cgroup),
4394                )
4395            })
4396            .collect()),
4397        _ => panic!("Unimplemented template enum {:?}", rule),
4398    }
4399}
4400
4401fn create_perf_fds(skel: &mut BpfSkel, event: u64) -> Result<()> {
4402    let mut attr = perf::bindings::perf_event_attr::default();
4403    attr.size = std::mem::size_of::<perf::bindings::perf_event_attr>() as u32;
4404    attr.type_ = perf::bindings::PERF_TYPE_RAW;
4405    attr.config = event;
4406    attr.sample_type = 0u64;
4407    attr.__bindgen_anon_1.sample_period = 0u64;
4408    attr.set_disabled(0);
4409
4410    let perf_events_map = &skel.maps.scx_pmu_map;
4411    let map_fd = unsafe { libbpf_sys::bpf_map__fd(perf_events_map.as_libbpf_object().as_ptr()) };
4412
4413    let mut failures = 0u64;
4414
4415    for cpu in 0..*NR_CPUS_POSSIBLE {
4416        let fd = unsafe { perf::perf_event_open(&mut attr as *mut _, -1, cpu as i32, -1, 0) };
4417        if fd < 0 {
4418            failures += 1;
4419            trace!(
4420                "perf_event_open failed cpu={cpu} errno={}",
4421                std::io::Error::last_os_error()
4422            );
4423            continue;
4424        }
4425
4426        let key = cpu as u32;
4427        let val = fd as u32;
4428        let ret = unsafe {
4429            libbpf_sys::bpf_map_update_elem(
4430                map_fd,
4431                &key as *const _ as *const _,
4432                &val as *const _ as *const _,
4433                0,
4434            )
4435        };
4436        if ret != 0 {
4437            trace!("bpf_map_update_elem failed cpu={cpu} fd={fd} ret={ret}");
4438        } else {
4439            trace!("mapped cpu={cpu} -> fd={fd}");
4440        }
4441    }
4442
4443    if failures > 0 {
4444        println!("membw tracking: failed to install {failures} counters");
4445        // Keep going, do not fail the scheduler for this
4446    }
4447
4448    Ok(())
4449}
4450
4451// Set up the counters
4452fn setup_membw_tracking(skel: &mut OpenBpfSkel) -> Result<u64> {
4453    let pmumanager = PMUManager::new()?;
4454    let codename = &pmumanager.codename as &str;
4455
4456    let pmuspec = match codename {
4457        "amdzen1" | "amdzen2" | "amdzen3" => {
4458            trace!("found AMD codename {codename}");
4459            pmumanager.pmus.get("ls_any_fills_from_sys.mem_io_local")
4460        }
4461        "amdzen4" | "amdzen5" => {
4462            trace!("found AMD codename {codename}");
4463            pmumanager.pmus.get("ls_any_fills_from_sys.dram_io_all")
4464        }
4465
4466        "haswell" | "broadwell" | "broadwellde" | "broadwellx" | "skylake" | "skylakex"
4467        | "cascadelakex" | "arrowlake" | "meteorlake" | "sapphirerapids" | "emeraldrapids"
4468        | "graniterapids" => {
4469            trace!("found Intel codename {codename}");
4470            pmumanager.pmus.get("LONGEST_LAT_CACHE.MISS")
4471        }
4472
4473        _ => {
4474            trace!("found unknown codename {codename}");
4475            None
4476        }
4477    };
4478
4479    let spec = pmuspec.ok_or("not_found").unwrap();
4480    let config = (spec.umask << 8) | spec.event[0];
4481
4482    // Install the counter in the BPF map
4483    skel.maps.rodata_data.as_mut().unwrap().membw_event = config;
4484
4485    Ok(config)
4486}
4487
4488#[clap_main::clap_main]
4489fn main(opts: Opts) -> Result<()> {
4490    if opts.version {
4491        println!(
4492            "scx_layered {}",
4493            build_id::full_version(env!("CARGO_PKG_VERSION"))
4494        );
4495        return Ok(());
4496    }
4497
4498    if opts.help_stats {
4499        stats::server_data().describe_meta(&mut std::io::stdout(), None)?;
4500        return Ok(());
4501    }
4502
4503    let env_filter = EnvFilter::try_from_default_env()
4504        .or_else(|_| match EnvFilter::try_new(&opts.log_level) {
4505            Ok(filter) => Ok(filter),
4506            Err(e) => {
4507                eprintln!(
4508                    "invalid log envvar: {}, using info, err is: {}",
4509                    opts.log_level, e
4510                );
4511                EnvFilter::try_new("info")
4512            }
4513        })
4514        .unwrap_or_else(|_| EnvFilter::new("info"));
4515
4516    match tracing_subscriber::fmt()
4517        .with_env_filter(env_filter)
4518        .with_target(true)
4519        .with_thread_ids(true)
4520        .with_file(true)
4521        .with_line_number(true)
4522        .try_init()
4523    {
4524        Ok(()) => {}
4525        Err(e) => eprintln!("failed to init logger: {}", e),
4526    }
4527
4528    if opts.verbose > 0 {
4529        warn!("Setting verbose via -v is deprecated and will be an error in future releases.");
4530    }
4531
4532    if opts.no_load_frac_limit {
4533        warn!("--no-load-frac-limit is deprecated and noop");
4534    }
4535    if opts.layer_preempt_weight_disable != 0.0 {
4536        warn!("--layer-preempt-weight-disable is deprecated and noop");
4537    }
4538    if opts.layer_growth_weight_disable != 0.0 {
4539        warn!("--layer-growth-weight-disable is deprecated and noop");
4540    }
4541    if opts.local_llc_iteration {
4542        warn!("--local_llc_iteration is deprecated and noop");
4543    }
4544
4545    debug!("opts={:?}", &opts);
4546
4547    if let Some(run_id) = opts.run_id {
4548        info!("scx_layered run_id: {}", run_id);
4549    }
4550
4551    let shutdown = Arc::new(AtomicBool::new(false));
4552    let shutdown_clone = shutdown.clone();
4553    ctrlc::set_handler(move || {
4554        shutdown_clone.store(true, Ordering::Relaxed);
4555    })
4556    .context("Error setting Ctrl-C handler")?;
4557
4558    if let Some(intv) = opts.monitor.or(opts.stats) {
4559        let shutdown_copy = shutdown.clone();
4560        let stats_columns = opts.stats_columns;
4561        let stats_no_llc = opts.stats_no_llc;
4562        let jh = std::thread::spawn(move || {
4563            match stats::monitor(
4564                Duration::from_secs_f64(intv),
4565                shutdown_copy,
4566                stats_columns,
4567                stats_no_llc,
4568            ) {
4569                Ok(_) => {
4570                    debug!("stats monitor thread finished successfully")
4571                }
4572                Err(error_object) => {
4573                    warn!(
4574                        "stats monitor thread finished because of an error {}",
4575                        error_object
4576                    )
4577                }
4578            }
4579        });
4580        if opts.monitor.is_some() {
4581            let _ = jh.join();
4582            return Ok(());
4583        }
4584    }
4585
4586    if let Some(path) = &opts.example {
4587        write_example_file(path)?;
4588        return Ok(());
4589    }
4590
4591    let mut layer_config = match opts.run_example {
4592        true => EXAMPLE_CONFIG.clone(),
4593        false => LayerConfig { specs: vec![] },
4594    };
4595
4596    for (idx, input) in opts.specs.iter().enumerate() {
4597        let specs = LayerSpec::parse(input)
4598            .context(format!("Failed to parse specs[{}] ({:?})", idx, input))?;
4599
4600        for spec in specs {
4601            match spec.template {
4602                Some(ref rule) => {
4603                    let matches = expand_template(&rule)?;
4604                    // in the absence of matching cgroups, have template layers
4605                    // behave as non-template layers do.
4606                    if matches.is_empty() {
4607                        layer_config.specs.push(spec);
4608                    } else {
4609                        for (mt, mask) in matches {
4610                            let mut genspec = spec.clone();
4611
4612                            genspec.cpuset = Some(mask);
4613
4614                            // Push the new "and" rule into each "or" term.
4615                            for orterm in &mut genspec.matches {
4616                                orterm.push(mt.clone());
4617                            }
4618
4619                            match &mt {
4620                                LayerMatch::CgroupSuffix(cgroup) => genspec.name.push_str(cgroup),
4621                                _ => bail!("Template match has unexpected type"),
4622                            }
4623
4624                            // Push the generated layer into the config
4625                            layer_config.specs.push(genspec);
4626                        }
4627                    }
4628                }
4629
4630                None => {
4631                    layer_config.specs.push(spec);
4632                }
4633            }
4634        }
4635    }
4636
4637    for spec in layer_config.specs.iter_mut() {
4638        let common = spec.kind.common_mut();
4639
4640        if common.slice_us == 0 {
4641            common.slice_us = opts.slice_us;
4642        }
4643
4644        if common.weight == 0 {
4645            common.weight = DEFAULT_LAYER_WEIGHT;
4646        }
4647        common.weight = common.weight.clamp(MIN_LAYER_WEIGHT, MAX_LAYER_WEIGHT);
4648
4649        if common.preempt {
4650            if common.disallow_open_after_us.is_some() {
4651                warn!(
4652                    "Preempt layer {} has non-null disallow_open_after_us, ignored",
4653                    &spec.name
4654                );
4655            }
4656            if common.disallow_preempt_after_us.is_some() {
4657                warn!(
4658                    "Preempt layer {} has non-null disallow_preempt_after_us, ignored",
4659                    &spec.name
4660                );
4661            }
4662            common.disallow_open_after_us = Some(u64::MAX);
4663            common.disallow_preempt_after_us = Some(u64::MAX);
4664        } else {
4665            if common.disallow_open_after_us.is_none() {
4666                common.disallow_open_after_us = Some(*DFL_DISALLOW_OPEN_AFTER_US);
4667            }
4668
4669            if common.disallow_preempt_after_us.is_none() {
4670                common.disallow_preempt_after_us = Some(*DFL_DISALLOW_PREEMPT_AFTER_US);
4671            }
4672        }
4673
4674        if common.idle_smt.is_some() {
4675            warn!("Layer {} has deprecated flag \"idle_smt\"", &spec.name);
4676        }
4677
4678        if common.allow_node_aligned.is_some() {
4679            warn!("Layer {} has deprecated flag \"allow_node_aligned\", node-aligned tasks are now always dispatched on layer DSQs", &spec.name);
4680        }
4681    }
4682
4683    let membw_required = layer_config.specs.iter().any(|spec| match spec.kind {
4684        LayerKind::Confined { membw_gb, .. } | LayerKind::Grouped { membw_gb, .. } => {
4685            membw_gb.is_some()
4686        }
4687        LayerKind::Open { .. } => false,
4688    });
4689
4690    if opts.print_and_exit {
4691        println!("specs={}", serde_json::to_string_pretty(&layer_config)?);
4692        return Ok(());
4693    }
4694
4695    debug!("specs={}", serde_json::to_string_pretty(&layer_config)?);
4696    let hint_to_layer_map = verify_layer_specs(&layer_config.specs)?;
4697
4698    let mut open_object = MaybeUninit::uninit();
4699    loop {
4700        let mut sched = Scheduler::init(
4701            &opts,
4702            &layer_config.specs,
4703            &mut open_object,
4704            &hint_to_layer_map,
4705            membw_required,
4706        )?;
4707        if !sched.run(shutdown.clone())?.should_restart() {
4708            break;
4709        }
4710    }
4711
4712    Ok(())
4713}