scx_layered/
main.rs

1// Copyright (c) Meta Platforms, Inc. and affiliates.
2
3// This software may be used and distributed according to the terms of the
4// GNU General Public License version 2.
5mod bpf_skel;
6mod stats;
7
8use std::collections::BTreeMap;
9use std::collections::HashMap;
10use std::collections::HashSet;
11use std::ffi::CString;
12use std::fs;
13use std::io::Write;
14use std::mem::MaybeUninit;
15use std::ops::Sub;
16use std::path::Path;
17use std::path::PathBuf;
18use std::sync::atomic::AtomicBool;
19use std::sync::atomic::Ordering;
20use std::sync::Arc;
21use std::thread::ThreadId;
22use std::time::Duration;
23use std::time::Instant;
24
25use anyhow::anyhow;
26use anyhow::bail;
27use anyhow::Context;
28use anyhow::Result;
29pub use bpf_skel::*;
30use clap::Parser;
31use crossbeam::channel::RecvTimeoutError;
32use lazy_static::lazy_static;
33use libbpf_rs::MapCore as _;
34use libbpf_rs::OpenObject;
35use libbpf_rs::ProgramInput;
36use log::info;
37use log::trace;
38use log::warn;
39use log::{debug, error};
40use nix::sched::CpuSet;
41use nvml_wrapper::error::NvmlError;
42use nvml_wrapper::Nvml;
43use once_cell::sync::OnceCell;
44use scx_layered::*;
45use scx_stats::prelude::*;
46use scx_utils::build_id;
47use scx_utils::compat;
48use scx_utils::init_libbpf_logging;
49use scx_utils::pm::{cpu_idle_resume_latency_supported, update_cpu_idle_resume_latency};
50use scx_utils::read_netdevs;
51use scx_utils::scx_enums;
52use scx_utils::scx_ops_attach;
53use scx_utils::scx_ops_load;
54use scx_utils::scx_ops_open;
55use scx_utils::uei_exited;
56use scx_utils::uei_report;
57use scx_utils::CoreType;
58use scx_utils::Cpumask;
59use scx_utils::Llc;
60use scx_utils::NetDev;
61use scx_utils::Topology;
62use scx_utils::UserExitInfo;
63use scx_utils::NR_CPUS_POSSIBLE;
64use scx_utils::NR_CPU_IDS;
65use stats::LayerStats;
66use stats::StatsReq;
67use stats::StatsRes;
68use stats::SysStats;
69use std::collections::VecDeque;
70use sysinfo::{Pid, ProcessRefreshKind, ProcessesToUpdate, System};
71
72const MAX_PATH: usize = bpf_intf::consts_MAX_PATH as usize;
73const MAX_COMM: usize = bpf_intf::consts_MAX_COMM as usize;
74const MAX_LAYER_WEIGHT: u32 = bpf_intf::consts_MAX_LAYER_WEIGHT;
75const MIN_LAYER_WEIGHT: u32 = bpf_intf::consts_MIN_LAYER_WEIGHT;
76const MAX_LAYER_MATCH_ORS: usize = bpf_intf::consts_MAX_LAYER_MATCH_ORS as usize;
77const MAX_LAYER_NAME: usize = bpf_intf::consts_MAX_LAYER_NAME as usize;
78const MAX_LAYERS: usize = bpf_intf::consts_MAX_LAYERS as usize;
79const DEFAULT_LAYER_WEIGHT: u32 = bpf_intf::consts_DEFAULT_LAYER_WEIGHT;
80const USAGE_HALF_LIFE: u32 = bpf_intf::consts_USAGE_HALF_LIFE;
81const USAGE_HALF_LIFE_F64: f64 = USAGE_HALF_LIFE as f64 / 1_000_000_000.0;
82
83const LAYER_USAGE_OWNED: usize = bpf_intf::layer_usage_LAYER_USAGE_OWNED as usize;
84const LAYER_USAGE_OPEN: usize = bpf_intf::layer_usage_LAYER_USAGE_OPEN as usize;
85const LAYER_USAGE_SUM_UPTO: usize = bpf_intf::layer_usage_LAYER_USAGE_SUM_UPTO as usize;
86const LAYER_USAGE_PROTECTED: usize = bpf_intf::layer_usage_LAYER_USAGE_PROTECTED as usize;
87const LAYER_USAGE_PROTECTED_PREEMPT: usize =
88    bpf_intf::layer_usage_LAYER_USAGE_PROTECTED_PREEMPT as usize;
89const NR_LAYER_USAGES: usize = bpf_intf::layer_usage_NR_LAYER_USAGES as usize;
90
91const NR_GSTATS: usize = bpf_intf::global_stat_id_NR_GSTATS as usize;
92const NR_LSTATS: usize = bpf_intf::layer_stat_id_NR_LSTATS as usize;
93const NR_LLC_LSTATS: usize = bpf_intf::llc_layer_stat_id_NR_LLC_LSTATS as usize;
94
95const NR_LAYER_MATCH_KINDS: usize = bpf_intf::layer_match_kind_NR_LAYER_MATCH_KINDS as usize;
96
97static NVML: OnceCell<Nvml> = OnceCell::new();
98
99fn nvml() -> Result<&'static Nvml, NvmlError> {
100    NVML.get_or_try_init(Nvml::init)
101}
102
103lazy_static! {
104    static ref USAGE_DECAY: f64 = 0.5f64.powf(1.0 / USAGE_HALF_LIFE_F64);
105    static ref DFL_DISALLOW_OPEN_AFTER_US: u64 = 2 * scx_enums.SCX_SLICE_DFL / 1000;
106    static ref DFL_DISALLOW_PREEMPT_AFTER_US: u64 = 4 * scx_enums.SCX_SLICE_DFL / 1000;
107    static ref EXAMPLE_CONFIG: LayerConfig = LayerConfig {
108        specs: vec![
109            LayerSpec {
110                name: "batch".into(),
111                comment: Some("tasks under system.slice or tasks with nice value > 0".into()),
112                cpuset: None,
113                template: None,
114                matches: vec![
115                    vec![LayerMatch::CgroupPrefix("system.slice/".into())],
116                    vec![LayerMatch::NiceAbove(0)],
117                ],
118                kind: LayerKind::Confined {
119                    util_range: (0.8, 0.9),
120                    cpus_range: Some((0, 16)),
121                    cpus_range_frac: None,
122                    protected: false,
123                    common: LayerCommon {
124                        min_exec_us: 1000,
125                        yield_ignore: 0.0,
126                        preempt: false,
127                        preempt_first: false,
128                        exclusive: false,
129                        allow_node_aligned: false,
130                        skip_remote_node: false,
131                        prev_over_idle_core: false,
132                        idle_smt: None,
133                        slice_us: 20000,
134                        fifo: false,
135                        weight: DEFAULT_LAYER_WEIGHT,
136                        disallow_open_after_us: None,
137                        disallow_preempt_after_us: None,
138                        xllc_mig_min_us: 1000.0,
139                        growth_algo: LayerGrowthAlgo::Sticky,
140                        idle_resume_us: None,
141                        perf: 1024,
142                        nodes: vec![],
143                        llcs: vec![],
144                        placement: LayerPlacement::Standard,
145                    },
146                },
147            },
148            LayerSpec {
149                name: "immediate".into(),
150                comment: Some("tasks under workload.slice with nice value < 0".into()),
151                cpuset: None,
152                template: None,
153                matches: vec![vec![
154                    LayerMatch::CgroupPrefix("workload.slice/".into()),
155                    LayerMatch::NiceBelow(0),
156                ]],
157                kind: LayerKind::Open {
158                    common: LayerCommon {
159                        min_exec_us: 100,
160                        yield_ignore: 0.25,
161                        preempt: true,
162                        preempt_first: false,
163                        exclusive: true,
164                        allow_node_aligned: true,
165                        skip_remote_node: false,
166                        prev_over_idle_core: true,
167                        idle_smt: None,
168                        slice_us: 20000,
169                        fifo: false,
170                        weight: DEFAULT_LAYER_WEIGHT,
171                        disallow_open_after_us: None,
172                        disallow_preempt_after_us: None,
173                        xllc_mig_min_us: 0.0,
174                        growth_algo: LayerGrowthAlgo::Sticky,
175                        perf: 1024,
176                        idle_resume_us: None,
177                        nodes: vec![],
178                        llcs: vec![],
179                        placement: LayerPlacement::Standard,
180                    },
181                },
182            },
183            LayerSpec {
184                name: "stress-ng".into(),
185                comment: Some("stress-ng test layer".into()),
186                cpuset: None,
187                template: None,
188                matches: vec![
189                    vec![LayerMatch::CommPrefix("stress-ng".into()),],
190                    vec![LayerMatch::PcommPrefix("stress-ng".into()),]
191                ],
192                kind: LayerKind::Confined {
193                    cpus_range: None,
194                    util_range: (0.2, 0.8),
195                    protected: false,
196                    cpus_range_frac: None,
197                    common: LayerCommon {
198                        min_exec_us: 800,
199                        yield_ignore: 0.0,
200                        preempt: true,
201                        preempt_first: false,
202                        exclusive: false,
203                        allow_node_aligned: false,
204                        skip_remote_node: false,
205                        prev_over_idle_core: false,
206                        idle_smt: None,
207                        slice_us: 800,
208                        fifo: false,
209                        weight: DEFAULT_LAYER_WEIGHT,
210                        disallow_open_after_us: None,
211                        disallow_preempt_after_us: None,
212                        xllc_mig_min_us: 0.0,
213                        growth_algo: LayerGrowthAlgo::Topo,
214                        perf: 1024,
215                        idle_resume_us: None,
216                        nodes: vec![],
217                        llcs: vec![],
218                        placement: LayerPlacement::Standard,
219                    },
220                },
221            },
222            LayerSpec {
223                name: "normal".into(),
224                comment: Some("the rest".into()),
225                cpuset: None,
226                template: None,
227                matches: vec![vec![]],
228                kind: LayerKind::Grouped {
229                    cpus_range: None,
230                    util_range: (0.5, 0.6),
231                    util_includes_open_cputime: true,
232                    protected: false,
233                    cpus_range_frac: None,
234                    common: LayerCommon {
235                        min_exec_us: 200,
236                        yield_ignore: 0.0,
237                        preempt: false,
238                        preempt_first: false,
239                        exclusive: false,
240                        allow_node_aligned: false,
241                        skip_remote_node: false,
242                        prev_over_idle_core: false,
243                        idle_smt: None,
244                        slice_us: 20000,
245                        fifo: false,
246                        weight: DEFAULT_LAYER_WEIGHT,
247                        disallow_open_after_us: None,
248                        disallow_preempt_after_us: None,
249                        xllc_mig_min_us: 100.0,
250                        growth_algo: LayerGrowthAlgo::Linear,
251                        perf: 1024,
252                        idle_resume_us: None,
253                        nodes: vec![],
254                        llcs: vec![],
255                        placement: LayerPlacement::Standard,
256                    },
257                },
258            },
259        ],
260    };
261}
262
263/// scx_layered: A highly configurable multi-layer sched_ext scheduler
264///
265/// scx_layered allows classifying tasks into multiple layers and applying
266/// different scheduling policies to them. The configuration is specified in
267/// json and composed of two parts - matches and policies.
268///
269/// Matches
270/// =======
271///
272/// Whenever a task is forked or its attributes are changed, the task goes
273/// through a series of matches to determine the layer it belongs to. A
274/// match set is composed of OR groups of AND blocks. An example:
275///
276///   "matches": [
277///     [
278///       {
279///         "CgroupPrefix": "system.slice/"
280///       }
281///     ],
282///     [
283///       {
284///         "CommPrefix": "fbagent"
285///       },
286///       {
287///         "NiceAbove": 0
288///       }
289///     ]
290///   ],
291///
292/// The outer array contains the OR groups and the inner AND blocks, so the
293/// above matches:
294///
295/// - Tasks which are in the cgroup sub-hierarchy under "system.slice".
296///
297/// - Or tasks whose comm starts with "fbagent" and have a nice value > 0.
298///
299/// Currently, the following matches are supported:
300///
301/// - CgroupPrefix: Matches the prefix of the cgroup that the task belongs
302///   to. As this is a string match, whether the pattern has the trailing
303///   '/' makes a difference. For example, "TOP/CHILD/" only matches tasks
304///   which are under that particular cgroup while "TOP/CHILD" also matches
305///   tasks under "TOP/CHILD0/" or "TOP/CHILD1/".
306///
307/// - CommPrefix: Matches the task's comm prefix.
308///
309/// - PcommPrefix: Matches the task's thread group leader's comm prefix.
310///
311/// - NiceAbove: Matches if the task's nice value is greater than the
312///   pattern.
313///
314/// - NiceBelow: Matches if the task's nice value is smaller than the
315///   pattern.
316///
317/// - NiceEquals: Matches if the task's nice value is exactly equal to
318///   the pattern.
319///
320/// - UIDEquals: Matches if the task's effective user id matches the value
321///
322/// - GIDEquals: Matches if the task's effective group id matches the value.
323///
324/// - PIDEquals: Matches if the task's pid matches the value.
325///
326/// - PPIDEquals: Matches if the task's ppid matches the value.
327///
328/// - TGIDEquals: Matches if the task's tgid matches the value.
329///
330/// - NSPIDEquals: Matches if the task's namespace id and pid matches the values.
331///
332/// - NSEquals: Matches if the task's namespace id matches the values.
333///
334/// - IsGroupLeader: Bool. When true, matches if the task is group leader
335///   (i.e. PID == TGID), aka the thread from which other threads are made.
336///   When false, matches if the task is *not* the group leader (i.e. the rest).
337///
338/// - CmdJoin: Matches when the task uses pthread_setname_np to send a join/leave
339/// command to the scheduler. See examples/cmdjoin.c for more details.
340///
341/// - UsedGpuTid: Bool. When true, matches if the tasks which have used
342///   gpus by tid.
343///
344/// - UsedGpuPid: Bool. When true, matches if the tasks which have used gpu
345///   by tgid/pid.
346///
347/// - [EXPERIMENTAL] AvgRuntime: (u64, u64). Match tasks whose average runtime
348///   is within the provided values [min, max).
349///
350/// While there are complexity limitations as the matches are performed in
351/// BPF, it is straightforward to add more types of matches.
352///
353/// Templates
354/// ---------
355///
356/// Templates let us create a variable number of layers dynamically at initialization
357/// time out of a cgroup name suffix/prefix. Sometimes we know there are multiple
358/// applications running on a machine, each with their own cgroup but do not know the
359/// exact names of the applications or cgroups, e.g., in cloud computing contexts where
360/// workloads are placed on machines dynamically and run under cgroups whose name is
361/// autogenerated. In that case, we cannot hardcode the cgroup match rules when writing
362/// the configuration. We thus cannot easily prevent tasks from different cgroups from
363/// falling into the same layer and affecting each other's performance.
364///
365///
366/// Templates offer a solution to this problem by generating one layer for each such cgroup,
367/// provided these cgroups share a suffix, and that the suffix is unique to them. Templates
368/// have a cgroup suffix rule that we use to find the relevant cgroups in the system. For each
369/// such cgroup, we copy the layer config and add a matching rule that matches just this cgroup.
370///
371///
372/// Policies
373/// ========
374///
375/// The following is an example policy configuration for a layer.
376///
377///   "kind": {
378///     "Confined": {
379///       "cpus_range": [1, 8],
380///       "util_range": [0.8, 0.9]
381///     }
382///   }
383///
384/// It's of "Confined" kind, which tries to concentrate the layer's tasks
385/// into a limited number of CPUs. In the above case, the number of CPUs
386/// assigned to the layer is scaled between 1 and 8 so that the per-cpu
387/// utilization is kept between 80% and 90%. If the CPUs are loaded higher
388/// than 90%, more CPUs are allocated to the layer. If the utilization drops
389/// below 80%, the layer loses CPUs.
390///
391/// Currently, the following policy kinds are supported:
392///
393/// - Confined: Tasks are restricted to the allocated CPUs. The number of
394///   CPUs allocated is modulated to keep the per-CPU utilization in
395///   "util_range". The range can optionally be restricted with the
396///   "cpus_range" property.
397///
398/// - Grouped: Similar to Confined but tasks may spill outside if there are
399///   idle CPUs outside the allocated ones. The range can optionally be
400///   restricted with the "cpus_range" property.
401///
402/// - Open: Prefer the CPUs which are not occupied by Confined or Grouped
403///   layers. Tasks in this group will spill into occupied CPUs if there are
404///   no unoccupied idle CPUs.
405///
406/// All layers take the following options:
407///
408/// - min_exec_us: Minimum execution time in microseconds. Whenever a task
409///   is scheduled in, this is the minimum CPU time that it's charged no
410///   matter how short the actual execution time may be.
411///
412/// - yield_ignore: Yield ignore ratio. If 0.0, yield(2) forfeits a whole
413///   execution slice. 0.25 yields three quarters of an execution slice and
414///   so on. If 1.0, yield is completely ignored.
415///
416/// - slice_us: Scheduling slice duration in microseconds.
417///
418/// - fifo: Use FIFO queues within the layer instead of the default vtime.
419///
420/// - preempt: If true, tasks in the layer will preempt tasks which belong
421///   to other non-preempting layers when no idle CPUs are available.
422///
423/// - preempt_first: If true, tasks in the layer will try to preempt tasks
424///   in their previous CPUs before trying to find idle CPUs.
425///
426/// - exclusive: If true, tasks in the layer will occupy the whole core. The
427///   other logical CPUs sharing the same core will be kept idle. This isn't
428///   a hard guarantee, so don't depend on it for security purposes.
429///
430/// - allow_node_aligned: Put node aligned tasks on layer DSQs instead of lo
431///   fallback. This is a hack to support node-affine tasks without making
432///   the whole scheduler node aware and should only be used with open
433///   layers on non-saturated machines to avoid possible stalls.
434///
435/// - prev_over_idle_core: On SMT enabled systems, prefer using the same CPU
436///   when picking a CPU for tasks on this layer, even if that CPUs SMT
437///   sibling is processing a task.
438///
439/// - weight: Weight of the layer, which is a range from 1 to 10000 with a
440///   default of 100. Layer weights are used during contention to prevent
441///   starvation across layers. Weights are used in combination with
442///   utilization to determine the infeasible adjusted weight with higher
443///   weights having a larger adjustment in adjusted utilization.
444///
445/// - disallow_open_after_us: Duration to wait after machine reaches saturation
446///   before confining tasks in Open layers.
447///
448/// - cpus_range_frac: Array of 2 floats between 0 and 1.0. Lower and upper
449///   bound fractions of all CPUs to give to a layer. Mutually exclusive
450///   with cpus_range.
451///
452/// - disallow_preempt_after_us: Duration to wait after machine reaches saturation
453///   before confining tasks to preempt.
454///
455/// - xllc_mig_min_us: Skip cross-LLC migrations if they are likely to run on
456///   their existing LLC sooner than this.
457///
458/// - idle_smt: *** DEPRECATED ****
459///
460/// - growth_algo: When a layer is allocated new CPUs different algorithms can
461///   be used to determine which CPU should be allocated next. The default
462///   algorithm is a "sticky" algorithm that attempts to spread layers evenly
463///   across cores.
464///
465/// - perf: CPU performance target. 0 means no configuration. A value
466///   between 1 and 1024 indicates the performance level CPUs running tasks
467///   in this layer are configured to using scx_bpf_cpuperf_set().
468///
469/// - idle_resume_us: Sets the idle resume QoS value. CPU idle time governors are expected to
470///   regard the minimum of the global (effective) CPU latency limit and the effective resume
471///   latency constraint for the given CPU as the upper limit for the exit latency of the idle
472///   states. See the latest kernel docs for more details:
473///   https://www.kernel.org/doc/html/latest/admin-guide/pm/cpuidle.html
474///
475/// - nodes: If set the layer will use the set of NUMA nodes for scheduling
476///   decisions. If unset then all available NUMA nodes will be used. If the
477///   llcs value is set the cpuset of NUMA nodes will be or'ed with the LLC
478///   config.
479///
480/// - llcs: If set the layer will use the set of LLCs (last level caches)
481///   for scheduling decisions. If unset then all LLCs will be used. If
482///   the nodes value is set the cpuset of LLCs will be or'ed with the nodes
483///   config.
484///
485///
486/// Similar to matches, adding new policies and extending existing ones
487/// should be relatively straightforward.
488///
489/// Configuration example and running scx_layered
490/// =============================================
491///
492/// An scx_layered config is composed of layer configs. A layer config is
493/// composed of a name, a set of matches, and a policy block. Running the
494/// following will write an example configuration into example.json.
495///
496///   $ scx_layered -e example.json
497///
498/// Note that the last layer in the configuration must have an empty match set
499/// as a catch-all for tasks which haven't been matched into previous layers.
500///
501/// The configuration can be specified in multiple json files and
502/// command line arguments, which are concatenated in the specified
503/// order. Each must contain valid layer configurations.
504///
505/// By default, an argument to scx_layered is interpreted as a JSON string. If
506/// the argument is a pointer to a JSON file, it should be prefixed with file:
507/// or f: as follows:
508///
509///   $ scx_layered file:example.json
510///   ...
511///   $ scx_layered f:example.json
512///
513/// Monitoring Statistics
514/// =====================
515///
516/// Run with `--stats INTERVAL` to enable stats monitoring. There is
517/// also an scx_stat server listening on /var/run/scx/root/stat that can
518/// be monitored by running `scx_layered --monitor INTERVAL` separately.
519///
520///   ```bash
521///   $ scx_layered --monitor 1
522///   tot= 117909 local=86.20 open_idle= 0.21 affn_viol= 1.37 proc=6ms
523///   busy= 34.2 util= 1733.6 load=  21744.1 fallback_cpu=  1
524///     batch    : util/frac=   11.8/  0.7 load/frac=     29.7:  0.1 tasks=  2597
525///                tot=   3478 local=67.80 open_idle= 0.00 preempt= 0.00 affn_viol= 0.00
526///                cpus=  2 [  2,  2] 04000001 00000000
527///     immediate: util/frac= 1218.8/ 70.3 load/frac=  21399.9: 98.4 tasks=  1107
528///                tot=  68997 local=90.57 open_idle= 0.26 preempt= 9.36 affn_viol= 0.00
529///                cpus= 50 [ 50, 50] fbfffffe 000fffff
530///     normal   : util/frac=  502.9/ 29.0 load/frac=    314.5:  1.4 tasks=  3512
531///                tot=  45434 local=80.97 open_idle= 0.16 preempt= 0.00 affn_viol= 3.56
532///                cpus= 50 [ 50, 50] fbfffffe 000fffff
533///   ```
534///
535/// Global statistics: see [`SysStats`]
536///
537/// Per-layer statistics: see [`LayerStats`]
538///
539#[derive(Debug, Parser)]
540#[command(verbatim_doc_comment)]
541struct Opts {
542    /// Scheduling slice duration in microseconds.
543    #[clap(short = 's', long, default_value = "20000")]
544    slice_us: u64,
545
546    /// Maximum consecutive execution time in microseconds. A task may be
547    /// allowed to keep executing on a CPU for this long. Note that this is
548    /// the upper limit and a task may have to moved off the CPU earlier. 0
549    /// indicates default - 20 * slice_us.
550    #[clap(short = 'M', long, default_value = "0")]
551    max_exec_us: u64,
552
553    /// Scheduling interval in seconds.
554    #[clap(short = 'i', long, default_value = "0.1")]
555    interval: f64,
556
557    /// ***DEPRECATED*** Disable load-fraction based max layer CPU limit.
558    /// recommended.
559    #[clap(short = 'n', long, default_value = "false")]
560    no_load_frac_limit: bool,
561
562    /// Exit debug dump buffer length. 0 indicates default.
563    #[clap(long, default_value = "0")]
564    exit_dump_len: u32,
565
566    /// Enable verbose output, including libbpf details. Specify multiple
567    /// times to increase verbosity.
568    #[clap(short = 'v', long, action = clap::ArgAction::Count)]
569    verbose: u8,
570
571    /// Disable topology awareness. When enabled, the "nodes" and "llcs" settings on
572    /// a layer are ignored. Defaults to false on topologies with multiple NUMA nodes
573    /// or LLCs, and true otherwise.
574    #[arg(short = 't', long, num_args = 0..=1, default_missing_value = "true", require_equals = true)]
575    disable_topology: Option<bool>,
576
577    /// Enable cross NUMA preemption.
578    #[clap(long)]
579    xnuma_preemption: bool,
580
581    /// Disable monitor
582    #[clap(long)]
583    monitor_disable: bool,
584
585    /// Write example layer specifications into the file and exit.
586    #[clap(short = 'e', long)]
587    example: Option<String>,
588
589    /// ***DEPRECATED*** Disables preemption if the weighted load fraction
590    /// of a layer (load_frac_adj) exceeds the threshold. The default is
591    /// disabled (0.0).
592    #[clap(long, default_value = "0.0")]
593    layer_preempt_weight_disable: f64,
594
595    /// ***DEPRECATED*** Disables layer growth if the weighted load fraction
596    /// of a layer (load_frac_adj) exceeds the threshold. The default is
597    /// disabled (0.0).
598    #[clap(long, default_value = "0.0")]
599    layer_growth_weight_disable: f64,
600
601    /// Enable stats monitoring with the specified interval.
602    #[clap(long)]
603    stats: Option<f64>,
604
605    /// Run in stats monitoring mode with the specified interval. Scheduler
606    /// is not launched.
607    #[clap(long)]
608    monitor: Option<f64>,
609
610    /// Run with example layer specifications (useful for e.g. CI pipelines)
611    #[clap(long)]
612    run_example: bool,
613
614    /// ***DEPRECATED *** Enables iteration over local LLCs first for
615    /// dispatch.
616    #[clap(long, default_value = "false")]
617    local_llc_iteration: bool,
618
619    /// Low priority fallback DSQs are used to execute tasks with custom CPU
620    /// affinities. These DSQs are immediately executed iff a CPU is
621    /// otherwise idle. However, after the specified wait, they are
622    /// guranteed upto --lo-fb-share fraction of each CPU.
623    #[clap(long, default_value = "10000")]
624    lo_fb_wait_us: u64,
625
626    /// The fraction of CPU time guaranteed to low priority fallback DSQs.
627    /// See --lo-fb-wait-us.
628    #[clap(long, default_value = ".05")]
629    lo_fb_share: f64,
630
631    /// Disable antistall
632    #[clap(long, default_value = "false")]
633    disable_antistall: bool,
634
635    /// Enable numa topology based gpu task affinitization.
636    #[clap(long, default_value = "false")]
637    enable_gpu_affinitize: bool,
638
639    /// Interval at which to reaffinitize gpu tasks to numa nodes.
640    /// Defaults to 900s
641    #[clap(long, default_value = "900")]
642    gpu_affinitize_secs: u64,
643
644    /// Enable match debug
645    /// This stores a mapping of task tid
646    /// to layer id such that bpftool map dump
647    /// can be used to debug layer matches.
648    #[clap(long, default_value = "false")]
649    enable_match_debug: bool,
650
651    /// Maximum task runnable_at delay (in seconds) before antistall turns on
652    #[clap(long, default_value = "3")]
653    antistall_sec: u64,
654
655    /// Enable gpu support
656    #[clap(long, default_value = "false")]
657    enable_gpu_support: bool,
658
659    /// Gpu Kprobe Level
660    /// The value set here determines how agressive
661    /// the kprobes enabled on gpu driver functions are.
662    /// Higher values are more aggressive, incurring more system overhead
663    /// and more accurately identifying PIDs using GPUs in a more timely manner.
664    /// Lower values incur less system overhead, at the cost of less accurately
665    /// identifying GPU pids and taking longer to do so.
666    #[clap(long, default_value = "3")]
667    gpu_kprobe_level: u64,
668
669    /// Enable netdev IRQ balancing. This is experimental and should be used with caution.
670    #[clap(long, default_value = "false")]
671    netdev_irq_balance: bool,
672
673    /// Disable queued wakeup optimization.
674    #[clap(long, default_value = "false")]
675    disable_queued_wakeup: bool,
676
677    /// Per-cpu kthreads are preempting by default. Make it not so.
678    #[clap(long, default_value = "false")]
679    disable_percpu_kthread_preempt: bool,
680
681    /// Only highpri (nice < 0) per-cpu kthreads are preempting by default.
682    /// Make every per-cpu kthread preempting. Meaningful only if
683    /// --disable-percpu-kthread-preempt is not set.
684    #[clap(long, default_value = "false")]
685    percpu_kthread_preempt_all: bool,
686
687    /// Print scheduler version and exit.
688    #[clap(short = 'V', long, action = clap::ArgAction::SetTrue)]
689    version: bool,
690
691    /// Show descriptions for statistics.
692    #[clap(long)]
693    help_stats: bool,
694
695    /// Layer specification. See --help.
696    specs: Vec<String>,
697
698    /// Periodically force tasks in layers using the AvgRuntime match rule to reevaluate which layer they belong to. Default period of 2s.
699    /// turns this off.
700    #[clap(long, default_value = "2000")]
701    layer_refresh_ms_avgruntime: u64,
702
703    /// Set the path for pinning the task hint map.
704    #[clap(long, default_value = "")]
705    task_hint_map: String,
706
707    /// Print the config (after template expansion) and exit.
708    #[clap(long, default_value = "false")]
709    print_and_exit: bool,
710}
711
712fn read_total_cpu(reader: &fb_procfs::ProcReader) -> Result<fb_procfs::CpuStat> {
713    reader
714        .read_stat()
715        .context("Failed to read procfs")?
716        .total_cpu
717        .ok_or_else(|| anyhow!("Could not read total cpu stat in proc"))
718}
719
720fn calc_util(curr: &fb_procfs::CpuStat, prev: &fb_procfs::CpuStat) -> Result<f64> {
721    match (curr, prev) {
722        (
723            fb_procfs::CpuStat {
724                user_usec: Some(curr_user),
725                nice_usec: Some(curr_nice),
726                system_usec: Some(curr_system),
727                idle_usec: Some(curr_idle),
728                iowait_usec: Some(curr_iowait),
729                irq_usec: Some(curr_irq),
730                softirq_usec: Some(curr_softirq),
731                stolen_usec: Some(curr_stolen),
732                ..
733            },
734            fb_procfs::CpuStat {
735                user_usec: Some(prev_user),
736                nice_usec: Some(prev_nice),
737                system_usec: Some(prev_system),
738                idle_usec: Some(prev_idle),
739                iowait_usec: Some(prev_iowait),
740                irq_usec: Some(prev_irq),
741                softirq_usec: Some(prev_softirq),
742                stolen_usec: Some(prev_stolen),
743                ..
744            },
745        ) => {
746            let idle_usec = curr_idle.saturating_sub(*prev_idle);
747            let iowait_usec = curr_iowait.saturating_sub(*prev_iowait);
748            let user_usec = curr_user.saturating_sub(*prev_user);
749            let system_usec = curr_system.saturating_sub(*prev_system);
750            let nice_usec = curr_nice.saturating_sub(*prev_nice);
751            let irq_usec = curr_irq.saturating_sub(*prev_irq);
752            let softirq_usec = curr_softirq.saturating_sub(*prev_softirq);
753            let stolen_usec = curr_stolen.saturating_sub(*prev_stolen);
754
755            let busy_usec =
756                user_usec + system_usec + nice_usec + irq_usec + softirq_usec + stolen_usec;
757            let total_usec = idle_usec + busy_usec + iowait_usec;
758            if total_usec > 0 {
759                Ok(((busy_usec as f64) / (total_usec as f64)).clamp(0.0, 1.0))
760            } else {
761                Ok(1.0)
762            }
763        }
764        _ => bail!("Missing stats in cpustat"),
765    }
766}
767
768fn copy_into_cstr(dst: &mut [i8], src: &str) {
769    let cstr = CString::new(src).unwrap();
770    let bytes = unsafe { std::mem::transmute::<&[u8], &[i8]>(cstr.as_bytes_with_nul()) };
771    dst[0..bytes.len()].copy_from_slice(bytes);
772}
773
774fn nodemask_from_nodes(nodes: &Vec<usize>) -> usize {
775    let mut mask = 0;
776    for node in nodes {
777        mask |= 1 << node;
778    }
779    mask
780}
781
782fn llcmask_from_llcs(llcs: &BTreeMap<usize, Arc<Llc>>) -> usize {
783    let mut mask = 0;
784    for (_, cache) in llcs {
785        mask |= 1 << cache.id;
786    }
787    mask
788}
789
790fn read_cpu_ctxs(skel: &BpfSkel) -> Result<Vec<bpf_intf::cpu_ctx>> {
791    let mut cpu_ctxs = vec![];
792    let cpu_ctxs_vec = skel
793        .maps
794        .cpu_ctxs
795        .lookup_percpu(&0u32.to_ne_bytes(), libbpf_rs::MapFlags::ANY)
796        .context("Failed to lookup cpu_ctx")?
797        .unwrap();
798    for cpu in 0..*NR_CPUS_POSSIBLE {
799        cpu_ctxs.push(*unsafe {
800            &*(cpu_ctxs_vec[cpu].as_slice().as_ptr() as *const bpf_intf::cpu_ctx)
801        });
802    }
803    Ok(cpu_ctxs)
804}
805
806#[derive(Clone, Debug)]
807struct BpfStats {
808    gstats: Vec<u64>,
809    lstats: Vec<Vec<u64>>,
810    lstats_sums: Vec<u64>,
811    llc_lstats: Vec<Vec<Vec<u64>>>, // [layer][llc][stat]
812}
813
814impl BpfStats {
815    fn read(skel: &BpfSkel, cpu_ctxs: &[bpf_intf::cpu_ctx]) -> Self {
816        let nr_layers = skel.maps.rodata_data.nr_layers as usize;
817        let nr_llcs = skel.maps.rodata_data.nr_llcs as usize;
818        let mut gstats = vec![0u64; NR_GSTATS];
819        let mut lstats = vec![vec![0u64; NR_LSTATS]; nr_layers];
820        let mut llc_lstats = vec![vec![vec![0u64; NR_LLC_LSTATS]; nr_llcs]; nr_layers];
821
822        for cpu in 0..*NR_CPUS_POSSIBLE {
823            for stat in 0..NR_GSTATS {
824                gstats[stat] += cpu_ctxs[cpu].gstats[stat];
825            }
826            for layer in 0..nr_layers {
827                for stat in 0..NR_LSTATS {
828                    lstats[layer][stat] += cpu_ctxs[cpu].lstats[layer][stat];
829                }
830            }
831        }
832
833        let mut lstats_sums = vec![0u64; NR_LSTATS];
834        for layer in 0..nr_layers {
835            for stat in 0..NR_LSTATS {
836                lstats_sums[stat] += lstats[layer][stat];
837            }
838        }
839
840        for llc_id in 0..nr_llcs {
841            // XXX - This would be a lot easier if llc_ctx were in
842            // the bss. Unfortunately, kernel < v6.12 crashes and
843            // kernel >= v6.12 fails verification after such
844            // conversion due to seemingly verifier bugs. Convert to
845            // bss maps later.
846            let key = llc_id as u32;
847            let llc_id_slice =
848                unsafe { std::slice::from_raw_parts((&key as *const u32) as *const u8, 4) };
849            let v = skel
850                .maps
851                .llc_data
852                .lookup(llc_id_slice, libbpf_rs::MapFlags::ANY)
853                .unwrap()
854                .unwrap();
855            let llcc = unsafe { *(v.as_slice().as_ptr() as *const bpf_intf::llc_ctx) };
856
857            for layer_id in 0..nr_layers {
858                for stat_id in 0..NR_LLC_LSTATS {
859                    llc_lstats[layer_id][llc_id][stat_id] = llcc.lstats[layer_id][stat_id];
860                }
861            }
862        }
863
864        Self {
865            gstats,
866            lstats,
867            lstats_sums,
868            llc_lstats,
869        }
870    }
871}
872
873impl<'a, 'b> Sub<&'b BpfStats> for &'a BpfStats {
874    type Output = BpfStats;
875
876    fn sub(self, rhs: &'b BpfStats) -> BpfStats {
877        let vec_sub = |l: &[u64], r: &[u64]| l.iter().zip(r.iter()).map(|(l, r)| *l - *r).collect();
878        BpfStats {
879            gstats: vec_sub(&self.gstats, &rhs.gstats),
880            lstats: self
881                .lstats
882                .iter()
883                .zip(rhs.lstats.iter())
884                .map(|(l, r)| vec_sub(l, r))
885                .collect(),
886            lstats_sums: vec_sub(&self.lstats_sums, &rhs.lstats_sums),
887            llc_lstats: self
888                .llc_lstats
889                .iter()
890                .zip(rhs.llc_lstats.iter())
891                .map(|(l_layer, r_layer)| {
892                    l_layer
893                        .iter()
894                        .zip(r_layer.iter())
895                        .map(|(l_llc, r_llc)| {
896                            let (l_llc, mut r_llc) = (l_llc.clone(), r_llc.clone());
897                            // Lat is not subtractable, take L side.
898                            r_llc[bpf_intf::llc_layer_stat_id_LLC_LSTAT_LAT as usize] = 0;
899                            vec_sub(&l_llc, &r_llc)
900                        })
901                        .collect()
902                })
903                .collect(),
904        }
905    }
906}
907
908#[derive(Clone, Debug)]
909struct Stats {
910    at: Instant,
911    elapsed: Duration,
912    nr_layers: usize,
913    nr_layer_tasks: Vec<usize>,
914    nr_nodes: usize,
915
916    total_util: f64, // Running AVG of sum of layer_utils
917    layer_utils: Vec<Vec<f64>>,
918    prev_layer_usages: Vec<Vec<u64>>,
919
920    cpu_busy: f64, // Read from /proc, maybe higher than total_util
921    prev_total_cpu: fb_procfs::CpuStat,
922
923    bpf_stats: BpfStats,
924    prev_bpf_stats: BpfStats,
925
926    processing_dur: Duration,
927    prev_processing_dur: Duration,
928
929    layer_slice_us: Vec<u64>,
930
931    gpu_tasks_affinitized: u64,
932    gpu_task_affinitization_ms: u64,
933}
934
935impl Stats {
936    fn read_layer_usages(cpu_ctxs: &[bpf_intf::cpu_ctx], nr_layers: usize) -> Vec<Vec<u64>> {
937        let mut layer_usages = vec![vec![0u64; NR_LAYER_USAGES]; nr_layers];
938
939        for cpu in 0..*NR_CPUS_POSSIBLE {
940            for layer in 0..nr_layers {
941                for usage in 0..NR_LAYER_USAGES {
942                    layer_usages[layer][usage] += cpu_ctxs[cpu].layer_usages[layer][usage];
943                }
944            }
945        }
946
947        layer_usages
948    }
949
950    fn new(
951        skel: &mut BpfSkel,
952        proc_reader: &fb_procfs::ProcReader,
953        gpu_task_affinitizer: &GpuTaskAffinitizer,
954    ) -> Result<Self> {
955        let nr_layers = skel.maps.rodata_data.nr_layers as usize;
956        let cpu_ctxs = read_cpu_ctxs(skel)?;
957        let bpf_stats = BpfStats::read(skel, &cpu_ctxs);
958        let nr_nodes = skel.maps.rodata_data.nr_nodes as usize;
959
960        Ok(Self {
961            at: Instant::now(),
962            elapsed: Default::default(),
963            nr_layers,
964            nr_layer_tasks: vec![0; nr_layers],
965            nr_nodes,
966
967            total_util: 0.0,
968            layer_utils: vec![vec![0.0; NR_LAYER_USAGES]; nr_layers],
969            prev_layer_usages: Self::read_layer_usages(&cpu_ctxs, nr_layers),
970
971            cpu_busy: 0.0,
972            prev_total_cpu: read_total_cpu(proc_reader)?,
973
974            bpf_stats: bpf_stats.clone(),
975            prev_bpf_stats: bpf_stats,
976
977            processing_dur: Default::default(),
978            prev_processing_dur: Default::default(),
979
980            layer_slice_us: vec![0; nr_layers],
981            gpu_tasks_affinitized: gpu_task_affinitizer.tasks_affinitized,
982            gpu_task_affinitization_ms: gpu_task_affinitizer.last_task_affinitization_ms,
983        })
984    }
985
986    fn refresh(
987        &mut self,
988        skel: &mut BpfSkel,
989        proc_reader: &fb_procfs::ProcReader,
990        now: Instant,
991        cur_processing_dur: Duration,
992        gpu_task_affinitizer: &GpuTaskAffinitizer,
993    ) -> Result<()> {
994        let elapsed = now.duration_since(self.at);
995        let elapsed_f64 = elapsed.as_secs_f64();
996        let cpu_ctxs = read_cpu_ctxs(skel)?;
997
998        let nr_layer_tasks: Vec<usize> = skel
999            .maps
1000            .bss_data
1001            .layers
1002            .iter()
1003            .take(self.nr_layers)
1004            .map(|layer| layer.nr_tasks as usize)
1005            .collect();
1006        let layer_slice_us: Vec<u64> = skel
1007            .maps
1008            .bss_data
1009            .layers
1010            .iter()
1011            .take(self.nr_layers)
1012            .map(|layer| layer.slice_ns / 1000_u64)
1013            .collect();
1014
1015        let cur_layer_usages = Self::read_layer_usages(&cpu_ctxs, self.nr_layers);
1016        let cur_layer_utils: Vec<Vec<f64>> = cur_layer_usages
1017            .iter()
1018            .zip(self.prev_layer_usages.iter())
1019            .map(|(cur, prev)| {
1020                cur.iter()
1021                    .zip(prev.iter())
1022                    .map(|(c, p)| (c - p) as f64 / 1_000_000_000.0 / elapsed_f64)
1023                    .collect()
1024            })
1025            .collect();
1026        let layer_utils: Vec<Vec<f64>> = cur_layer_utils
1027            .iter()
1028            .zip(self.layer_utils.iter())
1029            .map(|(cur, prev)| {
1030                cur.iter()
1031                    .zip(prev.iter())
1032                    .map(|(c, p)| {
1033                        let decay = USAGE_DECAY.powf(elapsed_f64);
1034                        p * decay + c * (1.0 - decay)
1035                    })
1036                    .collect()
1037            })
1038            .collect();
1039
1040        let cur_total_cpu = read_total_cpu(proc_reader)?;
1041        let cpu_busy = calc_util(&cur_total_cpu, &self.prev_total_cpu)?;
1042
1043        let cur_bpf_stats = BpfStats::read(skel, &cpu_ctxs);
1044        let bpf_stats = &cur_bpf_stats - &self.prev_bpf_stats;
1045
1046        let processing_dur = cur_processing_dur
1047            .checked_sub(self.prev_processing_dur)
1048            .unwrap();
1049
1050        *self = Self {
1051            at: now,
1052            elapsed,
1053            nr_layers: self.nr_layers,
1054            nr_layer_tasks,
1055            nr_nodes: self.nr_nodes,
1056
1057            total_util: layer_utils
1058                .iter()
1059                .map(|x| x.iter().take(LAYER_USAGE_SUM_UPTO + 1).sum::<f64>())
1060                .sum(),
1061            layer_utils,
1062            prev_layer_usages: cur_layer_usages,
1063
1064            cpu_busy,
1065            prev_total_cpu: cur_total_cpu,
1066
1067            bpf_stats,
1068            prev_bpf_stats: cur_bpf_stats,
1069
1070            processing_dur,
1071            prev_processing_dur: cur_processing_dur,
1072
1073            layer_slice_us,
1074            gpu_tasks_affinitized: gpu_task_affinitizer.tasks_affinitized,
1075            gpu_task_affinitization_ms: gpu_task_affinitizer.last_task_affinitization_ms,
1076        };
1077        Ok(())
1078    }
1079}
1080
1081#[derive(Debug)]
1082struct Layer {
1083    name: String,
1084    kind: LayerKind,
1085    growth_algo: LayerGrowthAlgo,
1086    core_order: Vec<usize>,
1087
1088    target_llc_cpus: (usize, usize),
1089    assigned_llcs: Vec<usize>,
1090
1091    nr_cpus: usize,
1092    nr_llc_cpus: Vec<usize>,
1093    cpus: Cpumask,
1094    allowed_cpus: Cpumask,
1095}
1096
1097fn get_kallsyms_addr(sym_name: &str) -> Result<u64> {
1098    fs::read_to_string("/proc/kallsyms")?
1099        .lines()
1100        .find(|line| line.contains(sym_name))
1101        .and_then(|line| line.split_whitespace().next())
1102        .and_then(|addr| u64::from_str_radix(addr, 16).ok())
1103        .ok_or_else(|| anyhow!("Symbol '{}' not found", sym_name))
1104}
1105
1106fn resolve_cpus_pct_range(
1107    cpus_range: &Option<(usize, usize)>,
1108    cpus_range_frac: &Option<(f64, f64)>,
1109    max_cpus: usize,
1110) -> Result<(usize, usize)> {
1111    match (cpus_range, cpus_range_frac) {
1112        (Some(_x), Some(_y)) => {
1113            bail!("cpus_range cannot be used with cpus_pct.");
1114        }
1115        (Some((cpus_range_min, cpus_range_max)), None) => Ok((*cpus_range_min, *cpus_range_max)),
1116        (None, Some((cpus_frac_min, cpus_frac_max))) => {
1117            if *cpus_frac_min < 0_f64
1118                || *cpus_frac_min > 1_f64
1119                || *cpus_frac_max < 0_f64
1120                || *cpus_frac_max > 1_f64
1121            {
1122                bail!("cpus_range_frac values must be between 0.0 and 1.0");
1123            }
1124            let cpus_min_count = ((max_cpus as f64) * cpus_frac_min).round_ties_even() as usize;
1125            let cpus_max_count = ((max_cpus as f64) * cpus_frac_max).round_ties_even() as usize;
1126            Ok((
1127                std::cmp::max(cpus_min_count, 1),
1128                std::cmp::min(cpus_max_count, max_cpus),
1129            ))
1130        }
1131        (None, None) => Ok((0, max_cpus)),
1132    }
1133}
1134
1135impl Layer {
1136    fn new(spec: &LayerSpec, topo: &Topology, core_order: &Vec<usize>) -> Result<Self> {
1137        let name = &spec.name;
1138        let kind = spec.kind.clone();
1139        let mut allowed_cpus = Cpumask::new();
1140        match &kind {
1141            LayerKind::Confined {
1142                cpus_range,
1143                cpus_range_frac,
1144                common: LayerCommon { nodes, llcs, .. },
1145                ..
1146            } => {
1147                let cpus_range =
1148                    resolve_cpus_pct_range(cpus_range, cpus_range_frac, topo.all_cpus.len())?;
1149                if cpus_range.0 > cpus_range.1 || cpus_range.1 == 0 {
1150                    bail!("invalid cpus_range {:?}", cpus_range);
1151                }
1152                if nodes.is_empty() && llcs.is_empty() {
1153                    allowed_cpus.set_all();
1154                } else {
1155                    // build up the cpus bitset
1156                    for (node_id, node) in &topo.nodes {
1157                        // first do the matching for nodes
1158                        if nodes.contains(node_id) {
1159                            for &id in node.all_cpus.keys() {
1160                                allowed_cpus.set_cpu(id)?;
1161                            }
1162                        }
1163                        // next match on any LLCs
1164                        for (llc_id, llc) in &node.llcs {
1165                            if llcs.contains(llc_id) {
1166                                for &id in llc.all_cpus.keys() {
1167                                    allowed_cpus.set_cpu(id)?;
1168                                }
1169                            }
1170                        }
1171                    }
1172                }
1173            }
1174            LayerKind::Grouped {
1175                common: LayerCommon { nodes, llcs, .. },
1176                ..
1177            }
1178            | LayerKind::Open {
1179                common: LayerCommon { nodes, llcs, .. },
1180                ..
1181            } => {
1182                if nodes.is_empty() && llcs.is_empty() {
1183                    allowed_cpus.set_all();
1184                } else {
1185                    // build up the cpus bitset
1186                    for (node_id, node) in &topo.nodes {
1187                        // first do the matching for nodes
1188                        if nodes.contains(node_id) {
1189                            for &id in node.all_cpus.keys() {
1190                                allowed_cpus.set_cpu(id)?;
1191                            }
1192                        }
1193                        // next match on any LLCs
1194                        for (llc_id, llc) in &node.llcs {
1195                            if llcs.contains(llc_id) {
1196                                for &id in llc.all_cpus.keys() {
1197                                    allowed_cpus.set_cpu(id)?;
1198                                }
1199                            }
1200                        }
1201                    }
1202                }
1203            }
1204        }
1205
1206        // Util can be above 1.0 for grouped layers if
1207        // util_includes_open_cputime is set.
1208        if let Some(util_range) = kind.util_range() {
1209            if util_range.0 < 0.0 || util_range.1 < 0.0 || util_range.0 >= util_range.1 {
1210                bail!("invalid util_range {:?}", util_range);
1211            }
1212        }
1213
1214        let layer_growth_algo = kind.common().growth_algo.clone();
1215
1216        debug!(
1217            "layer: {} algo: {:?} core order: {:?}",
1218            name, &layer_growth_algo, core_order
1219        );
1220
1221        Ok(Self {
1222            name: name.into(),
1223            kind,
1224            growth_algo: layer_growth_algo,
1225            core_order: core_order.clone(),
1226
1227            target_llc_cpus: (0, 0),
1228            assigned_llcs: vec![],
1229
1230            nr_cpus: 0,
1231            nr_llc_cpus: vec![0; topo.all_llcs.len()],
1232            cpus: Cpumask::new(),
1233            allowed_cpus,
1234        })
1235    }
1236
1237    fn free_some_cpus(&mut self, cpu_pool: &mut CpuPool, max_to_free: usize) -> Result<usize> {
1238        let cpus_to_free = match cpu_pool.next_to_free(&self.cpus, self.core_order.iter().rev())? {
1239            Some(ret) => ret.clone(),
1240            None => return Ok(0),
1241        };
1242
1243        let nr_to_free = cpus_to_free.weight();
1244
1245        Ok(if nr_to_free <= max_to_free {
1246            trace!("[{}] freeing CPUs: {}", self.name, &cpus_to_free);
1247            self.cpus &= &cpus_to_free.not();
1248            self.nr_cpus -= nr_to_free;
1249            for cpu in cpus_to_free.iter() {
1250                self.nr_llc_cpus[cpu_pool.topo.all_cpus[&cpu].llc_id] -= 1;
1251            }
1252            cpu_pool.free(&cpus_to_free)?;
1253            nr_to_free
1254        } else {
1255            0
1256        })
1257    }
1258
1259    fn alloc_some_cpus(&mut self, cpu_pool: &mut CpuPool) -> Result<usize> {
1260        let new_cpus = match cpu_pool
1261            .alloc_cpus(&self.allowed_cpus, &self.core_order)
1262            .clone()
1263        {
1264            Some(ret) => ret.clone(),
1265            None => {
1266                trace!("layer-{} can't grow, no CPUs", &self.name);
1267                return Ok(0);
1268            }
1269        };
1270
1271        let nr_new_cpus = new_cpus.weight();
1272
1273        trace!("[{}] adding CPUs: {}", &self.name, &new_cpus);
1274        self.cpus |= &new_cpus;
1275        self.nr_cpus += nr_new_cpus;
1276        for cpu in new_cpus.iter() {
1277            self.nr_llc_cpus[cpu_pool.topo.all_cpus[&cpu].llc_id] += 1;
1278        }
1279        Ok(nr_new_cpus)
1280    }
1281}
1282#[derive(Debug, Clone)]
1283struct NodeInfo {
1284    node_mask: nix::sched::CpuSet,
1285    _node_id: usize,
1286}
1287
1288#[derive(Debug)]
1289struct GpuTaskAffinitizer {
1290    // This struct tracks information neccessary to numa affinitize
1291    // gpu tasks periodically when needed.
1292    gpu_devs_to_node_info: HashMap<u32, NodeInfo>,
1293    gpu_pids_to_devs: HashMap<Pid, u32>,
1294    last_process_time: Option<Instant>,
1295    sys: System,
1296    pid_map: HashMap<Pid, Vec<Pid>>,
1297    poll_interval: Duration,
1298    enable: bool,
1299    tasks_affinitized: u64,
1300    last_task_affinitization_ms: u64,
1301}
1302
1303impl GpuTaskAffinitizer {
1304    pub fn new(poll_interval: u64, enable: bool) -> GpuTaskAffinitizer {
1305        GpuTaskAffinitizer {
1306            gpu_devs_to_node_info: HashMap::new(),
1307            gpu_pids_to_devs: HashMap::new(),
1308            last_process_time: None,
1309            sys: System::default(),
1310            pid_map: HashMap::new(),
1311            poll_interval: Duration::from_secs(poll_interval),
1312            enable,
1313            tasks_affinitized: 0,
1314            last_task_affinitization_ms: 0,
1315        }
1316    }
1317
1318    fn find_one_cpu(&self, affinity: Vec<u64>) -> Result<u32> {
1319        for (chunk, &mask) in affinity.iter().enumerate() {
1320            let mut inner_offset: u64 = 1;
1321            for _ in 0..64 {
1322                if (mask & inner_offset) != 0 {
1323                    return Ok((64 * chunk + u64::trailing_zeros(inner_offset) as usize) as u32);
1324                }
1325                inner_offset = inner_offset << 1;
1326            }
1327        }
1328        anyhow::bail!("unable to get CPU from NVML bitmask");
1329    }
1330
1331    fn node_to_cpuset(&self, node: &scx_utils::Node) -> Result<CpuSet> {
1332        let mut cpuset = CpuSet::new();
1333        for (cpu_id, _cpu) in &node.all_cpus {
1334            cpuset.set(*cpu_id)?;
1335        }
1336        Ok(cpuset)
1337    }
1338
1339    fn init_dev_node_map(&mut self, topo: Arc<Topology>) -> Result<()> {
1340        let nvml = nvml()?;
1341        let device_count = nvml.device_count()?;
1342
1343        for idx in 0..device_count {
1344            let dev = nvml.device_by_index(idx)?;
1345            // For machines w/ up to 1024 CPUs.
1346            let cpu = dev.cpu_affinity(16)?;
1347            let ideal_cpu = self.find_one_cpu(cpu)?;
1348            if let Some(cpu) = topo.all_cpus.get(&(ideal_cpu as usize)) {
1349                self.gpu_devs_to_node_info.insert(
1350                    idx,
1351                    NodeInfo {
1352                        node_mask: self.node_to_cpuset(
1353                            topo.nodes.get(&cpu.node_id).expect("topo missing node"),
1354                        )?,
1355                        _node_id: cpu.node_id,
1356                    },
1357                );
1358            }
1359        }
1360        Ok(())
1361    }
1362
1363    fn update_gpu_pids(&mut self) -> Result<()> {
1364        let nvml = nvml()?;
1365        for i in 0..nvml.device_count()? {
1366            let device = nvml.device_by_index(i)?;
1367            for proc in device
1368                .running_compute_processes()?
1369                .into_iter()
1370                .chain(device.running_graphics_processes()?.into_iter())
1371            {
1372                self.gpu_pids_to_devs.insert(Pid::from_u32(proc.pid), i);
1373            }
1374        }
1375        Ok(())
1376    }
1377
1378    fn update_process_info(&mut self) -> Result<()> {
1379        self.sys.refresh_processes_specifics(
1380            ProcessesToUpdate::All,
1381            true,
1382            ProcessRefreshKind::nothing(),
1383        );
1384        self.pid_map.clear();
1385        for (pid, proc_) in self.sys.processes() {
1386            if let Some(ppid) = proc_.parent() {
1387                self.pid_map.entry(ppid).or_default().push(*pid);
1388            }
1389        }
1390        Ok(())
1391    }
1392
1393    fn get_child_pids_and_tids(&self, root_pid: Pid) -> HashSet<Pid> {
1394        let mut work = VecDeque::from([root_pid]);
1395        let mut pids_and_tids: HashSet<Pid> = HashSet::new();
1396
1397        while let Some(pid) = work.pop_front() {
1398            if pids_and_tids.insert(pid) {
1399                if let Some(kids) = self.pid_map.get(&pid) {
1400                    work.extend(kids);
1401                }
1402                if let Some(proc_) = self.sys.process(pid) {
1403                    if let Some(tasks) = proc_.tasks() {
1404                        pids_and_tids.extend(tasks.iter().copied());
1405                    }
1406                }
1407            }
1408        }
1409        pids_and_tids
1410    }
1411
1412    fn affinitize_gpu_pids(&mut self) -> Result<()> {
1413        if !self.enable {
1414            return Ok(());
1415        }
1416        for (pid, dev) in &self.gpu_pids_to_devs {
1417            let node_info = self
1418                .gpu_devs_to_node_info
1419                .get(&dev)
1420                .expect("Unable to get gpu pid node mask");
1421            for child in self.get_child_pids_and_tids(*pid) {
1422                match nix::sched::sched_setaffinity(
1423                    nix::unistd::Pid::from_raw(child.as_u32() as i32),
1424                    &node_info.node_mask,
1425                ) {
1426                    Ok(_) => {
1427                        // Increment the global counter for successful affinitization
1428                        self.tasks_affinitized += 1;
1429                    }
1430                    Err(_) => {
1431                        warn!(
1432                            "Error affinitizing gpu pid {} to node {:#?}",
1433                            child.as_u32(),
1434                            node_info
1435                        );
1436                    }
1437                };
1438            }
1439        }
1440        Ok(())
1441    }
1442
1443    pub fn maybe_affinitize(&mut self) {
1444        if !self.enable {
1445            return;
1446        }
1447        let now = Instant::now();
1448
1449        if let Some(last_process_time) = self.last_process_time {
1450            if (now - last_process_time) < self.poll_interval {
1451                return;
1452            }
1453        }
1454
1455        match self.update_gpu_pids() {
1456            Ok(_) => {}
1457            Err(e) => {
1458                error!("Error updating GPU PIDs: {}", e);
1459            }
1460        };
1461        match self.update_process_info() {
1462            Ok(_) => {}
1463            Err(e) => {
1464                error!("Error updating process info to affinitize GPU PIDs: {}", e);
1465            }
1466        };
1467        match self.affinitize_gpu_pids() {
1468            Ok(_) => {}
1469            Err(e) => {
1470                error!("Error updating GPU PIDs: {}", e);
1471            }
1472        };
1473        self.last_process_time = Some(now);
1474        self.last_task_affinitization_ms = (Instant::now() - now).as_millis() as u64;
1475
1476        return;
1477    }
1478
1479    pub fn init(&mut self, topo: Arc<Topology>) {
1480        if !self.enable || self.last_process_time.is_some() {
1481            return;
1482        }
1483
1484        match self.init_dev_node_map(topo) {
1485            Ok(_) => {}
1486            Err(e) => {
1487                error!("Error initializing gpu node dev map: {}", e);
1488            }
1489        };
1490        self.sys = System::new_all();
1491        return;
1492    }
1493}
1494
1495struct Scheduler<'a> {
1496    skel: BpfSkel<'a>,
1497    struct_ops: Option<libbpf_rs::Link>,
1498    layer_specs: Vec<LayerSpec>,
1499
1500    sched_intv: Duration,
1501    layer_refresh_intv: Duration,
1502
1503    cpu_pool: CpuPool,
1504    layers: Vec<Layer>,
1505    idle_qos_enabled: bool,
1506
1507    proc_reader: fb_procfs::ProcReader,
1508    sched_stats: Stats,
1509
1510    nr_layer_cpus_ranges: Vec<(usize, usize)>,
1511    processing_dur: Duration,
1512
1513    topo: Arc<Topology>,
1514    netdevs: BTreeMap<String, NetDev>,
1515    stats_server: StatsServer<StatsReq, StatsRes>,
1516    gpu_task_handler: GpuTaskAffinitizer,
1517}
1518
1519impl<'a> Scheduler<'a> {
1520    fn init_layers(skel: &mut OpenBpfSkel, specs: &[LayerSpec], topo: &Topology) -> Result<()> {
1521        skel.maps.rodata_data.nr_layers = specs.len() as u32;
1522        let mut perf_set = false;
1523
1524        let mut layer_iteration_order = (0..specs.len()).collect::<Vec<_>>();
1525        let mut layer_weights: Vec<usize> = vec![];
1526
1527        for (spec_i, spec) in specs.iter().enumerate() {
1528            let layer = &mut skel.maps.bss_data.layers[spec_i];
1529
1530            for (or_i, or) in spec.matches.iter().enumerate() {
1531                for (and_i, and) in or.iter().enumerate() {
1532                    let mt = &mut layer.matches[or_i].matches[and_i];
1533
1534                    // Rules are allowlist-based by default
1535                    mt.exclude.write(false);
1536
1537                    match and {
1538                        LayerMatch::CgroupPrefix(prefix) => {
1539                            mt.kind = bpf_intf::layer_match_kind_MATCH_CGROUP_PREFIX as i32;
1540                            copy_into_cstr(&mut mt.cgroup_prefix, prefix.as_str());
1541                        }
1542                        LayerMatch::CgroupSuffix(suffix) => {
1543                            mt.kind = bpf_intf::layer_match_kind_MATCH_CGROUP_SUFFIX as i32;
1544                            copy_into_cstr(&mut mt.cgroup_suffix, suffix.as_str());
1545                        }
1546                        LayerMatch::CgroupContains(substr) => {
1547                            mt.kind = bpf_intf::layer_match_kind_MATCH_CGROUP_CONTAINS as i32;
1548                            copy_into_cstr(&mut mt.cgroup_substr, substr.as_str());
1549                        }
1550                        LayerMatch::CommPrefix(prefix) => {
1551                            mt.kind = bpf_intf::layer_match_kind_MATCH_COMM_PREFIX as i32;
1552                            copy_into_cstr(&mut mt.comm_prefix, prefix.as_str());
1553                        }
1554                        LayerMatch::CommPrefixExclude(prefix) => {
1555                            mt.kind = bpf_intf::layer_match_kind_MATCH_COMM_PREFIX as i32;
1556                            mt.exclude.write(true);
1557                            copy_into_cstr(&mut mt.comm_prefix, prefix.as_str());
1558                        }
1559                        LayerMatch::PcommPrefix(prefix) => {
1560                            mt.kind = bpf_intf::layer_match_kind_MATCH_PCOMM_PREFIX as i32;
1561                            copy_into_cstr(&mut mt.pcomm_prefix, prefix.as_str());
1562                        }
1563                        LayerMatch::PcommPrefixExclude(prefix) => {
1564                            mt.kind = bpf_intf::layer_match_kind_MATCH_PCOMM_PREFIX as i32;
1565                            mt.exclude.write(true);
1566                            copy_into_cstr(&mut mt.pcomm_prefix, prefix.as_str());
1567                        }
1568                        LayerMatch::NiceAbove(nice) => {
1569                            mt.kind = bpf_intf::layer_match_kind_MATCH_NICE_ABOVE as i32;
1570                            mt.nice = *nice;
1571                        }
1572                        LayerMatch::NiceBelow(nice) => {
1573                            mt.kind = bpf_intf::layer_match_kind_MATCH_NICE_BELOW as i32;
1574                            mt.nice = *nice;
1575                        }
1576                        LayerMatch::NiceEquals(nice) => {
1577                            mt.kind = bpf_intf::layer_match_kind_MATCH_NICE_EQUALS as i32;
1578                            mt.nice = *nice;
1579                        }
1580                        LayerMatch::UIDEquals(user_id) => {
1581                            mt.kind = bpf_intf::layer_match_kind_MATCH_USER_ID_EQUALS as i32;
1582                            mt.user_id = *user_id;
1583                        }
1584                        LayerMatch::GIDEquals(group_id) => {
1585                            mt.kind = bpf_intf::layer_match_kind_MATCH_GROUP_ID_EQUALS as i32;
1586                            mt.group_id = *group_id;
1587                        }
1588                        LayerMatch::PIDEquals(pid) => {
1589                            mt.kind = bpf_intf::layer_match_kind_MATCH_PID_EQUALS as i32;
1590                            mt.pid = *pid;
1591                        }
1592                        LayerMatch::PPIDEquals(ppid) => {
1593                            mt.kind = bpf_intf::layer_match_kind_MATCH_PPID_EQUALS as i32;
1594                            mt.ppid = *ppid;
1595                        }
1596                        LayerMatch::TGIDEquals(tgid) => {
1597                            mt.kind = bpf_intf::layer_match_kind_MATCH_TGID_EQUALS as i32;
1598                            mt.tgid = *tgid;
1599                        }
1600                        LayerMatch::NSPIDEquals(nsid, pid) => {
1601                            mt.kind = bpf_intf::layer_match_kind_MATCH_NSPID_EQUALS as i32;
1602                            mt.nsid = *nsid;
1603                            mt.pid = *pid;
1604                        }
1605                        LayerMatch::NSEquals(nsid) => {
1606                            mt.kind = bpf_intf::layer_match_kind_MATCH_NS_EQUALS as i32;
1607                            mt.nsid = *nsid as u64;
1608                        }
1609                        LayerMatch::CmdJoin(joincmd) => {
1610                            mt.kind = bpf_intf::layer_match_kind_MATCH_SCXCMD_JOIN as i32;
1611                            copy_into_cstr(&mut mt.comm_prefix, joincmd);
1612                        }
1613                        LayerMatch::IsGroupLeader(polarity) => {
1614                            mt.kind = bpf_intf::layer_match_kind_MATCH_IS_GROUP_LEADER as i32;
1615                            mt.is_group_leader.write(*polarity);
1616                        }
1617                        LayerMatch::IsKthread(polarity) => {
1618                            mt.kind = bpf_intf::layer_match_kind_MATCH_IS_KTHREAD as i32;
1619                            mt.is_kthread.write(*polarity);
1620                        }
1621                        LayerMatch::UsedGpuTid(polarity) => {
1622                            mt.kind = bpf_intf::layer_match_kind_MATCH_USED_GPU_TID as i32;
1623                            mt.used_gpu_tid.write(*polarity);
1624                        }
1625                        LayerMatch::UsedGpuPid(polarity) => {
1626                            mt.kind = bpf_intf::layer_match_kind_MATCH_USED_GPU_PID as i32;
1627                            mt.used_gpu_pid.write(*polarity);
1628                        }
1629                        LayerMatch::AvgRuntime(min, max) => {
1630                            mt.kind = bpf_intf::layer_match_kind_MATCH_AVG_RUNTIME as i32;
1631                            mt.min_avg_runtime_us = *min;
1632                            mt.max_avg_runtime_us = *max;
1633                        }
1634                    }
1635                }
1636                layer.matches[or_i].nr_match_ands = or.len() as i32;
1637            }
1638
1639            layer.nr_match_ors = spec.matches.len() as u32;
1640            layer.kind = spec.kind.as_bpf_enum();
1641
1642            {
1643                let LayerCommon {
1644                    min_exec_us,
1645                    yield_ignore,
1646                    perf,
1647                    preempt,
1648                    preempt_first,
1649                    exclusive,
1650                    allow_node_aligned,
1651                    skip_remote_node,
1652                    prev_over_idle_core,
1653                    growth_algo,
1654                    nodes,
1655                    slice_us,
1656                    fifo,
1657                    weight,
1658                    disallow_open_after_us,
1659                    disallow_preempt_after_us,
1660                    xllc_mig_min_us,
1661                    placement,
1662                    ..
1663                } = spec.kind.common();
1664
1665                layer.slice_ns = *slice_us * 1000;
1666                layer.fifo.write(*fifo);
1667                layer.min_exec_ns = min_exec_us * 1000;
1668                layer.yield_step_ns = if *yield_ignore > 0.999 {
1669                    0
1670                } else if *yield_ignore < 0.001 {
1671                    layer.slice_ns
1672                } else {
1673                    (layer.slice_ns as f64 * (1.0 - *yield_ignore)) as u64
1674                };
1675                let mut layer_name: String = spec.name.clone();
1676                layer_name.truncate(MAX_LAYER_NAME);
1677                copy_into_cstr(&mut layer.name, layer_name.as_str());
1678                layer.preempt.write(*preempt);
1679                layer.preempt_first.write(*preempt_first);
1680                layer.excl.write(*exclusive);
1681                layer.allow_node_aligned.write(*allow_node_aligned);
1682                layer.skip_remote_node.write(*skip_remote_node);
1683                layer.prev_over_idle_core.write(*prev_over_idle_core);
1684                layer.growth_algo = growth_algo.as_bpf_enum();
1685                layer.weight = *weight;
1686                layer.disallow_open_after_ns = match disallow_open_after_us.unwrap() {
1687                    v if v == u64::MAX => v,
1688                    v => v * 1000,
1689                };
1690                layer.disallow_preempt_after_ns = match disallow_preempt_after_us.unwrap() {
1691                    v if v == u64::MAX => v,
1692                    v => v * 1000,
1693                };
1694                layer.xllc_mig_min_ns = (xllc_mig_min_us * 1000.0) as u64;
1695                layer_weights.push(layer.weight.try_into().unwrap());
1696                layer.perf = u32::try_from(*perf)?;
1697                layer.node_mask = nodemask_from_nodes(nodes) as u64;
1698                for (topo_node_id, topo_node) in &topo.nodes {
1699                    if !nodes.is_empty() && !nodes.contains(topo_node_id) {
1700                        continue;
1701                    }
1702                    layer.llc_mask |= llcmask_from_llcs(&topo_node.llcs) as u64;
1703                }
1704
1705                let task_place = |place: u32| crate::types::layer_task_place(place);
1706                layer.task_place = match placement {
1707                    LayerPlacement::Standard => {
1708                        task_place(bpf_intf::layer_task_place_PLACEMENT_STD as u32)
1709                    }
1710                    LayerPlacement::Sticky => {
1711                        task_place(bpf_intf::layer_task_place_PLACEMENT_STICK as u32)
1712                    }
1713                    LayerPlacement::Floating => {
1714                        task_place(bpf_intf::layer_task_place_PLACEMENT_FLOAT as u32)
1715                    }
1716                };
1717            }
1718
1719            layer.is_protected.write(match spec.kind {
1720                LayerKind::Open { .. } => false,
1721                LayerKind::Confined { protected, .. } | LayerKind::Grouped { protected, .. } => {
1722                    protected
1723                }
1724            });
1725
1726            match &spec.cpuset {
1727                Some(mask) => {
1728                    Self::update_cpumask(&mask, &mut layer.cpuset);
1729                }
1730                None => {
1731                    for i in 0..layer.cpuset.len() {
1732                        layer.cpuset[i] = u8::MAX;
1733                    }
1734                }
1735            };
1736
1737            perf_set |= layer.perf > 0;
1738        }
1739
1740        layer_iteration_order.sort_by(|i, j| layer_weights[*i].cmp(&layer_weights[*j]));
1741        for (idx, layer_idx) in layer_iteration_order.iter().enumerate() {
1742            skel.maps.rodata_data.layer_iteration_order[idx] = *layer_idx as u32;
1743        }
1744
1745        if perf_set && !compat::ksym_exists("scx_bpf_cpuperf_set")? {
1746            warn!("cpufreq support not available, ignoring perf configurations");
1747        }
1748
1749        Ok(())
1750    }
1751
1752    fn init_nodes(skel: &mut OpenBpfSkel, _opts: &Opts, topo: &Topology) {
1753        skel.maps.rodata_data.nr_nodes = topo.nodes.len() as u32;
1754        skel.maps.rodata_data.nr_llcs = 0;
1755
1756        for (&node_id, node) in &topo.nodes {
1757            debug!("configuring node {}, LLCs {:?}", node_id, node.llcs.len());
1758            skel.maps.rodata_data.nr_llcs += node.llcs.len() as u32;
1759            let raw_numa_slice = node.span.as_raw_slice();
1760            let node_cpumask_slice = &mut skel.maps.rodata_data.numa_cpumasks[node_id];
1761            let (left, _) = node_cpumask_slice.split_at_mut(raw_numa_slice.len());
1762            left.clone_from_slice(raw_numa_slice);
1763            debug!(
1764                "node {} mask: {:?}",
1765                node_id, skel.maps.rodata_data.numa_cpumasks[node_id]
1766            );
1767
1768            for llc in node.llcs.values() {
1769                debug!("configuring llc {:?} for node {:?}", llc.id, node_id);
1770                skel.maps.rodata_data.llc_numa_id_map[llc.id] = node_id as u32;
1771            }
1772        }
1773
1774        for cpu in topo.all_cpus.values() {
1775            skel.maps.rodata_data.cpu_llc_id_map[cpu.id] = cpu.llc_id as u32;
1776        }
1777    }
1778
1779    fn init_cpu_prox_map(topo: &Topology, cpu_ctxs: &mut [bpf_intf::cpu_ctx]) {
1780        let radiate = |mut vec: Vec<usize>, center_id: usize| -> Vec<usize> {
1781            vec.sort_by_key(|&id| (center_id as i32 - id as i32).abs());
1782            vec
1783        };
1784        let radiate_cpu =
1785            |mut vec: Vec<usize>, center_cpu: usize, center_core: usize| -> Vec<usize> {
1786                vec.sort_by_key(|&id| {
1787                    (
1788                        (center_core as i32 - topo.all_cpus.get(&id).unwrap().core_id as i32).abs(),
1789                        (center_cpu as i32 - id as i32).abs(),
1790                    )
1791                });
1792                vec
1793            };
1794
1795        for (&cpu_id, cpu) in &topo.all_cpus {
1796            // Collect the spans.
1797            let mut core_span = topo.all_cores[&cpu.core_id].span.clone();
1798            let llc_span = &topo.all_llcs[&cpu.llc_id].span;
1799            let node_span = &topo.nodes[&cpu.node_id].span;
1800            let sys_span = &topo.span;
1801
1802            // Make the spans exclusive.
1803            let sys_span = sys_span.and(&node_span.not());
1804            let node_span = node_span.and(&llc_span.not());
1805            let llc_span = llc_span.and(&core_span.not());
1806            core_span.clear_cpu(cpu_id).unwrap();
1807
1808            // Convert them into arrays.
1809            let mut sys_order: Vec<usize> = sys_span.iter().collect();
1810            let mut node_order: Vec<usize> = node_span.iter().collect();
1811            let mut llc_order: Vec<usize> = llc_span.iter().collect();
1812            let mut core_order: Vec<usize> = core_span.iter().collect();
1813
1814            // Shuffle them so that different CPUs follow different orders.
1815            // Each CPU radiates in both directions based on the cpu id and
1816            // radiates out to the closest cores based on core ids.
1817
1818            sys_order = radiate_cpu(sys_order, cpu_id, cpu.core_id);
1819            node_order = radiate(node_order, cpu.node_id);
1820            llc_order = radiate_cpu(llc_order, cpu_id, cpu.core_id);
1821            core_order = radiate_cpu(core_order, cpu_id, cpu.core_id);
1822
1823            // Concatenate and record the topology boundaries.
1824            let mut order: Vec<usize> = vec![];
1825            let mut idx: usize = 0;
1826
1827            idx += 1;
1828            order.push(cpu_id);
1829
1830            idx += core_order.len();
1831            order.append(&mut core_order);
1832            let core_end = idx;
1833
1834            idx += llc_order.len();
1835            order.append(&mut llc_order);
1836            let llc_end = idx;
1837
1838            idx += node_order.len();
1839            order.append(&mut node_order);
1840            let node_end = idx;
1841
1842            idx += sys_order.len();
1843            order.append(&mut sys_order);
1844            let sys_end = idx;
1845
1846            debug!(
1847                "CPU[{}] proximity map[{}/{}/{}/{}]: {:?}",
1848                cpu_id, core_end, llc_end, node_end, sys_end, &order
1849            );
1850
1851            // Record in cpu_ctx.
1852            let pmap = &mut cpu_ctxs[cpu_id].prox_map;
1853            for (i, &cpu) in order.iter().enumerate() {
1854                pmap.cpus[i] = cpu as u16;
1855            }
1856            pmap.core_end = core_end as u32;
1857            pmap.llc_end = llc_end as u32;
1858            pmap.node_end = node_end as u32;
1859            pmap.sys_end = sys_end as u32;
1860        }
1861    }
1862
1863    fn convert_cpu_ctxs(cpu_ctxs: Vec<bpf_intf::cpu_ctx>) -> Vec<Vec<u8>> {
1864        cpu_ctxs
1865            .into_iter()
1866            .map(|cpu_ctx| {
1867                let bytes = unsafe {
1868                    std::slice::from_raw_parts(
1869                        &cpu_ctx as *const bpf_intf::cpu_ctx as *const u8,
1870                        std::mem::size_of::<bpf_intf::cpu_ctx>(),
1871                    )
1872                };
1873                bytes.to_vec()
1874            })
1875            .collect()
1876    }
1877
1878    fn init_cpus(skel: &BpfSkel, layer_specs: &[LayerSpec], topo: &Topology) -> Result<()> {
1879        let key = (0_u32).to_ne_bytes();
1880        let mut cpu_ctxs: Vec<bpf_intf::cpu_ctx> = vec![];
1881        let cpu_ctxs_vec = skel
1882            .maps
1883            .cpu_ctxs
1884            .lookup_percpu(&key, libbpf_rs::MapFlags::ANY)
1885            .context("Failed to lookup cpu_ctx")?
1886            .unwrap();
1887
1888        let op_layers: Vec<u32> = layer_specs
1889            .iter()
1890            .enumerate()
1891            .filter(|(_idx, spec)| match &spec.kind {
1892                LayerKind::Open { .. } => spec.kind.common().preempt,
1893                _ => false,
1894            })
1895            .map(|(idx, _)| idx as u32)
1896            .collect();
1897        let on_layers: Vec<u32> = layer_specs
1898            .iter()
1899            .enumerate()
1900            .filter(|(_idx, spec)| match &spec.kind {
1901                LayerKind::Open { .. } => !spec.kind.common().preempt,
1902                _ => false,
1903            })
1904            .map(|(idx, _)| idx as u32)
1905            .collect();
1906        let gp_layers: Vec<u32> = layer_specs
1907            .iter()
1908            .enumerate()
1909            .filter(|(_idx, spec)| match &spec.kind {
1910                LayerKind::Grouped { .. } => spec.kind.common().preempt,
1911                _ => false,
1912            })
1913            .map(|(idx, _)| idx as u32)
1914            .collect();
1915        let gn_layers: Vec<u32> = layer_specs
1916            .iter()
1917            .enumerate()
1918            .filter(|(_idx, spec)| match &spec.kind {
1919                LayerKind::Grouped { .. } => !spec.kind.common().preempt,
1920                _ => false,
1921            })
1922            .map(|(idx, _)| idx as u32)
1923            .collect();
1924
1925        // FIXME - this incorrectly assumes all possible CPUs are consecutive.
1926        for cpu in 0..*NR_CPUS_POSSIBLE {
1927            cpu_ctxs.push(*unsafe {
1928                &*(cpu_ctxs_vec[cpu].as_slice().as_ptr() as *const bpf_intf::cpu_ctx)
1929            });
1930
1931            let topo_cpu = topo.all_cpus.get(&cpu).unwrap();
1932            let is_big = topo_cpu.core_type == CoreType::Big { turbo: true };
1933            cpu_ctxs[cpu].cpu = cpu as i32;
1934            cpu_ctxs[cpu].layer_id = MAX_LAYERS as u32;
1935            cpu_ctxs[cpu].task_layer_id = MAX_LAYERS as u32;
1936            cpu_ctxs[cpu].is_big = is_big;
1937
1938            fastrand::seed(cpu as u64);
1939
1940            let mut ogp_order = op_layers.clone();
1941            ogp_order.append(&mut gp_layers.clone());
1942            fastrand::shuffle(&mut ogp_order);
1943
1944            let mut ogn_order = on_layers.clone();
1945            ogn_order.append(&mut gn_layers.clone());
1946            fastrand::shuffle(&mut ogn_order);
1947
1948            let mut op_order = op_layers.clone();
1949            fastrand::shuffle(&mut op_order);
1950
1951            let mut on_order = on_layers.clone();
1952            fastrand::shuffle(&mut on_order);
1953
1954            let mut gp_order = gp_layers.clone();
1955            fastrand::shuffle(&mut gp_order);
1956
1957            let mut gn_order = gn_layers.clone();
1958            fastrand::shuffle(&mut gn_order);
1959
1960            for i in 0..MAX_LAYERS {
1961                cpu_ctxs[cpu].ogp_layer_order[i] =
1962                    ogp_order.get(i).cloned().unwrap_or(MAX_LAYERS as u32);
1963                cpu_ctxs[cpu].ogn_layer_order[i] =
1964                    ogn_order.get(i).cloned().unwrap_or(MAX_LAYERS as u32);
1965
1966                cpu_ctxs[cpu].op_layer_order[i] =
1967                    op_order.get(i).cloned().unwrap_or(MAX_LAYERS as u32);
1968                cpu_ctxs[cpu].on_layer_order[i] =
1969                    on_order.get(i).cloned().unwrap_or(MAX_LAYERS as u32);
1970                cpu_ctxs[cpu].gp_layer_order[i] =
1971                    gp_order.get(i).cloned().unwrap_or(MAX_LAYERS as u32);
1972                cpu_ctxs[cpu].gn_layer_order[i] =
1973                    gn_order.get(i).cloned().unwrap_or(MAX_LAYERS as u32);
1974            }
1975        }
1976
1977        Self::init_cpu_prox_map(topo, &mut cpu_ctxs);
1978
1979        skel.maps
1980            .cpu_ctxs
1981            .update_percpu(
1982                &key,
1983                &Self::convert_cpu_ctxs(cpu_ctxs),
1984                libbpf_rs::MapFlags::ANY,
1985            )
1986            .context("Failed to update cpu_ctx")?;
1987
1988        Ok(())
1989    }
1990
1991    fn init_llc_prox_map(skel: &mut BpfSkel, topo: &Topology) -> Result<()> {
1992        for (&llc_id, llc) in &topo.all_llcs {
1993            // Collect the orders.
1994            let mut node_order: Vec<usize> =
1995                topo.nodes[&llc.node_id].llcs.keys().cloned().collect();
1996            let mut sys_order: Vec<usize> = topo.all_llcs.keys().cloned().collect();
1997
1998            // Make the orders exclusive.
1999            sys_order.retain(|id| !node_order.contains(id));
2000            node_order.retain(|&id| id != llc_id);
2001
2002            // Shufle so that different LLCs follow different orders. See
2003            // init_cpu_prox_map().
2004            fastrand::seed(llc_id as u64);
2005            fastrand::shuffle(&mut sys_order);
2006            fastrand::shuffle(&mut node_order);
2007
2008            // Concatenate and record the node boundary.
2009            let mut order: Vec<usize> = vec![];
2010            let mut idx: usize = 0;
2011
2012            idx += 1;
2013            order.push(llc_id);
2014
2015            idx += node_order.len();
2016            order.append(&mut node_order);
2017            let node_end = idx;
2018
2019            idx += sys_order.len();
2020            order.append(&mut sys_order);
2021            let sys_end = idx;
2022
2023            debug!(
2024                "LLC[{}] proximity map[{}/{}]: {:?}",
2025                llc_id, node_end, sys_end, &order
2026            );
2027
2028            // Record in llc_ctx.
2029            //
2030            // XXX - This would be a lot easier if llc_ctx were in the bss.
2031            // See BpfStats::read().
2032            let key = llc_id as u32;
2033            let llc_id_slice =
2034                unsafe { std::slice::from_raw_parts((&key as *const u32) as *const u8, 4) };
2035            let v = skel
2036                .maps
2037                .llc_data
2038                .lookup(llc_id_slice, libbpf_rs::MapFlags::ANY)
2039                .unwrap()
2040                .unwrap();
2041            let mut llcc = unsafe { *(v.as_slice().as_ptr() as *const bpf_intf::llc_ctx) };
2042
2043            let pmap = &mut llcc.prox_map;
2044            for (i, &llc_id) in order.iter().enumerate() {
2045                pmap.llcs[i] = llc_id as u16;
2046            }
2047            pmap.node_end = node_end as u32;
2048            pmap.sys_end = sys_end as u32;
2049
2050            let v = unsafe {
2051                std::slice::from_raw_parts(
2052                    &llcc as *const bpf_intf::llc_ctx as *const u8,
2053                    std::mem::size_of::<bpf_intf::llc_ctx>(),
2054                )
2055            };
2056
2057            skel.maps
2058                .llc_data
2059                .update(llc_id_slice, v, libbpf_rs::MapFlags::ANY)?
2060        }
2061
2062        Ok(())
2063    }
2064
2065    fn init(
2066        opts: &'a Opts,
2067        layer_specs: &[LayerSpec],
2068        open_object: &'a mut MaybeUninit<OpenObject>,
2069    ) -> Result<Self> {
2070        let nr_layers = layer_specs.len();
2071        let mut disable_topology = opts.disable_topology.unwrap_or(false);
2072
2073        let topo = Arc::new(if disable_topology {
2074            Topology::with_flattened_llc_node()?
2075        } else {
2076            Topology::new()?
2077        });
2078
2079        /*
2080         * FIXME: scx_layered incorrectly assumes that node, LLC and CPU IDs
2081         * are consecutive. Verify that they are on this system and bail if
2082         * not. It's lucky that core ID is not used anywhere as core IDs are
2083         * not consecutive on some Ryzen CPUs.
2084         */
2085        if topo.nodes.keys().enumerate().any(|(i, &k)| i != k) {
2086            bail!("Holes in node IDs detected: {:?}", topo.nodes.keys());
2087        }
2088        if topo.all_llcs.keys().enumerate().any(|(i, &k)| i != k) {
2089            bail!("Holes in LLC IDs detected: {:?}", topo.all_llcs.keys());
2090        }
2091        if topo.all_cpus.keys().enumerate().any(|(i, &k)| i != k) {
2092            bail!("Holes in CPU IDs detected: {:?}", topo.all_cpus.keys());
2093        }
2094
2095        let netdevs = if opts.netdev_irq_balance {
2096            warn!(
2097                "Experimental netdev IRQ balancing enabled. Reset IRQ masks of network devices after use!!!"
2098            );
2099            read_netdevs()?
2100        } else {
2101            BTreeMap::new()
2102        };
2103
2104        if !disable_topology {
2105            if topo.nodes.len() == 1 && topo.nodes[&0].llcs.len() == 1 {
2106                disable_topology = true;
2107            };
2108            info!(
2109                "Topology awareness not specified, selecting {} based on hardware",
2110                if disable_topology {
2111                    "disabled"
2112                } else {
2113                    "enabled"
2114                }
2115            );
2116        };
2117
2118        let cpu_pool = CpuPool::new(topo.clone())?;
2119
2120        // If disabling topology awareness clear out any set NUMA/LLC configs and
2121        // it will fallback to using all cores.
2122        let layer_specs: Vec<_> = if disable_topology {
2123            info!("Disabling topology awareness");
2124            layer_specs
2125                .iter()
2126                .cloned()
2127                .map(|mut s| {
2128                    s.kind.common_mut().nodes.clear();
2129                    s.kind.common_mut().llcs.clear();
2130                    s
2131                })
2132                .collect()
2133        } else {
2134            layer_specs.to_vec()
2135        };
2136
2137        // Open the BPF prog first for verification.
2138        let mut skel_builder = BpfSkelBuilder::default();
2139        skel_builder.obj_builder.debug(opts.verbose > 1);
2140        init_libbpf_logging(None);
2141        info!(
2142            "Running scx_layered (build ID: {})",
2143            build_id::full_version(env!("CARGO_PKG_VERSION"))
2144        );
2145        let mut skel = scx_ops_open!(skel_builder, open_object, layered)?;
2146
2147        // enable autoloads for conditionally loaded things
2148        // immediately after creating skel (because this is always before loading)
2149        if opts.enable_gpu_support {
2150            // by default, enable open if gpu support is enabled.
2151            // open has been observed to be relatively cheap to kprobe.
2152            if opts.gpu_kprobe_level >= 1 {
2153                compat::cond_kprobe_enable("nvidia_open", &skel.progs.kprobe_nvidia_open)?;
2154            }
2155            // enable the rest progressively based upon how often they are called
2156            // for observed workloads
2157            if opts.gpu_kprobe_level >= 2 {
2158                compat::cond_kprobe_enable("nvidia_mmap", &skel.progs.kprobe_nvidia_mmap)?;
2159            }
2160            if opts.gpu_kprobe_level >= 3 {
2161                compat::cond_kprobe_enable("nvidia_poll", &skel.progs.kprobe_nvidia_poll)?;
2162            }
2163        }
2164
2165        let ext_sched_class_addr = get_kallsyms_addr("ext_sched_class");
2166        let idle_sched_class_addr = get_kallsyms_addr("idle_sched_class");
2167
2168        if ext_sched_class_addr.is_ok() && idle_sched_class_addr.is_ok() {
2169            skel.maps.rodata_data.ext_sched_class_addr = ext_sched_class_addr.unwrap();
2170            skel.maps.rodata_data.idle_sched_class_addr = idle_sched_class_addr.unwrap();
2171        } else {
2172            warn!(
2173                "Unable to get sched_class addresses from /proc/kallsyms, disabling skip_preempt."
2174            );
2175        }
2176
2177        skel.maps.rodata_data.slice_ns = scx_enums.SCX_SLICE_DFL;
2178        skel.maps.rodata_data.max_exec_ns = 20 * scx_enums.SCX_SLICE_DFL;
2179
2180        // Initialize skel according to @opts.
2181        skel.struct_ops.layered_mut().exit_dump_len = opts.exit_dump_len;
2182
2183        if !opts.disable_queued_wakeup {
2184            match *compat::SCX_OPS_ALLOW_QUEUED_WAKEUP {
2185                0 => info!("Kernel does not support queued wakeup optimization"),
2186                v => skel.struct_ops.layered_mut().flags |= v,
2187            }
2188        }
2189
2190        skel.maps.rodata_data.percpu_kthread_preempt = !opts.disable_percpu_kthread_preempt;
2191        skel.maps.rodata_data.percpu_kthread_preempt_all =
2192            !opts.disable_percpu_kthread_preempt && opts.percpu_kthread_preempt_all;
2193        skel.maps.rodata_data.debug = opts.verbose as u32;
2194        skel.maps.rodata_data.slice_ns = opts.slice_us * 1000;
2195        skel.maps.rodata_data.max_exec_ns = if opts.max_exec_us > 0 {
2196            opts.max_exec_us * 1000
2197        } else {
2198            opts.slice_us * 1000 * 20
2199        };
2200        skel.maps.rodata_data.nr_cpu_ids = *NR_CPU_IDS as u32;
2201        skel.maps.rodata_data.nr_possible_cpus = *NR_CPUS_POSSIBLE as u32;
2202        skel.maps.rodata_data.smt_enabled = topo.smt_enabled;
2203        skel.maps.rodata_data.has_little_cores = topo.has_little_cores();
2204        skel.maps.rodata_data.xnuma_preemption = opts.xnuma_preemption;
2205        skel.maps.rodata_data.antistall_sec = opts.antistall_sec;
2206        skel.maps.rodata_data.monitor_disable = opts.monitor_disable;
2207        skel.maps.rodata_data.lo_fb_wait_ns = opts.lo_fb_wait_us * 1000;
2208        skel.maps.rodata_data.lo_fb_share_ppk = ((opts.lo_fb_share * 1024.0) as u32).clamp(1, 1024);
2209        skel.maps.rodata_data.enable_antistall = !opts.disable_antistall;
2210        skel.maps.rodata_data.enable_match_debug = opts.enable_match_debug;
2211        skel.maps.rodata_data.enable_gpu_support = opts.enable_gpu_support;
2212
2213        for (cpu, sib) in topo.sibling_cpus().iter().enumerate() {
2214            skel.maps.rodata_data.__sibling_cpu[cpu] = *sib;
2215        }
2216        for cpu in topo.all_cpus.keys() {
2217            skel.maps.rodata_data.all_cpus[cpu / 8] |= 1 << (cpu % 8);
2218        }
2219
2220        skel.maps.rodata_data.nr_op_layers = layer_specs
2221            .iter()
2222            .filter(|spec| match &spec.kind {
2223                LayerKind::Open { .. } => spec.kind.common().preempt,
2224                _ => false,
2225            })
2226            .count() as u32;
2227        skel.maps.rodata_data.nr_on_layers = layer_specs
2228            .iter()
2229            .filter(|spec| match &spec.kind {
2230                LayerKind::Open { .. } => !spec.kind.common().preempt,
2231                _ => false,
2232            })
2233            .count() as u32;
2234        skel.maps.rodata_data.nr_gp_layers = layer_specs
2235            .iter()
2236            .filter(|spec| match &spec.kind {
2237                LayerKind::Grouped { .. } => spec.kind.common().preempt,
2238                _ => false,
2239            })
2240            .count() as u32;
2241        skel.maps.rodata_data.nr_gn_layers = layer_specs
2242            .iter()
2243            .filter(|spec| match &spec.kind {
2244                LayerKind::Grouped { .. } => !spec.kind.common().preempt,
2245                _ => false,
2246            })
2247            .count() as u32;
2248        skel.maps.rodata_data.nr_excl_layers = layer_specs
2249            .iter()
2250            .filter(|spec| spec.kind.common().exclusive)
2251            .count() as u32;
2252
2253        let mut min_open = u64::MAX;
2254        let mut min_preempt = u64::MAX;
2255
2256        for spec in layer_specs.iter() {
2257            if let LayerKind::Open { common, .. } = &spec.kind {
2258                min_open = min_open.min(common.disallow_open_after_us.unwrap());
2259                min_preempt = min_preempt.min(common.disallow_preempt_after_us.unwrap());
2260            }
2261        }
2262
2263        skel.maps.rodata_data.min_open_layer_disallow_open_after_ns = match min_open {
2264            u64::MAX => *DFL_DISALLOW_OPEN_AFTER_US,
2265            v => v,
2266        };
2267        skel.maps
2268            .rodata_data
2269            .min_open_layer_disallow_preempt_after_ns = match min_preempt {
2270            u64::MAX => *DFL_DISALLOW_PREEMPT_AFTER_US,
2271            v => v,
2272        };
2273
2274        // Consider all layers empty at the beginning.
2275        for i in 0..layer_specs.len() {
2276            skel.maps.bss_data.empty_layer_ids[i] = i as u32;
2277        }
2278        skel.maps.bss_data.nr_empty_layer_ids = nr_layers as u32;
2279
2280        Self::init_layers(&mut skel, &layer_specs, &topo)?;
2281        Self::init_nodes(&mut skel, opts, &topo);
2282
2283        // We set the pin path before loading the skeleton. This will ensure
2284        // libbpf creates and pins the map, or reuses the pinned map fd for us,
2285        // so that we can keep reusing the older map already pinned on scheduler
2286        // restarts.
2287        let layered_task_hint_map_path = &opts.task_hint_map;
2288        let hint_map = &mut skel.maps.scx_layered_task_hint_map;
2289        // Only set pin path if a path is provided.
2290        if layered_task_hint_map_path.is_empty() == false {
2291            hint_map.set_pin_path(layered_task_hint_map_path).unwrap();
2292        }
2293
2294        let mut skel = scx_ops_load!(skel, layered, uei)?;
2295
2296        let mut layers = vec![];
2297        let layer_growth_orders =
2298            LayerGrowthAlgo::layer_core_orders(&cpu_pool, &layer_specs, &topo)?;
2299        for (idx, spec) in layer_specs.iter().enumerate() {
2300            let growth_order = layer_growth_orders
2301                .get(&idx)
2302                .with_context(|| "layer has no growth order".to_string())?;
2303            layers.push(Layer::new(spec, &topo, growth_order)?);
2304        }
2305
2306        let mut idle_qos_enabled = layers
2307            .iter()
2308            .any(|layer| layer.kind.common().idle_resume_us.unwrap_or(0) > 0);
2309        if idle_qos_enabled && !cpu_idle_resume_latency_supported() {
2310            warn!("idle_resume_us not supported, ignoring");
2311            idle_qos_enabled = false;
2312        }
2313
2314        Self::init_cpus(&skel, &layer_specs, &topo)?;
2315        Self::init_llc_prox_map(&mut skel, &topo)?;
2316
2317        // Other stuff.
2318        let proc_reader = fb_procfs::ProcReader::new();
2319
2320        // Handle setup if layered is running in a pid namespace.
2321        let input = ProgramInput {
2322            ..Default::default()
2323        };
2324        let prog = &mut skel.progs.initialize_pid_namespace;
2325
2326        let _ = prog.test_run(input);
2327
2328        // XXX If we try to refresh the cpumasks here before attaching, we
2329        // sometimes (non-deterministically) don't see the updated values in
2330        // BPF. It would be better to update the cpumasks here before we
2331        // attach, but the value will quickly converge anyways so it's not a
2332        // huge problem in the interim until we figure it out.
2333
2334        // Allow all tasks to open and write to BPF task hint map, now that
2335        // we should have it pinned at the desired location.
2336        if layered_task_hint_map_path.is_empty() == false {
2337            let path = CString::new(layered_task_hint_map_path.as_bytes()).unwrap();
2338            let mode: libc::mode_t = 0o666;
2339            unsafe {
2340                if libc::chmod(path.as_ptr(), mode) != 0 {
2341                    trace!("'chmod' to 666 of task hint map failed, continuing...");
2342                }
2343            }
2344        }
2345
2346        // Attach.
2347        let struct_ops = scx_ops_attach!(skel, layered)?;
2348        let stats_server = StatsServer::new(stats::server_data()).launch()?;
2349        let mut gpu_task_handler =
2350            GpuTaskAffinitizer::new(opts.gpu_affinitize_secs, opts.enable_gpu_affinitize);
2351        gpu_task_handler.init(topo.clone());
2352        let sched = Self {
2353            struct_ops: Some(struct_ops),
2354            layer_specs,
2355
2356            sched_intv: Duration::from_secs_f64(opts.interval),
2357            layer_refresh_intv: Duration::from_millis(opts.layer_refresh_ms_avgruntime),
2358
2359            cpu_pool,
2360            layers,
2361            idle_qos_enabled,
2362
2363            sched_stats: Stats::new(&mut skel, &proc_reader, &gpu_task_handler)?,
2364
2365            nr_layer_cpus_ranges: vec![(0, 0); nr_layers],
2366            processing_dur: Default::default(),
2367
2368            proc_reader,
2369            skel,
2370
2371            topo,
2372            netdevs,
2373            stats_server,
2374            gpu_task_handler,
2375        };
2376
2377        info!("Layered Scheduler Attached. Run `scx_layered --monitor` for metrics.");
2378
2379        Ok(sched)
2380    }
2381
2382    fn update_cpumask(mask: &Cpumask, bpfmask: &mut [u8]) {
2383        for cpu in 0..mask.len() {
2384            if mask.test_cpu(cpu) {
2385                bpfmask[cpu / 8] |= 1 << (cpu % 8);
2386            } else {
2387                bpfmask[cpu / 8] &= !(1 << (cpu % 8));
2388            }
2389        }
2390    }
2391
2392    fn update_bpf_layer_cpumask(layer: &Layer, bpf_layer: &mut types::layer) {
2393        trace!("[{}] Updating BPF CPUs: {}", layer.name, &layer.cpus);
2394        Self::update_cpumask(&layer.cpus, &mut bpf_layer.cpus);
2395
2396        bpf_layer.nr_cpus = layer.nr_cpus as u32;
2397        for (llc_id, &nr_llc_cpus) in layer.nr_llc_cpus.iter().enumerate() {
2398            bpf_layer.nr_llc_cpus[llc_id] = nr_llc_cpus as u32;
2399        }
2400
2401        bpf_layer.refresh_cpus = 1;
2402    }
2403
2404    fn update_netdev_cpumasks(&mut self) -> Result<()> {
2405        let available_cpus = self.cpu_pool.available_cpus();
2406        if available_cpus.is_empty() {
2407            return Ok(());
2408        }
2409
2410        for (iface, netdev) in self.netdevs.iter_mut() {
2411            let node = self
2412                .topo
2413                .nodes
2414                .values()
2415                .take_while(|n| n.id == netdev.node())
2416                .next()
2417                .ok_or_else(|| anyhow!("Failed to get netdev node"))?;
2418            let node_cpus = node.span.clone();
2419            for (irq, irqmask) in netdev.irqs.iter_mut() {
2420                irqmask.clear_all();
2421                for cpu in available_cpus.iter() {
2422                    if !node_cpus.test_cpu(cpu) {
2423                        continue;
2424                    }
2425                    let _ = irqmask.set_cpu(cpu);
2426                }
2427                // If no CPUs are available in the node then spread the load across the node
2428                if irqmask.weight() == 0 {
2429                    for cpu in node_cpus.iter() {
2430                        let _ = irqmask.set_cpu(cpu);
2431                    }
2432                }
2433                trace!("{} updating irq {} cpumask {:?}", iface, irq, irqmask);
2434            }
2435            netdev.apply_cpumasks()?;
2436        }
2437
2438        Ok(())
2439    }
2440
2441    /// Calculate how many CPUs each layer would like to have if there were
2442    /// no competition. The CPU range is determined by applying the inverse
2443    /// of util_range and then capping by cpus_range. If the current
2444    /// allocation is within the acceptable range, no change is made.
2445    /// Returns (target, min) pair for each layer.
2446    fn calc_target_nr_cpus(&self) -> Vec<(usize, usize)> {
2447        let nr_cpus = self.cpu_pool.topo.all_cpus.len();
2448        let utils = &self.sched_stats.layer_utils;
2449
2450        let mut records: Vec<(u64, u64, u64, usize, usize, usize)> = vec![];
2451        let mut targets: Vec<(usize, usize)> = vec![];
2452
2453        for (idx, layer) in self.layers.iter().enumerate() {
2454            targets.push(match &layer.kind {
2455                LayerKind::Confined {
2456                    util_range,
2457                    cpus_range,
2458                    cpus_range_frac,
2459                    ..
2460                }
2461                | LayerKind::Grouped {
2462                    util_range,
2463                    cpus_range,
2464                    cpus_range_frac,
2465                    ..
2466                } => {
2467                    // A grouped layer can choose to include open cputime
2468                    // for sizing. Also, as an empty layer can only get CPU
2469                    // time through fallback (counted as owned) or open
2470                    // execution, add open cputime for empty layers.
2471                    let owned = utils[idx][LAYER_USAGE_OWNED];
2472                    let open = utils[idx][LAYER_USAGE_OPEN];
2473
2474                    let mut util = owned;
2475                    if layer.kind.util_includes_open_cputime() || layer.nr_cpus == 0 {
2476                        util += open;
2477                    }
2478
2479                    let util = if util < 0.01 { 0.0 } else { util };
2480                    let low = (util / util_range.1).ceil() as usize;
2481                    let high = ((util / util_range.0).floor() as usize).max(low);
2482                    let target = layer.cpus.weight().clamp(low, high);
2483                    let cpus_range =
2484                        resolve_cpus_pct_range(cpus_range, cpus_range_frac, nr_cpus).unwrap();
2485
2486                    records.push((
2487                        (owned * 100.0) as u64,
2488                        (open * 100.0) as u64,
2489                        (util * 100.0) as u64,
2490                        low,
2491                        high,
2492                        target,
2493                    ));
2494
2495                    (target.clamp(cpus_range.0, cpus_range.1), cpus_range.0)
2496                }
2497                LayerKind::Open { .. } => (0, 0),
2498            });
2499        }
2500
2501        trace!("initial targets: {:?}", &targets);
2502        trace!("(owned, open, util, low, high, target): {:?}", &records);
2503        targets
2504    }
2505
2506    /// Given (target, min) pair for each layer which was determined
2507    /// assuming infinite number of CPUs, distribute the actual CPUs
2508    /// according to their weights.
2509    fn weighted_target_nr_cpus(&self, targets: &[(usize, usize)]) -> Vec<usize> {
2510        let mut nr_left = self.cpu_pool.topo.all_cpus.len();
2511        let weights: Vec<usize> = self
2512            .layers
2513            .iter()
2514            .map(|layer| layer.kind.common().weight as usize)
2515            .collect();
2516        let mut cands: BTreeMap<usize, (usize, usize, usize)> = targets
2517            .iter()
2518            .zip(&weights)
2519            .enumerate()
2520            .map(|(i, ((target, min), weight))| (i, (*target, *min, *weight)))
2521            .collect();
2522        let mut weight_sum: usize = weights.iter().sum();
2523        let mut weighted: Vec<usize> = vec![0; self.layers.len()];
2524
2525        trace!("cands: {:?}", &cands);
2526
2527        // First, accept all layers that are <= min.
2528        cands.retain(|&i, &mut (target, min, weight)| {
2529            if target <= min {
2530                let target = target.min(nr_left);
2531                weighted[i] = target;
2532                weight_sum -= weight;
2533                nr_left -= target;
2534                false
2535            } else {
2536                true
2537            }
2538        });
2539
2540        trace!("cands after accepting mins: {:?}", &cands);
2541
2542        // Keep accepting ones under their allotted share.
2543        let calc_share = |nr_left, weight, weight_sum| {
2544            (((nr_left * weight) as f64 / weight_sum as f64).ceil() as usize).min(nr_left)
2545        };
2546
2547        while !cands.is_empty() {
2548            let mut progress = false;
2549
2550            cands.retain(|&i, &mut (target, _min, weight)| {
2551                let share = calc_share(nr_left, weight, weight_sum);
2552                if target <= share {
2553                    weighted[i] = target;
2554                    weight_sum -= weight;
2555                    nr_left -= target;
2556                    progress = true;
2557                    false
2558                } else {
2559                    true
2560                }
2561            });
2562
2563            if !progress {
2564                break;
2565            }
2566        }
2567
2568        trace!("cands after accepting under allotted: {:?}", &cands);
2569
2570        // The remaining candidates are in contention with each other,
2571        // distribute according to the shares.
2572        let nr_to_share = nr_left;
2573        for (i, (_target, _min, weight)) in cands.into_iter() {
2574            let share = calc_share(nr_to_share, weight, weight_sum).min(nr_left);
2575            weighted[i] = share;
2576            nr_left -= share;
2577        }
2578
2579        trace!("weighted: {:?}", &weighted);
2580
2581        weighted
2582    }
2583
2584    // Figure out a tuple (LLCs, extra_cpus) in terms of the target CPUs
2585    // computed by weighted_target_nr_cpus. Returns the number of full LLCs
2586    // occupied by a layer, and any extra CPUs that don't occupy a full LLC.
2587    fn compute_target_llcs(target: usize, topo: &Topology) -> (usize, usize) {
2588        // TODO(kkd): We assume each LLC has equal number of cores.
2589        let cores_per_llc = topo.all_cores.len() / topo.all_llcs.len();
2590        // TODO(kkd): We assume each core has fixed number of threads.
2591        let cpus_per_core = topo.all_cores.first_key_value().unwrap().1.cpus.len();
2592        let cpus_per_llc = cores_per_llc * cpus_per_core;
2593
2594        let full = target / cpus_per_llc;
2595        let extra = target % cpus_per_llc;
2596
2597        (full, extra.div_ceil(cpus_per_core))
2598    }
2599
2600    // Recalculate the core order for layers using StickyDynamic growth
2601    // algorithm. Tuples from compute_target_llcs are used to decide how many
2602    // LLCs and cores should be assigned to each layer, logic to alloc and free
2603    // CPUs operates on that core order. This happens in three logical steps, we
2604    // first free LLCs from layers that shrunk from last recomputation, then
2605    // distribute freed LLCs to growing layers, and then spill over remaining
2606    // cores in free LLCs.
2607    fn recompute_layer_core_order(&mut self, layer_targets: &Vec<(usize, usize)>) {
2608        // Collect freed LLCs from shrinking layers.
2609        debug!(
2610            " free: before pass: free_llcs={:?}",
2611            self.cpu_pool.free_llcs
2612        );
2613        for &(idx, target) in layer_targets.iter().rev() {
2614            let layer = &mut self.layers[idx];
2615            let old_tlc = layer.target_llc_cpus;
2616            let new_tlc = Self::compute_target_llcs(target, &self.topo);
2617
2618            if layer.growth_algo != LayerGrowthAlgo::StickyDynamic {
2619                continue;
2620            }
2621
2622            let mut to_free = (old_tlc.0 as i32 - new_tlc.0 as i32).max(0) as usize;
2623
2624            debug!(
2625                " free: layer={} old_tlc={:?} new_tlc={:?} to_free={} assigned={} free={}",
2626                layer.name,
2627                old_tlc,
2628                new_tlc,
2629                to_free,
2630                layer.assigned_llcs.len(),
2631                self.cpu_pool.free_llcs.len()
2632            );
2633
2634            while to_free > 0 && layer.assigned_llcs.len() > 0 {
2635                let llc = layer.assigned_llcs.pop().unwrap();
2636                self.cpu_pool.free_llcs.push((llc, 0));
2637                to_free -= 1;
2638
2639                debug!(" layer={} freed_llc={}", layer.name, llc);
2640            }
2641        }
2642        debug!(" free: after pass: free_llcs={:?}", self.cpu_pool.free_llcs);
2643
2644        // Redistribute the freed LLCs to growing layers.
2645        for &(idx, target) in layer_targets.iter().rev() {
2646            let layer = &mut self.layers[idx];
2647            let old_tlc = layer.target_llc_cpus;
2648            let new_tlc = Self::compute_target_llcs(target, &self.topo);
2649
2650            if layer.growth_algo != LayerGrowthAlgo::StickyDynamic {
2651                continue;
2652            }
2653
2654            let mut to_alloc = (new_tlc.0 as i32 - old_tlc.0 as i32).max(0) as usize;
2655
2656            debug!(
2657                " alloc: layer={} old_tlc={:?} new_tlc={:?} to_alloc={} assigned={} free={}",
2658                layer.name,
2659                old_tlc,
2660                new_tlc,
2661                to_alloc,
2662                layer.assigned_llcs.len(),
2663                self.cpu_pool.free_llcs.len()
2664            );
2665
2666            while to_alloc > 0
2667                && self.cpu_pool.free_llcs.len() > 0
2668                && to_alloc <= self.cpu_pool.free_llcs.len()
2669            {
2670                let llc = self.cpu_pool.free_llcs.pop().unwrap().0;
2671                layer.assigned_llcs.push(llc);
2672                to_alloc -= 1;
2673
2674                debug!(" layer={} alloc_llc={}", layer.name, llc);
2675            }
2676
2677            debug!(
2678                " alloc: layer={} assigned_llcs={:?}",
2679                layer.name, layer.assigned_llcs
2680            );
2681
2682            // Update for next iteration.
2683            layer.target_llc_cpus = new_tlc;
2684        }
2685
2686        // Spillover overflowing cores into free LLCs. Bigger layers get to take
2687        // a chunk before smaller layers.
2688        for &(idx, _) in layer_targets.iter() {
2689            let mut core_order = vec![];
2690            let layer = &mut self.layers[idx];
2691
2692            if layer.growth_algo != LayerGrowthAlgo::StickyDynamic {
2693                continue;
2694            }
2695
2696            let tlc = layer.target_llc_cpus;
2697            let mut extra = tlc.1;
2698            // TODO(kkd): Move this logic into cpu_pool? What's the best place?
2699            let cores_per_llc = self.topo.all_cores.len() / self.topo.all_llcs.len();
2700            let cpus_per_core = self.topo.all_cores.first_key_value().unwrap().1.cpus.len();
2701            let cpus_per_llc = cores_per_llc * cpus_per_core;
2702
2703            // Consume from front since we pop from the back.
2704            for i in 0..self.cpu_pool.free_llcs.len() {
2705                let free_vec = &mut self.cpu_pool.free_llcs;
2706                // Available CPUs in LLC.
2707                let avail = cpus_per_llc - free_vec[i].1;
2708                // The amount we'll use.
2709                let mut used = extra.min(avail);
2710
2711                let shift = free_vec[i].1;
2712                free_vec[i].1 += used;
2713
2714                let llc_id = free_vec[i].0;
2715                let llc = self.topo.all_llcs.get(&llc_id).unwrap();
2716
2717                for core in llc.cores.iter().skip(shift) {
2718                    core_order.push(core.1.id);
2719                    if used == 0 {
2720                        break;
2721                    }
2722                    used -= 1;
2723                }
2724
2725                extra -= used;
2726                if extra == 0 {
2727                    break;
2728                }
2729            }
2730
2731            core_order.reverse();
2732            layer.core_order = core_order;
2733        }
2734
2735        // Reset consumed entries in free LLCs.
2736        for i in 0..self.cpu_pool.free_llcs.len() {
2737            self.cpu_pool.free_llcs[i].1 = 0;
2738        }
2739
2740        for &(idx, _) in layer_targets.iter() {
2741            let layer = &mut self.layers[idx];
2742
2743            if layer.growth_algo != LayerGrowthAlgo::StickyDynamic {
2744                continue;
2745            }
2746
2747            for core in self.topo.all_cores.iter() {
2748                let llc_id = core.1.llc_id;
2749                if layer.assigned_llcs.contains(&llc_id) {
2750                    layer.core_order.push(core.1.id);
2751                }
2752            }
2753            // Update core_order for the layer, but reverse to keep the start stable.
2754            layer.core_order.reverse();
2755
2756            debug!(
2757                " alloc: layer={} core_order={:?}",
2758                layer.name, layer.core_order
2759            );
2760        }
2761    }
2762
2763    fn refresh_cpumasks(&mut self) -> Result<()> {
2764        let layer_is_open = |layer: &Layer| matches!(layer.kind, LayerKind::Open { .. });
2765
2766        let mut updated = false;
2767        let targets = self.calc_target_nr_cpus();
2768        let targets = self.weighted_target_nr_cpus(&targets);
2769
2770        let mut ascending: Vec<(usize, usize)> = targets.iter().copied().enumerate().collect();
2771        ascending.sort_by(|a, b| a.1.cmp(&b.1));
2772
2773        self.recompute_layer_core_order(&ascending);
2774
2775        // If any layer is growing, guarantee that the largest layer that is
2776        // freeing CPUs frees at least one CPU.
2777        let mut force_free = self
2778            .layers
2779            .iter()
2780            .zip(targets.iter())
2781            .any(|(layer, &target)| layer.nr_cpus < target);
2782
2783        // Shrink all layers first so that CPUs are available for
2784        // redistribution. Do so in the descending target number of CPUs
2785        // order.
2786        for &(idx, target) in ascending.iter().rev() {
2787            let layer = &mut self.layers[idx];
2788            if layer_is_open(layer) {
2789                continue;
2790            }
2791
2792            let nr_cur = layer.cpus.weight();
2793            if nr_cur <= target {
2794                continue;
2795            }
2796            let mut nr_to_free = nr_cur - target;
2797
2798            // There's some dampening built into util metrics but slow down
2799            // freeing further to avoid unnecessary changes. This is solely
2800            // based on intution. Drop or update according to real-world
2801            // behavior.
2802            let nr_to_break_at = nr_to_free / 2;
2803
2804            let mut freed = false;
2805
2806            while nr_to_free > 0 {
2807                let max_to_free = if force_free {
2808                    force_free = false;
2809                    layer.nr_cpus
2810                } else {
2811                    nr_to_free
2812                };
2813
2814                let nr_freed = layer.free_some_cpus(&mut self.cpu_pool, max_to_free)?;
2815                if nr_freed == 0 {
2816                    break;
2817                }
2818
2819                nr_to_free = nr_to_free.saturating_sub(nr_freed);
2820                freed = true;
2821
2822                if nr_to_free <= nr_to_break_at {
2823                    break;
2824                }
2825            }
2826
2827            if freed {
2828                Self::update_bpf_layer_cpumask(layer, &mut self.skel.maps.bss_data.layers[idx]);
2829                updated = true;
2830            }
2831        }
2832
2833        // Grow layers. Do so in the ascending target number of CPUs order
2834        // so that we're always more generous to smaller layers. This avoids
2835        // starving small layers and shouldn't make noticable difference for
2836        // bigger layers as work conservation should still be achieved
2837        // through open execution.
2838        for &(idx, target) in &ascending {
2839            let layer = &mut self.layers[idx];
2840
2841            if layer_is_open(layer) {
2842                continue;
2843            }
2844
2845            let nr_cur = layer.cpus.weight();
2846            if nr_cur >= target {
2847                continue;
2848            }
2849
2850            let mut nr_to_alloc = target - nr_cur;
2851            let mut alloced = false;
2852
2853            while nr_to_alloc > 0 {
2854                let nr_alloced = layer.alloc_some_cpus(&mut self.cpu_pool)?;
2855                if nr_alloced == 0 {
2856                    break;
2857                }
2858                alloced = true;
2859                nr_to_alloc -= nr_alloced.min(nr_to_alloc);
2860            }
2861
2862            if alloced {
2863                Self::update_bpf_layer_cpumask(layer, &mut self.skel.maps.bss_data.layers[idx]);
2864                updated = true;
2865            }
2866        }
2867
2868        // Give the rest to the open layers.
2869        if updated {
2870            for (idx, layer) in self.layers.iter_mut().enumerate() {
2871                if !layer_is_open(layer) {
2872                    continue;
2873                }
2874
2875                let bpf_layer = &mut self.skel.maps.bss_data.layers[idx];
2876                let available_cpus = self.cpu_pool.available_cpus().and(&layer.allowed_cpus);
2877                let nr_available_cpus = available_cpus.weight();
2878
2879                // Open layers need the intersection of allowed cpus and
2880                // available cpus.
2881                layer.cpus = available_cpus;
2882                layer.nr_cpus = nr_available_cpus;
2883                Self::update_bpf_layer_cpumask(layer, bpf_layer);
2884            }
2885
2886            self.skel.maps.bss_data.fallback_cpu = self.cpu_pool.fallback_cpu as u32;
2887
2888            for (lidx, layer) in self.layers.iter().enumerate() {
2889                self.nr_layer_cpus_ranges[lidx] = (
2890                    self.nr_layer_cpus_ranges[lidx].0.min(layer.nr_cpus),
2891                    self.nr_layer_cpus_ranges[lidx].1.max(layer.nr_cpus),
2892                );
2893            }
2894
2895            // Trigger updates on the BPF side.
2896            let input = ProgramInput {
2897                ..Default::default()
2898            };
2899            let prog = &mut self.skel.progs.refresh_layer_cpumasks;
2900            let _ = prog.test_run(input);
2901
2902            // Update empty_layers.
2903            let empty_layer_ids: Vec<u32> = self
2904                .layers
2905                .iter()
2906                .enumerate()
2907                .filter(|(_idx, layer)| layer.nr_cpus == 0)
2908                .map(|(idx, _layer)| idx as u32)
2909                .collect();
2910            for i in 0..self.layers.len() {
2911                self.skel.maps.bss_data.empty_layer_ids[i] =
2912                    empty_layer_ids.get(i).cloned().unwrap_or(MAX_LAYERS as u32);
2913            }
2914            self.skel.maps.bss_data.nr_empty_layer_ids = empty_layer_ids.len() as u32;
2915        }
2916
2917        let _ = self.update_netdev_cpumasks();
2918        Ok(())
2919    }
2920
2921    fn refresh_idle_qos(&mut self) -> Result<()> {
2922        if !self.idle_qos_enabled {
2923            return Ok(());
2924        }
2925
2926        let mut cpu_idle_qos = vec![0; *NR_CPU_IDS];
2927        for layer in self.layers.iter() {
2928            let idle_resume_us = layer.kind.common().idle_resume_us.unwrap_or(0) as i32;
2929            for cpu in layer.cpus.iter() {
2930                cpu_idle_qos[cpu] = idle_resume_us;
2931            }
2932        }
2933
2934        for (cpu, idle_resume_usec) in cpu_idle_qos.iter().enumerate() {
2935            update_cpu_idle_resume_latency(cpu, *idle_resume_usec)?;
2936        }
2937
2938        Ok(())
2939    }
2940
2941    fn step(&mut self) -> Result<()> {
2942        let started_at = Instant::now();
2943        self.sched_stats.refresh(
2944            &mut self.skel,
2945            &self.proc_reader,
2946            started_at,
2947            self.processing_dur,
2948            &self.gpu_task_handler,
2949        )?;
2950        self.refresh_cpumasks()?;
2951        self.refresh_idle_qos()?;
2952        self.gpu_task_handler.maybe_affinitize();
2953        self.processing_dur += Instant::now().duration_since(started_at);
2954        Ok(())
2955    }
2956
2957    fn generate_sys_stats(
2958        &mut self,
2959        stats: &Stats,
2960        cpus_ranges: &mut [(usize, usize)],
2961    ) -> Result<SysStats> {
2962        let bstats = &stats.bpf_stats;
2963        let mut sys_stats = SysStats::new(stats, bstats, self.cpu_pool.fallback_cpu)?;
2964
2965        for (lidx, (spec, layer)) in self.layer_specs.iter().zip(self.layers.iter()).enumerate() {
2966            let layer_stats = LayerStats::new(lidx, layer, stats, bstats, cpus_ranges[lidx]);
2967            sys_stats.layers.insert(spec.name.to_string(), layer_stats);
2968            cpus_ranges[lidx] = (layer.nr_cpus, layer.nr_cpus);
2969        }
2970
2971        Ok(sys_stats)
2972    }
2973
2974    fn run(&mut self, shutdown: Arc<AtomicBool>) -> Result<UserExitInfo> {
2975        let (res_ch, req_ch) = self.stats_server.channels();
2976        let mut next_sched_at = Instant::now() + self.sched_intv;
2977        let enable_layer_refresh = !self.layer_refresh_intv.is_zero();
2978        let mut next_layer_refresh_at = Instant::now() + self.layer_refresh_intv;
2979        let mut cpus_ranges = HashMap::<ThreadId, Vec<(usize, usize)>>::new();
2980
2981        while !shutdown.load(Ordering::Relaxed) && !uei_exited!(&self.skel, uei) {
2982            let now = Instant::now();
2983
2984            if now >= next_sched_at {
2985                self.step()?;
2986                while next_sched_at < now {
2987                    next_sched_at += self.sched_intv;
2988                }
2989            }
2990
2991            if enable_layer_refresh && now >= next_layer_refresh_at {
2992                self.skel.maps.bss_data.layer_refresh_seq_avgruntime += 1;
2993                while next_layer_refresh_at < now {
2994                    next_layer_refresh_at += self.layer_refresh_intv;
2995                }
2996            }
2997
2998            match req_ch.recv_deadline(next_sched_at) {
2999                Ok(StatsReq::Hello(tid)) => {
3000                    cpus_ranges.insert(
3001                        tid,
3002                        self.layers.iter().map(|l| (l.nr_cpus, l.nr_cpus)).collect(),
3003                    );
3004                    let stats =
3005                        Stats::new(&mut self.skel, &self.proc_reader, &self.gpu_task_handler)?;
3006                    res_ch.send(StatsRes::Hello(stats))?;
3007                }
3008                Ok(StatsReq::Refresh(tid, mut stats)) => {
3009                    // Propagate self's layer cpu ranges into each stat's.
3010                    for i in 0..self.nr_layer_cpus_ranges.len() {
3011                        for (_, ranges) in cpus_ranges.iter_mut() {
3012                            ranges[i] = (
3013                                ranges[i].0.min(self.nr_layer_cpus_ranges[i].0),
3014                                ranges[i].1.max(self.nr_layer_cpus_ranges[i].1),
3015                            );
3016                        }
3017                        self.nr_layer_cpus_ranges[i] =
3018                            (self.layers[i].nr_cpus, self.layers[i].nr_cpus);
3019                    }
3020
3021                    stats.refresh(
3022                        &mut self.skel,
3023                        &self.proc_reader,
3024                        now,
3025                        self.processing_dur,
3026                        &self.gpu_task_handler,
3027                    )?;
3028                    let sys_stats =
3029                        self.generate_sys_stats(&stats, cpus_ranges.get_mut(&tid).unwrap())?;
3030                    res_ch.send(StatsRes::Refreshed((stats, sys_stats)))?;
3031                }
3032                Ok(StatsReq::Bye(tid)) => {
3033                    cpus_ranges.remove(&tid);
3034                    res_ch.send(StatsRes::Bye)?;
3035                }
3036                Err(RecvTimeoutError::Timeout) => {}
3037                Err(e) => Err(e)?,
3038            }
3039        }
3040
3041        let _ = self.struct_ops.take();
3042        uei_report!(&self.skel, uei)
3043    }
3044}
3045
3046impl Drop for Scheduler<'_> {
3047    fn drop(&mut self) {
3048        if let Some(struct_ops) = self.struct_ops.take() {
3049            drop(struct_ops);
3050        }
3051    }
3052}
3053
3054fn write_example_file(path: &str) -> Result<()> {
3055    let mut f = fs::OpenOptions::new()
3056        .create_new(true)
3057        .write(true)
3058        .open(path)?;
3059    Ok(f.write_all(serde_json::to_string_pretty(&*EXAMPLE_CONFIG)?.as_bytes())?)
3060}
3061
3062fn verify_layer_specs(specs: &[LayerSpec]) -> Result<()> {
3063    let nr_specs = specs.len();
3064    if nr_specs == 0 {
3065        bail!("No layer spec");
3066    }
3067    if nr_specs > MAX_LAYERS {
3068        bail!("Too many layer specs");
3069    }
3070
3071    for (idx, spec) in specs.iter().enumerate() {
3072        if idx < nr_specs - 1 {
3073            if spec.matches.is_empty() {
3074                bail!("Non-terminal spec {:?} has NULL matches", spec.name);
3075            }
3076        } else {
3077            if spec.matches.len() != 1 || !spec.matches[0].is_empty() {
3078                bail!("Terminal spec {:?} must have an empty match", spec.name);
3079            }
3080        }
3081
3082        if spec.matches.len() > MAX_LAYER_MATCH_ORS {
3083            bail!(
3084                "Spec {:?} has too many ({}) OR match blocks",
3085                spec.name,
3086                spec.matches.len()
3087            );
3088        }
3089
3090        for (ands_idx, ands) in spec.matches.iter().enumerate() {
3091            if ands.len() > NR_LAYER_MATCH_KINDS {
3092                bail!(
3093                    "Spec {:?}'s {}th OR block has too many ({}) match conditions",
3094                    spec.name,
3095                    ands_idx,
3096                    ands.len()
3097                );
3098            }
3099            for one in ands.iter() {
3100                match one {
3101                    LayerMatch::CgroupPrefix(prefix) => {
3102                        if prefix.len() > MAX_PATH {
3103                            bail!("Spec {:?} has too long a cgroup prefix", spec.name);
3104                        }
3105                    }
3106                    LayerMatch::CgroupSuffix(suffix) => {
3107                        if suffix.len() > MAX_PATH {
3108                            bail!("Spec {:?} has too long a cgroup suffix", spec.name);
3109                        }
3110                    }
3111                    LayerMatch::CgroupContains(substr) => {
3112                        if substr.len() > MAX_PATH {
3113                            bail!("Spec {:?} has too long a cgroup substr", spec.name);
3114                        }
3115                    }
3116                    LayerMatch::CommPrefix(prefix) => {
3117                        if prefix.len() > MAX_COMM {
3118                            bail!("Spec {:?} has too long a comm prefix", spec.name);
3119                        }
3120                    }
3121                    LayerMatch::PcommPrefix(prefix) => {
3122                        if prefix.len() > MAX_COMM {
3123                            bail!("Spec {:?} has too long a process name prefix", spec.name);
3124                        }
3125                    }
3126                    _ => {}
3127                }
3128            }
3129        }
3130
3131        match spec.kind {
3132            LayerKind::Confined {
3133                cpus_range,
3134                util_range,
3135                ..
3136            }
3137            | LayerKind::Grouped {
3138                cpus_range,
3139                util_range,
3140                ..
3141            } => {
3142                if let Some((cpus_min, cpus_max)) = cpus_range {
3143                    if cpus_min > cpus_max {
3144                        bail!(
3145                            "Spec {:?} has invalid cpus_range({}, {})",
3146                            spec.name,
3147                            cpus_min,
3148                            cpus_max
3149                        );
3150                    }
3151                }
3152                if util_range.0 >= util_range.1 {
3153                    bail!(
3154                        "Spec {:?} has invalid util_range ({}, {})",
3155                        spec.name,
3156                        util_range.0,
3157                        util_range.1
3158                    );
3159                }
3160            }
3161            _ => {}
3162        }
3163    }
3164
3165    Ok(())
3166}
3167
3168fn name_suffix(cgroup: &str, len: usize) -> String {
3169    let suffixlen = std::cmp::min(len, cgroup.len());
3170    let suffixrev: String = cgroup.chars().rev().take(suffixlen).collect();
3171
3172    suffixrev.chars().rev().collect()
3173}
3174
3175fn traverse_sysfs(dir: &Path) -> Result<Vec<PathBuf>> {
3176    let mut paths = vec![];
3177
3178    if !dir.is_dir() {
3179        panic!("path {:?} does not correspond to directory", dir);
3180    }
3181
3182    let direntries = fs::read_dir(dir)?;
3183
3184    for entry in direntries {
3185        let path = entry?.path();
3186        if path.is_dir() {
3187            paths.append(&mut traverse_sysfs(&path)?);
3188            paths.push(path);
3189        }
3190    }
3191
3192    Ok(paths)
3193}
3194
3195fn find_cpumask(cgroup: &str) -> Cpumask {
3196    let mut path = String::from(cgroup);
3197    path.push_str("/cpuset.cpus.effective");
3198
3199    let description = fs::read_to_string(&mut path).unwrap();
3200
3201    Cpumask::from_cpulist(&description).unwrap()
3202}
3203
3204fn expand_template(rule: &LayerMatch) -> Result<Vec<(LayerMatch, Cpumask)>> {
3205    match rule {
3206        LayerMatch::CgroupSuffix(suffix) => Ok(traverse_sysfs(Path::new("/sys/fs/cgroup"))?
3207            .into_iter()
3208            .map(|cgroup| String::from(cgroup.to_str().expect("could not parse cgroup path")))
3209            .filter(|cgroup| cgroup.ends_with(suffix))
3210            .map(|cgroup| {
3211                (
3212                    {
3213                        let mut slashterminated = cgroup.clone();
3214                        slashterminated.push('/');
3215                        LayerMatch::CgroupSuffix(name_suffix(&slashterminated, 64))
3216                    },
3217                    find_cpumask(&cgroup),
3218                )
3219            })
3220            .collect()),
3221        _ => panic!("Unimplemented template enum {:?}", rule),
3222    }
3223}
3224
3225fn main() -> Result<()> {
3226    let opts = Opts::parse();
3227
3228    if opts.version {
3229        println!(
3230            "scx_layered {}",
3231            build_id::full_version(env!("CARGO_PKG_VERSION"))
3232        );
3233        return Ok(());
3234    }
3235
3236    if opts.help_stats {
3237        stats::server_data().describe_meta(&mut std::io::stdout(), None)?;
3238        return Ok(());
3239    }
3240
3241    if opts.no_load_frac_limit {
3242        warn!("--no-load-frac-limit is deprecated and noop");
3243    }
3244    if opts.layer_preempt_weight_disable != 0.0 {
3245        warn!("--layer-preempt-weight-disable is deprecated and noop");
3246    }
3247    if opts.layer_growth_weight_disable != 0.0 {
3248        warn!("--layer-growth-weight-disable is deprecated and noop");
3249    }
3250    if opts.local_llc_iteration {
3251        warn!("--local_llc_iteration is deprecated and noop");
3252    }
3253
3254    let llv = match opts.verbose {
3255        0 => simplelog::LevelFilter::Info,
3256        1 => simplelog::LevelFilter::Debug,
3257        _ => simplelog::LevelFilter::Trace,
3258    };
3259    let mut lcfg = simplelog::ConfigBuilder::new();
3260    lcfg.set_time_offset_to_local()
3261        .expect("Failed to set local time offset")
3262        .set_time_level(simplelog::LevelFilter::Error)
3263        .set_location_level(simplelog::LevelFilter::Off)
3264        .set_target_level(simplelog::LevelFilter::Off)
3265        .set_thread_level(simplelog::LevelFilter::Off);
3266    simplelog::TermLogger::init(
3267        llv,
3268        lcfg.build(),
3269        simplelog::TerminalMode::Stderr,
3270        simplelog::ColorChoice::Auto,
3271    )?;
3272
3273    debug!("opts={:?}", &opts);
3274
3275    let shutdown = Arc::new(AtomicBool::new(false));
3276    let shutdown_clone = shutdown.clone();
3277    ctrlc::set_handler(move || {
3278        shutdown_clone.store(true, Ordering::Relaxed);
3279    })
3280    .context("Error setting Ctrl-C handler")?;
3281
3282    if let Some(intv) = opts.monitor.or(opts.stats) {
3283        let shutdown_copy = shutdown.clone();
3284        let jh = std::thread::spawn(move || {
3285            match stats::monitor(Duration::from_secs_f64(intv), shutdown_copy) {
3286                Ok(_) => {
3287                    debug!("stats monitor thread finished successfully")
3288                }
3289                Err(error_object) => {
3290                    warn!(
3291                        "stats monitor thread finished because of an error {}",
3292                        error_object
3293                    )
3294                }
3295            }
3296        });
3297        if opts.monitor.is_some() {
3298            let _ = jh.join();
3299            return Ok(());
3300        }
3301    }
3302
3303    if let Some(path) = &opts.example {
3304        write_example_file(path)?;
3305        return Ok(());
3306    }
3307
3308    let mut layer_config = match opts.run_example {
3309        true => EXAMPLE_CONFIG.clone(),
3310        false => LayerConfig { specs: vec![] },
3311    };
3312
3313    for (idx, input) in opts.specs.iter().enumerate() {
3314        let specs = LayerSpec::parse(input)
3315            .context(format!("Failed to parse specs[{}] ({:?})", idx, input))?;
3316
3317        for spec in specs {
3318            match spec.template {
3319                Some(ref rule) => {
3320                    let matches = expand_template(&rule)?;
3321                    // in the absence of matching cgroups, have template layers
3322                    // behave as non-template layers do.
3323                    if matches.is_empty() {
3324                        layer_config.specs.push(spec);
3325                    } else {
3326                        for (mt, mask) in matches {
3327                            let mut genspec = spec.clone();
3328
3329                            genspec.cpuset = Some(mask);
3330
3331                            // Push the new "and" rule into each "or" term.
3332                            for orterm in &mut genspec.matches {
3333                                orterm.push(mt.clone());
3334                            }
3335
3336                            match &mt {
3337                                LayerMatch::CgroupSuffix(cgroup) => genspec.name.push_str(cgroup),
3338                                _ => bail!("Template match has unexpected type"),
3339                            }
3340
3341                            // Push the generated layer into the config
3342                            layer_config.specs.push(genspec);
3343                        }
3344                    }
3345                }
3346
3347                None => {
3348                    layer_config.specs.push(spec);
3349                }
3350            }
3351        }
3352    }
3353
3354    for spec in layer_config.specs.iter_mut() {
3355        let common = spec.kind.common_mut();
3356
3357        if common.slice_us == 0 {
3358            common.slice_us = opts.slice_us;
3359        }
3360
3361        if common.weight == 0 {
3362            common.weight = DEFAULT_LAYER_WEIGHT;
3363        }
3364        common.weight = common.weight.clamp(MIN_LAYER_WEIGHT, MAX_LAYER_WEIGHT);
3365
3366        if common.preempt {
3367            if common.disallow_open_after_us.is_some() {
3368                warn!(
3369                    "Preempt layer {} has non-null disallow_open_after_us, ignored",
3370                    &spec.name
3371                );
3372            }
3373            if common.disallow_preempt_after_us.is_some() {
3374                warn!(
3375                    "Preempt layer {} has non-null disallow_preempt_after_us, ignored",
3376                    &spec.name
3377                );
3378            }
3379            common.disallow_open_after_us = Some(u64::MAX);
3380            common.disallow_preempt_after_us = Some(u64::MAX);
3381        } else {
3382            if common.disallow_open_after_us.is_none() {
3383                common.disallow_open_after_us = Some(*DFL_DISALLOW_OPEN_AFTER_US);
3384            }
3385
3386            if common.disallow_preempt_after_us.is_none() {
3387                common.disallow_preempt_after_us = Some(*DFL_DISALLOW_PREEMPT_AFTER_US);
3388            }
3389        }
3390
3391        if common.idle_smt.is_some() {
3392            warn!("Layer {} has deprecated flag \"idle_smt\"", &spec.name);
3393        }
3394    }
3395
3396    if opts.print_and_exit {
3397        println!("specs={}", serde_json::to_string_pretty(&layer_config)?);
3398        return Ok(());
3399    }
3400
3401    debug!("specs={}", serde_json::to_string_pretty(&layer_config)?);
3402    verify_layer_specs(&layer_config.specs)?;
3403
3404    let mut open_object = MaybeUninit::uninit();
3405    loop {
3406        let mut sched = Scheduler::init(&opts, &layer_config.specs, &mut open_object)?;
3407        if !sched.run(shutdown.clone())?.should_restart() {
3408            break;
3409        }
3410    }
3411
3412    Ok(())
3413}