scx_layered/
main.rs

1// Copyright (c) Meta Platforms, Inc. and affiliates.
2
3// This software may be used and distributed according to the terms of the
4// GNU General Public License version 2.
5mod bpf_skel;
6mod stats;
7
8use std::collections::BTreeMap;
9use std::collections::HashMap;
10use std::collections::HashSet;
11use std::ffi::CString;
12use std::fs;
13use std::io::Write;
14use std::mem::MaybeUninit;
15use std::ops::Sub;
16use std::path::Path;
17use std::path::PathBuf;
18use std::sync::atomic::AtomicBool;
19use std::sync::atomic::Ordering;
20use std::sync::Arc;
21use std::thread::ThreadId;
22use std::time::Duration;
23use std::time::Instant;
24
25use inotify::{Inotify, WatchMask};
26use libc;
27use std::os::unix::io::AsRawFd;
28
29use anyhow::anyhow;
30use anyhow::bail;
31use anyhow::Context;
32use anyhow::Result;
33pub use bpf_skel::*;
34use clap::Parser;
35use crossbeam::channel::Receiver;
36use crossbeam::select;
37use lazy_static::lazy_static;
38use libbpf_rs::libbpf_sys;
39use libbpf_rs::AsRawLibbpf;
40use libbpf_rs::MapCore as _;
41use libbpf_rs::OpenObject;
42use libbpf_rs::ProgramInput;
43use nix::sched::CpuSet;
44use nvml_wrapper::error::NvmlError;
45use nvml_wrapper::Nvml;
46use once_cell::sync::OnceCell;
47use regex::Regex;
48use scx_bpf_compat;
49use scx_layered::*;
50use scx_raw_pmu::PMUManager;
51use scx_stats::prelude::*;
52use scx_utils::build_id;
53use scx_utils::compat;
54use scx_utils::init_libbpf_logging;
55use scx_utils::libbpf_clap_opts::LibbpfOpts;
56use scx_utils::perf;
57use scx_utils::pm::{cpu_idle_resume_latency_supported, update_cpu_idle_resume_latency};
58use scx_utils::read_netdevs;
59use scx_utils::scx_enums;
60use scx_utils::scx_ops_attach;
61use scx_utils::scx_ops_load;
62use scx_utils::scx_ops_open;
63use scx_utils::uei_exited;
64use scx_utils::uei_report;
65use scx_utils::CoreType;
66use scx_utils::Cpumask;
67use scx_utils::Llc;
68use scx_utils::NetDev;
69use scx_utils::Topology;
70use scx_utils::TopologyArgs;
71use scx_utils::UserExitInfo;
72use scx_utils::NR_CPUS_POSSIBLE;
73use scx_utils::NR_CPU_IDS;
74use stats::LayerStats;
75use stats::StatsReq;
76use stats::StatsRes;
77use stats::SysStats;
78use std::collections::VecDeque;
79use sysinfo::{Pid, ProcessRefreshKind, ProcessesToUpdate, System};
80use tracing::{debug, error, info, trace, warn};
81use tracing_subscriber::filter::EnvFilter;
82use walkdir::WalkDir;
83
84const SCHEDULER_NAME: &str = "scx_layered";
85const MAX_PATH: usize = bpf_intf::consts_MAX_PATH as usize;
86const MAX_COMM: usize = bpf_intf::consts_MAX_COMM as usize;
87const MAX_LAYER_WEIGHT: u32 = bpf_intf::consts_MAX_LAYER_WEIGHT;
88const MIN_LAYER_WEIGHT: u32 = bpf_intf::consts_MIN_LAYER_WEIGHT;
89const MAX_LAYER_MATCH_ORS: usize = bpf_intf::consts_MAX_LAYER_MATCH_ORS as usize;
90const MAX_LAYER_NAME: usize = bpf_intf::consts_MAX_LAYER_NAME as usize;
91const MAX_LAYERS: usize = bpf_intf::consts_MAX_LAYERS as usize;
92const DEFAULT_LAYER_WEIGHT: u32 = bpf_intf::consts_DEFAULT_LAYER_WEIGHT;
93const USAGE_HALF_LIFE: u32 = bpf_intf::consts_USAGE_HALF_LIFE;
94const USAGE_HALF_LIFE_F64: f64 = USAGE_HALF_LIFE as f64 / 1_000_000_000.0;
95
96const LAYER_USAGE_OWNED: usize = bpf_intf::layer_usage_LAYER_USAGE_OWNED as usize;
97const LAYER_USAGE_OPEN: usize = bpf_intf::layer_usage_LAYER_USAGE_OPEN as usize;
98const LAYER_USAGE_SUM_UPTO: usize = bpf_intf::layer_usage_LAYER_USAGE_SUM_UPTO as usize;
99const LAYER_USAGE_PROTECTED: usize = bpf_intf::layer_usage_LAYER_USAGE_PROTECTED as usize;
100const LAYER_USAGE_PROTECTED_PREEMPT: usize =
101    bpf_intf::layer_usage_LAYER_USAGE_PROTECTED_PREEMPT as usize;
102const NR_LAYER_USAGES: usize = bpf_intf::layer_usage_NR_LAYER_USAGES as usize;
103
104const NR_GSTATS: usize = bpf_intf::global_stat_id_NR_GSTATS as usize;
105const NR_LSTATS: usize = bpf_intf::layer_stat_id_NR_LSTATS as usize;
106const NR_LLC_LSTATS: usize = bpf_intf::llc_layer_stat_id_NR_LLC_LSTATS as usize;
107
108const NR_LAYER_MATCH_KINDS: usize = bpf_intf::layer_match_kind_NR_LAYER_MATCH_KINDS as usize;
109
110static NVML: OnceCell<Nvml> = OnceCell::new();
111
112fn nvml() -> Result<&'static Nvml, NvmlError> {
113    NVML.get_or_try_init(Nvml::init)
114}
115
116lazy_static! {
117    static ref USAGE_DECAY: f64 = 0.5f64.powf(1.0 / USAGE_HALF_LIFE_F64);
118    static ref DFL_DISALLOW_OPEN_AFTER_US: u64 = 2 * scx_enums.SCX_SLICE_DFL / 1000;
119    static ref DFL_DISALLOW_PREEMPT_AFTER_US: u64 = 4 * scx_enums.SCX_SLICE_DFL / 1000;
120    static ref EXAMPLE_CONFIG: LayerConfig = LayerConfig {
121        specs: vec![
122            LayerSpec {
123                name: "batch".into(),
124                comment: Some("tasks under system.slice or tasks with nice value > 0".into()),
125                cpuset: None,
126                template: None,
127                matches: vec![
128                    vec![LayerMatch::CgroupPrefix("system.slice/".into())],
129                    vec![LayerMatch::NiceAbove(0)],
130                ],
131                kind: LayerKind::Confined {
132                    util_range: (0.8, 0.9),
133                    cpus_range: Some((0, 16)),
134                    cpus_range_frac: None,
135                    protected: false,
136                    membw_gb: None,
137                    common: LayerCommon {
138                        min_exec_us: 1000,
139                        yield_ignore: 0.0,
140                        preempt: false,
141                        preempt_first: false,
142                        exclusive: false,
143                        allow_node_aligned: false,
144                        skip_remote_node: false,
145                        prev_over_idle_core: false,
146                        idle_smt: None,
147                        slice_us: 20000,
148                        fifo: false,
149                        weight: DEFAULT_LAYER_WEIGHT,
150                        disallow_open_after_us: None,
151                        disallow_preempt_after_us: None,
152                        xllc_mig_min_us: 1000.0,
153                        growth_algo: LayerGrowthAlgo::Sticky,
154                        idle_resume_us: None,
155                        perf: 1024,
156                        nodes: vec![],
157                        llcs: vec![],
158                        member_expire_ms: 0,
159                        placement: LayerPlacement::Standard,
160                    },
161                },
162            },
163            LayerSpec {
164                name: "immediate".into(),
165                comment: Some("tasks under workload.slice with nice value < 0".into()),
166                cpuset: None,
167                template: None,
168                matches: vec![vec![
169                    LayerMatch::CgroupPrefix("workload.slice/".into()),
170                    LayerMatch::NiceBelow(0),
171                ]],
172                kind: LayerKind::Open {
173                    common: LayerCommon {
174                        min_exec_us: 100,
175                        yield_ignore: 0.25,
176                        preempt: true,
177                        preempt_first: false,
178                        exclusive: true,
179                        allow_node_aligned: true,
180                        skip_remote_node: false,
181                        prev_over_idle_core: true,
182                        idle_smt: None,
183                        slice_us: 20000,
184                        fifo: false,
185                        weight: DEFAULT_LAYER_WEIGHT,
186                        disallow_open_after_us: None,
187                        disallow_preempt_after_us: None,
188                        xllc_mig_min_us: 0.0,
189                        growth_algo: LayerGrowthAlgo::Sticky,
190                        perf: 1024,
191                        idle_resume_us: None,
192                        nodes: vec![],
193                        llcs: vec![],
194                        member_expire_ms: 0,
195                        placement: LayerPlacement::Standard,
196                    },
197                },
198            },
199            LayerSpec {
200                name: "stress-ng".into(),
201                comment: Some("stress-ng test layer".into()),
202                cpuset: None,
203                template: None,
204                matches: vec![
205                    vec![LayerMatch::CommPrefix("stress-ng".into()),],
206                    vec![LayerMatch::PcommPrefix("stress-ng".into()),]
207                ],
208                kind: LayerKind::Confined {
209                    cpus_range: None,
210                    util_range: (0.2, 0.8),
211                    protected: false,
212                    cpus_range_frac: None,
213                    membw_gb: None,
214                    common: LayerCommon {
215                        min_exec_us: 800,
216                        yield_ignore: 0.0,
217                        preempt: true,
218                        preempt_first: false,
219                        exclusive: false,
220                        allow_node_aligned: false,
221                        skip_remote_node: false,
222                        prev_over_idle_core: false,
223                        idle_smt: None,
224                        slice_us: 800,
225                        fifo: false,
226                        weight: DEFAULT_LAYER_WEIGHT,
227                        disallow_open_after_us: None,
228                        disallow_preempt_after_us: None,
229                        xllc_mig_min_us: 0.0,
230                        growth_algo: LayerGrowthAlgo::Topo,
231                        perf: 1024,
232                        idle_resume_us: None,
233                        nodes: vec![],
234                        llcs: vec![],
235                        member_expire_ms: 0,
236                        placement: LayerPlacement::Standard,
237                    },
238                },
239            },
240            LayerSpec {
241                name: "normal".into(),
242                comment: Some("the rest".into()),
243                cpuset: None,
244                template: None,
245                matches: vec![vec![]],
246                kind: LayerKind::Grouped {
247                    cpus_range: None,
248                    util_range: (0.5, 0.6),
249                    util_includes_open_cputime: true,
250                    protected: false,
251                    cpus_range_frac: None,
252                    membw_gb: None,
253                    common: LayerCommon {
254                        min_exec_us: 200,
255                        yield_ignore: 0.0,
256                        preempt: false,
257                        preempt_first: false,
258                        exclusive: false,
259                        allow_node_aligned: false,
260                        skip_remote_node: false,
261                        prev_over_idle_core: false,
262                        idle_smt: None,
263                        slice_us: 20000,
264                        fifo: false,
265                        weight: DEFAULT_LAYER_WEIGHT,
266                        disallow_open_after_us: None,
267                        disallow_preempt_after_us: None,
268                        xllc_mig_min_us: 100.0,
269                        growth_algo: LayerGrowthAlgo::Linear,
270                        perf: 1024,
271                        idle_resume_us: None,
272                        nodes: vec![],
273                        llcs: vec![],
274                        member_expire_ms: 0,
275                        placement: LayerPlacement::Standard,
276                    },
277                },
278            },
279        ],
280    };
281}
282
283/// scx_layered: A highly configurable multi-layer sched_ext scheduler
284///
285/// scx_layered allows classifying tasks into multiple layers and applying
286/// different scheduling policies to them. The configuration is specified in
287/// json and composed of two parts - matches and policies.
288///
289/// Matches
290/// =======
291///
292/// Whenever a task is forked or its attributes are changed, the task goes
293/// through a series of matches to determine the layer it belongs to. A
294/// match set is composed of OR groups of AND blocks. An example:
295///
296///   "matches": [
297///     [
298///       {
299///         "CgroupPrefix": "system.slice/"
300///       }
301///     ],
302///     [
303///       {
304///         "CommPrefix": "fbagent"
305///       },
306///       {
307///         "NiceAbove": 0
308///       }
309///     ]
310///   ],
311///
312/// The outer array contains the OR groups and the inner AND blocks, so the
313/// above matches:
314///
315/// - Tasks which are in the cgroup sub-hierarchy under "system.slice".
316///
317/// - Or tasks whose comm starts with "fbagent" and have a nice value > 0.
318///
319/// Currently, the following matches are supported:
320///
321/// - CgroupPrefix: Matches the prefix of the cgroup that the task belongs
322///   to. As this is a string match, whether the pattern has the trailing
323///   '/' makes a difference. For example, "TOP/CHILD/" only matches tasks
324///   which are under that particular cgroup while "TOP/CHILD" also matches
325///   tasks under "TOP/CHILD0/" or "TOP/CHILD1/".
326///
327/// - CommPrefix: Matches the task's comm prefix.
328///
329/// - PcommPrefix: Matches the task's thread group leader's comm prefix.
330///
331/// - NiceAbove: Matches if the task's nice value is greater than the
332///   pattern.
333///
334/// - NiceBelow: Matches if the task's nice value is smaller than the
335///   pattern.
336///
337/// - NiceEquals: Matches if the task's nice value is exactly equal to
338///   the pattern.
339///
340/// - UIDEquals: Matches if the task's effective user id matches the value
341///
342/// - GIDEquals: Matches if the task's effective group id matches the value.
343///
344/// - PIDEquals: Matches if the task's pid matches the value.
345///
346/// - PPIDEquals: Matches if the task's ppid matches the value.
347///
348/// - TGIDEquals: Matches if the task's tgid matches the value.
349///
350/// - NSPIDEquals: Matches if the task's namespace id and pid matches the values.
351///
352/// - NSEquals: Matches if the task's namespace id matches the values.
353///
354/// - IsGroupLeader: Bool. When true, matches if the task is group leader
355///   (i.e. PID == TGID), aka the thread from which other threads are made.
356///   When false, matches if the task is *not* the group leader (i.e. the rest).
357///
358/// - CmdJoin: Matches when the task uses pthread_setname_np to send a join/leave
359/// command to the scheduler. See examples/cmdjoin.c for more details.
360///
361/// - UsedGpuTid: Bool. When true, matches if the tasks which have used
362///   gpus by tid.
363///
364/// - UsedGpuPid: Bool. When true, matches if the tasks which have used gpu
365///   by tgid/pid.
366///
367/// - [EXPERIMENTAL] AvgRuntime: (u64, u64). Match tasks whose average runtime
368///   is within the provided values [min, max).
369///
370/// - HintEquals: u64. Match tasks whose hint value equals this value.
371///   The value must be in the range [0, 1024].
372///
373/// - SystemCpuUtilBelow: f64. Match when the system CPU utilization fraction
374///   is below the specified threshold (a value in the range [0.0, 1.0]). This
375///   option can only be used in conjunction with HintEquals.
376///
377/// - DsqInsertBelow: f64. Match when the layer DSQ insertion fraction is below
378///   the specified threshold (a value in the range [0.0, 1.0]). This option can
379///   only be used in conjunction with HintEquals.
380///
381/// While there are complexity limitations as the matches are performed in
382/// BPF, it is straightforward to add more types of matches.
383///
384/// Templates
385/// ---------
386///
387/// Templates let us create a variable number of layers dynamically at initialization
388/// time out of a cgroup name suffix/prefix. Sometimes we know there are multiple
389/// applications running on a machine, each with their own cgroup but do not know the
390/// exact names of the applications or cgroups, e.g., in cloud computing contexts where
391/// workloads are placed on machines dynamically and run under cgroups whose name is
392/// autogenerated. In that case, we cannot hardcode the cgroup match rules when writing
393/// the configuration. We thus cannot easily prevent tasks from different cgroups from
394/// falling into the same layer and affecting each other's performance.
395///
396///
397/// Templates offer a solution to this problem by generating one layer for each such cgroup,
398/// provided these cgroups share a suffix, and that the suffix is unique to them. Templates
399/// have a cgroup suffix rule that we use to find the relevant cgroups in the system. For each
400/// such cgroup, we copy the layer config and add a matching rule that matches just this cgroup.
401///
402///
403/// Policies
404/// ========
405///
406/// The following is an example policy configuration for a layer.
407///
408///   "kind": {
409///     "Confined": {
410///       "cpus_range": [1, 8],
411///       "util_range": [0.8, 0.9]
412///     }
413///   }
414///
415/// It's of "Confined" kind, which tries to concentrate the layer's tasks
416/// into a limited number of CPUs. In the above case, the number of CPUs
417/// assigned to the layer is scaled between 1 and 8 so that the per-cpu
418/// utilization is kept between 80% and 90%. If the CPUs are loaded higher
419/// than 90%, more CPUs are allocated to the layer. If the utilization drops
420/// below 80%, the layer loses CPUs.
421///
422/// Currently, the following policy kinds are supported:
423///
424/// - Confined: Tasks are restricted to the allocated CPUs. The number of
425///   CPUs allocated is modulated to keep the per-CPU utilization in
426///   "util_range". The range can optionally be restricted with the
427///   "cpus_range" property.
428///
429/// - Grouped: Similar to Confined but tasks may spill outside if there are
430///   idle CPUs outside the allocated ones. The range can optionally be
431///   restricted with the "cpus_range" property.
432///
433/// - Open: Prefer the CPUs which are not occupied by Confined or Grouped
434///   layers. Tasks in this group will spill into occupied CPUs if there are
435///   no unoccupied idle CPUs.
436///
437/// All layers take the following options:
438///
439/// - min_exec_us: Minimum execution time in microseconds. Whenever a task
440///   is scheduled in, this is the minimum CPU time that it's charged no
441///   matter how short the actual execution time may be.
442///
443/// - yield_ignore: Yield ignore ratio. If 0.0, yield(2) forfeits a whole
444///   execution slice. 0.25 yields three quarters of an execution slice and
445///   so on. If 1.0, yield is completely ignored.
446///
447/// - slice_us: Scheduling slice duration in microseconds.
448///
449/// - fifo: Use FIFO queues within the layer instead of the default vtime.
450///
451/// - preempt: If true, tasks in the layer will preempt tasks which belong
452///   to other non-preempting layers when no idle CPUs are available.
453///
454/// - preempt_first: If true, tasks in the layer will try to preempt tasks
455///   in their previous CPUs before trying to find idle CPUs.
456///
457/// - exclusive: If true, tasks in the layer will occupy the whole core. The
458///   other logical CPUs sharing the same core will be kept idle. This isn't
459///   a hard guarantee, so don't depend on it for security purposes.
460///
461/// - allow_node_aligned: Put node aligned tasks on layer DSQs instead of lo
462///   fallback. This is a hack to support node-affine tasks without making
463///   the whole scheduler node aware and should only be used with open
464///   layers on non-saturated machines to avoid possible stalls.
465///
466/// - prev_over_idle_core: On SMT enabled systems, prefer using the same CPU
467///   when picking a CPU for tasks on this layer, even if that CPUs SMT
468///   sibling is processing a task.
469///
470/// - weight: Weight of the layer, which is a range from 1 to 10000 with a
471///   default of 100. Layer weights are used during contention to prevent
472///   starvation across layers. Weights are used in combination with
473///   utilization to determine the infeasible adjusted weight with higher
474///   weights having a larger adjustment in adjusted utilization.
475///
476/// - disallow_open_after_us: Duration to wait after machine reaches saturation
477///   before confining tasks in Open layers.
478///
479/// - cpus_range_frac: Array of 2 floats between 0 and 1.0. Lower and upper
480///   bound fractions of all CPUs to give to a layer. Mutually exclusive
481///   with cpus_range.
482///
483/// - disallow_preempt_after_us: Duration to wait after machine reaches saturation
484///   before confining tasks to preempt.
485///
486/// - xllc_mig_min_us: Skip cross-LLC migrations if they are likely to run on
487///   their existing LLC sooner than this.
488///
489/// - idle_smt: *** DEPRECATED ****
490///
491/// - growth_algo: When a layer is allocated new CPUs different algorithms can
492///   be used to determine which CPU should be allocated next. The default
493///   algorithm is a "sticky" algorithm that attempts to spread layers evenly
494///   across cores.
495///
496/// - perf: CPU performance target. 0 means no configuration. A value
497///   between 1 and 1024 indicates the performance level CPUs running tasks
498///   in this layer are configured to using scx_bpf_cpuperf_set().
499///
500/// - idle_resume_us: Sets the idle resume QoS value. CPU idle time governors are expected to
501///   regard the minimum of the global (effective) CPU latency limit and the effective resume
502///   latency constraint for the given CPU as the upper limit for the exit latency of the idle
503///   states. See the latest kernel docs for more details:
504///   https://www.kernel.org/doc/html/latest/admin-guide/pm/cpuidle.html
505///
506/// - nodes: If set the layer will use the set of NUMA nodes for scheduling
507///   decisions. If unset then all available NUMA nodes will be used. If the
508///   llcs value is set the cpuset of NUMA nodes will be or'ed with the LLC
509///   config.
510///
511/// - llcs: If set the layer will use the set of LLCs (last level caches)
512///   for scheduling decisions. If unset then all LLCs will be used. If
513///   the nodes value is set the cpuset of LLCs will be or'ed with the nodes
514///   config.
515///
516///
517/// Similar to matches, adding new policies and extending existing ones
518/// should be relatively straightforward.
519///
520/// Configuration example and running scx_layered
521/// =============================================
522///
523/// An scx_layered config is composed of layer configs. A layer config is
524/// composed of a name, a set of matches, and a policy block. Running the
525/// following will write an example configuration into example.json.
526///
527///   $ scx_layered -e example.json
528///
529/// Note that the last layer in the configuration must have an empty match set
530/// as a catch-all for tasks which haven't been matched into previous layers.
531///
532/// The configuration can be specified in multiple json files and
533/// command line arguments, which are concatenated in the specified
534/// order. Each must contain valid layer configurations.
535///
536/// By default, an argument to scx_layered is interpreted as a JSON string. If
537/// the argument is a pointer to a JSON file, it should be prefixed with file:
538/// or f: as follows:
539///
540///   $ scx_layered file:example.json
541///   ...
542///   $ scx_layered f:example.json
543///
544/// Monitoring Statistics
545/// =====================
546///
547/// Run with `--stats INTERVAL` to enable stats monitoring. There is
548/// also an scx_stat server listening on /var/run/scx/root/stat that can
549/// be monitored by running `scx_layered --monitor INTERVAL` separately.
550///
551///   ```bash
552///   $ scx_layered --monitor 1
553///   tot= 117909 local=86.20 open_idle= 0.21 affn_viol= 1.37 proc=6ms
554///   busy= 34.2 util= 1733.6 load=  21744.1 fallback_cpu=  1
555///     batch    : util/frac=   11.8/  0.7 load/frac=     29.7:  0.1 tasks=  2597
556///                tot=   3478 local=67.80 open_idle= 0.00 preempt= 0.00 affn_viol= 0.00
557///                cpus=  2 [  2,  2] 04000001 00000000
558///     immediate: util/frac= 1218.8/ 70.3 load/frac=  21399.9: 98.4 tasks=  1107
559///                tot=  68997 local=90.57 open_idle= 0.26 preempt= 9.36 affn_viol= 0.00
560///                cpus= 50 [ 50, 50] fbfffffe 000fffff
561///     normal   : util/frac=  502.9/ 29.0 load/frac=    314.5:  1.4 tasks=  3512
562///                tot=  45434 local=80.97 open_idle= 0.16 preempt= 0.00 affn_viol= 3.56
563///                cpus= 50 [ 50, 50] fbfffffe 000fffff
564///   ```
565///
566/// Global statistics: see [`SysStats`]
567///
568/// Per-layer statistics: see [`LayerStats`]
569///
570#[derive(Debug, Parser)]
571#[command(verbatim_doc_comment)]
572struct Opts {
573    /// Deprecated, noop, use RUST_LOG or --log-level instead.
574    #[clap(short = 'v', long, action = clap::ArgAction::Count)]
575    verbose: u8,
576
577    /// Scheduling slice duration in microseconds.
578    #[clap(short = 's', long, default_value = "20000")]
579    slice_us: u64,
580
581    /// Maximum consecutive execution time in microseconds. A task may be
582    /// allowed to keep executing on a CPU for this long. Note that this is
583    /// the upper limit and a task may have to moved off the CPU earlier. 0
584    /// indicates default - 20 * slice_us.
585    #[clap(short = 'M', long, default_value = "0")]
586    max_exec_us: u64,
587
588    /// Scheduling interval in seconds.
589    #[clap(short = 'i', long, default_value = "0.1")]
590    interval: f64,
591
592    /// ***DEPRECATED*** Disable load-fraction based max layer CPU limit.
593    /// recommended.
594    #[clap(short = 'n', long, default_value = "false")]
595    no_load_frac_limit: bool,
596
597    /// Exit debug dump buffer length. 0 indicates default.
598    #[clap(long, default_value = "0")]
599    exit_dump_len: u32,
600
601    /// Specify the logging level. Accepts rust's envfilter syntax for modular
602    /// logging: https://docs.rs/tracing-subscriber/latest/tracing_subscriber/filter/struct.EnvFilter.html#example-syntax. Examples: ["info", "warn,tokio=info"]
603    #[clap(long, default_value = "info")]
604    log_level: String,
605
606    /// Disable topology awareness. When enabled, the "nodes" and "llcs" settings on
607    /// a layer are ignored. Defaults to false on topologies with multiple NUMA nodes
608    /// or LLCs, and true otherwise.
609    #[arg(short = 't', long, num_args = 0..=1, default_missing_value = "true", require_equals = true)]
610    disable_topology: Option<bool>,
611
612    /// Enable cross NUMA preemption.
613    #[clap(long)]
614    xnuma_preemption: bool,
615
616    /// Disable monitor
617    #[clap(long)]
618    monitor_disable: bool,
619
620    /// Write example layer specifications into the file and exit.
621    #[clap(short = 'e', long)]
622    example: Option<String>,
623
624    /// ***DEPRECATED*** Disables preemption if the weighted load fraction
625    /// of a layer (load_frac_adj) exceeds the threshold. The default is
626    /// disabled (0.0).
627    #[clap(long, default_value = "0.0")]
628    layer_preempt_weight_disable: f64,
629
630    /// ***DEPRECATED*** Disables layer growth if the weighted load fraction
631    /// of a layer (load_frac_adj) exceeds the threshold. The default is
632    /// disabled (0.0).
633    #[clap(long, default_value = "0.0")]
634    layer_growth_weight_disable: f64,
635
636    /// Enable stats monitoring with the specified interval.
637    #[clap(long)]
638    stats: Option<f64>,
639
640    /// Run in stats monitoring mode with the specified interval. Scheduler
641    /// is not launched.
642    #[clap(long)]
643    monitor: Option<f64>,
644
645    /// Run with example layer specifications (useful for e.g. CI pipelines)
646    #[clap(long)]
647    run_example: bool,
648
649    /// ***DEPRECATED *** Enables iteration over local LLCs first for
650    /// dispatch.
651    #[clap(long, default_value = "false")]
652    local_llc_iteration: bool,
653
654    /// Low priority fallback DSQs are used to execute tasks with custom CPU
655    /// affinities. These DSQs are immediately executed iff a CPU is
656    /// otherwise idle. However, after the specified wait, they are
657    /// guaranteed upto --lo-fb-share fraction of each CPU.
658    #[clap(long, default_value = "10000")]
659    lo_fb_wait_us: u64,
660
661    /// The fraction of CPU time guaranteed to low priority fallback DSQs.
662    /// See --lo-fb-wait-us.
663    #[clap(long, default_value = ".05")]
664    lo_fb_share: f64,
665
666    /// Disable antistall
667    #[clap(long, default_value = "false")]
668    disable_antistall: bool,
669
670    /// Enable numa topology based gpu task affinitization.
671    #[clap(long, default_value = "false")]
672    enable_gpu_affinitize: bool,
673
674    /// Interval at which to reaffinitize gpu tasks to numa nodes.
675    /// Defaults to 900s
676    #[clap(long, default_value = "900")]
677    gpu_affinitize_secs: u64,
678
679    /// Enable match debug
680    /// This stores a mapping of task tid
681    /// to layer id such that bpftool map dump
682    /// can be used to debug layer matches.
683    #[clap(long, default_value = "false")]
684    enable_match_debug: bool,
685
686    /// Maximum task runnable_at delay (in seconds) before antistall turns on
687    #[clap(long, default_value = "3")]
688    antistall_sec: u64,
689
690    /// Enable gpu support
691    #[clap(long, default_value = "false")]
692    enable_gpu_support: bool,
693
694    /// Gpu Kprobe Level
695    /// The value set here determines how aggressive
696    /// the kprobes enabled on gpu driver functions are.
697    /// Higher values are more aggressive, incurring more system overhead
698    /// and more accurately identifying PIDs using GPUs in a more timely manner.
699    /// Lower values incur less system overhead, at the cost of less accurately
700    /// identifying GPU pids and taking longer to do so.
701    #[clap(long, default_value = "3")]
702    gpu_kprobe_level: u64,
703
704    /// Enable netdev IRQ balancing. This is experimental and should be used with caution.
705    #[clap(long, default_value = "false")]
706    netdev_irq_balance: bool,
707
708    /// Disable queued wakeup optimization.
709    #[clap(long, default_value = "false")]
710    disable_queued_wakeup: bool,
711
712    /// Per-cpu kthreads are preempting by default. Make it not so.
713    #[clap(long, default_value = "false")]
714    disable_percpu_kthread_preempt: bool,
715
716    /// Only highpri (nice < 0) per-cpu kthreads are preempting by default.
717    /// Make every per-cpu kthread preempting. Meaningful only if
718    /// --disable-percpu-kthread-preempt is not set.
719    #[clap(long, default_value = "false")]
720    percpu_kthread_preempt_all: bool,
721
722    /// Print scheduler version and exit.
723    #[clap(short = 'V', long, action = clap::ArgAction::SetTrue)]
724    version: bool,
725
726    /// Optional run ID for tracking scheduler instances.
727    #[clap(long)]
728    run_id: Option<u64>,
729
730    /// Show descriptions for statistics.
731    #[clap(long)]
732    help_stats: bool,
733
734    /// Layer specification. See --help.
735    specs: Vec<String>,
736
737    /// Periodically force tasks in layers using the AvgRuntime match rule to reevaluate which layer they belong to. Default period of 2s.
738    /// turns this off.
739    #[clap(long, default_value = "2000")]
740    layer_refresh_ms_avgruntime: u64,
741
742    /// Set the path for pinning the task hint map.
743    #[clap(long, default_value = "")]
744    task_hint_map: String,
745
746    /// Print the config (after template expansion) and exit.
747    #[clap(long, default_value = "false")]
748    print_and_exit: bool,
749
750    /// Enable affinitized task to use hi fallback queue to get more CPU time.
751    #[clap(long, default_value = "")]
752    hi_fb_thread_name: String,
753
754    #[clap(flatten, next_help_heading = "Topology Options")]
755    topology: TopologyArgs,
756
757    #[clap(flatten, next_help_heading = "Libbpf Options")]
758    pub libbpf: LibbpfOpts,
759}
760
761// Cgroup event types for inter-thread communication
762#[derive(Debug, Clone)]
763enum CgroupEvent {
764    Created {
765        path: String,
766        cgroup_id: u64,    // inode number
767        match_bitmap: u64, // bitmap of matched regex rules
768    },
769    Removed {
770        path: String,
771        cgroup_id: u64, // inode number
772    },
773}
774
775fn read_total_cpu(reader: &fb_procfs::ProcReader) -> Result<fb_procfs::CpuStat> {
776    reader
777        .read_stat()
778        .context("Failed to read procfs")?
779        .total_cpu
780        .ok_or_else(|| anyhow!("Could not read total cpu stat in proc"))
781}
782
783fn calc_util(curr: &fb_procfs::CpuStat, prev: &fb_procfs::CpuStat) -> Result<f64> {
784    match (curr, prev) {
785        (
786            fb_procfs::CpuStat {
787                user_usec: Some(curr_user),
788                nice_usec: Some(curr_nice),
789                system_usec: Some(curr_system),
790                idle_usec: Some(curr_idle),
791                iowait_usec: Some(curr_iowait),
792                irq_usec: Some(curr_irq),
793                softirq_usec: Some(curr_softirq),
794                stolen_usec: Some(curr_stolen),
795                ..
796            },
797            fb_procfs::CpuStat {
798                user_usec: Some(prev_user),
799                nice_usec: Some(prev_nice),
800                system_usec: Some(prev_system),
801                idle_usec: Some(prev_idle),
802                iowait_usec: Some(prev_iowait),
803                irq_usec: Some(prev_irq),
804                softirq_usec: Some(prev_softirq),
805                stolen_usec: Some(prev_stolen),
806                ..
807            },
808        ) => {
809            let idle_usec = curr_idle.saturating_sub(*prev_idle);
810            let iowait_usec = curr_iowait.saturating_sub(*prev_iowait);
811            let user_usec = curr_user.saturating_sub(*prev_user);
812            let system_usec = curr_system.saturating_sub(*prev_system);
813            let nice_usec = curr_nice.saturating_sub(*prev_nice);
814            let irq_usec = curr_irq.saturating_sub(*prev_irq);
815            let softirq_usec = curr_softirq.saturating_sub(*prev_softirq);
816            let stolen_usec = curr_stolen.saturating_sub(*prev_stolen);
817
818            let busy_usec =
819                user_usec + system_usec + nice_usec + irq_usec + softirq_usec + stolen_usec;
820            let total_usec = idle_usec + busy_usec + iowait_usec;
821            if total_usec > 0 {
822                Ok(((busy_usec as f64) / (total_usec as f64)).clamp(0.0, 1.0))
823            } else {
824                Ok(1.0)
825            }
826        }
827        _ => bail!("Missing stats in cpustat"),
828    }
829}
830
831fn copy_into_cstr(dst: &mut [i8], src: &str) {
832    let cstr = CString::new(src).unwrap();
833    let bytes = unsafe { std::mem::transmute::<&[u8], &[i8]>(cstr.as_bytes_with_nul()) };
834    dst[0..bytes.len()].copy_from_slice(bytes);
835}
836
837fn nodemask_from_nodes(nodes: &Vec<usize>) -> usize {
838    let mut mask = 0;
839    for node in nodes {
840        mask |= 1 << node;
841    }
842    mask
843}
844
845fn llcmask_from_llcs(llcs: &BTreeMap<usize, Arc<Llc>>) -> usize {
846    let mut mask = 0;
847    for (_, cache) in llcs {
848        mask |= 1 << cache.id;
849    }
850    mask
851}
852
853fn read_cpu_ctxs(skel: &BpfSkel) -> Result<Vec<bpf_intf::cpu_ctx>> {
854    let mut cpu_ctxs = vec![];
855    let cpu_ctxs_vec = skel
856        .maps
857        .cpu_ctxs
858        .lookup_percpu(&0u32.to_ne_bytes(), libbpf_rs::MapFlags::ANY)
859        .context("Failed to lookup cpu_ctx")?
860        .unwrap();
861    for cpu in 0..*NR_CPUS_POSSIBLE {
862        cpu_ctxs.push(*unsafe {
863            &*(cpu_ctxs_vec[cpu].as_slice().as_ptr() as *const bpf_intf::cpu_ctx)
864        });
865    }
866    Ok(cpu_ctxs)
867}
868
869#[derive(Clone, Debug)]
870struct BpfStats {
871    gstats: Vec<u64>,
872    lstats: Vec<Vec<u64>>,
873    lstats_sums: Vec<u64>,
874    llc_lstats: Vec<Vec<Vec<u64>>>, // [layer][llc][stat]
875}
876
877impl BpfStats {
878    fn read(skel: &BpfSkel, cpu_ctxs: &[bpf_intf::cpu_ctx]) -> Self {
879        let nr_layers = skel.maps.rodata_data.as_ref().unwrap().nr_layers as usize;
880        let nr_llcs = skel.maps.rodata_data.as_ref().unwrap().nr_llcs as usize;
881        let mut gstats = vec![0u64; NR_GSTATS];
882        let mut lstats = vec![vec![0u64; NR_LSTATS]; nr_layers];
883        let mut llc_lstats = vec![vec![vec![0u64; NR_LLC_LSTATS]; nr_llcs]; nr_layers];
884
885        for cpu in 0..*NR_CPUS_POSSIBLE {
886            for stat in 0..NR_GSTATS {
887                gstats[stat] += cpu_ctxs[cpu].gstats[stat];
888            }
889            for layer in 0..nr_layers {
890                for stat in 0..NR_LSTATS {
891                    lstats[layer][stat] += cpu_ctxs[cpu].lstats[layer][stat];
892                }
893            }
894        }
895
896        let mut lstats_sums = vec![0u64; NR_LSTATS];
897        for layer in 0..nr_layers {
898            for stat in 0..NR_LSTATS {
899                lstats_sums[stat] += lstats[layer][stat];
900            }
901        }
902
903        for llc_id in 0..nr_llcs {
904            // XXX - This would be a lot easier if llc_ctx were in
905            // the bss. Unfortunately, kernel < v6.12 crashes and
906            // kernel >= v6.12 fails verification after such
907            // conversion due to seemingly verifier bugs. Convert to
908            // bss maps later.
909            let key = llc_id as u32;
910            let llc_id_slice =
911                unsafe { std::slice::from_raw_parts((&key as *const u32) as *const u8, 4) };
912            let v = skel
913                .maps
914                .llc_data
915                .lookup(llc_id_slice, libbpf_rs::MapFlags::ANY)
916                .unwrap()
917                .unwrap();
918            let llcc = unsafe { *(v.as_slice().as_ptr() as *const bpf_intf::llc_ctx) };
919
920            for layer_id in 0..nr_layers {
921                for stat_id in 0..NR_LLC_LSTATS {
922                    llc_lstats[layer_id][llc_id][stat_id] = llcc.lstats[layer_id][stat_id];
923                }
924            }
925        }
926
927        Self {
928            gstats,
929            lstats,
930            lstats_sums,
931            llc_lstats,
932        }
933    }
934}
935
936impl<'a, 'b> Sub<&'b BpfStats> for &'a BpfStats {
937    type Output = BpfStats;
938
939    fn sub(self, rhs: &'b BpfStats) -> BpfStats {
940        let vec_sub = |l: &[u64], r: &[u64]| l.iter().zip(r.iter()).map(|(l, r)| *l - *r).collect();
941        BpfStats {
942            gstats: vec_sub(&self.gstats, &rhs.gstats),
943            lstats: self
944                .lstats
945                .iter()
946                .zip(rhs.lstats.iter())
947                .map(|(l, r)| vec_sub(l, r))
948                .collect(),
949            lstats_sums: vec_sub(&self.lstats_sums, &rhs.lstats_sums),
950            llc_lstats: self
951                .llc_lstats
952                .iter()
953                .zip(rhs.llc_lstats.iter())
954                .map(|(l_layer, r_layer)| {
955                    l_layer
956                        .iter()
957                        .zip(r_layer.iter())
958                        .map(|(l_llc, r_llc)| {
959                            let (l_llc, mut r_llc) = (l_llc.clone(), r_llc.clone());
960                            // Lat is not subtractable, take L side.
961                            r_llc[bpf_intf::llc_layer_stat_id_LLC_LSTAT_LAT as usize] = 0;
962                            vec_sub(&l_llc, &r_llc)
963                        })
964                        .collect()
965                })
966                .collect(),
967        }
968    }
969}
970
971#[derive(Clone, Debug)]
972struct Stats {
973    at: Instant,
974    elapsed: Duration,
975    nr_layers: usize,
976    nr_layer_tasks: Vec<usize>,
977    nr_nodes: usize,
978
979    total_util: f64, // Running AVG of sum of layer_utils
980    layer_utils: Vec<Vec<f64>>,
981    prev_layer_usages: Vec<Vec<u64>>,
982
983    layer_membws: Vec<Vec<f64>>, // Estimated memory bandsidth consumption
984    prev_layer_membw_agg: Vec<Vec<u64>>, // Estimated aggregate membw consumption
985
986    cpu_busy: f64, // Read from /proc, maybe higher than total_util
987    prev_total_cpu: fb_procfs::CpuStat,
988    prev_pmu_resctrl_membw: (u64, u64), // (PMU-reported membw, resctrl-reported membw)
989
990    system_cpu_util_ewma: f64,       // 10s EWMA of system CPU utilization
991    layer_dsq_insert_ewma: Vec<f64>, // 10s EWMA of per-layer DSQ insertion ratio
992
993    bpf_stats: BpfStats,
994    prev_bpf_stats: BpfStats,
995
996    processing_dur: Duration,
997    prev_processing_dur: Duration,
998
999    layer_slice_us: Vec<u64>,
1000
1001    gpu_tasks_affinitized: u64,
1002    gpu_task_affinitization_ms: u64,
1003}
1004
1005impl Stats {
1006    fn read_layer_usages(cpu_ctxs: &[bpf_intf::cpu_ctx], nr_layers: usize) -> Vec<Vec<u64>> {
1007        let mut layer_usages = vec![vec![0u64; NR_LAYER_USAGES]; nr_layers];
1008
1009        for cpu in 0..*NR_CPUS_POSSIBLE {
1010            for layer in 0..nr_layers {
1011                for usage in 0..NR_LAYER_USAGES {
1012                    layer_usages[layer][usage] += cpu_ctxs[cpu].layer_usages[layer][usage];
1013                }
1014            }
1015        }
1016
1017        layer_usages
1018    }
1019
1020    // Same as above, but for aggregate memory bandwidth consumption.
1021    fn read_layer_membw_agg(cpu_ctxs: &[bpf_intf::cpu_ctx], nr_layers: usize) -> Vec<Vec<u64>> {
1022        let mut layer_membw_agg = vec![vec![0u64; NR_LAYER_USAGES]; nr_layers];
1023
1024        for cpu in 0..*NR_CPUS_POSSIBLE {
1025            for layer in 0..nr_layers {
1026                for usage in 0..NR_LAYER_USAGES {
1027                    layer_membw_agg[layer][usage] += cpu_ctxs[cpu].layer_membw_agg[layer][usage];
1028                }
1029            }
1030        }
1031
1032        layer_membw_agg
1033    }
1034
1035    /// Use the membw reported by resctrl to normalize the values reported by hw counters.
1036    /// We have the following problem:
1037    /// 1) We want per-task memory bandwidth reporting. We cannot do this with resctrl, much
1038    /// less transparently, since we would require different RMID for each task.
1039    /// 2) We want to directly use perf counters for tracking per-task memory bandwidth, but
1040    /// we can't: Non-resctrl counters do not measure the right thing (e.g., they only measure
1041    /// proxies like load operations),
1042    /// 3) Resctrl counters are not accessible directly so we cannot read them from the BPF side.
1043    ///
1044    /// Approximate per-task memory bandwidth using perf counters to measure _relative_ memory
1045    /// bandwidth usage.
1046    fn resctrl_read_total_membw() -> Result<u64> {
1047        let mut total_membw = 0u64;
1048        for entry in WalkDir::new("/sys/fs/resctrl/mon_data")
1049            .min_depth(1)
1050            .into_iter()
1051            .filter_map(Result::ok)
1052            .filter(|x| x.path().is_dir())
1053        {
1054            let mut path = entry.path().to_path_buf();
1055            path.push("mbm_total_bytes");
1056            total_membw += fs::read_to_string(path)?.trim().parse::<u64>()?;
1057        }
1058
1059        Ok(total_membw)
1060    }
1061
1062    fn new(
1063        skel: &mut BpfSkel,
1064        proc_reader: &fb_procfs::ProcReader,
1065        gpu_task_affinitizer: &GpuTaskAffinitizer,
1066    ) -> Result<Self> {
1067        let nr_layers = skel.maps.rodata_data.as_ref().unwrap().nr_layers as usize;
1068        let cpu_ctxs = read_cpu_ctxs(skel)?;
1069        let bpf_stats = BpfStats::read(skel, &cpu_ctxs);
1070        let nr_nodes = skel.maps.rodata_data.as_ref().unwrap().nr_nodes as usize;
1071        let pmu_membw = Self::read_layer_membw_agg(&cpu_ctxs, nr_layers);
1072
1073        Ok(Self {
1074            at: Instant::now(),
1075            elapsed: Default::default(),
1076            nr_layers,
1077            nr_layer_tasks: vec![0; nr_layers],
1078            nr_nodes,
1079
1080            total_util: 0.0,
1081            layer_utils: vec![vec![0.0; NR_LAYER_USAGES]; nr_layers],
1082            layer_membws: vec![vec![0.0; NR_LAYER_USAGES]; nr_layers],
1083            prev_layer_usages: Self::read_layer_usages(&cpu_ctxs, nr_layers),
1084            // This is not normalized because we don't have enough history to do so.
1085            // It should not matter too much, since the value is dropped on the first
1086            // iteration.
1087            prev_layer_membw_agg: pmu_membw,
1088            prev_pmu_resctrl_membw: (0, 0),
1089
1090            cpu_busy: 0.0,
1091            prev_total_cpu: read_total_cpu(proc_reader)?,
1092            system_cpu_util_ewma: 0.0,
1093            layer_dsq_insert_ewma: vec![0.0; nr_layers],
1094
1095            bpf_stats: bpf_stats.clone(),
1096            prev_bpf_stats: bpf_stats,
1097
1098            processing_dur: Default::default(),
1099            prev_processing_dur: Default::default(),
1100
1101            layer_slice_us: vec![0; nr_layers],
1102            gpu_tasks_affinitized: gpu_task_affinitizer.tasks_affinitized,
1103            gpu_task_affinitization_ms: gpu_task_affinitizer.last_task_affinitization_ms,
1104        })
1105    }
1106
1107    fn refresh(
1108        &mut self,
1109        skel: &mut BpfSkel,
1110        proc_reader: &fb_procfs::ProcReader,
1111        now: Instant,
1112        cur_processing_dur: Duration,
1113        gpu_task_affinitizer: &GpuTaskAffinitizer,
1114    ) -> Result<()> {
1115        let elapsed = now.duration_since(self.at);
1116        let elapsed_f64 = elapsed.as_secs_f64();
1117        let cpu_ctxs = read_cpu_ctxs(skel)?;
1118
1119        let nr_layer_tasks: Vec<usize> = skel
1120            .maps
1121            .bss_data
1122            .as_ref()
1123            .unwrap()
1124            .layers
1125            .iter()
1126            .take(self.nr_layers)
1127            .map(|layer| layer.nr_tasks as usize)
1128            .collect();
1129        let layer_slice_us: Vec<u64> = skel
1130            .maps
1131            .bss_data
1132            .as_ref()
1133            .unwrap()
1134            .layers
1135            .iter()
1136            .take(self.nr_layers)
1137            .map(|layer| layer.slice_ns / 1000_u64)
1138            .collect();
1139
1140        let cur_layer_usages = Self::read_layer_usages(&cpu_ctxs, self.nr_layers);
1141        let cur_layer_membw_agg = Self::read_layer_membw_agg(&cpu_ctxs, self.nr_layers);
1142
1143        // Memory BW normalization. It requires finding the delta according to perf, the delta
1144        // according to resctl, and finding the factor between them. This also helps in
1145        // determining whether this delta is stable.
1146        //
1147        // We scale only the raw PMC delta - the part of the counter incremented between two
1148        // periods. This is because the scaling factor is only valid for that time period.
1149        // Non-scaled samples are not comparable, while scaled ones are.
1150        let (pmu_prev, resctrl_prev) = self.prev_pmu_resctrl_membw;
1151        let pmu_cur: u64 = cur_layer_membw_agg
1152            .iter()
1153            .map(|membw_agg| membw_agg[LAYER_USAGE_OPEN] + membw_agg[LAYER_USAGE_OWNED])
1154            .sum();
1155        let resctrl_cur = Self::resctrl_read_total_membw()?;
1156        let factor = (resctrl_cur - resctrl_prev) as f64 / (pmu_cur - pmu_prev) as f64;
1157
1158        // Computes the runtime deltas and converts them from ns to s.
1159        let compute_diff = |cur_agg: &Vec<Vec<u64>>, prev_agg: &Vec<Vec<u64>>| {
1160            cur_agg
1161                .iter()
1162                .zip(prev_agg.iter())
1163                .map(|(cur, prev)| {
1164                    cur.iter()
1165                        .zip(prev.iter())
1166                        .map(|(c, p)| (c - p) as f64 / 1_000_000_000.0 / elapsed_f64)
1167                        .collect()
1168                })
1169                .collect()
1170        };
1171
1172        // Computes the total memory traffic done since last computation and normalizes to GBs.
1173        // We derive the rate of consumption elsewhere.
1174        let compute_mem_diff = |cur_agg: &Vec<Vec<u64>>, prev_agg: &Vec<Vec<u64>>| {
1175            cur_agg
1176                .iter()
1177                .zip(prev_agg.iter())
1178                .map(|(cur, prev)| {
1179                    cur.iter()
1180                        .zip(prev.iter())
1181                        .map(|(c, p)| (*c as i64 - *p as i64) as f64 / (1024 as f64).powf(3.0))
1182                        .collect()
1183                })
1184                .collect()
1185        };
1186
1187        let cur_layer_utils: Vec<Vec<f64>> =
1188            compute_diff(&cur_layer_usages, &self.prev_layer_usages);
1189
1190        // Scale the raw value delta by the resctrl/pmc computed factor.
1191        let cur_layer_membw: Vec<Vec<f64>> =
1192            compute_mem_diff(&cur_layer_membw_agg, &self.prev_layer_membw_agg);
1193
1194        let cur_layer_membw: Vec<Vec<f64>> = cur_layer_membw
1195            .iter()
1196            .map(|x| x.iter().map(|x| *x * factor).collect())
1197            .collect();
1198
1199        let metric_decay =
1200            |cur_metric: Vec<Vec<f64>>, prev_metric: &Vec<Vec<f64>>, decay_rate: f64| {
1201                cur_metric
1202                    .iter()
1203                    .zip(prev_metric.iter())
1204                    .map(|(cur, prev)| {
1205                        cur.iter()
1206                            .zip(prev.iter())
1207                            .map(|(c, p)| {
1208                                let decay = decay_rate.powf(elapsed_f64);
1209                                p * decay + c * (1.0 - decay)
1210                            })
1211                            .collect()
1212                    })
1213                    .collect()
1214            };
1215
1216        let layer_utils: Vec<Vec<f64>> =
1217            metric_decay(cur_layer_utils, &self.layer_utils, *USAGE_DECAY);
1218        let layer_membws: Vec<Vec<f64>> = metric_decay(cur_layer_membw, &self.layer_membws, 0.0);
1219
1220        let cur_total_cpu = read_total_cpu(proc_reader)?;
1221        let cpu_busy = calc_util(&cur_total_cpu, &self.prev_total_cpu)?;
1222
1223        // Calculate system CPU utilization EWMA (10 second window)
1224        const SYS_CPU_UTIL_EWMA_SECS: f64 = 10.0;
1225        let elapsed_f64 = elapsed.as_secs_f64();
1226        let alpha = elapsed_f64 / SYS_CPU_UTIL_EWMA_SECS.max(elapsed_f64);
1227        let system_cpu_util_ewma = alpha * cpu_busy + (1.0 - alpha) * self.system_cpu_util_ewma;
1228
1229        let cur_bpf_stats = BpfStats::read(skel, &cpu_ctxs);
1230        let bpf_stats = &cur_bpf_stats - &self.prev_bpf_stats;
1231
1232        // Calculate per-layer DSQ insertion EWMA (10 second window)
1233        const DSQ_INSERT_EWMA_SECS: f64 = 10.0;
1234        let dsq_alpha = elapsed_f64 / DSQ_INSERT_EWMA_SECS.max(elapsed_f64);
1235        let layer_dsq_insert_ewma: Vec<f64> = (0..self.nr_layers)
1236            .map(|layer_id| {
1237                let sel_local = bpf_stats.lstats[layer_id]
1238                    [bpf_intf::layer_stat_id_LSTAT_SEL_LOCAL as usize]
1239                    as f64;
1240                let enq_local = bpf_stats.lstats[layer_id]
1241                    [bpf_intf::layer_stat_id_LSTAT_ENQ_LOCAL as usize]
1242                    as f64;
1243                let enq_dsq = bpf_stats.lstats[layer_id]
1244                    [bpf_intf::layer_stat_id_LSTAT_ENQ_DSQ as usize]
1245                    as f64;
1246                let total_dispatches = sel_local + enq_local + enq_dsq;
1247
1248                let cur_ratio = if total_dispatches > 0.0 {
1249                    enq_dsq / total_dispatches
1250                } else {
1251                    0.0
1252                };
1253
1254                dsq_alpha * cur_ratio + (1.0 - dsq_alpha) * self.layer_dsq_insert_ewma[layer_id]
1255            })
1256            .collect();
1257
1258        let processing_dur = cur_processing_dur
1259            .checked_sub(self.prev_processing_dur)
1260            .unwrap();
1261
1262        *self = Self {
1263            at: now,
1264            elapsed,
1265            nr_layers: self.nr_layers,
1266            nr_layer_tasks,
1267            nr_nodes: self.nr_nodes,
1268
1269            total_util: layer_utils
1270                .iter()
1271                .map(|x| x.iter().take(LAYER_USAGE_SUM_UPTO + 1).sum::<f64>())
1272                .sum(),
1273            layer_utils,
1274            layer_membws,
1275            prev_layer_usages: cur_layer_usages,
1276            prev_layer_membw_agg: cur_layer_membw_agg,
1277            // Was updated during normalization.
1278            prev_pmu_resctrl_membw: (pmu_cur, resctrl_cur),
1279
1280            cpu_busy,
1281            prev_total_cpu: cur_total_cpu,
1282            system_cpu_util_ewma,
1283            layer_dsq_insert_ewma,
1284
1285            bpf_stats,
1286            prev_bpf_stats: cur_bpf_stats,
1287
1288            processing_dur,
1289            prev_processing_dur: cur_processing_dur,
1290
1291            layer_slice_us,
1292            gpu_tasks_affinitized: gpu_task_affinitizer.tasks_affinitized,
1293            gpu_task_affinitization_ms: gpu_task_affinitizer.last_task_affinitization_ms,
1294        };
1295        Ok(())
1296    }
1297}
1298
1299#[derive(Debug)]
1300struct Layer {
1301    name: String,
1302    kind: LayerKind,
1303    growth_algo: LayerGrowthAlgo,
1304    core_order: Vec<usize>,
1305
1306    target_llc_cpus: (usize, usize),
1307    assigned_llcs: Vec<usize>,
1308
1309    nr_cpus: usize,
1310    nr_llc_cpus: Vec<usize>,
1311    cpus: Cpumask,
1312    allowed_cpus: Cpumask,
1313}
1314
1315fn get_kallsyms_addr(sym_name: &str) -> Result<u64> {
1316    fs::read_to_string("/proc/kallsyms")?
1317        .lines()
1318        .find(|line| line.contains(sym_name))
1319        .and_then(|line| line.split_whitespace().next())
1320        .and_then(|addr| u64::from_str_radix(addr, 16).ok())
1321        .ok_or_else(|| anyhow!("Symbol '{}' not found", sym_name))
1322}
1323
1324fn resolve_cpus_pct_range(
1325    cpus_range: &Option<(usize, usize)>,
1326    cpus_range_frac: &Option<(f64, f64)>,
1327    max_cpus: usize,
1328) -> Result<(usize, usize)> {
1329    match (cpus_range, cpus_range_frac) {
1330        (Some(_x), Some(_y)) => {
1331            bail!("cpus_range cannot be used with cpus_pct.");
1332        }
1333        (Some((cpus_range_min, cpus_range_max)), None) => Ok((*cpus_range_min, *cpus_range_max)),
1334        (None, Some((cpus_frac_min, cpus_frac_max))) => {
1335            if *cpus_frac_min < 0_f64
1336                || *cpus_frac_min > 1_f64
1337                || *cpus_frac_max < 0_f64
1338                || *cpus_frac_max > 1_f64
1339            {
1340                bail!("cpus_range_frac values must be between 0.0 and 1.0");
1341            }
1342            let cpus_min_count = ((max_cpus as f64) * cpus_frac_min).round_ties_even() as usize;
1343            let cpus_max_count = ((max_cpus as f64) * cpus_frac_max).round_ties_even() as usize;
1344            Ok((
1345                std::cmp::max(cpus_min_count, 1),
1346                std::cmp::min(cpus_max_count, max_cpus),
1347            ))
1348        }
1349        (None, None) => Ok((0, max_cpus)),
1350    }
1351}
1352
1353impl Layer {
1354    fn new(spec: &LayerSpec, topo: &Topology, core_order: &Vec<usize>) -> Result<Self> {
1355        let name = &spec.name;
1356        let kind = spec.kind.clone();
1357        let mut allowed_cpus = Cpumask::new();
1358        match &kind {
1359            LayerKind::Confined {
1360                cpus_range,
1361                cpus_range_frac,
1362                common: LayerCommon { nodes, llcs, .. },
1363                ..
1364            } => {
1365                let cpus_range =
1366                    resolve_cpus_pct_range(cpus_range, cpus_range_frac, topo.all_cpus.len())?;
1367                if cpus_range.0 > cpus_range.1 || cpus_range.1 == 0 {
1368                    bail!("invalid cpus_range {:?}", cpus_range);
1369                }
1370                if nodes.is_empty() && llcs.is_empty() {
1371                    allowed_cpus.set_all();
1372                } else {
1373                    // build up the cpus bitset
1374                    for (node_id, node) in &topo.nodes {
1375                        // first do the matching for nodes
1376                        if nodes.contains(node_id) {
1377                            for &id in node.all_cpus.keys() {
1378                                allowed_cpus.set_cpu(id)?;
1379                            }
1380                        }
1381                        // next match on any LLCs
1382                        for (llc_id, llc) in &node.llcs {
1383                            if llcs.contains(llc_id) {
1384                                for &id in llc.all_cpus.keys() {
1385                                    allowed_cpus.set_cpu(id)?;
1386                                }
1387                            }
1388                        }
1389                    }
1390                }
1391            }
1392            LayerKind::Grouped {
1393                common: LayerCommon { nodes, llcs, .. },
1394                ..
1395            }
1396            | LayerKind::Open {
1397                common: LayerCommon { nodes, llcs, .. },
1398                ..
1399            } => {
1400                if nodes.is_empty() && llcs.is_empty() {
1401                    allowed_cpus.set_all();
1402                } else {
1403                    // build up the cpus bitset
1404                    for (node_id, node) in &topo.nodes {
1405                        // first do the matching for nodes
1406                        if nodes.contains(node_id) {
1407                            for &id in node.all_cpus.keys() {
1408                                allowed_cpus.set_cpu(id)?;
1409                            }
1410                        }
1411                        // next match on any LLCs
1412                        for (llc_id, llc) in &node.llcs {
1413                            if llcs.contains(llc_id) {
1414                                for &id in llc.all_cpus.keys() {
1415                                    allowed_cpus.set_cpu(id)?;
1416                                }
1417                            }
1418                        }
1419                    }
1420                }
1421            }
1422        }
1423
1424        // Util can be above 1.0 for grouped layers if
1425        // util_includes_open_cputime is set.
1426        if let Some(util_range) = kind.util_range() {
1427            if util_range.0 < 0.0 || util_range.1 < 0.0 || util_range.0 >= util_range.1 {
1428                bail!("invalid util_range {:?}", util_range);
1429            }
1430        }
1431
1432        let layer_growth_algo = kind.common().growth_algo.clone();
1433
1434        debug!(
1435            "layer: {} algo: {:?} core order: {:?}",
1436            name, &layer_growth_algo, core_order
1437        );
1438
1439        Ok(Self {
1440            name: name.into(),
1441            kind,
1442            growth_algo: layer_growth_algo,
1443            core_order: core_order.clone(),
1444
1445            target_llc_cpus: (0, 0),
1446            assigned_llcs: vec![],
1447
1448            nr_cpus: 0,
1449            nr_llc_cpus: vec![0; topo.all_llcs.len()],
1450            cpus: Cpumask::new(),
1451            allowed_cpus,
1452        })
1453    }
1454
1455    fn free_some_cpus(&mut self, cpu_pool: &mut CpuPool, max_to_free: usize) -> Result<usize> {
1456        let cpus_to_free = match cpu_pool.next_to_free(&self.cpus, self.core_order.iter().rev())? {
1457            Some(ret) => ret.clone(),
1458            None => return Ok(0),
1459        };
1460
1461        let nr_to_free = cpus_to_free.weight();
1462
1463        Ok(if nr_to_free <= max_to_free {
1464            trace!("[{}] freeing CPUs: {}", self.name, &cpus_to_free);
1465            self.cpus &= &cpus_to_free.not();
1466            self.nr_cpus -= nr_to_free;
1467            for cpu in cpus_to_free.iter() {
1468                self.nr_llc_cpus[cpu_pool.topo.all_cpus[&cpu].llc_id] -= 1;
1469            }
1470            cpu_pool.free(&cpus_to_free)?;
1471            nr_to_free
1472        } else {
1473            0
1474        })
1475    }
1476
1477    fn alloc_some_cpus(&mut self, cpu_pool: &mut CpuPool) -> Result<usize> {
1478        let new_cpus = match cpu_pool
1479            .alloc_cpus(&self.allowed_cpus, &self.core_order)
1480            .clone()
1481        {
1482            Some(ret) => ret.clone(),
1483            None => {
1484                trace!("layer-{} can't grow, no CPUs", &self.name);
1485                return Ok(0);
1486            }
1487        };
1488
1489        let nr_new_cpus = new_cpus.weight();
1490
1491        trace!("[{}] adding CPUs: {}", &self.name, &new_cpus);
1492        self.cpus |= &new_cpus;
1493        self.nr_cpus += nr_new_cpus;
1494        for cpu in new_cpus.iter() {
1495            self.nr_llc_cpus[cpu_pool.topo.all_cpus[&cpu].llc_id] += 1;
1496        }
1497        Ok(nr_new_cpus)
1498    }
1499}
1500#[derive(Debug, Clone)]
1501struct NodeInfo {
1502    node_mask: nix::sched::CpuSet,
1503    _node_id: usize,
1504}
1505
1506#[derive(Debug)]
1507struct GpuTaskAffinitizer {
1508    // This struct tracks information necessary to numa affinitize
1509    // gpu tasks periodically when needed.
1510    gpu_devs_to_node_info: HashMap<u32, NodeInfo>,
1511    gpu_pids_to_devs: HashMap<Pid, u32>,
1512    last_process_time: Option<Instant>,
1513    sys: System,
1514    pid_map: HashMap<Pid, Vec<Pid>>,
1515    poll_interval: Duration,
1516    enable: bool,
1517    tasks_affinitized: u64,
1518    last_task_affinitization_ms: u64,
1519}
1520
1521impl GpuTaskAffinitizer {
1522    pub fn new(poll_interval: u64, enable: bool) -> GpuTaskAffinitizer {
1523        GpuTaskAffinitizer {
1524            gpu_devs_to_node_info: HashMap::new(),
1525            gpu_pids_to_devs: HashMap::new(),
1526            last_process_time: None,
1527            sys: System::default(),
1528            pid_map: HashMap::new(),
1529            poll_interval: Duration::from_secs(poll_interval),
1530            enable,
1531            tasks_affinitized: 0,
1532            last_task_affinitization_ms: 0,
1533        }
1534    }
1535
1536    fn find_one_cpu(&self, affinity: Vec<u64>) -> Result<u32> {
1537        for (chunk, &mask) in affinity.iter().enumerate() {
1538            let mut inner_offset: u64 = 1;
1539            for _ in 0..64 {
1540                if (mask & inner_offset) != 0 {
1541                    return Ok((64 * chunk + u64::trailing_zeros(inner_offset) as usize) as u32);
1542                }
1543                inner_offset = inner_offset << 1;
1544            }
1545        }
1546        anyhow::bail!("unable to get CPU from NVML bitmask");
1547    }
1548
1549    fn node_to_cpuset(&self, node: &scx_utils::Node) -> Result<CpuSet> {
1550        let mut cpuset = CpuSet::new();
1551        for (cpu_id, _cpu) in &node.all_cpus {
1552            cpuset.set(*cpu_id)?;
1553        }
1554        Ok(cpuset)
1555    }
1556
1557    fn init_dev_node_map(&mut self, topo: Arc<Topology>) -> Result<()> {
1558        let nvml = nvml()?;
1559        let device_count = nvml.device_count()?;
1560
1561        for idx in 0..device_count {
1562            let dev = nvml.device_by_index(idx)?;
1563            // For machines w/ up to 1024 CPUs.
1564            let cpu = dev.cpu_affinity(16)?;
1565            let ideal_cpu = self.find_one_cpu(cpu)?;
1566            if let Some(cpu) = topo.all_cpus.get(&(ideal_cpu as usize)) {
1567                self.gpu_devs_to_node_info.insert(
1568                    idx,
1569                    NodeInfo {
1570                        node_mask: self.node_to_cpuset(
1571                            topo.nodes.get(&cpu.node_id).expect("topo missing node"),
1572                        )?,
1573                        _node_id: cpu.node_id,
1574                    },
1575                );
1576            }
1577        }
1578        Ok(())
1579    }
1580
1581    fn update_gpu_pids(&mut self) -> Result<()> {
1582        let nvml = nvml()?;
1583        for i in 0..nvml.device_count()? {
1584            let device = nvml.device_by_index(i)?;
1585            for proc in device
1586                .running_compute_processes()?
1587                .into_iter()
1588                .chain(device.running_graphics_processes()?.into_iter())
1589            {
1590                self.gpu_pids_to_devs.insert(Pid::from_u32(proc.pid), i);
1591            }
1592        }
1593        Ok(())
1594    }
1595
1596    fn update_process_info(&mut self) -> Result<()> {
1597        self.sys.refresh_processes_specifics(
1598            ProcessesToUpdate::All,
1599            true,
1600            ProcessRefreshKind::nothing(),
1601        );
1602        self.pid_map.clear();
1603        for (pid, proc_) in self.sys.processes() {
1604            if let Some(ppid) = proc_.parent() {
1605                self.pid_map.entry(ppid).or_default().push(*pid);
1606            }
1607        }
1608        Ok(())
1609    }
1610
1611    fn get_child_pids_and_tids(&self, root_pid: Pid) -> HashSet<Pid> {
1612        let mut work = VecDeque::from([root_pid]);
1613        let mut pids_and_tids: HashSet<Pid> = HashSet::new();
1614
1615        while let Some(pid) = work.pop_front() {
1616            if pids_and_tids.insert(pid) {
1617                if let Some(kids) = self.pid_map.get(&pid) {
1618                    work.extend(kids);
1619                }
1620                if let Some(proc_) = self.sys.process(pid) {
1621                    if let Some(tasks) = proc_.tasks() {
1622                        pids_and_tids.extend(tasks.iter().copied());
1623                    }
1624                }
1625            }
1626        }
1627        pids_and_tids
1628    }
1629
1630    fn affinitize_gpu_pids(&mut self) -> Result<()> {
1631        if !self.enable {
1632            return Ok(());
1633        }
1634        for (pid, dev) in &self.gpu_pids_to_devs {
1635            let node_info = self
1636                .gpu_devs_to_node_info
1637                .get(&dev)
1638                .expect("Unable to get gpu pid node mask");
1639            for child in self.get_child_pids_and_tids(*pid) {
1640                match nix::sched::sched_setaffinity(
1641                    nix::unistd::Pid::from_raw(child.as_u32() as i32),
1642                    &node_info.node_mask,
1643                ) {
1644                    Ok(_) => {
1645                        // Increment the global counter for successful affinitization
1646                        self.tasks_affinitized += 1;
1647                    }
1648                    Err(_) => {
1649                        debug!(
1650                            "Error affinitizing gpu pid {} to node {:#?}",
1651                            child.as_u32(),
1652                            node_info
1653                        );
1654                    }
1655                };
1656            }
1657        }
1658        Ok(())
1659    }
1660
1661    pub fn maybe_affinitize(&mut self) {
1662        if !self.enable {
1663            return;
1664        }
1665        let now = Instant::now();
1666
1667        if let Some(last_process_time) = self.last_process_time {
1668            if (now - last_process_time) < self.poll_interval {
1669                return;
1670            }
1671        }
1672
1673        match self.update_gpu_pids() {
1674            Ok(_) => {}
1675            Err(e) => {
1676                error!("Error updating GPU PIDs: {}", e);
1677            }
1678        };
1679        match self.update_process_info() {
1680            Ok(_) => {}
1681            Err(e) => {
1682                error!("Error updating process info to affinitize GPU PIDs: {}", e);
1683            }
1684        };
1685        match self.affinitize_gpu_pids() {
1686            Ok(_) => {}
1687            Err(e) => {
1688                error!("Error updating GPU PIDs: {}", e);
1689            }
1690        };
1691        self.last_process_time = Some(now);
1692        self.last_task_affinitization_ms = (Instant::now() - now).as_millis() as u64;
1693
1694        return;
1695    }
1696
1697    pub fn init(&mut self, topo: Arc<Topology>) {
1698        if !self.enable || self.last_process_time.is_some() {
1699            return;
1700        }
1701
1702        match self.init_dev_node_map(topo) {
1703            Ok(_) => {}
1704            Err(e) => {
1705                error!("Error initializing gpu node dev map: {}", e);
1706            }
1707        };
1708        self.sys = System::new_all();
1709        return;
1710    }
1711}
1712
1713struct Scheduler<'a> {
1714    skel: BpfSkel<'a>,
1715    struct_ops: Option<libbpf_rs::Link>,
1716    layer_specs: Vec<LayerSpec>,
1717
1718    sched_intv: Duration,
1719    layer_refresh_intv: Duration,
1720
1721    cpu_pool: CpuPool,
1722    layers: Vec<Layer>,
1723    idle_qos_enabled: bool,
1724
1725    proc_reader: fb_procfs::ProcReader,
1726    sched_stats: Stats,
1727
1728    cgroup_regexes: Option<HashMap<u32, Regex>>,
1729
1730    nr_layer_cpus_ranges: Vec<(usize, usize)>,
1731    processing_dur: Duration,
1732
1733    topo: Arc<Topology>,
1734    netdevs: BTreeMap<String, NetDev>,
1735    stats_server: StatsServer<StatsReq, StatsRes>,
1736    gpu_task_handler: GpuTaskAffinitizer,
1737}
1738
1739impl<'a> Scheduler<'a> {
1740    fn init_layers(
1741        skel: &mut OpenBpfSkel,
1742        specs: &[LayerSpec],
1743        topo: &Topology,
1744    ) -> Result<HashMap<u32, Regex>> {
1745        skel.maps.rodata_data.as_mut().unwrap().nr_layers = specs.len() as u32;
1746        let mut perf_set = false;
1747
1748        let mut layer_iteration_order = (0..specs.len()).collect::<Vec<_>>();
1749        let mut layer_weights: Vec<usize> = vec![];
1750        let mut cgroup_regex_id = 0;
1751        let mut cgroup_regexes = HashMap::new();
1752
1753        for (spec_i, spec) in specs.iter().enumerate() {
1754            let layer = &mut skel.maps.bss_data.as_mut().unwrap().layers[spec_i];
1755
1756            for (or_i, or) in spec.matches.iter().enumerate() {
1757                for (and_i, and) in or.iter().enumerate() {
1758                    let mt = &mut layer.matches[or_i].matches[and_i];
1759
1760                    // Rules are allowlist-based by default
1761                    mt.exclude.write(false);
1762
1763                    match and {
1764                        LayerMatch::CgroupPrefix(prefix) => {
1765                            mt.kind = bpf_intf::layer_match_kind_MATCH_CGROUP_PREFIX as i32;
1766                            copy_into_cstr(&mut mt.cgroup_prefix, prefix.as_str());
1767                        }
1768                        LayerMatch::CgroupSuffix(suffix) => {
1769                            mt.kind = bpf_intf::layer_match_kind_MATCH_CGROUP_SUFFIX as i32;
1770                            copy_into_cstr(&mut mt.cgroup_suffix, suffix.as_str());
1771                        }
1772                        LayerMatch::CgroupRegex(regex_str) => {
1773                            if cgroup_regex_id >= bpf_intf::consts_MAX_CGROUP_REGEXES {
1774                                bail!(
1775                                    "Too many cgroup regex rules. Maximum allowed: {}",
1776                                    bpf_intf::consts_MAX_CGROUP_REGEXES
1777                                );
1778                            }
1779
1780                            // CgroupRegex matching handled in userspace via cgroup watcher
1781                            mt.kind = bpf_intf::layer_match_kind_MATCH_CGROUP_REGEX as i32;
1782                            mt.cgroup_regex_id = cgroup_regex_id;
1783
1784                            let regex = Regex::new(regex_str).with_context(|| {
1785                                format!("Invalid regex '{}' in layer '{}'", regex_str, spec.name)
1786                            })?;
1787                            cgroup_regexes.insert(cgroup_regex_id, regex);
1788                            cgroup_regex_id += 1;
1789                        }
1790                        LayerMatch::CgroupContains(substr) => {
1791                            mt.kind = bpf_intf::layer_match_kind_MATCH_CGROUP_CONTAINS as i32;
1792                            copy_into_cstr(&mut mt.cgroup_substr, substr.as_str());
1793                        }
1794                        LayerMatch::CommPrefix(prefix) => {
1795                            mt.kind = bpf_intf::layer_match_kind_MATCH_COMM_PREFIX as i32;
1796                            copy_into_cstr(&mut mt.comm_prefix, prefix.as_str());
1797                        }
1798                        LayerMatch::CommPrefixExclude(prefix) => {
1799                            mt.kind = bpf_intf::layer_match_kind_MATCH_COMM_PREFIX as i32;
1800                            mt.exclude.write(true);
1801                            copy_into_cstr(&mut mt.comm_prefix, prefix.as_str());
1802                        }
1803                        LayerMatch::PcommPrefix(prefix) => {
1804                            mt.kind = bpf_intf::layer_match_kind_MATCH_PCOMM_PREFIX as i32;
1805                            copy_into_cstr(&mut mt.pcomm_prefix, prefix.as_str());
1806                        }
1807                        LayerMatch::PcommPrefixExclude(prefix) => {
1808                            mt.kind = bpf_intf::layer_match_kind_MATCH_PCOMM_PREFIX as i32;
1809                            mt.exclude.write(true);
1810                            copy_into_cstr(&mut mt.pcomm_prefix, prefix.as_str());
1811                        }
1812                        LayerMatch::NiceAbove(nice) => {
1813                            mt.kind = bpf_intf::layer_match_kind_MATCH_NICE_ABOVE as i32;
1814                            mt.nice = *nice;
1815                        }
1816                        LayerMatch::NiceBelow(nice) => {
1817                            mt.kind = bpf_intf::layer_match_kind_MATCH_NICE_BELOW as i32;
1818                            mt.nice = *nice;
1819                        }
1820                        LayerMatch::NiceEquals(nice) => {
1821                            mt.kind = bpf_intf::layer_match_kind_MATCH_NICE_EQUALS as i32;
1822                            mt.nice = *nice;
1823                        }
1824                        LayerMatch::UIDEquals(user_id) => {
1825                            mt.kind = bpf_intf::layer_match_kind_MATCH_USER_ID_EQUALS as i32;
1826                            mt.user_id = *user_id;
1827                        }
1828                        LayerMatch::GIDEquals(group_id) => {
1829                            mt.kind = bpf_intf::layer_match_kind_MATCH_GROUP_ID_EQUALS as i32;
1830                            mt.group_id = *group_id;
1831                        }
1832                        LayerMatch::PIDEquals(pid) => {
1833                            mt.kind = bpf_intf::layer_match_kind_MATCH_PID_EQUALS as i32;
1834                            mt.pid = *pid;
1835                        }
1836                        LayerMatch::PPIDEquals(ppid) => {
1837                            mt.kind = bpf_intf::layer_match_kind_MATCH_PPID_EQUALS as i32;
1838                            mt.ppid = *ppid;
1839                        }
1840                        LayerMatch::TGIDEquals(tgid) => {
1841                            mt.kind = bpf_intf::layer_match_kind_MATCH_TGID_EQUALS as i32;
1842                            mt.tgid = *tgid;
1843                        }
1844                        LayerMatch::NSPIDEquals(nsid, pid) => {
1845                            mt.kind = bpf_intf::layer_match_kind_MATCH_NSPID_EQUALS as i32;
1846                            mt.nsid = *nsid;
1847                            mt.pid = *pid;
1848                        }
1849                        LayerMatch::NSEquals(nsid) => {
1850                            mt.kind = bpf_intf::layer_match_kind_MATCH_NS_EQUALS as i32;
1851                            mt.nsid = *nsid as u64;
1852                        }
1853                        LayerMatch::CmdJoin(joincmd) => {
1854                            mt.kind = bpf_intf::layer_match_kind_MATCH_SCXCMD_JOIN as i32;
1855                            copy_into_cstr(&mut mt.comm_prefix, joincmd);
1856                        }
1857                        LayerMatch::IsGroupLeader(polarity) => {
1858                            mt.kind = bpf_intf::layer_match_kind_MATCH_IS_GROUP_LEADER as i32;
1859                            mt.is_group_leader.write(*polarity);
1860                        }
1861                        LayerMatch::IsKthread(polarity) => {
1862                            mt.kind = bpf_intf::layer_match_kind_MATCH_IS_KTHREAD as i32;
1863                            mt.is_kthread.write(*polarity);
1864                        }
1865                        LayerMatch::UsedGpuTid(polarity) => {
1866                            mt.kind = bpf_intf::layer_match_kind_MATCH_USED_GPU_TID as i32;
1867                            mt.used_gpu_tid.write(*polarity);
1868                        }
1869                        LayerMatch::UsedGpuPid(polarity) => {
1870                            mt.kind = bpf_intf::layer_match_kind_MATCH_USED_GPU_PID as i32;
1871                            mt.used_gpu_pid.write(*polarity);
1872                        }
1873                        LayerMatch::AvgRuntime(min, max) => {
1874                            mt.kind = bpf_intf::layer_match_kind_MATCH_AVG_RUNTIME as i32;
1875                            mt.min_avg_runtime_us = *min;
1876                            mt.max_avg_runtime_us = *max;
1877                        }
1878                        LayerMatch::HintEquals(hint) => {
1879                            mt.kind = bpf_intf::layer_match_kind_MATCH_HINT_EQUALS as i32;
1880                            mt.hint = *hint;
1881                        }
1882                        LayerMatch::SystemCpuUtilBelow(threshold) => {
1883                            mt.kind = bpf_intf::layer_match_kind_MATCH_SYSTEM_CPU_UTIL_BELOW as i32;
1884                            mt.system_cpu_util_below = (*threshold * 10000.0) as u64;
1885                        }
1886                        LayerMatch::DsqInsertBelow(threshold) => {
1887                            mt.kind = bpf_intf::layer_match_kind_MATCH_DSQ_INSERT_BELOW as i32;
1888                            mt.dsq_insert_below = (*threshold * 10000.0) as u64;
1889                        }
1890                        LayerMatch::NumaNode(node_id) => {
1891                            if *node_id as usize >= topo.nodes.len() {
1892                                bail!(
1893                                    "Spec {:?} has invalid NUMA node ID {} (available nodes: 0-{})",
1894                                    spec.name,
1895                                    node_id,
1896                                    topo.nodes.len() - 1
1897                                );
1898                            }
1899                            mt.kind = bpf_intf::layer_match_kind_MATCH_NUMA_NODE as i32;
1900                            mt.numa_node_id = *node_id;
1901                        }
1902                    }
1903                }
1904                layer.matches[or_i].nr_match_ands = or.len() as i32;
1905            }
1906
1907            layer.nr_match_ors = spec.matches.len() as u32;
1908            layer.kind = spec.kind.as_bpf_enum();
1909
1910            {
1911                let LayerCommon {
1912                    min_exec_us,
1913                    yield_ignore,
1914                    perf,
1915                    preempt,
1916                    preempt_first,
1917                    exclusive,
1918                    allow_node_aligned,
1919                    skip_remote_node,
1920                    prev_over_idle_core,
1921                    growth_algo,
1922                    nodes,
1923                    slice_us,
1924                    fifo,
1925                    weight,
1926                    disallow_open_after_us,
1927                    disallow_preempt_after_us,
1928                    xllc_mig_min_us,
1929                    placement,
1930                    member_expire_ms,
1931                    ..
1932                } = spec.kind.common();
1933
1934                layer.slice_ns = *slice_us * 1000;
1935                layer.fifo.write(*fifo);
1936                layer.min_exec_ns = min_exec_us * 1000;
1937                layer.yield_step_ns = if *yield_ignore > 0.999 {
1938                    0
1939                } else if *yield_ignore < 0.001 {
1940                    layer.slice_ns
1941                } else {
1942                    (layer.slice_ns as f64 * (1.0 - *yield_ignore)) as u64
1943                };
1944                let mut layer_name: String = spec.name.clone();
1945                layer_name.truncate(MAX_LAYER_NAME);
1946                copy_into_cstr(&mut layer.name, layer_name.as_str());
1947                layer.preempt.write(*preempt);
1948                layer.preempt_first.write(*preempt_first);
1949                layer.excl.write(*exclusive);
1950                layer.allow_node_aligned.write(*allow_node_aligned);
1951                layer.skip_remote_node.write(*skip_remote_node);
1952                layer.prev_over_idle_core.write(*prev_over_idle_core);
1953                layer.growth_algo = growth_algo.as_bpf_enum();
1954                layer.weight = *weight;
1955                layer.member_expire_ms = *member_expire_ms;
1956                layer.disallow_open_after_ns = match disallow_open_after_us.unwrap() {
1957                    v if v == u64::MAX => v,
1958                    v => v * 1000,
1959                };
1960                layer.disallow_preempt_after_ns = match disallow_preempt_after_us.unwrap() {
1961                    v if v == u64::MAX => v,
1962                    v => v * 1000,
1963                };
1964                layer.xllc_mig_min_ns = (xllc_mig_min_us * 1000.0) as u64;
1965                layer_weights.push(layer.weight.try_into().unwrap());
1966                layer.perf = u32::try_from(*perf)?;
1967                layer.node_mask = nodemask_from_nodes(nodes) as u64;
1968                for (topo_node_id, topo_node) in &topo.nodes {
1969                    if !nodes.is_empty() && !nodes.contains(topo_node_id) {
1970                        continue;
1971                    }
1972                    layer.llc_mask |= llcmask_from_llcs(&topo_node.llcs) as u64;
1973                }
1974
1975                let task_place = |place: u32| crate::types::layer_task_place(place);
1976                layer.task_place = match placement {
1977                    LayerPlacement::Standard => {
1978                        task_place(bpf_intf::layer_task_place_PLACEMENT_STD as u32)
1979                    }
1980                    LayerPlacement::Sticky => {
1981                        task_place(bpf_intf::layer_task_place_PLACEMENT_STICK as u32)
1982                    }
1983                    LayerPlacement::Floating => {
1984                        task_place(bpf_intf::layer_task_place_PLACEMENT_FLOAT as u32)
1985                    }
1986                };
1987            }
1988
1989            layer.is_protected.write(match spec.kind {
1990                LayerKind::Open { .. } => false,
1991                LayerKind::Confined { protected, .. } | LayerKind::Grouped { protected, .. } => {
1992                    protected
1993                }
1994            });
1995
1996            match &spec.cpuset {
1997                Some(mask) => {
1998                    Self::update_cpumask(&mask, &mut layer.cpuset);
1999                }
2000                None => {
2001                    for i in 0..layer.cpuset.len() {
2002                        layer.cpuset[i] = u8::MAX;
2003                    }
2004                }
2005            };
2006
2007            perf_set |= layer.perf > 0;
2008        }
2009
2010        layer_iteration_order.sort_by(|i, j| layer_weights[*i].cmp(&layer_weights[*j]));
2011        for (idx, layer_idx) in layer_iteration_order.iter().enumerate() {
2012            skel.maps
2013                .rodata_data
2014                .as_mut()
2015                .unwrap()
2016                .layer_iteration_order[idx] = *layer_idx as u32;
2017        }
2018
2019        if perf_set && !compat::ksym_exists("scx_bpf_cpuperf_set")? {
2020            warn!("cpufreq support not available, ignoring perf configurations");
2021        }
2022
2023        Ok(cgroup_regexes)
2024    }
2025
2026    fn init_nodes(skel: &mut OpenBpfSkel, _opts: &Opts, topo: &Topology) {
2027        skel.maps.rodata_data.as_mut().unwrap().nr_nodes = topo.nodes.len() as u32;
2028        skel.maps.rodata_data.as_mut().unwrap().nr_llcs = 0;
2029
2030        for (&node_id, node) in &topo.nodes {
2031            debug!("configuring node {}, LLCs {:?}", node_id, node.llcs.len());
2032            skel.maps.rodata_data.as_mut().unwrap().nr_llcs += node.llcs.len() as u32;
2033            let raw_numa_slice = node.span.as_raw_slice();
2034            let node_cpumask_slice =
2035                &mut skel.maps.rodata_data.as_mut().unwrap().numa_cpumasks[node_id];
2036            let (left, _) = node_cpumask_slice.split_at_mut(raw_numa_slice.len());
2037            left.clone_from_slice(raw_numa_slice);
2038            debug!(
2039                "node {} mask: {:?}",
2040                node_id,
2041                skel.maps.rodata_data.as_ref().unwrap().numa_cpumasks[node_id]
2042            );
2043
2044            for llc in node.llcs.values() {
2045                debug!("configuring llc {:?} for node {:?}", llc.id, node_id);
2046                skel.maps.rodata_data.as_mut().unwrap().llc_numa_id_map[llc.id] = node_id as u32;
2047            }
2048        }
2049
2050        for cpu in topo.all_cpus.values() {
2051            skel.maps.rodata_data.as_mut().unwrap().cpu_llc_id_map[cpu.id] = cpu.llc_id as u32;
2052        }
2053    }
2054
2055    fn init_cpu_prox_map(topo: &Topology, cpu_ctxs: &mut [bpf_intf::cpu_ctx]) {
2056        let radiate = |mut vec: Vec<usize>, center_id: usize| -> Vec<usize> {
2057            vec.sort_by_key(|&id| (center_id as i32 - id as i32).abs());
2058            vec
2059        };
2060        let radiate_cpu =
2061            |mut vec: Vec<usize>, center_cpu: usize, center_core: usize| -> Vec<usize> {
2062                vec.sort_by_key(|&id| {
2063                    (
2064                        (center_core as i32 - topo.all_cpus.get(&id).unwrap().core_id as i32).abs(),
2065                        (center_cpu as i32 - id as i32).abs(),
2066                    )
2067                });
2068                vec
2069            };
2070
2071        for (&cpu_id, cpu) in &topo.all_cpus {
2072            // Collect the spans.
2073            let mut core_span = topo.all_cores[&cpu.core_id].span.clone();
2074            let llc_span = &topo.all_llcs[&cpu.llc_id].span;
2075            let node_span = &topo.nodes[&cpu.node_id].span;
2076            let sys_span = &topo.span;
2077
2078            // Make the spans exclusive.
2079            let sys_span = sys_span.and(&node_span.not());
2080            let node_span = node_span.and(&llc_span.not());
2081            let llc_span = llc_span.and(&core_span.not());
2082            core_span.clear_cpu(cpu_id).unwrap();
2083
2084            // Convert them into arrays.
2085            let mut sys_order: Vec<usize> = sys_span.iter().collect();
2086            let mut node_order: Vec<usize> = node_span.iter().collect();
2087            let mut llc_order: Vec<usize> = llc_span.iter().collect();
2088            let mut core_order: Vec<usize> = core_span.iter().collect();
2089
2090            // Shuffle them so that different CPUs follow different orders.
2091            // Each CPU radiates in both directions based on the cpu id and
2092            // radiates out to the closest cores based on core ids.
2093
2094            sys_order = radiate_cpu(sys_order, cpu_id, cpu.core_id);
2095            node_order = radiate(node_order, cpu.node_id);
2096            llc_order = radiate_cpu(llc_order, cpu_id, cpu.core_id);
2097            core_order = radiate_cpu(core_order, cpu_id, cpu.core_id);
2098
2099            // Concatenate and record the topology boundaries.
2100            let mut order: Vec<usize> = vec![];
2101            let mut idx: usize = 0;
2102
2103            idx += 1;
2104            order.push(cpu_id);
2105
2106            idx += core_order.len();
2107            order.append(&mut core_order);
2108            let core_end = idx;
2109
2110            idx += llc_order.len();
2111            order.append(&mut llc_order);
2112            let llc_end = idx;
2113
2114            idx += node_order.len();
2115            order.append(&mut node_order);
2116            let node_end = idx;
2117
2118            idx += sys_order.len();
2119            order.append(&mut sys_order);
2120            let sys_end = idx;
2121
2122            debug!(
2123                "CPU[{}] proximity map[{}/{}/{}/{}]: {:?}",
2124                cpu_id, core_end, llc_end, node_end, sys_end, &order
2125            );
2126
2127            // Record in cpu_ctx.
2128            let pmap = &mut cpu_ctxs[cpu_id].prox_map;
2129            for (i, &cpu) in order.iter().enumerate() {
2130                pmap.cpus[i] = cpu as u16;
2131            }
2132            pmap.core_end = core_end as u32;
2133            pmap.llc_end = llc_end as u32;
2134            pmap.node_end = node_end as u32;
2135            pmap.sys_end = sys_end as u32;
2136        }
2137    }
2138
2139    fn convert_cpu_ctxs(cpu_ctxs: Vec<bpf_intf::cpu_ctx>) -> Vec<Vec<u8>> {
2140        cpu_ctxs
2141            .into_iter()
2142            .map(|cpu_ctx| {
2143                let bytes = unsafe {
2144                    std::slice::from_raw_parts(
2145                        &cpu_ctx as *const bpf_intf::cpu_ctx as *const u8,
2146                        std::mem::size_of::<bpf_intf::cpu_ctx>(),
2147                    )
2148                };
2149                bytes.to_vec()
2150            })
2151            .collect()
2152    }
2153
2154    fn init_cpus(skel: &BpfSkel, layer_specs: &[LayerSpec], topo: &Topology) -> Result<()> {
2155        let key = (0_u32).to_ne_bytes();
2156        let mut cpu_ctxs: Vec<bpf_intf::cpu_ctx> = vec![];
2157        let cpu_ctxs_vec = skel
2158            .maps
2159            .cpu_ctxs
2160            .lookup_percpu(&key, libbpf_rs::MapFlags::ANY)
2161            .context("Failed to lookup cpu_ctx")?
2162            .unwrap();
2163
2164        let op_layers: Vec<u32> = layer_specs
2165            .iter()
2166            .enumerate()
2167            .filter(|(_idx, spec)| match &spec.kind {
2168                LayerKind::Open { .. } => spec.kind.common().preempt,
2169                _ => false,
2170            })
2171            .map(|(idx, _)| idx as u32)
2172            .collect();
2173        let on_layers: Vec<u32> = layer_specs
2174            .iter()
2175            .enumerate()
2176            .filter(|(_idx, spec)| match &spec.kind {
2177                LayerKind::Open { .. } => !spec.kind.common().preempt,
2178                _ => false,
2179            })
2180            .map(|(idx, _)| idx as u32)
2181            .collect();
2182        let gp_layers: Vec<u32> = layer_specs
2183            .iter()
2184            .enumerate()
2185            .filter(|(_idx, spec)| match &spec.kind {
2186                LayerKind::Grouped { .. } => spec.kind.common().preempt,
2187                _ => false,
2188            })
2189            .map(|(idx, _)| idx as u32)
2190            .collect();
2191        let gn_layers: Vec<u32> = layer_specs
2192            .iter()
2193            .enumerate()
2194            .filter(|(_idx, spec)| match &spec.kind {
2195                LayerKind::Grouped { .. } => !spec.kind.common().preempt,
2196                _ => false,
2197            })
2198            .map(|(idx, _)| idx as u32)
2199            .collect();
2200
2201        // FIXME - this incorrectly assumes all possible CPUs are consecutive.
2202        for cpu in 0..*NR_CPUS_POSSIBLE {
2203            cpu_ctxs.push(*unsafe {
2204                &*(cpu_ctxs_vec[cpu].as_slice().as_ptr() as *const bpf_intf::cpu_ctx)
2205            });
2206
2207            let topo_cpu = topo.all_cpus.get(&cpu).unwrap();
2208            let is_big = topo_cpu.core_type == CoreType::Big { turbo: true };
2209            cpu_ctxs[cpu].cpu = cpu as i32;
2210            cpu_ctxs[cpu].layer_id = MAX_LAYERS as u32;
2211            cpu_ctxs[cpu].is_big = is_big;
2212
2213            fastrand::seed(cpu as u64);
2214
2215            let mut ogp_order = op_layers.clone();
2216            ogp_order.append(&mut gp_layers.clone());
2217            fastrand::shuffle(&mut ogp_order);
2218
2219            let mut ogn_order = on_layers.clone();
2220            ogn_order.append(&mut gn_layers.clone());
2221            fastrand::shuffle(&mut ogn_order);
2222
2223            let mut op_order = op_layers.clone();
2224            fastrand::shuffle(&mut op_order);
2225
2226            let mut on_order = on_layers.clone();
2227            fastrand::shuffle(&mut on_order);
2228
2229            let mut gp_order = gp_layers.clone();
2230            fastrand::shuffle(&mut gp_order);
2231
2232            let mut gn_order = gn_layers.clone();
2233            fastrand::shuffle(&mut gn_order);
2234
2235            for i in 0..MAX_LAYERS {
2236                cpu_ctxs[cpu].ogp_layer_order[i] =
2237                    ogp_order.get(i).cloned().unwrap_or(MAX_LAYERS as u32);
2238                cpu_ctxs[cpu].ogn_layer_order[i] =
2239                    ogn_order.get(i).cloned().unwrap_or(MAX_LAYERS as u32);
2240
2241                cpu_ctxs[cpu].op_layer_order[i] =
2242                    op_order.get(i).cloned().unwrap_or(MAX_LAYERS as u32);
2243                cpu_ctxs[cpu].on_layer_order[i] =
2244                    on_order.get(i).cloned().unwrap_or(MAX_LAYERS as u32);
2245                cpu_ctxs[cpu].gp_layer_order[i] =
2246                    gp_order.get(i).cloned().unwrap_or(MAX_LAYERS as u32);
2247                cpu_ctxs[cpu].gn_layer_order[i] =
2248                    gn_order.get(i).cloned().unwrap_or(MAX_LAYERS as u32);
2249            }
2250        }
2251
2252        Self::init_cpu_prox_map(topo, &mut cpu_ctxs);
2253
2254        skel.maps
2255            .cpu_ctxs
2256            .update_percpu(
2257                &key,
2258                &Self::convert_cpu_ctxs(cpu_ctxs),
2259                libbpf_rs::MapFlags::ANY,
2260            )
2261            .context("Failed to update cpu_ctx")?;
2262
2263        Ok(())
2264    }
2265
2266    fn init_llc_prox_map(skel: &mut BpfSkel, topo: &Topology) -> Result<()> {
2267        for (&llc_id, llc) in &topo.all_llcs {
2268            // Collect the orders.
2269            let mut node_order: Vec<usize> =
2270                topo.nodes[&llc.node_id].llcs.keys().cloned().collect();
2271            let mut sys_order: Vec<usize> = topo.all_llcs.keys().cloned().collect();
2272
2273            // Make the orders exclusive.
2274            sys_order.retain(|id| !node_order.contains(id));
2275            node_order.retain(|&id| id != llc_id);
2276
2277            // Shuffle so that different LLCs follow different orders. See
2278            // init_cpu_prox_map().
2279            fastrand::seed(llc_id as u64);
2280            fastrand::shuffle(&mut sys_order);
2281            fastrand::shuffle(&mut node_order);
2282
2283            // Concatenate and record the node boundary.
2284            let mut order: Vec<usize> = vec![];
2285            let mut idx: usize = 0;
2286
2287            idx += 1;
2288            order.push(llc_id);
2289
2290            idx += node_order.len();
2291            order.append(&mut node_order);
2292            let node_end = idx;
2293
2294            idx += sys_order.len();
2295            order.append(&mut sys_order);
2296            let sys_end = idx;
2297
2298            debug!(
2299                "LLC[{}] proximity map[{}/{}]: {:?}",
2300                llc_id, node_end, sys_end, &order
2301            );
2302
2303            // Record in llc_ctx.
2304            //
2305            // XXX - This would be a lot easier if llc_ctx were in the bss.
2306            // See BpfStats::read().
2307            let key = llc_id as u32;
2308            let llc_id_slice =
2309                unsafe { std::slice::from_raw_parts((&key as *const u32) as *const u8, 4) };
2310            let v = skel
2311                .maps
2312                .llc_data
2313                .lookup(llc_id_slice, libbpf_rs::MapFlags::ANY)
2314                .unwrap()
2315                .unwrap();
2316            let mut llcc = unsafe { *(v.as_slice().as_ptr() as *const bpf_intf::llc_ctx) };
2317
2318            let pmap = &mut llcc.prox_map;
2319            for (i, &llc_id) in order.iter().enumerate() {
2320                pmap.llcs[i] = llc_id as u16;
2321            }
2322            pmap.node_end = node_end as u32;
2323            pmap.sys_end = sys_end as u32;
2324
2325            let v = unsafe {
2326                std::slice::from_raw_parts(
2327                    &llcc as *const bpf_intf::llc_ctx as *const u8,
2328                    std::mem::size_of::<bpf_intf::llc_ctx>(),
2329                )
2330            };
2331
2332            skel.maps
2333                .llc_data
2334                .update(llc_id_slice, v, libbpf_rs::MapFlags::ANY)?
2335        }
2336
2337        Ok(())
2338    }
2339
2340    fn init(
2341        opts: &'a Opts,
2342        layer_specs: &[LayerSpec],
2343        open_object: &'a mut MaybeUninit<OpenObject>,
2344        hint_to_layer_map: &HashMap<u64, HintLayerInfo>,
2345        membw_tracking: bool,
2346    ) -> Result<Self> {
2347        let nr_layers = layer_specs.len();
2348        let mut disable_topology = opts.disable_topology.unwrap_or(false);
2349
2350        let topo = Arc::new(if disable_topology {
2351            Topology::with_flattened_llc_node()?
2352        } else if opts.topology.virt_llc.is_some() {
2353            Topology::with_args(&opts.topology)?
2354        } else {
2355            Topology::new()?
2356        });
2357
2358        /*
2359         * FIXME: scx_layered incorrectly assumes that node, LLC and CPU IDs
2360         * are consecutive. Verify that they are on this system and bail if
2361         * not. It's lucky that core ID is not used anywhere as core IDs are
2362         * not consecutive on some Ryzen CPUs.
2363         */
2364        if topo.nodes.keys().enumerate().any(|(i, &k)| i != k) {
2365            bail!("Holes in node IDs detected: {:?}", topo.nodes.keys());
2366        }
2367        if topo.all_llcs.keys().enumerate().any(|(i, &k)| i != k) {
2368            bail!("Holes in LLC IDs detected: {:?}", topo.all_llcs.keys());
2369        }
2370        if topo.all_cpus.keys().enumerate().any(|(i, &k)| i != k) {
2371            bail!("Holes in CPU IDs detected: {:?}", topo.all_cpus.keys());
2372        }
2373
2374        let netdevs = if opts.netdev_irq_balance {
2375            warn!(
2376                "Experimental netdev IRQ balancing enabled. Reset IRQ masks of network devices after use!!!"
2377            );
2378            read_netdevs()?
2379        } else {
2380            BTreeMap::new()
2381        };
2382
2383        if !disable_topology {
2384            if topo.nodes.len() == 1 && topo.nodes[&0].llcs.len() == 1 {
2385                disable_topology = true;
2386            };
2387            info!(
2388                "Topology awareness not specified, selecting {} based on hardware",
2389                if disable_topology {
2390                    "disabled"
2391                } else {
2392                    "enabled"
2393                }
2394            );
2395        };
2396
2397        let cpu_pool = CpuPool::new(topo.clone())?;
2398
2399        // If disabling topology awareness clear out any set NUMA/LLC configs and
2400        // it will fallback to using all cores.
2401        let layer_specs: Vec<_> = if disable_topology {
2402            info!("Disabling topology awareness");
2403            layer_specs
2404                .iter()
2405                .cloned()
2406                .map(|mut s| {
2407                    s.kind.common_mut().nodes.clear();
2408                    s.kind.common_mut().llcs.clear();
2409                    s
2410                })
2411                .collect()
2412        } else {
2413            layer_specs.to_vec()
2414        };
2415
2416        // Check kernel features
2417        init_libbpf_logging(None);
2418        let kfuncs_in_syscall = scx_bpf_compat::kfuncs_supported_in_syscall()?;
2419        if !kfuncs_in_syscall {
2420            warn!("Using slow path: kfuncs not supported in syscall programs (a8e03b6bbb2c ∉ ker)");
2421        }
2422
2423        // Open the BPF prog first for verification.
2424        let debug_level = if opts.log_level.contains("trace") {
2425            2
2426        } else if opts.log_level.contains("debug") {
2427            1
2428        } else {
2429            0
2430        };
2431        let mut skel_builder = BpfSkelBuilder::default();
2432        skel_builder.obj_builder.debug(debug_level > 1);
2433
2434        info!(
2435            "Running scx_layered (build ID: {})",
2436            build_id::full_version(env!("CARGO_PKG_VERSION"))
2437        );
2438        let open_opts = opts.libbpf.clone().into_bpf_open_opts();
2439        let mut skel = scx_ops_open!(skel_builder, open_object, layered, open_opts)?;
2440
2441        // No memory BW tracking by default
2442        skel.progs.scx_pmu_switch_tc.set_autoload(membw_tracking);
2443        skel.progs.scx_pmu_tick_tc.set_autoload(membw_tracking);
2444
2445        let mut loaded_kprobes = HashSet::new();
2446
2447        // enable autoloads for conditionally loaded things
2448        // immediately after creating skel (because this is always before loading)
2449        if opts.enable_gpu_support {
2450            // by default, enable open if gpu support is enabled.
2451            // open has been observed to be relatively cheap to kprobe.
2452            if opts.gpu_kprobe_level >= 1 {
2453                compat::cond_kprobe_load("nvidia_open", &skel.progs.kprobe_nvidia_open)?;
2454                loaded_kprobes.insert("nvidia_open");
2455            }
2456            // enable the rest progressively based upon how often they are called
2457            // for observed workloads
2458            if opts.gpu_kprobe_level >= 2 {
2459                compat::cond_kprobe_load("nvidia_mmap", &skel.progs.kprobe_nvidia_mmap)?;
2460                loaded_kprobes.insert("nvidia_mmap");
2461            }
2462            if opts.gpu_kprobe_level >= 3 {
2463                compat::cond_kprobe_load("nvidia_poll", &skel.progs.kprobe_nvidia_poll)?;
2464                loaded_kprobes.insert("nvidia_poll");
2465            }
2466        }
2467
2468        let ext_sched_class_addr = get_kallsyms_addr("ext_sched_class");
2469        let idle_sched_class_addr = get_kallsyms_addr("idle_sched_class");
2470
2471        let event = if membw_tracking {
2472            setup_membw_tracking(&mut skel)?
2473        } else {
2474            0
2475        };
2476
2477        let rodata = skel.maps.rodata_data.as_mut().unwrap();
2478
2479        if ext_sched_class_addr.is_ok() && idle_sched_class_addr.is_ok() {
2480            rodata.ext_sched_class_addr = ext_sched_class_addr.unwrap();
2481            rodata.idle_sched_class_addr = idle_sched_class_addr.unwrap();
2482        } else {
2483            warn!(
2484                "Unable to get sched_class addresses from /proc/kallsyms, disabling skip_preempt."
2485            );
2486        }
2487
2488        rodata.slice_ns = scx_enums.SCX_SLICE_DFL;
2489        rodata.max_exec_ns = 20 * scx_enums.SCX_SLICE_DFL;
2490
2491        // Initialize skel according to @opts.
2492        skel.struct_ops.layered_mut().exit_dump_len = opts.exit_dump_len;
2493
2494        if !opts.disable_queued_wakeup {
2495            match *compat::SCX_OPS_ALLOW_QUEUED_WAKEUP {
2496                0 => info!("Kernel does not support queued wakeup optimization"),
2497                v => skel.struct_ops.layered_mut().flags |= v,
2498            }
2499        }
2500
2501        rodata.percpu_kthread_preempt = !opts.disable_percpu_kthread_preempt;
2502        rodata.percpu_kthread_preempt_all =
2503            !opts.disable_percpu_kthread_preempt && opts.percpu_kthread_preempt_all;
2504        rodata.debug = debug_level as u32;
2505        rodata.slice_ns = opts.slice_us * 1000;
2506        rodata.max_exec_ns = if opts.max_exec_us > 0 {
2507            opts.max_exec_us * 1000
2508        } else {
2509            opts.slice_us * 1000 * 20
2510        };
2511        rodata.nr_cpu_ids = *NR_CPU_IDS as u32;
2512        rodata.nr_possible_cpus = *NR_CPUS_POSSIBLE as u32;
2513        rodata.smt_enabled = topo.smt_enabled;
2514        rodata.has_little_cores = topo.has_little_cores();
2515        rodata.xnuma_preemption = opts.xnuma_preemption;
2516        rodata.antistall_sec = opts.antistall_sec;
2517        rodata.monitor_disable = opts.monitor_disable;
2518        rodata.lo_fb_wait_ns = opts.lo_fb_wait_us * 1000;
2519        rodata.lo_fb_share_ppk = ((opts.lo_fb_share * 1024.0) as u32).clamp(1, 1024);
2520        rodata.enable_antistall = !opts.disable_antistall;
2521        rodata.enable_match_debug = opts.enable_match_debug;
2522        rodata.enable_gpu_support = opts.enable_gpu_support;
2523        rodata.kfuncs_supported_in_syscall = kfuncs_in_syscall;
2524
2525        for (cpu, sib) in topo.sibling_cpus().iter().enumerate() {
2526            rodata.__sibling_cpu[cpu] = *sib;
2527        }
2528        for cpu in topo.all_cpus.keys() {
2529            rodata.all_cpus[cpu / 8] |= 1 << (cpu % 8);
2530        }
2531
2532        rodata.nr_op_layers = layer_specs
2533            .iter()
2534            .filter(|spec| match &spec.kind {
2535                LayerKind::Open { .. } => spec.kind.common().preempt,
2536                _ => false,
2537            })
2538            .count() as u32;
2539        rodata.nr_on_layers = layer_specs
2540            .iter()
2541            .filter(|spec| match &spec.kind {
2542                LayerKind::Open { .. } => !spec.kind.common().preempt,
2543                _ => false,
2544            })
2545            .count() as u32;
2546        rodata.nr_gp_layers = layer_specs
2547            .iter()
2548            .filter(|spec| match &spec.kind {
2549                LayerKind::Grouped { .. } => spec.kind.common().preempt,
2550                _ => false,
2551            })
2552            .count() as u32;
2553        rodata.nr_gn_layers = layer_specs
2554            .iter()
2555            .filter(|spec| match &spec.kind {
2556                LayerKind::Grouped { .. } => !spec.kind.common().preempt,
2557                _ => false,
2558            })
2559            .count() as u32;
2560        rodata.nr_excl_layers = layer_specs
2561            .iter()
2562            .filter(|spec| spec.kind.common().exclusive)
2563            .count() as u32;
2564
2565        let mut min_open = u64::MAX;
2566        let mut min_preempt = u64::MAX;
2567
2568        for spec in layer_specs.iter() {
2569            if let LayerKind::Open { common, .. } = &spec.kind {
2570                min_open = min_open.min(common.disallow_open_after_us.unwrap());
2571                min_preempt = min_preempt.min(common.disallow_preempt_after_us.unwrap());
2572            }
2573        }
2574
2575        rodata.min_open_layer_disallow_open_after_ns = match min_open {
2576            u64::MAX => *DFL_DISALLOW_OPEN_AFTER_US,
2577            v => v,
2578        };
2579        rodata.min_open_layer_disallow_preempt_after_ns = match min_preempt {
2580            u64::MAX => *DFL_DISALLOW_PREEMPT_AFTER_US,
2581            v => v,
2582        };
2583
2584        // Consider all layers empty at the beginning.
2585        for i in 0..layer_specs.len() {
2586            skel.maps.bss_data.as_mut().unwrap().empty_layer_ids[i] = i as u32;
2587        }
2588        skel.maps.bss_data.as_mut().unwrap().nr_empty_layer_ids = nr_layers as u32;
2589
2590        // We set the pin path before loading the skeleton. This will ensure
2591        // libbpf creates and pins the map, or reuses the pinned map fd for us,
2592        // so that we can keep reusing the older map already pinned on scheduler
2593        // restarts.
2594        let layered_task_hint_map_path = &opts.task_hint_map;
2595        let hint_map = &mut skel.maps.scx_layered_task_hint_map;
2596        // Only set pin path if a path is provided.
2597        if layered_task_hint_map_path.is_empty() == false {
2598            hint_map.set_pin_path(layered_task_hint_map_path).unwrap();
2599            rodata.task_hint_map_enabled = true;
2600        }
2601
2602        if !opts.hi_fb_thread_name.is_empty() {
2603            let bpf_hi_fb_thread_name = &mut rodata.hi_fb_thread_name;
2604            copy_into_cstr(bpf_hi_fb_thread_name, opts.hi_fb_thread_name.as_str());
2605            rodata.enable_hi_fb_thread_name_match = true;
2606        }
2607
2608        let cgroup_regexes = Self::init_layers(&mut skel, &layer_specs, &topo)?;
2609        skel.maps.rodata_data.as_mut().unwrap().nr_cgroup_regexes = cgroup_regexes.len() as u32;
2610        Self::init_nodes(&mut skel, opts, &topo);
2611
2612        let mut skel = scx_ops_load!(skel, layered, uei)?;
2613
2614        // Populate the mapping of hints to layer IDs for faster lookups
2615        if hint_to_layer_map.len() != 0 {
2616            for (k, v) in hint_to_layer_map.iter() {
2617                let key: u32 = *k as u32;
2618
2619                // Create hint_layer_info struct
2620                let mut info_bytes = vec![0u8; std::mem::size_of::<bpf_intf::hint_layer_info>()];
2621                let info_ptr = info_bytes.as_mut_ptr() as *mut bpf_intf::hint_layer_info;
2622                unsafe {
2623                    (*info_ptr).layer_id = v.layer_id as u32;
2624                    (*info_ptr).system_cpu_util_below = match v.system_cpu_util_below {
2625                        Some(threshold) => (threshold * 10000.0) as u64,
2626                        None => u64::MAX, // disabled sentinel
2627                    };
2628                    (*info_ptr).dsq_insert_below = match v.dsq_insert_below {
2629                        Some(threshold) => (threshold * 10000.0) as u64,
2630                        None => u64::MAX, // disabled sentinel
2631                    };
2632                }
2633
2634                skel.maps.hint_to_layer_id_map.update(
2635                    &key.to_ne_bytes(),
2636                    &info_bytes,
2637                    libbpf_rs::MapFlags::ANY,
2638                )?;
2639            }
2640        }
2641
2642        if membw_tracking {
2643            create_perf_fds(&mut skel, event)?;
2644        }
2645
2646        let mut layers = vec![];
2647        let layer_growth_orders =
2648            LayerGrowthAlgo::layer_core_orders(&cpu_pool, &layer_specs, &topo)?;
2649        for (idx, spec) in layer_specs.iter().enumerate() {
2650            let growth_order = layer_growth_orders
2651                .get(&idx)
2652                .with_context(|| "layer has no growth order".to_string())?;
2653            layers.push(Layer::new(spec, &topo, growth_order)?);
2654        }
2655
2656        let mut idle_qos_enabled = layers
2657            .iter()
2658            .any(|layer| layer.kind.common().idle_resume_us.unwrap_or(0) > 0);
2659        if idle_qos_enabled && !cpu_idle_resume_latency_supported() {
2660            warn!("idle_resume_us not supported, ignoring");
2661            idle_qos_enabled = false;
2662        }
2663
2664        Self::init_cpus(&skel, &layer_specs, &topo)?;
2665        Self::init_llc_prox_map(&mut skel, &topo)?;
2666
2667        // Other stuff.
2668        let proc_reader = fb_procfs::ProcReader::new();
2669
2670        // Handle setup if layered is running in a pid namespace.
2671        let input = ProgramInput {
2672            ..Default::default()
2673        };
2674        let prog = &mut skel.progs.initialize_pid_namespace;
2675
2676        let _ = prog.test_run(input);
2677
2678        // XXX If we try to refresh the cpumasks here before attaching, we
2679        // sometimes (non-deterministically) don't see the updated values in
2680        // BPF. It would be better to update the cpumasks here before we
2681        // attach, but the value will quickly converge anyways so it's not a
2682        // huge problem in the interim until we figure it out.
2683
2684        // Allow all tasks to open and write to BPF task hint map, now that
2685        // we should have it pinned at the desired location.
2686        if layered_task_hint_map_path.is_empty() == false {
2687            let path = CString::new(layered_task_hint_map_path.as_bytes()).unwrap();
2688            let mode: libc::mode_t = 0o666;
2689            unsafe {
2690                if libc::chmod(path.as_ptr(), mode) != 0 {
2691                    trace!("'chmod' to 666 of task hint map failed, continuing...");
2692                }
2693            }
2694        }
2695
2696        // Attach.
2697        let struct_ops = scx_ops_attach!(skel, layered)?;
2698
2699        // Turn on installed kprobes
2700        if opts.enable_gpu_support {
2701            if loaded_kprobes.contains("nvidia_open") {
2702                compat::cond_kprobe_attach("nvidia_open", &skel.progs.kprobe_nvidia_open)?;
2703            }
2704            if loaded_kprobes.contains("nvidia_mmap") {
2705                compat::cond_kprobe_attach("nvidia_mmap", &skel.progs.kprobe_nvidia_mmap)?;
2706            }
2707            if loaded_kprobes.contains("nvidia_poll") {
2708                compat::cond_kprobe_attach("nvidia_poll", &skel.progs.kprobe_nvidia_poll)?;
2709            }
2710        }
2711
2712        let stats_server = StatsServer::new(stats::server_data()).launch()?;
2713        let mut gpu_task_handler =
2714            GpuTaskAffinitizer::new(opts.gpu_affinitize_secs, opts.enable_gpu_affinitize);
2715        gpu_task_handler.init(topo.clone());
2716
2717        let sched = Self {
2718            struct_ops: Some(struct_ops),
2719            layer_specs,
2720
2721            sched_intv: Duration::from_secs_f64(opts.interval),
2722            layer_refresh_intv: Duration::from_millis(opts.layer_refresh_ms_avgruntime),
2723
2724            cpu_pool,
2725            layers,
2726            idle_qos_enabled,
2727
2728            sched_stats: Stats::new(&mut skel, &proc_reader, &gpu_task_handler)?,
2729
2730            cgroup_regexes: Some(cgroup_regexes),
2731            nr_layer_cpus_ranges: vec![(0, 0); nr_layers],
2732            processing_dur: Default::default(),
2733
2734            proc_reader,
2735            skel,
2736
2737            topo,
2738            netdevs,
2739            stats_server,
2740            gpu_task_handler,
2741        };
2742
2743        info!("Layered Scheduler Attached. Run `scx_layered --monitor` for metrics.");
2744
2745        Ok(sched)
2746    }
2747
2748    fn update_cpumask(mask: &Cpumask, bpfmask: &mut [u8]) {
2749        for cpu in 0..mask.len() {
2750            if mask.test_cpu(cpu) {
2751                bpfmask[cpu / 8] |= 1 << (cpu % 8);
2752            } else {
2753                bpfmask[cpu / 8] &= !(1 << (cpu % 8));
2754            }
2755        }
2756    }
2757
2758    fn update_bpf_layer_cpumask(layer: &Layer, bpf_layer: &mut types::layer) {
2759        trace!("[{}] Updating BPF CPUs: {}", layer.name, &layer.cpus);
2760        Self::update_cpumask(&layer.cpus, &mut bpf_layer.cpus);
2761
2762        bpf_layer.nr_cpus = layer.nr_cpus as u32;
2763        for (llc_id, &nr_llc_cpus) in layer.nr_llc_cpus.iter().enumerate() {
2764            bpf_layer.nr_llc_cpus[llc_id] = nr_llc_cpus as u32;
2765        }
2766
2767        bpf_layer.refresh_cpus = 1;
2768    }
2769
2770    fn update_netdev_cpumasks(&mut self) -> Result<()> {
2771        let available_cpus = self.cpu_pool.available_cpus();
2772        if available_cpus.is_empty() {
2773            return Ok(());
2774        }
2775
2776        for (iface, netdev) in self.netdevs.iter_mut() {
2777            let node = self
2778                .topo
2779                .nodes
2780                .values()
2781                .take_while(|n| n.id == netdev.node())
2782                .next()
2783                .ok_or_else(|| anyhow!("Failed to get netdev node"))?;
2784            let node_cpus = node.span.clone();
2785            for (irq, irqmask) in netdev.irqs.iter_mut() {
2786                irqmask.clear_all();
2787                for cpu in available_cpus.iter() {
2788                    if !node_cpus.test_cpu(cpu) {
2789                        continue;
2790                    }
2791                    let _ = irqmask.set_cpu(cpu);
2792                }
2793                // If no CPUs are available in the node then spread the load across the node
2794                if irqmask.weight() == 0 {
2795                    for cpu in node_cpus.iter() {
2796                        let _ = irqmask.set_cpu(cpu);
2797                    }
2798                }
2799                trace!("{} updating irq {} cpumask {:?}", iface, irq, irqmask);
2800            }
2801            netdev.apply_cpumasks()?;
2802        }
2803
2804        Ok(())
2805    }
2806
2807    fn clamp_target_by_membw(
2808        &self,
2809        layer: &Layer,
2810        membw_limit: f64,
2811        membw: f64,
2812        curtarget: u64,
2813    ) -> usize {
2814        let ncpu: u64 = layer.cpus.weight() as u64;
2815        let membw = (membw * (1024 as f64).powf(3.0)).round() as u64;
2816        let membw_limit = (membw_limit * (1024 as f64).powf(3.0)).round() as u64;
2817        let last_membw_percpu = if ncpu > 0 { membw / ncpu } else { 0 };
2818
2819        // Either there is no memory bandwidth limit set, or the counters
2820        // are not fully initialized yet. Just return the current target.
2821        if membw_limit == 0 || last_membw_percpu == 0 {
2822            return curtarget as usize;
2823        }
2824
2825        return (membw_limit / last_membw_percpu) as usize;
2826    }
2827
2828    /// Calculate how many CPUs each layer would like to have if there were
2829    /// no competition. The CPU range is determined by applying the inverse
2830    /// of util_range and then capping by cpus_range. If the current
2831    /// allocation is within the acceptable range, no change is made.
2832    /// Returns (target, min) pair for each layer.
2833    fn calc_target_nr_cpus(&self) -> Vec<(usize, usize)> {
2834        let nr_cpus = self.cpu_pool.topo.all_cpus.len();
2835        let utils = &self.sched_stats.layer_utils;
2836        let membws = &self.sched_stats.layer_membws;
2837
2838        let mut records: Vec<(u64, u64, u64, usize, usize, usize)> = vec![];
2839        let mut targets: Vec<(usize, usize)> = vec![];
2840
2841        for (idx, layer) in self.layers.iter().enumerate() {
2842            targets.push(match &layer.kind {
2843                LayerKind::Confined {
2844                    util_range,
2845                    cpus_range,
2846                    cpus_range_frac,
2847                    membw_gb,
2848                    ..
2849                }
2850                | LayerKind::Grouped {
2851                    util_range,
2852                    cpus_range,
2853                    cpus_range_frac,
2854                    membw_gb,
2855                    ..
2856                } => {
2857                    let cpus_range =
2858                        resolve_cpus_pct_range(cpus_range, cpus_range_frac, nr_cpus).unwrap();
2859
2860                    // A grouped layer can choose to include open cputime
2861                    // for sizing. Also, as an empty layer can only get CPU
2862                    // time through fallback (counted as owned) or open
2863                    // execution, add open cputime for empty layers.
2864                    let owned = utils[idx][LAYER_USAGE_OWNED];
2865                    let open = utils[idx][LAYER_USAGE_OPEN];
2866
2867                    let membw_owned = membws[idx][LAYER_USAGE_OWNED];
2868                    let membw_open = membws[idx][LAYER_USAGE_OPEN];
2869
2870                    let mut util = owned;
2871                    let mut membw = membw_owned;
2872                    if layer.kind.util_includes_open_cputime() || layer.nr_cpus == 0 {
2873                        util += open;
2874                        membw += membw_open;
2875                    }
2876
2877                    let util = if util < 0.01 { 0.0 } else { util };
2878                    let low = (util / util_range.1).ceil() as usize;
2879                    let high = ((util / util_range.0).floor() as usize).max(low);
2880
2881                    let membw_limit = match membw_gb {
2882                        Some(membw_limit) => *membw_limit,
2883                        None => 0.0,
2884                    };
2885
2886                    trace!(
2887                        "layer {0} (membw, membw_limit): ({membw} gi_b, {membw_limit} gi_b)",
2888                        layer.name
2889                    );
2890
2891                    let target = layer.cpus.weight().clamp(low, high);
2892
2893                    records.push((
2894                        (owned * 100.0) as u64,
2895                        (open * 100.0) as u64,
2896                        (util * 100.0) as u64,
2897                        low,
2898                        high,
2899                        target,
2900                    ));
2901
2902                    let target = target.clamp(cpus_range.0, cpus_range.1);
2903                    let membw_target = self.clamp_target_by_membw(
2904                        &layer,
2905                        membw_limit as f64,
2906                        membw as f64,
2907                        target as u64,
2908                    );
2909
2910                    trace!("CPU target pre- and post-membw adjustment: {target} -> {membw_target}");
2911
2912                    // If there's no way to drop our memory usage down enough,
2913                    // pin the target CPUs to low and drop a warning.
2914                    if membw_target < cpus_range.0 {
2915                        warn!("cannot satisfy memory bw limit for layer {}", layer.name);
2916                        warn!("membw_target {membw_target} low {}", cpus_range.0);
2917                    };
2918
2919                    // Memory bandwidth target cannot override imposed limits or bump
2920                    // the target above what CPU usage-based throttling requires.
2921                    let target = membw_target.clamp(cpus_range.0, target);
2922
2923                    (target, cpus_range.0)
2924                }
2925                LayerKind::Open { .. } => (0, 0),
2926            });
2927        }
2928
2929        trace!("(owned, open, util, low, high, target): {:?}", &records);
2930        targets
2931    }
2932
2933    /// Given (target, min) pair for each layer which was determined
2934    /// assuming infinite number of CPUs, distribute the actual CPUs
2935    /// according to their weights.
2936    fn weighted_target_nr_cpus(&self, targets: &[(usize, usize)]) -> Vec<usize> {
2937        let mut nr_left = self.cpu_pool.topo.all_cpus.len();
2938        let weights: Vec<usize> = self
2939            .layers
2940            .iter()
2941            .map(|layer| layer.kind.common().weight as usize)
2942            .collect();
2943        let mut cands: BTreeMap<usize, (usize, usize, usize)> = targets
2944            .iter()
2945            .zip(&weights)
2946            .enumerate()
2947            .map(|(i, ((target, min), weight))| (i, (*target, *min, *weight)))
2948            .collect();
2949        let mut weight_sum: usize = weights.iter().sum();
2950        let mut weighted: Vec<usize> = vec![0; self.layers.len()];
2951
2952        trace!("cands: {:?}", &cands);
2953
2954        // First, accept all layers that are <= min.
2955        cands.retain(|&i, &mut (target, min, weight)| {
2956            if target <= min {
2957                let target = target.min(nr_left);
2958                weighted[i] = target;
2959                weight_sum -= weight;
2960                nr_left -= target;
2961                false
2962            } else {
2963                true
2964            }
2965        });
2966
2967        trace!("cands after accepting mins: {:?}", &cands);
2968
2969        // Keep accepting ones under their allotted share.
2970        let calc_share = |nr_left, weight, weight_sum| {
2971            (((nr_left * weight) as f64 / weight_sum as f64).ceil() as usize).min(nr_left)
2972        };
2973
2974        while !cands.is_empty() {
2975            let mut progress = false;
2976
2977            cands.retain(|&i, &mut (target, _min, weight)| {
2978                let share = calc_share(nr_left, weight, weight_sum);
2979                if target <= share {
2980                    weighted[i] = target;
2981                    weight_sum -= weight;
2982                    nr_left -= target;
2983                    progress = true;
2984                    false
2985                } else {
2986                    true
2987                }
2988            });
2989
2990            if !progress {
2991                break;
2992            }
2993        }
2994
2995        trace!("cands after accepting under allotted: {:?}", &cands);
2996
2997        // The remaining candidates are in contention with each other,
2998        // distribute according to the shares.
2999        let nr_to_share = nr_left;
3000        for (i, (_target, _min, weight)) in cands.into_iter() {
3001            let share = calc_share(nr_to_share, weight, weight_sum).min(nr_left);
3002            weighted[i] = share;
3003            nr_left -= share;
3004        }
3005
3006        trace!("weighted: {:?}", &weighted);
3007
3008        weighted
3009    }
3010
3011    // Figure out a tuple (LLCs, extra_cpus) in terms of the target CPUs
3012    // computed by weighted_target_nr_cpus. Returns the number of full LLCs
3013    // occupied by a layer, and any extra CPUs that don't occupy a full LLC.
3014    fn compute_target_llcs(target: usize, topo: &Topology) -> (usize, usize) {
3015        // TODO(kkd): We assume each LLC has equal number of cores.
3016        let cores_per_llc = topo.all_cores.len() / topo.all_llcs.len();
3017        // TODO(kkd): We assume each core has fixed number of threads.
3018        let cpus_per_core = topo.all_cores.first_key_value().unwrap().1.cpus.len();
3019        let cpus_per_llc = cores_per_llc * cpus_per_core;
3020
3021        let full = target / cpus_per_llc;
3022        let extra = target % cpus_per_llc;
3023
3024        (full, extra.div_ceil(cpus_per_core))
3025    }
3026
3027    // Recalculate the core order for layers using StickyDynamic growth
3028    // algorithm. Tuples from compute_target_llcs are used to decide how many
3029    // LLCs and cores should be assigned to each layer, logic to alloc and free
3030    // CPUs operates on that core order. This happens in three logical steps, we
3031    // first free LLCs from layers that shrunk from last recomputation, then
3032    // distribute freed LLCs to growing layers, and then spill over remaining
3033    // cores in free LLCs.
3034    fn recompute_layer_core_order(&mut self, layer_targets: &Vec<(usize, usize)>) -> Result<bool> {
3035        // Collect freed LLCs from shrinking layers.
3036        debug!(
3037            " free: before pass: free_llcs={:?}",
3038            self.cpu_pool.free_llcs
3039        );
3040        for &(idx, target) in layer_targets.iter().rev() {
3041            let layer = &mut self.layers[idx];
3042            let old_tlc = layer.target_llc_cpus;
3043            let new_tlc = Self::compute_target_llcs(target, &self.topo);
3044
3045            if layer.growth_algo != LayerGrowthAlgo::StickyDynamic {
3046                continue;
3047            }
3048
3049            let mut to_free = (old_tlc.0 as i32 - new_tlc.0 as i32).max(0) as usize;
3050
3051            debug!(
3052                " free: layer={} old_tlc={:?} new_tlc={:?} to_free={} assigned={} free={}",
3053                layer.name,
3054                old_tlc,
3055                new_tlc,
3056                to_free,
3057                layer.assigned_llcs.len(),
3058                self.cpu_pool.free_llcs.len()
3059            );
3060
3061            while to_free > 0 && layer.assigned_llcs.len() > 0 {
3062                let llc = layer.assigned_llcs.pop().unwrap();
3063                self.cpu_pool.free_llcs.push((llc, 0));
3064                to_free -= 1;
3065
3066                debug!(" layer={} freed_llc={}", layer.name, llc);
3067            }
3068        }
3069        debug!(" free: after pass: free_llcs={:?}", self.cpu_pool.free_llcs);
3070
3071        // Redistribute the freed LLCs to growing layers.
3072        for &(idx, target) in layer_targets.iter().rev() {
3073            let layer = &mut self.layers[idx];
3074            let old_tlc = layer.target_llc_cpus;
3075            let new_tlc = Self::compute_target_llcs(target, &self.topo);
3076
3077            if layer.growth_algo != LayerGrowthAlgo::StickyDynamic {
3078                continue;
3079            }
3080
3081            let mut to_alloc = (new_tlc.0 as i32 - old_tlc.0 as i32).max(0) as usize;
3082
3083            debug!(
3084                " alloc: layer={} old_tlc={:?} new_tlc={:?} to_alloc={} assigned={} free={}",
3085                layer.name,
3086                old_tlc,
3087                new_tlc,
3088                to_alloc,
3089                layer.assigned_llcs.len(),
3090                self.cpu_pool.free_llcs.len()
3091            );
3092
3093            while to_alloc > 0
3094                && self.cpu_pool.free_llcs.len() > 0
3095                && to_alloc <= self.cpu_pool.free_llcs.len()
3096            {
3097                let llc = self.cpu_pool.free_llcs.pop().unwrap().0;
3098                layer.assigned_llcs.push(llc);
3099                to_alloc -= 1;
3100
3101                debug!(" layer={} alloc_llc={}", layer.name, llc);
3102            }
3103
3104            debug!(
3105                " alloc: layer={} assigned_llcs={:?}",
3106                layer.name, layer.assigned_llcs
3107            );
3108
3109            // Update for next iteration.
3110            layer.target_llc_cpus = new_tlc;
3111        }
3112
3113        // Spillover overflowing cores into free LLCs. Bigger layers get to take
3114        // a chunk before smaller layers.
3115        for &(idx, _) in layer_targets.iter() {
3116            let mut core_order = vec![];
3117            let layer = &mut self.layers[idx];
3118
3119            if layer.growth_algo != LayerGrowthAlgo::StickyDynamic {
3120                continue;
3121            }
3122
3123            let tlc = layer.target_llc_cpus;
3124            let mut extra = tlc.1;
3125            // TODO(kkd): Move this logic into cpu_pool? What's the best place?
3126            let cores_per_llc = self.topo.all_cores.len() / self.topo.all_llcs.len();
3127            let cpus_per_core = self.topo.all_cores.first_key_value().unwrap().1.cpus.len();
3128            let cpus_per_llc = cores_per_llc * cpus_per_core;
3129
3130            // Consume from front since we pop from the back.
3131            for i in 0..self.cpu_pool.free_llcs.len() {
3132                let free_vec = &mut self.cpu_pool.free_llcs;
3133                // Available CPUs in LLC.
3134                let avail = cpus_per_llc - free_vec[i].1;
3135                // The amount we'll use.
3136                let mut used = extra.min(avail);
3137                let cores_to_add = used;
3138
3139                let shift = free_vec[i].1;
3140                free_vec[i].1 += used;
3141
3142                let llc_id = free_vec[i].0;
3143                let llc = self.topo.all_llcs.get(&llc_id).unwrap();
3144
3145                for core in llc.cores.iter().skip(shift) {
3146                    if used == 0 {
3147                        break;
3148                    }
3149                    core_order.push(core.1.id);
3150                    used -= 1;
3151                }
3152
3153                extra -= cores_to_add;
3154                if extra == 0 {
3155                    break;
3156                }
3157            }
3158
3159            core_order.reverse();
3160            layer.core_order = core_order;
3161        }
3162
3163        // Reset consumed entries in free LLCs.
3164        for i in 0..self.cpu_pool.free_llcs.len() {
3165            self.cpu_pool.free_llcs[i].1 = 0;
3166        }
3167
3168        for &(idx, _) in layer_targets.iter() {
3169            let layer = &mut self.layers[idx];
3170
3171            if layer.growth_algo != LayerGrowthAlgo::StickyDynamic {
3172                continue;
3173            }
3174
3175            for core in self.topo.all_cores.iter() {
3176                let llc_id = core.1.llc_id;
3177                if layer.assigned_llcs.contains(&llc_id) {
3178                    layer.core_order.push(core.1.id);
3179                }
3180            }
3181            // Update core_order for the layer, but reverse to keep the start stable.
3182            layer.core_order.reverse();
3183
3184            debug!(
3185                " alloc: layer={} core_order={:?}",
3186                layer.name, layer.core_order
3187            );
3188        }
3189
3190        // Apply CPU changes directly for StickyDynamic layers
3191        // Do this in two phases: first free all CPUs, then allocate all CPUs
3192        // This ensures freed CPUs are available for reallocation
3193        let mut updated = false;
3194
3195        // Calculate target CPUs for all layers and free excess CPUs
3196        for &(idx, _) in layer_targets.iter() {
3197            let layer = &mut self.layers[idx];
3198
3199            if layer.growth_algo != LayerGrowthAlgo::StickyDynamic {
3200                continue;
3201            }
3202
3203            // Calculate new cpumask based on core_order (which includes both assigned LLCs and spillover cores)
3204            let mut new_cpus = Cpumask::new();
3205            for &core_id in &layer.core_order {
3206                if let Some(core) = self.topo.all_cores.get(&core_id) {
3207                    new_cpus |= &core.span;
3208                }
3209            }
3210
3211            // Intersect with allowed_cpus
3212            new_cpus &= &layer.allowed_cpus;
3213
3214            // Determine CPUs to free (old cpus not in new cpus)
3215            let cpus_to_free = layer.cpus.clone().and(&new_cpus.clone().not());
3216
3217            if cpus_to_free.weight() > 0 {
3218                debug!(
3219                    " apply: layer={} freeing CPUs: {}",
3220                    layer.name, cpus_to_free
3221                );
3222                // Update layer state and free
3223                layer.cpus &= &cpus_to_free.not();
3224                layer.nr_cpus -= cpus_to_free.weight();
3225                for cpu in cpus_to_free.iter() {
3226                    layer.nr_llc_cpus[self.cpu_pool.topo.all_cpus[&cpu].llc_id] -= 1;
3227                }
3228                self.cpu_pool.free(&cpus_to_free)?;
3229                updated = true;
3230            }
3231        }
3232
3233        // Allocate needed CPUs to all layers
3234        for &(idx, _) in layer_targets.iter() {
3235            let layer = &mut self.layers[idx];
3236
3237            if layer.growth_algo != LayerGrowthAlgo::StickyDynamic {
3238                continue;
3239            }
3240
3241            // Recalculate target CPUs
3242            let mut new_cpus = Cpumask::new();
3243            for &core_id in &layer.core_order {
3244                if let Some(core) = self.topo.all_cores.get(&core_id) {
3245                    new_cpus |= &core.span;
3246                }
3247            }
3248            new_cpus &= &layer.allowed_cpus;
3249
3250            // Determine CPUs to allocate (new cpus not in old cpus, and whether they are available)
3251            let available_cpus = self.cpu_pool.available_cpus();
3252            let desired_to_alloc = new_cpus.clone().and(&layer.cpus.clone().not());
3253            let cpus_to_alloc = desired_to_alloc.clone().and(&available_cpus);
3254
3255            if desired_to_alloc.weight() > cpus_to_alloc.weight() {
3256                debug!(
3257                    " apply: layer={} wanted to alloc {} CPUs but only {} available",
3258                    layer.name,
3259                    desired_to_alloc.weight(),
3260                    cpus_to_alloc.weight()
3261                );
3262            }
3263
3264            if cpus_to_alloc.weight() > 0 {
3265                debug!(
3266                    " apply: layer={} allocating CPUs: {}",
3267                    layer.name, cpus_to_alloc
3268                );
3269                // Update layer state and free
3270                layer.cpus |= &cpus_to_alloc;
3271                layer.nr_cpus += cpus_to_alloc.weight();
3272                for cpu in cpus_to_alloc.iter() {
3273                    layer.nr_llc_cpus[self.cpu_pool.topo.all_cpus[&cpu].llc_id] += 1;
3274                }
3275                self.cpu_pool.mark_allocated(&cpus_to_alloc)?;
3276                updated = true;
3277            }
3278
3279            debug!(
3280                " apply: layer={} final cpus.weight()={} nr_cpus={}",
3281                layer.name,
3282                layer.cpus.weight(),
3283                layer.nr_cpus
3284            );
3285        }
3286
3287        Ok(updated)
3288    }
3289
3290    fn refresh_cpumasks(&mut self) -> Result<()> {
3291        let layer_is_open = |layer: &Layer| matches!(layer.kind, LayerKind::Open { .. });
3292
3293        let mut updated = false;
3294        let targets = self.calc_target_nr_cpus();
3295        let targets = self.weighted_target_nr_cpus(&targets);
3296
3297        let mut ascending: Vec<(usize, usize)> = targets.iter().copied().enumerate().collect();
3298        ascending.sort_by(|a, b| a.1.cmp(&b.1));
3299
3300        let sticky_dynamic_updated = self.recompute_layer_core_order(&ascending)?;
3301        updated |= sticky_dynamic_updated;
3302
3303        // Update BPF cpumasks for StickyDynamic layers if they were updated
3304        if sticky_dynamic_updated {
3305            for (idx, layer) in self.layers.iter().enumerate() {
3306                if layer.growth_algo == LayerGrowthAlgo::StickyDynamic {
3307                    Self::update_bpf_layer_cpumask(
3308                        layer,
3309                        &mut self.skel.maps.bss_data.as_mut().unwrap().layers[idx],
3310                    );
3311                }
3312            }
3313        }
3314
3315        // If any layer is growing, guarantee that the largest layer that is
3316        // freeing CPUs frees at least one CPU.
3317        let mut force_free = self
3318            .layers
3319            .iter()
3320            .zip(targets.iter())
3321            .any(|(layer, &target)| layer.nr_cpus < target);
3322
3323        // Shrink all layers first so that CPUs are available for
3324        // redistribution. Do so in the descending target number of CPUs
3325        // order.
3326        for &(idx, target) in ascending.iter().rev() {
3327            let layer = &mut self.layers[idx];
3328            if layer_is_open(layer) {
3329                continue;
3330            }
3331
3332            // Skip StickyDynamic layers as they are managed in recompute_layer_core_order
3333            if layer.growth_algo == LayerGrowthAlgo::StickyDynamic {
3334                continue;
3335            }
3336
3337            let nr_cur = layer.cpus.weight();
3338            if nr_cur <= target {
3339                continue;
3340            }
3341            let mut nr_to_free = nr_cur - target;
3342
3343            // There's some dampening built into util metrics but slow down
3344            // freeing further to avoid unnecessary changes. This is solely
3345            // based on intution. Drop or update according to real-world
3346            // behavior.
3347            let nr_to_break_at = nr_to_free / 2;
3348
3349            let mut freed = false;
3350
3351            while nr_to_free > 0 {
3352                let max_to_free = if force_free {
3353                    force_free = false;
3354                    layer.nr_cpus
3355                } else {
3356                    nr_to_free
3357                };
3358
3359                let nr_freed = layer.free_some_cpus(&mut self.cpu_pool, max_to_free)?;
3360                if nr_freed == 0 {
3361                    break;
3362                }
3363
3364                nr_to_free = nr_to_free.saturating_sub(nr_freed);
3365                freed = true;
3366
3367                if nr_to_free <= nr_to_break_at {
3368                    break;
3369                }
3370            }
3371
3372            if freed {
3373                Self::update_bpf_layer_cpumask(
3374                    layer,
3375                    &mut self.skel.maps.bss_data.as_mut().unwrap().layers[idx],
3376                );
3377                updated = true;
3378            }
3379        }
3380
3381        // Grow layers. Do so in the ascending target number of CPUs order
3382        // so that we're always more generous to smaller layers. This avoids
3383        // starving small layers and shouldn't make noticeable difference for
3384        // bigger layers as work conservation should still be achieved
3385        // through open execution.
3386        for &(idx, target) in &ascending {
3387            let layer = &mut self.layers[idx];
3388
3389            if layer_is_open(layer) {
3390                continue;
3391            }
3392
3393            // Skip StickyDynamic layers as they are managed in recompute_layer_core_order
3394            if layer.growth_algo == LayerGrowthAlgo::StickyDynamic {
3395                continue;
3396            }
3397
3398            let nr_cur = layer.cpus.weight();
3399            if nr_cur >= target {
3400                continue;
3401            }
3402
3403            let mut nr_to_alloc = target - nr_cur;
3404            let mut alloced = false;
3405
3406            while nr_to_alloc > 0 {
3407                let nr_alloced = layer.alloc_some_cpus(&mut self.cpu_pool)?;
3408                if nr_alloced == 0 {
3409                    break;
3410                }
3411                alloced = true;
3412                nr_to_alloc -= nr_alloced.min(nr_to_alloc);
3413            }
3414
3415            if alloced {
3416                Self::update_bpf_layer_cpumask(
3417                    layer,
3418                    &mut self.skel.maps.bss_data.as_mut().unwrap().layers[idx],
3419                );
3420                updated = true;
3421            }
3422        }
3423
3424        // Give the rest to the open layers.
3425        if updated {
3426            for (idx, layer) in self.layers.iter_mut().enumerate() {
3427                if !layer_is_open(layer) {
3428                    continue;
3429                }
3430
3431                let bpf_layer = &mut self.skel.maps.bss_data.as_mut().unwrap().layers[idx];
3432                let available_cpus = self.cpu_pool.available_cpus().and(&layer.allowed_cpus);
3433                let nr_available_cpus = available_cpus.weight();
3434
3435                // Open layers need the intersection of allowed cpus and
3436                // available cpus.
3437                layer.cpus = available_cpus;
3438                layer.nr_cpus = nr_available_cpus;
3439                Self::update_bpf_layer_cpumask(layer, bpf_layer);
3440            }
3441
3442            self.skel.maps.bss_data.as_mut().unwrap().fallback_cpu =
3443                self.cpu_pool.fallback_cpu as u32;
3444
3445            for (lidx, layer) in self.layers.iter().enumerate() {
3446                self.nr_layer_cpus_ranges[lidx] = (
3447                    self.nr_layer_cpus_ranges[lidx].0.min(layer.nr_cpus),
3448                    self.nr_layer_cpus_ranges[lidx].1.max(layer.nr_cpus),
3449                );
3450            }
3451
3452            // Trigger updates on the BPF side.
3453            let input = ProgramInput {
3454                ..Default::default()
3455            };
3456            let prog = &mut self.skel.progs.refresh_layer_cpumasks;
3457            let _ = prog.test_run(input);
3458
3459            // Update empty_layers.
3460            let empty_layer_ids: Vec<u32> = self
3461                .layers
3462                .iter()
3463                .enumerate()
3464                .filter(|(_idx, layer)| layer.nr_cpus == 0)
3465                .map(|(idx, _layer)| idx as u32)
3466                .collect();
3467            for i in 0..self.layers.len() {
3468                self.skel.maps.bss_data.as_mut().unwrap().empty_layer_ids[i] =
3469                    empty_layer_ids.get(i).cloned().unwrap_or(MAX_LAYERS as u32);
3470            }
3471            self.skel.maps.bss_data.as_mut().unwrap().nr_empty_layer_ids =
3472                empty_layer_ids.len() as u32;
3473        }
3474
3475        let _ = self.update_netdev_cpumasks();
3476        Ok(())
3477    }
3478
3479    fn refresh_idle_qos(&mut self) -> Result<()> {
3480        if !self.idle_qos_enabled {
3481            return Ok(());
3482        }
3483
3484        let mut cpu_idle_qos = vec![0; *NR_CPU_IDS];
3485        for layer in self.layers.iter() {
3486            let idle_resume_us = layer.kind.common().idle_resume_us.unwrap_or(0) as i32;
3487            for cpu in layer.cpus.iter() {
3488                cpu_idle_qos[cpu] = idle_resume_us;
3489            }
3490        }
3491
3492        for (cpu, idle_resume_usec) in cpu_idle_qos.iter().enumerate() {
3493            update_cpu_idle_resume_latency(cpu, *idle_resume_usec)?;
3494        }
3495
3496        Ok(())
3497    }
3498
3499    fn step(&mut self) -> Result<()> {
3500        let started_at = Instant::now();
3501        self.sched_stats.refresh(
3502            &mut self.skel,
3503            &self.proc_reader,
3504            started_at,
3505            self.processing_dur,
3506            &self.gpu_task_handler,
3507        )?;
3508
3509        // Update BPF with EWMA values
3510        self.skel
3511            .maps
3512            .bss_data
3513            .as_mut()
3514            .unwrap()
3515            .system_cpu_util_ewma = (self.sched_stats.system_cpu_util_ewma * 10000.0) as u64;
3516
3517        for layer_id in 0..self.sched_stats.nr_layers {
3518            self.skel
3519                .maps
3520                .bss_data
3521                .as_mut()
3522                .unwrap()
3523                .layer_dsq_insert_ewma[layer_id] =
3524                (self.sched_stats.layer_dsq_insert_ewma[layer_id] * 10000.0) as u64;
3525        }
3526
3527        self.refresh_cpumasks()?;
3528        self.refresh_idle_qos()?;
3529        self.gpu_task_handler.maybe_affinitize();
3530        self.processing_dur += Instant::now().duration_since(started_at);
3531        Ok(())
3532    }
3533
3534    fn generate_sys_stats(
3535        &mut self,
3536        stats: &Stats,
3537        cpus_ranges: &mut [(usize, usize)],
3538    ) -> Result<SysStats> {
3539        let bstats = &stats.bpf_stats;
3540        let mut sys_stats = SysStats::new(stats, bstats, self.cpu_pool.fallback_cpu)?;
3541
3542        for (lidx, (spec, layer)) in self.layer_specs.iter().zip(self.layers.iter()).enumerate() {
3543            let layer_stats = LayerStats::new(lidx, layer, stats, bstats, cpus_ranges[lidx]);
3544            sys_stats.layers.insert(spec.name.to_string(), layer_stats);
3545            cpus_ranges[lidx] = (layer.nr_cpus, layer.nr_cpus);
3546        }
3547
3548        Ok(sys_stats)
3549    }
3550
3551    // Helper function to process a cgroup creation (common logic for walkdir and inotify)
3552    fn process_cgroup_creation(
3553        path: &Path,
3554        cgroup_regexes: &HashMap<u32, Regex>,
3555        cgroup_path_to_id: &mut HashMap<String, u64>,
3556        sender: &crossbeam::channel::Sender<CgroupEvent>,
3557    ) {
3558        let path_str = path.to_string_lossy().to_string();
3559
3560        // Get cgroup ID (inode number)
3561        let cgroup_id = std::fs::metadata(path)
3562            .map(|metadata| {
3563                use std::os::unix::fs::MetadataExt;
3564                metadata.ino()
3565            })
3566            .unwrap_or(0);
3567
3568        // Build match bitmap by testing against CgroupRegex rules
3569        let mut match_bitmap = 0u64;
3570        for (rule_id, regex) in cgroup_regexes {
3571            if regex.is_match(&path_str) {
3572                match_bitmap |= 1u64 << rule_id;
3573            }
3574        }
3575
3576        // Store in hash
3577        cgroup_path_to_id.insert(path_str.clone(), cgroup_id);
3578
3579        // Send event
3580        if let Err(e) = sender.send(CgroupEvent::Created {
3581            path: path_str,
3582            cgroup_id,
3583            match_bitmap,
3584        }) {
3585            error!("Failed to send cgroup creation event: {}", e);
3586        }
3587    }
3588
3589    fn start_cgroup_watcher(
3590        shutdown: Arc<AtomicBool>,
3591        cgroup_regexes: HashMap<u32, Regex>,
3592    ) -> Result<Receiver<CgroupEvent>> {
3593        let mut inotify = Inotify::init().context("Failed to initialize inotify")?;
3594        let mut wd_to_path = HashMap::new();
3595
3596        // Create crossbeam channel for cgroup events (bounded to prevent memory issues)
3597        let (sender, receiver) = crossbeam::channel::bounded::<CgroupEvent>(1024);
3598
3599        // Watch for directory creation and deletion events
3600        let root_wd = inotify
3601            .watches()
3602            .add("/sys/fs/cgroup", WatchMask::CREATE | WatchMask::DELETE)
3603            .context("Failed to add watch for /sys/fs/cgroup")?;
3604        wd_to_path.insert(root_wd, PathBuf::from("/sys/fs/cgroup"));
3605
3606        // Also recursively watch existing directories for new subdirectories
3607        Self::add_recursive_watches(&mut inotify, &mut wd_to_path, Path::new("/sys/fs/cgroup"))?;
3608
3609        // Spawn watcher thread
3610        std::thread::spawn(move || {
3611            let mut buffer = [0; 4096];
3612            let inotify_fd = inotify.as_raw_fd();
3613            // Maintain hash of cgroup path -> cgroup ID (inode number)
3614            let mut cgroup_path_to_id = HashMap::<String, u64>::new();
3615
3616            // Populate existing cgroups
3617            for entry in WalkDir::new("/sys/fs/cgroup")
3618                .into_iter()
3619                .filter_map(|e| e.ok())
3620                .filter(|e| e.file_type().is_dir())
3621            {
3622                let path = entry.path();
3623                Self::process_cgroup_creation(
3624                    path,
3625                    &cgroup_regexes,
3626                    &mut cgroup_path_to_id,
3627                    &sender,
3628                );
3629            }
3630
3631            while !shutdown.load(Ordering::Relaxed) {
3632                // Use select to wait for events with a 100ms timeout
3633                let ready = unsafe {
3634                    let mut read_fds: libc::fd_set = std::mem::zeroed();
3635                    libc::FD_ZERO(&mut read_fds);
3636                    libc::FD_SET(inotify_fd, &mut read_fds);
3637
3638                    let mut timeout = libc::timeval {
3639                        tv_sec: 0,
3640                        tv_usec: 100_000, // 100ms
3641                    };
3642
3643                    libc::select(
3644                        inotify_fd + 1,
3645                        &mut read_fds,
3646                        std::ptr::null_mut(),
3647                        std::ptr::null_mut(),
3648                        &mut timeout,
3649                    )
3650                };
3651
3652                if ready <= 0 {
3653                    // Timeout or error, continue loop to check shutdown
3654                    continue;
3655                }
3656
3657                // Read events non-blocking
3658                let events = match inotify.read_events(&mut buffer) {
3659                    Ok(events) => events,
3660                    Err(e) => {
3661                        error!("Error reading inotify events: {}", e);
3662                        break;
3663                    }
3664                };
3665
3666                for event in events {
3667                    if !event.mask.contains(inotify::EventMask::CREATE)
3668                        && !event.mask.contains(inotify::EventMask::DELETE)
3669                    {
3670                        continue;
3671                    }
3672
3673                    let name = match event.name {
3674                        Some(name) => name,
3675                        None => continue,
3676                    };
3677
3678                    let parent_path = match wd_to_path.get(&event.wd) {
3679                        Some(parent) => parent,
3680                        None => {
3681                            warn!("Unknown watch descriptor: {:?}", event.wd);
3682                            continue;
3683                        }
3684                    };
3685
3686                    let path = parent_path.join(name.to_string_lossy().as_ref());
3687
3688                    if event.mask.contains(inotify::EventMask::CREATE) {
3689                        if !path.is_dir() {
3690                            continue;
3691                        }
3692
3693                        Self::process_cgroup_creation(
3694                            &path,
3695                            &cgroup_regexes,
3696                            &mut cgroup_path_to_id,
3697                            &sender,
3698                        );
3699
3700                        // Add watch for this new directory
3701                        match inotify
3702                            .watches()
3703                            .add(&path, WatchMask::CREATE | WatchMask::DELETE)
3704                        {
3705                            Ok(wd) => {
3706                                wd_to_path.insert(wd, path.clone());
3707                            }
3708                            Err(e) => {
3709                                warn!(
3710                                    "Failed to add watch for new cgroup {}: {}",
3711                                    path.display(),
3712                                    e
3713                                );
3714                            }
3715                        }
3716                    } else if event.mask.contains(inotify::EventMask::DELETE) {
3717                        let path_str = path.to_string_lossy().to_string();
3718
3719                        // Get cgroup ID from our hash (since the directory is gone, we can't stat it)
3720                        let cgroup_id = cgroup_path_to_id.remove(&path_str).unwrap_or(0);
3721
3722                        // Send removal event to main thread
3723                        if let Err(e) = sender.send(CgroupEvent::Removed {
3724                            path: path_str,
3725                            cgroup_id,
3726                        }) {
3727                            error!("Failed to send cgroup removal event: {}", e);
3728                        }
3729
3730                        // Find and remove the watch descriptor for this path
3731                        let wd_to_remove = wd_to_path.iter().find_map(|(wd, watched_path)| {
3732                            if watched_path == &path {
3733                                Some(wd.clone())
3734                            } else {
3735                                None
3736                            }
3737                        });
3738                        if let Some(wd) = wd_to_remove {
3739                            wd_to_path.remove(&wd);
3740                        }
3741                    }
3742                }
3743            }
3744        });
3745
3746        Ok(receiver)
3747    }
3748
3749    fn add_recursive_watches(
3750        inotify: &mut Inotify,
3751        wd_to_path: &mut HashMap<inotify::WatchDescriptor, PathBuf>,
3752        path: &Path,
3753    ) -> Result<()> {
3754        for entry in WalkDir::new(path)
3755            .into_iter()
3756            .filter_map(|e| e.ok())
3757            .filter(|e| e.file_type().is_dir())
3758            .skip(1)
3759        {
3760            let entry_path = entry.path();
3761            // Add watch for this directory
3762            match inotify
3763                .watches()
3764                .add(entry_path, WatchMask::CREATE | WatchMask::DELETE)
3765            {
3766                Ok(wd) => {
3767                    wd_to_path.insert(wd, entry_path.to_path_buf());
3768                }
3769                Err(e) => {
3770                    debug!("Failed to add watch for {}: {}", entry_path.display(), e);
3771                }
3772            }
3773        }
3774        Ok(())
3775    }
3776
3777    fn run(&mut self, shutdown: Arc<AtomicBool>) -> Result<UserExitInfo> {
3778        let (res_ch, req_ch) = self.stats_server.channels();
3779        let mut next_sched_at = Instant::now() + self.sched_intv;
3780        let enable_layer_refresh = !self.layer_refresh_intv.is_zero();
3781        let mut next_layer_refresh_at = Instant::now() + self.layer_refresh_intv;
3782        let mut cpus_ranges = HashMap::<ThreadId, Vec<(usize, usize)>>::new();
3783
3784        // Start the cgroup watcher only if there are CgroupRegex rules
3785        let cgroup_regexes = self.cgroup_regexes.take().unwrap();
3786        let cgroup_event_rx = if !cgroup_regexes.is_empty() {
3787            Some(Self::start_cgroup_watcher(
3788                shutdown.clone(),
3789                cgroup_regexes,
3790            )?)
3791        } else {
3792            None
3793        };
3794
3795        while !shutdown.load(Ordering::Relaxed) && !uei_exited!(&self.skel, uei) {
3796            let now = Instant::now();
3797
3798            if now >= next_sched_at {
3799                self.step()?;
3800                while next_sched_at < now {
3801                    next_sched_at += self.sched_intv;
3802                }
3803            }
3804
3805            if enable_layer_refresh && now >= next_layer_refresh_at {
3806                self.skel
3807                    .maps
3808                    .bss_data
3809                    .as_mut()
3810                    .unwrap()
3811                    .layer_refresh_seq_avgruntime += 1;
3812                while next_layer_refresh_at < now {
3813                    next_layer_refresh_at += self.layer_refresh_intv;
3814                }
3815            }
3816
3817            // Handle both stats requests and cgroup events with timeout
3818            let timeout_duration = next_sched_at.saturating_duration_since(Instant::now());
3819            let never_rx = crossbeam::channel::never();
3820            let cgroup_rx = cgroup_event_rx.as_ref().unwrap_or(&never_rx);
3821
3822            select! {
3823                recv(req_ch) -> msg => match msg {
3824                    Ok(StatsReq::Hello(tid)) => {
3825                        cpus_ranges.insert(
3826                            tid,
3827                            self.layers.iter().map(|l| (l.nr_cpus, l.nr_cpus)).collect(),
3828                        );
3829                        let stats =
3830                            Stats::new(&mut self.skel, &self.proc_reader, &self.gpu_task_handler)?;
3831                        res_ch.send(StatsRes::Hello(stats))?;
3832                    }
3833                    Ok(StatsReq::Refresh(tid, mut stats)) => {
3834                        // Propagate self's layer cpu ranges into each stat's.
3835                        for i in 0..self.nr_layer_cpus_ranges.len() {
3836                            for (_, ranges) in cpus_ranges.iter_mut() {
3837                                ranges[i] = (
3838                                    ranges[i].0.min(self.nr_layer_cpus_ranges[i].0),
3839                                    ranges[i].1.max(self.nr_layer_cpus_ranges[i].1),
3840                                );
3841                            }
3842                            self.nr_layer_cpus_ranges[i] =
3843                                (self.layers[i].nr_cpus, self.layers[i].nr_cpus);
3844                        }
3845
3846                        stats.refresh(
3847                            &mut self.skel,
3848                            &self.proc_reader,
3849                            now,
3850                            self.processing_dur,
3851                            &self.gpu_task_handler,
3852                        )?;
3853                        let sys_stats =
3854                            self.generate_sys_stats(&stats, cpus_ranges.get_mut(&tid).unwrap())?;
3855                        res_ch.send(StatsRes::Refreshed((stats, sys_stats)))?;
3856                    }
3857                    Ok(StatsReq::Bye(tid)) => {
3858                        cpus_ranges.remove(&tid);
3859                        res_ch.send(StatsRes::Bye)?;
3860                    }
3861                    Err(e) => Err(e)?,
3862                },
3863
3864                recv(cgroup_rx) -> event => match event {
3865                    Ok(CgroupEvent::Created { path, cgroup_id, match_bitmap }) => {
3866                        // Insert into BPF map
3867                        self.skel.maps.cgroup_match_bitmap.update(
3868                            &cgroup_id.to_ne_bytes(),
3869                            &match_bitmap.to_ne_bytes(),
3870                            libbpf_rs::MapFlags::ANY,
3871                        ).with_context(|| format!(
3872                            "Failed to insert cgroup {}({}) into BPF map. Cgroup map may be full \
3873                             (max 16384 entries). Aborting.",
3874                            cgroup_id, path
3875                        ))?;
3876
3877                        debug!("Added cgroup {} to BPF map with bitmap 0x{:x}", cgroup_id, match_bitmap);
3878                    }
3879                    Ok(CgroupEvent::Removed { path, cgroup_id }) => {
3880                        // Delete from BPF map
3881                        if let Err(e) = self.skel.maps.cgroup_match_bitmap.delete(&cgroup_id.to_ne_bytes()) {
3882                            warn!("Failed to delete cgroup {} from BPF map: {}", cgroup_id, e);
3883                        } else {
3884                            debug!("Removed cgroup {}({}) from BPF map", cgroup_id, path);
3885                        }
3886                    }
3887                    Err(e) => {
3888                        error!("Error receiving cgroup event: {}", e);
3889                    }
3890                },
3891
3892                recv(crossbeam::channel::after(timeout_duration)) -> _ => {
3893                    // Timeout - continue main loop
3894                }
3895            }
3896        }
3897
3898        let _ = self.struct_ops.take();
3899        uei_report!(&self.skel, uei)
3900    }
3901}
3902
3903impl Drop for Scheduler<'_> {
3904    fn drop(&mut self) {
3905        info!("Unregister {SCHEDULER_NAME} scheduler");
3906
3907        if let Some(struct_ops) = self.struct_ops.take() {
3908            drop(struct_ops);
3909        }
3910    }
3911}
3912
3913fn write_example_file(path: &str) -> Result<()> {
3914    let mut f = fs::OpenOptions::new()
3915        .create_new(true)
3916        .write(true)
3917        .open(path)?;
3918    Ok(f.write_all(serde_json::to_string_pretty(&*EXAMPLE_CONFIG)?.as_bytes())?)
3919}
3920
3921struct HintLayerInfo {
3922    layer_id: usize,
3923    system_cpu_util_below: Option<f64>,
3924    dsq_insert_below: Option<f64>,
3925}
3926
3927fn verify_layer_specs(specs: &[LayerSpec]) -> Result<HashMap<u64, HintLayerInfo>> {
3928    let mut hint_to_layer_map = HashMap::<u64, (usize, String, Option<f64>, Option<f64>)>::new();
3929
3930    let nr_specs = specs.len();
3931    if nr_specs == 0 {
3932        bail!("No layer spec");
3933    }
3934    if nr_specs > MAX_LAYERS {
3935        bail!("Too many layer specs");
3936    }
3937
3938    for (idx, spec) in specs.iter().enumerate() {
3939        if idx < nr_specs - 1 {
3940            if spec.matches.is_empty() {
3941                bail!("Non-terminal spec {:?} has NULL matches", spec.name);
3942            }
3943        } else {
3944            if spec.matches.len() != 1 || !spec.matches[0].is_empty() {
3945                bail!("Terminal spec {:?} must have an empty match", spec.name);
3946            }
3947        }
3948
3949        if spec.matches.len() > MAX_LAYER_MATCH_ORS {
3950            bail!(
3951                "Spec {:?} has too many ({}) OR match blocks",
3952                spec.name,
3953                spec.matches.len()
3954            );
3955        }
3956
3957        for (ands_idx, ands) in spec.matches.iter().enumerate() {
3958            if ands.len() > NR_LAYER_MATCH_KINDS {
3959                bail!(
3960                    "Spec {:?}'s {}th OR block has too many ({}) match conditions",
3961                    spec.name,
3962                    ands_idx,
3963                    ands.len()
3964                );
3965            }
3966            let mut hint_equals_cnt = 0;
3967            let mut system_cpu_util_below_cnt = 0;
3968            let mut dsq_insert_below_cnt = 0;
3969            let mut hint_value: Option<u64> = None;
3970            let mut system_cpu_util_threshold: Option<f64> = None;
3971            let mut dsq_insert_threshold: Option<f64> = None;
3972            for one in ands.iter() {
3973                match one {
3974                    LayerMatch::CgroupPrefix(prefix) => {
3975                        if prefix.len() > MAX_PATH {
3976                            bail!("Spec {:?} has too long a cgroup prefix", spec.name);
3977                        }
3978                    }
3979                    LayerMatch::CgroupSuffix(suffix) => {
3980                        if suffix.len() > MAX_PATH {
3981                            bail!("Spec {:?} has too long a cgroup suffix", spec.name);
3982                        }
3983                    }
3984                    LayerMatch::CgroupContains(substr) => {
3985                        if substr.len() > MAX_PATH {
3986                            bail!("Spec {:?} has too long a cgroup substr", spec.name);
3987                        }
3988                    }
3989                    LayerMatch::CommPrefix(prefix) => {
3990                        if prefix.len() > MAX_COMM {
3991                            bail!("Spec {:?} has too long a comm prefix", spec.name);
3992                        }
3993                    }
3994                    LayerMatch::PcommPrefix(prefix) => {
3995                        if prefix.len() > MAX_COMM {
3996                            bail!("Spec {:?} has too long a process name prefix", spec.name);
3997                        }
3998                    }
3999                    LayerMatch::SystemCpuUtilBelow(threshold) => {
4000                        if *threshold < 0.0 || *threshold > 1.0 {
4001                            bail!(
4002                                "Spec {:?} has SystemCpuUtilBelow threshold outside the range [0.0, 1.0]",
4003                                spec.name
4004                            );
4005                        }
4006                        system_cpu_util_threshold = Some(*threshold);
4007                        system_cpu_util_below_cnt += 1;
4008                    }
4009                    LayerMatch::DsqInsertBelow(threshold) => {
4010                        if *threshold < 0.0 || *threshold > 1.0 {
4011                            bail!(
4012                                "Spec {:?} has DsqInsertBelow threshold outside the range [0.0, 1.0]",
4013                                spec.name
4014                            );
4015                        }
4016                        dsq_insert_threshold = Some(*threshold);
4017                        dsq_insert_below_cnt += 1;
4018                    }
4019                    LayerMatch::HintEquals(hint) => {
4020                        if *hint > 1024 {
4021                            bail!(
4022                                "Spec {:?} has hint value outside the range [0, 1024]",
4023                                spec.name
4024                            );
4025                        }
4026                        hint_value = Some(*hint);
4027                        hint_equals_cnt += 1;
4028                    }
4029                    _ => {}
4030                }
4031            }
4032            if hint_equals_cnt > 1 {
4033                bail!("Only 1 HintEquals match permitted per AND block");
4034            }
4035            let high_freq_matcher_cnt = system_cpu_util_below_cnt + dsq_insert_below_cnt;
4036            if high_freq_matcher_cnt > 0 {
4037                if hint_equals_cnt != 1 {
4038                    bail!("High-frequency matchers (SystemCpuUtilBelow, DsqInsertBelow) must be used with one HintEquals");
4039                }
4040                if system_cpu_util_below_cnt > 1 {
4041                    bail!("Only 1 SystemCpuUtilBelow match permitted per AND block");
4042                }
4043                if dsq_insert_below_cnt > 1 {
4044                    bail!("Only 1 DsqInsertBelow match permitted per AND block");
4045                }
4046                if ands.len() != hint_equals_cnt + system_cpu_util_below_cnt + dsq_insert_below_cnt
4047                {
4048                    bail!("High-frequency matchers must be used only with HintEquals (no other matchers)");
4049                }
4050            } else if hint_equals_cnt == 1 && ands.len() != 1 {
4051                bail!("HintEquals match cannot be in conjunction with other matches");
4052            }
4053
4054            // Insert hint into map if present
4055            if let Some(hint) = hint_value {
4056                if let Some((layer_id, name, _, _)) = hint_to_layer_map.get(&hint) {
4057                    if *layer_id != idx {
4058                        bail!(
4059                            "Spec {:?} has hint value ({}) that is already mapped to Spec {:?}",
4060                            spec.name,
4061                            hint,
4062                            name
4063                        );
4064                    }
4065                } else {
4066                    hint_to_layer_map.insert(
4067                        hint,
4068                        (
4069                            idx,
4070                            spec.name.clone(),
4071                            system_cpu_util_threshold,
4072                            dsq_insert_threshold,
4073                        ),
4074                    );
4075                }
4076            }
4077        }
4078
4079        match spec.kind {
4080            LayerKind::Confined {
4081                cpus_range,
4082                util_range,
4083                ..
4084            }
4085            | LayerKind::Grouped {
4086                cpus_range,
4087                util_range,
4088                ..
4089            } => {
4090                if let Some((cpus_min, cpus_max)) = cpus_range {
4091                    if cpus_min > cpus_max {
4092                        bail!(
4093                            "Spec {:?} has invalid cpus_range({}, {})",
4094                            spec.name,
4095                            cpus_min,
4096                            cpus_max
4097                        );
4098                    }
4099                }
4100                if util_range.0 >= util_range.1 {
4101                    bail!(
4102                        "Spec {:?} has invalid util_range ({}, {})",
4103                        spec.name,
4104                        util_range.0,
4105                        util_range.1
4106                    );
4107                }
4108            }
4109            _ => {}
4110        }
4111    }
4112
4113    Ok(hint_to_layer_map
4114        .into_iter()
4115        .map(|(k, v)| {
4116            (
4117                k,
4118                HintLayerInfo {
4119                    layer_id: v.0,
4120                    system_cpu_util_below: v.2,
4121                    dsq_insert_below: v.3,
4122                },
4123            )
4124        })
4125        .collect())
4126}
4127
4128fn name_suffix(cgroup: &str, len: usize) -> String {
4129    let suffixlen = std::cmp::min(len, cgroup.len());
4130    let suffixrev: String = cgroup.chars().rev().take(suffixlen).collect();
4131
4132    suffixrev.chars().rev().collect()
4133}
4134
4135fn traverse_sysfs(dir: &Path) -> Result<Vec<PathBuf>> {
4136    let mut paths = vec![];
4137
4138    if !dir.is_dir() {
4139        panic!("path {:?} does not correspond to directory", dir);
4140    }
4141
4142    let direntries = fs::read_dir(dir)?;
4143
4144    for entry in direntries {
4145        let path = entry?.path();
4146        if path.is_dir() {
4147            paths.append(&mut traverse_sysfs(&path)?);
4148            paths.push(path);
4149        }
4150    }
4151
4152    Ok(paths)
4153}
4154
4155fn find_cpumask(cgroup: &str) -> Cpumask {
4156    let mut path = String::from(cgroup);
4157    path.push_str("/cpuset.cpus.effective");
4158
4159    let description = fs::read_to_string(&mut path).unwrap();
4160
4161    Cpumask::from_cpulist(&description).unwrap()
4162}
4163
4164fn expand_template(rule: &LayerMatch) -> Result<Vec<(LayerMatch, Cpumask)>> {
4165    match rule {
4166        LayerMatch::CgroupSuffix(suffix) => Ok(traverse_sysfs(Path::new("/sys/fs/cgroup"))?
4167            .into_iter()
4168            .map(|cgroup| String::from(cgroup.to_str().expect("could not parse cgroup path")))
4169            .filter(|cgroup| cgroup.ends_with(suffix))
4170            .map(|cgroup| {
4171                (
4172                    {
4173                        let mut slashterminated = cgroup.clone();
4174                        slashterminated.push('/');
4175                        LayerMatch::CgroupSuffix(name_suffix(&slashterminated, 64))
4176                    },
4177                    find_cpumask(&cgroup),
4178                )
4179            })
4180            .collect()),
4181        LayerMatch::CgroupRegex(expr) => Ok(traverse_sysfs(Path::new("/sys/fs/cgroup"))?
4182            .into_iter()
4183            .map(|cgroup| String::from(cgroup.to_str().expect("could not parse cgroup path")))
4184            .filter(|cgroup| {
4185                let re = Regex::new(expr).unwrap();
4186                re.is_match(cgroup)
4187            })
4188            .map(|cgroup| {
4189                (
4190                    // Here we convert the regex match into a suffix match because we still need to
4191                    // do the matching on the bpf side and doing a regex match in bpf isn't
4192                    // easily done.
4193                    {
4194                        let mut slashterminated = cgroup.clone();
4195                        slashterminated.push('/');
4196                        LayerMatch::CgroupSuffix(name_suffix(&slashterminated, 64))
4197                    },
4198                    find_cpumask(&cgroup),
4199                )
4200            })
4201            .collect()),
4202        _ => panic!("Unimplemented template enum {:?}", rule),
4203    }
4204}
4205
4206fn create_perf_fds(skel: &mut BpfSkel, event: u64) -> Result<()> {
4207    let mut attr = perf::bindings::perf_event_attr::default();
4208    attr.size = std::mem::size_of::<perf::bindings::perf_event_attr>() as u32;
4209    attr.type_ = perf::bindings::PERF_TYPE_RAW;
4210    attr.config = event;
4211    attr.sample_type = 0u64;
4212    attr.__bindgen_anon_1.sample_period = 0u64;
4213    attr.set_disabled(0);
4214
4215    let perf_events_map = &skel.maps.scx_pmu_map;
4216    let map_fd = unsafe { libbpf_sys::bpf_map__fd(perf_events_map.as_libbpf_object().as_ptr()) };
4217
4218    let mut failures = 0u64;
4219
4220    for cpu in 0..*NR_CPUS_POSSIBLE {
4221        let fd = unsafe { perf::perf_event_open(&mut attr as *mut _, -1, cpu as i32, -1, 0) };
4222        if fd < 0 {
4223            failures += 1;
4224            trace!(
4225                "perf_event_open failed cpu={cpu} errno={}",
4226                std::io::Error::last_os_error()
4227            );
4228            continue;
4229        }
4230
4231        let key = cpu as u32;
4232        let val = fd as u32;
4233        let ret = unsafe {
4234            libbpf_sys::bpf_map_update_elem(
4235                map_fd,
4236                &key as *const _ as *const _,
4237                &val as *const _ as *const _,
4238                0,
4239            )
4240        };
4241        if ret != 0 {
4242            trace!("bpf_map_update_elem failed cpu={cpu} fd={fd} ret={ret}");
4243        } else {
4244            trace!("mapped cpu={cpu} -> fd={fd}");
4245        }
4246    }
4247
4248    if failures > 0 {
4249        println!("membw tracking: failed to install {failures} counters");
4250        // Keep going, do not fail the scheduler for this
4251    }
4252
4253    Ok(())
4254}
4255
4256// Set up the counters
4257fn setup_membw_tracking(skel: &mut OpenBpfSkel) -> Result<u64> {
4258    let pmumanager = PMUManager::new()?;
4259    let codename = &pmumanager.codename as &str;
4260
4261    let pmuspec = match codename {
4262        "amdzen1" | "amdzen2" | "amdzen3" => {
4263            trace!("found AMD codename {codename}");
4264            pmumanager.pmus.get("ls_any_fills_from_sys.mem_io_local")
4265        }
4266        "amdzen4" | "amdzen5" => {
4267            trace!("found AMD codename {codename}");
4268            pmumanager.pmus.get("ls_any_fills_from_sys.dram_io_all")
4269        }
4270
4271        "haswell" | "broadwell" | "broadwellde" | "broadwellx" | "skylake" | "skylakex"
4272        | "cascadelakex" | "arrowlake" | "meteorlake" | "sapphirerapids" | "emeraldrapids"
4273        | "graniterapids" => {
4274            trace!("found Intel codename {codename}");
4275            pmumanager.pmus.get("LONGEST_LAT_CACHE.MISS")
4276        }
4277
4278        _ => {
4279            trace!("found unknown codename {codename}");
4280            None
4281        }
4282    };
4283
4284    let spec = pmuspec.ok_or("not_found").unwrap();
4285    let config = (spec.umask << 8) | spec.event[0];
4286
4287    // Install the counter in the BPF map
4288    skel.maps.rodata_data.as_mut().unwrap().membw_event = config;
4289
4290    Ok(config)
4291}
4292
4293#[clap_main::clap_main]
4294fn main(opts: Opts) -> Result<()> {
4295    if opts.version {
4296        println!(
4297            "scx_layered {}",
4298            build_id::full_version(env!("CARGO_PKG_VERSION"))
4299        );
4300        return Ok(());
4301    }
4302
4303    if opts.help_stats {
4304        stats::server_data().describe_meta(&mut std::io::stdout(), None)?;
4305        return Ok(());
4306    }
4307
4308    let env_filter = EnvFilter::try_from_default_env()
4309        .or_else(|_| match EnvFilter::try_new(&opts.log_level) {
4310            Ok(filter) => Ok(filter),
4311            Err(e) => {
4312                eprintln!(
4313                    "invalid log envvar: {}, using info, err is: {}",
4314                    opts.log_level, e
4315                );
4316                EnvFilter::try_new("info")
4317            }
4318        })
4319        .unwrap_or_else(|_| EnvFilter::new("info"));
4320
4321    match tracing_subscriber::fmt()
4322        .with_env_filter(env_filter)
4323        .with_target(true)
4324        .with_thread_ids(true)
4325        .with_file(true)
4326        .with_line_number(true)
4327        .try_init()
4328    {
4329        Ok(()) => {}
4330        Err(e) => eprintln!("failed to init logger: {}", e),
4331    }
4332
4333    if opts.verbose > 0 {
4334        warn!("Setting verbose via -v is deprecated and will be an error in future releases.");
4335    }
4336
4337    if opts.no_load_frac_limit {
4338        warn!("--no-load-frac-limit is deprecated and noop");
4339    }
4340    if opts.layer_preempt_weight_disable != 0.0 {
4341        warn!("--layer-preempt-weight-disable is deprecated and noop");
4342    }
4343    if opts.layer_growth_weight_disable != 0.0 {
4344        warn!("--layer-growth-weight-disable is deprecated and noop");
4345    }
4346    if opts.local_llc_iteration {
4347        warn!("--local_llc_iteration is deprecated and noop");
4348    }
4349
4350    debug!("opts={:?}", &opts);
4351
4352    if let Some(run_id) = opts.run_id {
4353        info!("scx_layered run_id: {}", run_id);
4354    }
4355
4356    let shutdown = Arc::new(AtomicBool::new(false));
4357    let shutdown_clone = shutdown.clone();
4358    ctrlc::set_handler(move || {
4359        shutdown_clone.store(true, Ordering::Relaxed);
4360    })
4361    .context("Error setting Ctrl-C handler")?;
4362
4363    if let Some(intv) = opts.monitor.or(opts.stats) {
4364        let shutdown_copy = shutdown.clone();
4365        let jh = std::thread::spawn(move || {
4366            match stats::monitor(Duration::from_secs_f64(intv), shutdown_copy) {
4367                Ok(_) => {
4368                    debug!("stats monitor thread finished successfully")
4369                }
4370                Err(error_object) => {
4371                    warn!(
4372                        "stats monitor thread finished because of an error {}",
4373                        error_object
4374                    )
4375                }
4376            }
4377        });
4378        if opts.monitor.is_some() {
4379            let _ = jh.join();
4380            return Ok(());
4381        }
4382    }
4383
4384    if let Some(path) = &opts.example {
4385        write_example_file(path)?;
4386        return Ok(());
4387    }
4388
4389    let mut layer_config = match opts.run_example {
4390        true => EXAMPLE_CONFIG.clone(),
4391        false => LayerConfig { specs: vec![] },
4392    };
4393
4394    for (idx, input) in opts.specs.iter().enumerate() {
4395        let specs = LayerSpec::parse(input)
4396            .context(format!("Failed to parse specs[{}] ({:?})", idx, input))?;
4397
4398        for spec in specs {
4399            match spec.template {
4400                Some(ref rule) => {
4401                    let matches = expand_template(&rule)?;
4402                    // in the absence of matching cgroups, have template layers
4403                    // behave as non-template layers do.
4404                    if matches.is_empty() {
4405                        layer_config.specs.push(spec);
4406                    } else {
4407                        for (mt, mask) in matches {
4408                            let mut genspec = spec.clone();
4409
4410                            genspec.cpuset = Some(mask);
4411
4412                            // Push the new "and" rule into each "or" term.
4413                            for orterm in &mut genspec.matches {
4414                                orterm.push(mt.clone());
4415                            }
4416
4417                            match &mt {
4418                                LayerMatch::CgroupSuffix(cgroup) => genspec.name.push_str(cgroup),
4419                                _ => bail!("Template match has unexpected type"),
4420                            }
4421
4422                            // Push the generated layer into the config
4423                            layer_config.specs.push(genspec);
4424                        }
4425                    }
4426                }
4427
4428                None => {
4429                    layer_config.specs.push(spec);
4430                }
4431            }
4432        }
4433    }
4434
4435    for spec in layer_config.specs.iter_mut() {
4436        let common = spec.kind.common_mut();
4437
4438        if common.slice_us == 0 {
4439            common.slice_us = opts.slice_us;
4440        }
4441
4442        if common.weight == 0 {
4443            common.weight = DEFAULT_LAYER_WEIGHT;
4444        }
4445        common.weight = common.weight.clamp(MIN_LAYER_WEIGHT, MAX_LAYER_WEIGHT);
4446
4447        if common.preempt {
4448            if common.disallow_open_after_us.is_some() {
4449                warn!(
4450                    "Preempt layer {} has non-null disallow_open_after_us, ignored",
4451                    &spec.name
4452                );
4453            }
4454            if common.disallow_preempt_after_us.is_some() {
4455                warn!(
4456                    "Preempt layer {} has non-null disallow_preempt_after_us, ignored",
4457                    &spec.name
4458                );
4459            }
4460            common.disallow_open_after_us = Some(u64::MAX);
4461            common.disallow_preempt_after_us = Some(u64::MAX);
4462        } else {
4463            if common.disallow_open_after_us.is_none() {
4464                common.disallow_open_after_us = Some(*DFL_DISALLOW_OPEN_AFTER_US);
4465            }
4466
4467            if common.disallow_preempt_after_us.is_none() {
4468                common.disallow_preempt_after_us = Some(*DFL_DISALLOW_PREEMPT_AFTER_US);
4469            }
4470        }
4471
4472        if common.idle_smt.is_some() {
4473            warn!("Layer {} has deprecated flag \"idle_smt\"", &spec.name);
4474        }
4475    }
4476
4477    let membw_required = layer_config.specs.iter().any(|spec| match spec.kind {
4478        LayerKind::Confined { membw_gb, .. } | LayerKind::Grouped { membw_gb, .. } => {
4479            membw_gb.is_some()
4480        }
4481        LayerKind::Open { .. } => false,
4482    });
4483
4484    if opts.print_and_exit {
4485        println!("specs={}", serde_json::to_string_pretty(&layer_config)?);
4486        return Ok(());
4487    }
4488
4489    debug!("specs={}", serde_json::to_string_pretty(&layer_config)?);
4490    let hint_to_layer_map = verify_layer_specs(&layer_config.specs)?;
4491
4492    let mut open_object = MaybeUninit::uninit();
4493    loop {
4494        let mut sched = Scheduler::init(
4495            &opts,
4496            &layer_config.specs,
4497            &mut open_object,
4498            &hint_to_layer_map,
4499            membw_required,
4500        )?;
4501        if !sched.run(shutdown.clone())?.should_restart() {
4502            break;
4503        }
4504    }
4505
4506    Ok(())
4507}