scx_wd40/
main.rs

1// Copyright (c) Meta Platforms, Inc. and affiliates.
2
3// This software may be used and distributed according to the terms of the
4// GNU General Public License version 2.
5mod bpf_skel;
6pub use bpf_skel::*;
7pub mod bpf_intf;
8
9mod domain;
10use domain::DomainGroup;
11
12pub mod tuner;
13use tuner::Tuner;
14
15pub mod load_balance;
16use load_balance::LoadBalancer;
17
18mod stats;
19use std::collections::BTreeMap;
20use std::mem::MaybeUninit;
21use std::sync::atomic::AtomicBool;
22use std::sync::atomic::Ordering;
23use std::sync::Arc;
24use std::time::Duration;
25use std::time::Instant;
26use std::time::SystemTime;
27use std::time::UNIX_EPOCH;
28
29use stats::ClusterStats;
30use stats::NodeStats;
31
32#[macro_use]
33extern crate static_assertions;
34
35use ::fb_procfs as procfs;
36use anyhow::anyhow;
37use anyhow::bail;
38use anyhow::Context;
39use anyhow::Result;
40use clap::Parser;
41use crossbeam::channel::RecvTimeoutError;
42use libbpf_rs::skel::Skel;
43use libbpf_rs::MapCore as _;
44use libbpf_rs::OpenObject;
45use libbpf_rs::ProgramInput;
46use log::info;
47use scx_arena::ArenaLib;
48use scx_stats::prelude::*;
49use scx_utils::build_id;
50use scx_utils::compat;
51use scx_utils::init_libbpf_logging;
52use scx_utils::libbpf_clap_opts::LibbpfOpts;
53use scx_utils::scx_enums;
54use scx_utils::scx_ops_attach;
55use scx_utils::scx_ops_load;
56use scx_utils::scx_ops_open;
57use scx_utils::uei_exited;
58use scx_utils::uei_report;
59use scx_utils::Cpumask;
60use scx_utils::Topology;
61use scx_utils::UserExitInfo;
62use scx_utils::NR_CPU_IDS;
63
64use crate::types::dom_ctx;
65
66const SCHEDULER_NAME: &str = "scx_wd40";
67const MAX_DOMS: usize = bpf_intf::consts_MAX_DOMS as usize;
68const MAX_CPUS: usize = bpf_intf::consts_MAX_CPUS as usize;
69
70// Number of u64 words in a BPF CPU mask.
71static mut MASK_LEN: usize = 0;
72
73/// scx_wd40: A fork of the scx_rusty multi-domain scheduler.
74///
75/// The message below is from the original scx_rusty codebase:
76///
77/// The BPF part does simple vtime or round robin scheduling in each domain
78/// while tracking average load of each domain and duty cycle of each task.
79///
80/// The userspace part performs two roles. First, it makes higher frequency
81/// (100ms) tuning decisions. It identifies CPUs which are not too heavily
82/// loaded and marks them so that they can pull tasks from other overloaded
83/// domains on the fly.
84///
85/// Second, it drives lower frequency (2s) load balancing. It determines
86/// whether load balancing is necessary by comparing domain load averages.
87/// If there are large enough load differences, it examines upto 1024
88/// recently active tasks on the domain to determine which should be
89/// migrated.
90///
91/// The overhead of userspace operations is low. Load balancing is not
92/// performed frequently, but work-conservation is still maintained through
93/// tuning and greedy execution. Load balancing itself is not that expensive
94/// either. It only accesses per-domain load metrics to determine the domains
95/// that need load balancing, as well as limited number of per-task metrics
96/// for each pushing domain.
97///
98/// An earlier variant of this scheduler was used to balance across six
99/// domains, each representing a chiplet in a six-chiplet AMD processor, and
100/// could match the performance of production setup using CFS.
101///
102/// WARNING: scx_rusty currently assumes that all domains have equal
103/// processing power and at similar distances from each other. This
104/// limitation will be removed in the future.
105#[derive(Debug, Parser)]
106struct Opts {
107    /// Scheduling slice duration for under-utilized hosts, in microseconds.
108    #[clap(short = 'u', long, default_value = "20000")]
109    slice_us_underutil: u64,
110
111    /// Scheduling slice duration for over-utilized hosts, in microseconds.
112    #[clap(short = 'o', long, default_value = "1000")]
113    slice_us_overutil: u64,
114
115    /// Load balance interval in seconds.
116    #[clap(short = 'i', long, default_value = "2.0")]
117    interval: f64,
118
119    /// The tuner runs at a higher frequency than the load balancer to dynamically
120    /// tune scheduling behavior. Tuning interval in seconds.
121    #[clap(short = 'I', long, default_value = "0.1")]
122    tune_interval: f64,
123
124    /// The half-life of task and domain load running averages in seconds.
125    #[clap(short = 'l', long, default_value = "1.0")]
126    load_half_life: f64,
127
128    /// Build domains according to how CPUs are grouped at this cache level
129    /// as determined by /sys/devices/system/cpu/cpuX/cache/indexI/id.
130    #[clap(short = 'c', long, default_value = "3")]
131    cache_level: u32,
132
133    /// When non-zero, enable greedy task stealing. When a domain is idle, a cpu
134    /// will attempt to steal tasks from another domain as follows:
135    ///
136    /// 1. Try to consume a task from the current domain
137    /// 2. Try to consume a task from another domain in the current NUMA node
138    ///    (or globally, if running on a single-socket system), if the domain
139    ///    has at least this specified number of tasks enqueued.
140    ///
141    /// See greedy_threshold_x_numa to enable task stealing across NUMA nodes.
142    /// Tasks stolen in this manner are not permanently stolen from their
143    /// domain.
144    #[clap(short = 'g', long, default_value = "1")]
145    greedy_threshold: u32,
146
147    /// When non-zero, enable greedy task stealing across NUMA nodes. The order
148    /// of greedy task stealing follows greedy-threshold as described above, and
149    /// greedy-threshold must be nonzero to enable task stealing across NUMA
150    /// nodes.
151    #[clap(long, default_value = "0")]
152    greedy_threshold_x_numa: u32,
153
154    /// Disable load balancing. Unless disabled, userspace will periodically calculate
155    /// the load factor of each domain and instruct BPF which processes to move.
156    #[clap(long, action = clap::ArgAction::SetTrue)]
157    no_load_balance: bool,
158
159    /// Put per-cpu kthreads directly into local dsq's.
160    #[clap(short = 'k', long, action = clap::ArgAction::SetTrue)]
161    kthreads_local: bool,
162
163    /// In recent kernels (>=v6.6), the kernel is responsible for balancing
164    /// kworkers across L3 cache domains. Exclude them from load-balancing
165    /// to avoid conflicting operations. Greedy executions still apply.
166    #[clap(short = 'b', long, action = clap::ArgAction::SetTrue)]
167    balanced_kworkers: bool,
168
169    /// Use FIFO scheduling instead of weighted vtime scheduling.
170    #[clap(short = 'f', long, action = clap::ArgAction::SetTrue)]
171    fifo_sched: bool,
172
173    /// Idle CPUs with utilization lower than this will get remote tasks
174    /// directly pushed onto them. 0 disables, 100 always enables.
175    #[clap(short = 'D', long, default_value = "90.0")]
176    direct_greedy_under: f64,
177
178    /// Idle CPUs with utilization lower than this may get kicked to
179    /// accelerate stealing when a task is queued on a saturated remote
180    /// domain. 0 disables, 100 enables always.
181    #[clap(short = 'K', long, default_value = "100.0")]
182    kick_greedy_under: f64,
183
184    /// Whether tasks can be pushed directly to idle CPUs on NUMA nodes
185    /// different than their domain's node. If direct-greedy-under is disabled,
186    /// this option is a no-op. Otherwise, if this option is set to false
187    /// (default), tasks will only be directly pushed to idle CPUs if they
188    /// reside on the same NUMA node as the task's domain.
189    #[clap(short = 'r', long, action = clap::ArgAction::SetTrue)]
190    direct_greedy_numa: bool,
191
192    /// If specified, only tasks which have their scheduling policy set to
193    /// SCHED_EXT using sched_setscheduler(2) are switched. Otherwise, all
194    /// tasks are switched.
195    #[clap(short = 'p', long, action = clap::ArgAction::SetTrue)]
196    partial: bool,
197
198    /// Enables soft NUMA affinity for tasks that use set_mempolicy. This
199    /// may improve performance in some scenarios when using mempolicies.
200    #[clap(long, action = clap::ArgAction::SetTrue)]
201    mempolicy_affinity: bool,
202
203    /// Enable stats monitoring with the specified interval.
204    #[clap(long)]
205    stats: Option<f64>,
206
207    /// Run in stats monitoring mode with the specified interval. The scheduler
208    /// is not launched.
209    #[clap(long)]
210    monitor: Option<f64>,
211
212    /// Exit debug dump buffer length. 0 indicates default.
213    #[clap(long, default_value = "0")]
214    exit_dump_len: u32,
215
216    /// Enable verbose output, including libbpf details. Specify multiple
217    /// times to increase verbosity.
218    #[clap(short = 'v', long, action = clap::ArgAction::Count)]
219    verbose: u8,
220
221    /// Print version and exit.
222    #[clap(long)]
223    version: bool,
224
225    /// Show descriptions for statistics.
226    #[clap(long)]
227    help_stats: bool,
228
229    /// Tunable for prioritizing CPU performance by configuring the CPU frequency governor.
230    /// Valid values are [0, 1024]. Higher values prioritize performance, lower values
231    /// prioritize energy efficiency. When in doubt, use 0 or 1024.
232    #[clap(long, default_value = "0")]
233    perf: u32,
234
235    #[clap(flatten, next_help_heading = "Libbpf Options")]
236    pub libbpf: LibbpfOpts,
237}
238
239fn read_cpu_busy_and_total(reader: &procfs::ProcReader) -> Result<(u64, u64)> {
240    let cs = reader
241        .read_stat()
242        .context("Failed to read procfs")?
243        .total_cpu
244        .ok_or_else(|| anyhow!("Could not read total cpu stat in proc"))?;
245
246    Ok(match cs {
247        procfs::CpuStat {
248            user_usec: Some(user),
249            nice_usec: Some(nice),
250            system_usec: Some(system),
251            idle_usec: Some(idle),
252            iowait_usec: Some(iowait),
253            irq_usec: Some(irq),
254            softirq_usec: Some(softirq),
255            stolen_usec: Some(stolen),
256            guest_usec: _,
257            guest_nice_usec: _,
258        } => {
259            let busy = user + system + nice + irq + softirq + stolen;
260            let total = busy + idle + iowait;
261            (busy, total)
262        }
263        _ => bail!("Some procfs stats are not populated!"),
264    })
265}
266
267pub fn sub_or_zero(curr: &u64, prev: &u64) -> u64 {
268    curr.checked_sub(*prev).unwrap_or(0u64)
269}
270
271pub fn update_bpf_mask(bpfptr: *mut types::scx_bitmap, cpumask: &Cpumask) -> Result<()> {
272    let bpfmask = unsafe { &mut *bpfptr };
273
274    unsafe { cpumask.write_to_ptr(&raw mut bpfmask.bits as *mut u64, MASK_LEN)? };
275
276    Ok(())
277}
278
279#[derive(Clone, Debug)]
280struct StatsCtx {
281    cpu_busy: u64,
282    cpu_total: u64,
283    bpf_stats: Vec<u64>,
284    time_used: Duration,
285}
286
287impl StatsCtx {
288    fn read_bpf_stats(skel: &BpfSkel) -> Result<Vec<u64>> {
289        let stats_map = &skel.maps.stats;
290        let mut stats: Vec<u64> = Vec::new();
291
292        for stat in 0..bpf_intf::stat_idx_RUSTY_NR_STATS {
293            let cpu_stat_vec = stats_map
294                .lookup_percpu(&stat.to_ne_bytes(), libbpf_rs::MapFlags::ANY)
295                .with_context(|| format!("Failed to lookup stat {}", stat))?
296                .expect("per-cpu stat should exist");
297            let sum = cpu_stat_vec
298                .iter()
299                .map(|val| {
300                    u64::from_ne_bytes(
301                        val.as_slice()
302                            .try_into()
303                            .expect("Invalid value length in stat map"),
304                    )
305                })
306                .sum();
307            stats.push(sum);
308        }
309        Ok(stats)
310    }
311
312    fn blank() -> Self {
313        Self {
314            cpu_busy: 0,
315            cpu_total: 0,
316            bpf_stats: vec![0u64; bpf_intf::stat_idx_RUSTY_NR_STATS as usize],
317            time_used: Duration::default(),
318        }
319    }
320
321    fn new(skel: &BpfSkel, proc_reader: &procfs::ProcReader, time_used: Duration) -> Result<Self> {
322        let (cpu_busy, cpu_total) = read_cpu_busy_and_total(proc_reader)?;
323
324        Ok(Self {
325            cpu_busy,
326            cpu_total,
327            bpf_stats: Self::read_bpf_stats(skel)?,
328            time_used,
329        })
330    }
331
332    fn delta(&self, rhs: &Self) -> Self {
333        Self {
334            cpu_busy: sub_or_zero(&self.cpu_busy, &rhs.cpu_busy),
335            cpu_total: sub_or_zero(&self.cpu_total, &rhs.cpu_total),
336            bpf_stats: self
337                .bpf_stats
338                .iter()
339                .zip(rhs.bpf_stats.iter())
340                .map(|(lhs, rhs)| sub_or_zero(&lhs, &rhs))
341                .collect(),
342            time_used: self.time_used - rhs.time_used,
343        }
344    }
345}
346
347struct Scheduler<'a> {
348    skel: BpfSkel<'a>,
349    struct_ops: Option<libbpf_rs::Link>,
350
351    sched_interval: Duration,
352    tune_interval: Duration,
353    balance_load: bool,
354    balanced_kworkers: bool,
355
356    dom_group: Arc<DomainGroup>,
357
358    proc_reader: procfs::ProcReader,
359
360    lb_at: SystemTime,
361    lb_stats: BTreeMap<usize, NodeStats>,
362    time_used: Duration,
363
364    tuner: Tuner,
365    stats_server: StatsServer<StatsCtx, (StatsCtx, ClusterStats)>,
366}
367
368impl<'a> Scheduler<'a> {
369    fn setup_wd40(skel: &mut BpfSkel<'a>) -> Result<()> {
370        // Allocate the arena memory from the BPF side so userspace initializes it before starting
371        // the scheduler. Despite the function call's name this is neither a test nor a test run,
372        // it's the recommended way of executing SEC("syscall") probes.
373        let input = ProgramInput {
374            ..Default::default()
375        };
376        let output = skel.progs.wd40_setup.test_run(input)?;
377        if output.return_value != 0 {
378            bail!(
379                "Could not initialize WD40 arenas, wd40_arena_setup returned {}",
380                output.return_value as i32
381            );
382        }
383
384        Ok(())
385    }
386
387    fn setup_arenas(skel: &mut BpfSkel<'a>) -> Result<()> {
388        let task_size = std::mem::size_of::<types::task_ctx>();
389        let arenalib = ArenaLib::init(skel.object_mut(), task_size, *NR_CPU_IDS)?;
390        arenalib.setup()?;
391
392        Self::setup_wd40(skel)?;
393
394        Ok(())
395    }
396
397    fn init(opts: &Opts, open_object: &'a mut MaybeUninit<OpenObject>) -> Result<Self> {
398        // Open the BPF prog first for verification.
399        let mut skel_builder = BpfSkelBuilder::default();
400        skel_builder.obj_builder.debug(opts.verbose > 0);
401        init_libbpf_logging(None);
402        info!(
403            "Running scx_wd40 (build ID: {})",
404            build_id::full_version(env!("CARGO_PKG_VERSION"))
405        );
406        let open_opts = opts.libbpf.clone().into_bpf_open_opts();
407        let mut skel = scx_ops_open!(skel_builder, open_object, wd40, open_opts).unwrap();
408
409        // Initialize skel according to @opts.
410        let domains = Arc::new(DomainGroup::new(&Topology::new()?)?);
411
412        if *NR_CPU_IDS > MAX_CPUS {
413            bail!(
414                "Num possible CPU IDs ({}) exceeds maximum of ({})",
415                *NR_CPU_IDS,
416                MAX_CPUS
417            );
418        }
419
420        if domains.nr_doms() > MAX_DOMS {
421            bail!(
422                "nr_doms ({}) is greater than MAX_DOMS ({})",
423                domains.nr_doms(),
424                MAX_DOMS
425            );
426        }
427
428        skel.maps.bss_data.as_mut().unwrap().slice_ns = scx_enums.SCX_SLICE_DFL;
429
430        let rodata = skel.maps.rodata_data.as_mut().unwrap();
431        rodata.nr_nodes = domains.nr_nodes() as u32;
432        rodata.nr_doms = domains.nr_doms() as u32;
433        rodata.nr_cpu_ids = *NR_CPU_IDS as u32;
434
435        if opts.partial {
436            skel.struct_ops.wd40_mut().flags |= *compat::SCX_OPS_SWITCH_PARTIAL;
437        }
438        skel.struct_ops.wd40_mut().exit_dump_len = opts.exit_dump_len;
439
440        rodata.load_half_life = (opts.load_half_life * 1000000000.0) as u32;
441        rodata.kthreads_local = opts.kthreads_local;
442        rodata.fifo_sched = opts.fifo_sched;
443        rodata.greedy_threshold = opts.greedy_threshold;
444        rodata.greedy_threshold_x_numa = opts.greedy_threshold_x_numa;
445        rodata.direct_greedy_numa = opts.direct_greedy_numa;
446        rodata.mempolicy_affinity = opts.mempolicy_affinity;
447        rodata.debug = opts.verbose as u32;
448        rodata.wd40_perf_mode = opts.perf;
449
450        let mut skel = scx_ops_load!(skel, wd40, uei)?;
451
452        Self::setup_arenas(&mut skel)?;
453
454        let bss_data = skel.maps.bss_data.as_mut().unwrap();
455        info!(
456            "Mask length {}, number of possible CPUs {}",
457            bss_data.mask_size,
458            skel.maps.rodata_data.as_mut().unwrap().nr_cpu_ids
459        );
460        // Read the mask length chosen by BPF. We count elements in the u64 array, like the BPF
461        // program does.
462        //
463        // This invocation is safe because there is no concurrency in the program during initialization.
464        unsafe { MASK_LEN = bss_data.mask_size as usize };
465
466        let types::topo_level(index) = types::topo_level::TOPO_LLC;
467
468        for numa in 0..domains.nr_nodes() {
469            let mut numa_mask = Cpumask::new();
470            let node_domains = domains.numa_doms(&numa);
471            for dom in node_domains.iter() {
472                let dom_mask = dom.mask();
473                numa_mask = numa_mask.or(&dom_mask);
474            }
475
476            update_bpf_mask(bss_data.node_data[numa], &numa_mask)?;
477            info!("NODE[{:02}] mask= {}", numa, numa_mask);
478
479            for dom in node_domains.iter() {
480                // XXX Remove this by using the topo node's cpumask.
481                let ptr = bss_data.topo_nodes[index as usize][dom.id()];
482                let domc = unsafe {
483                    &mut *std::ptr::with_exposed_provenance_mut::<dom_ctx>(ptr.try_into().unwrap())
484                };
485                update_bpf_mask(domc.cpumask, &dom.mask())?;
486
487                bss_data.dom_numa_id_map[dom.id()] =
488                    numa.try_into().expect("NUMA ID could not fit into 32 bits");
489
490                info!(" DOM[{:02}] mask= {}", dom.id(), dom.mask());
491            }
492        }
493
494        // Actually get the scheduler starting.
495        let struct_ops = Some(scx_ops_attach!(skel, wd40)?);
496        let stats_server = StatsServer::new(stats::server_data()).launch()?;
497
498        for (id, dom) in domains.doms().iter() {
499            let mut ctx = dom.ctx.lock().unwrap();
500
501            let ptr = skel.maps.bss_data.as_mut().unwrap().topo_nodes[index as usize][*id];
502            let domc = unsafe {
503                &mut *std::ptr::with_exposed_provenance_mut::<dom_ctx>(ptr.try_into().unwrap())
504            };
505            *ctx = Some(domc);
506        }
507
508        info!("WD40 scheduler started! Run `scx_wd40 --monitor` for metrics.");
509
510        // Other stuff.
511        let proc_reader = procfs::ProcReader::new();
512
513        Ok(Self {
514            skel,
515            struct_ops, // should be held to keep it attached
516
517            sched_interval: Duration::from_secs_f64(opts.interval),
518            tune_interval: Duration::from_secs_f64(opts.tune_interval),
519            balance_load: !opts.no_load_balance,
520            balanced_kworkers: opts.balanced_kworkers,
521
522            dom_group: domains.clone(),
523            proc_reader,
524
525            lb_at: SystemTime::now(),
526            lb_stats: BTreeMap::new(),
527            time_used: Duration::default(),
528
529            tuner: Tuner::new(
530                domains,
531                opts.direct_greedy_under,
532                opts.kick_greedy_under,
533                opts.slice_us_underutil * 1000,
534                opts.slice_us_overutil * 1000,
535            )?,
536            stats_server,
537        })
538    }
539
540    fn cluster_stats(&self, sc: &StatsCtx, node_stats: BTreeMap<usize, NodeStats>) -> ClusterStats {
541        let stat = |idx| sc.bpf_stats[idx as usize];
542        let total = stat(bpf_intf::stat_idx_RUSTY_STAT_WAKE_SYNC)
543            + stat(bpf_intf::stat_idx_RUSTY_STAT_SYNC_PREV_IDLE)
544            + stat(bpf_intf::stat_idx_RUSTY_STAT_PREV_IDLE)
545            + stat(bpf_intf::stat_idx_RUSTY_STAT_GREEDY_IDLE)
546            + stat(bpf_intf::stat_idx_RUSTY_STAT_PINNED)
547            + stat(bpf_intf::stat_idx_RUSTY_STAT_DIRECT_DISPATCH)
548            + stat(bpf_intf::stat_idx_RUSTY_STAT_DIRECT_GREEDY)
549            + stat(bpf_intf::stat_idx_RUSTY_STAT_DIRECT_GREEDY_FAR)
550            + stat(bpf_intf::stat_idx_RUSTY_STAT_DSQ_DISPATCH)
551            + stat(bpf_intf::stat_idx_RUSTY_STAT_GREEDY_LOCAL)
552            + stat(bpf_intf::stat_idx_RUSTY_STAT_GREEDY_XNUMA);
553        let stat_pct = |idx| stat(idx) as f64 / total as f64 * 100.0;
554
555        let cpu_busy = if sc.cpu_total != 0 {
556            (sc.cpu_busy as f64 / sc.cpu_total as f64) * 100.0
557        } else {
558            0.0
559        };
560
561        ClusterStats {
562            at_us: SystemTime::now()
563                .duration_since(UNIX_EPOCH)
564                .unwrap()
565                .as_micros()
566                .try_into()
567                .unwrap(),
568            lb_at_us: self
569                .lb_at
570                .duration_since(UNIX_EPOCH)
571                .unwrap()
572                .as_micros()
573                .try_into()
574                .unwrap(),
575            total,
576            slice_us: self.tuner.slice_ns / 1000,
577
578            cpu_busy,
579            load: node_stats.iter().map(|(_k, v)| v.load).sum::<f64>(),
580            nr_migrations: sc.bpf_stats[bpf_intf::stat_idx_RUSTY_STAT_LOAD_BALANCE as usize],
581
582            task_get_err: sc.bpf_stats[bpf_intf::stat_idx_RUSTY_STAT_TASK_GET_ERR as usize],
583            time_used: sc.time_used.as_secs_f64(),
584
585            sync_prev_idle: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_SYNC_PREV_IDLE),
586            wake_sync: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_WAKE_SYNC),
587            prev_idle: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_PREV_IDLE),
588            greedy_idle: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_GREEDY_IDLE),
589            pinned: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_PINNED),
590            direct: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_DIRECT_DISPATCH),
591            greedy: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_DIRECT_GREEDY),
592            greedy_far: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_DIRECT_GREEDY_FAR),
593            dsq_dispatch: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_DSQ_DISPATCH),
594            greedy_local: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_GREEDY_LOCAL),
595            greedy_xnuma: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_GREEDY_XNUMA),
596            kick_greedy: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_KICK_GREEDY),
597            repatriate: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_REPATRIATE),
598            dl_clamp: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_DL_CLAMP),
599            dl_preset: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_DL_PRESET),
600
601            direct_greedy_cpus: self.tuner.direct_greedy_mask.as_raw_slice().to_owned(),
602            kick_greedy_cpus: self.tuner.kick_greedy_mask.as_raw_slice().to_owned(),
603
604            nodes: node_stats,
605        }
606    }
607
608    fn lb_step(&mut self) -> Result<()> {
609        let mut lb = LoadBalancer::new(
610            &mut self.skel,
611            self.dom_group.clone(),
612            self.balanced_kworkers,
613            self.tuner.fully_utilized,
614            self.balance_load,
615        );
616
617        lb.load_balance()?;
618
619        self.lb_at = SystemTime::now();
620        self.lb_stats = lb.get_stats();
621        Ok(())
622    }
623
624    fn run(&mut self, shutdown: Arc<AtomicBool>) -> Result<UserExitInfo> {
625        let (res_ch, req_ch) = self.stats_server.channels();
626        let now = Instant::now();
627        let mut next_tune_at = now + self.tune_interval;
628        let mut next_sched_at = now + self.sched_interval;
629
630        self.skel.maps.stats.value_size() as usize;
631
632        while !shutdown.load(Ordering::Relaxed) && !uei_exited!(&self.skel, uei) {
633            let now = Instant::now();
634
635            if now >= next_tune_at {
636                self.tuner.step(&mut self.skel)?;
637                next_tune_at += self.tune_interval;
638                if next_tune_at < now {
639                    next_tune_at = now + self.tune_interval;
640                }
641            }
642
643            if now >= next_sched_at {
644                self.lb_step()?;
645                next_sched_at += self.sched_interval;
646                if next_sched_at < now {
647                    next_sched_at = now + self.sched_interval;
648                }
649            }
650
651            self.time_used += Instant::now().duration_since(now);
652
653            match req_ch.recv_deadline(next_sched_at.min(next_tune_at)) {
654                Ok(prev_sc) => {
655                    let cur_sc = StatsCtx::new(&self.skel, &self.proc_reader, self.time_used)?;
656                    let delta_sc = cur_sc.delta(&prev_sc);
657                    let cstats = self.cluster_stats(&delta_sc, self.lb_stats.clone());
658                    res_ch.send((cur_sc, cstats))?;
659                }
660                Err(RecvTimeoutError::Timeout) => {}
661                Err(e) => Err(e)?,
662            }
663        }
664
665        let _ = self.struct_ops.take();
666        uei_report!(&self.skel, uei)
667    }
668}
669
670impl Drop for Scheduler<'_> {
671    fn drop(&mut self) {
672        info!("Unregister {SCHEDULER_NAME} scheduler");
673
674        if let Some(struct_ops) = self.struct_ops.take() {
675            drop(struct_ops);
676        }
677    }
678}
679
680fn main() -> Result<()> {
681    let opts = Opts::parse();
682
683    if opts.version {
684        println!(
685            "scx_wd40: {}",
686            build_id::full_version(env!("CARGO_PKG_VERSION"))
687        );
688        return Ok(());
689    }
690
691    if opts.help_stats {
692        stats::server_data().describe_meta(&mut std::io::stdout(), None)?;
693        return Ok(());
694    }
695
696    let llv = match opts.verbose {
697        0 => simplelog::LevelFilter::Info,
698        1 => simplelog::LevelFilter::Debug,
699        _ => simplelog::LevelFilter::Trace,
700    };
701    let mut lcfg = simplelog::ConfigBuilder::new();
702    lcfg.set_time_offset_to_local()
703        .expect("Failed to set local time offset")
704        .set_time_level(simplelog::LevelFilter::Error)
705        .set_location_level(simplelog::LevelFilter::Off)
706        .set_target_level(simplelog::LevelFilter::Off)
707        .set_thread_level(simplelog::LevelFilter::Off);
708    simplelog::TermLogger::init(
709        llv,
710        lcfg.build(),
711        simplelog::TerminalMode::Stderr,
712        simplelog::ColorChoice::Auto,
713    )?;
714
715    let shutdown = Arc::new(AtomicBool::new(false));
716    let shutdown_clone = shutdown.clone();
717    ctrlc::set_handler(move || {
718        shutdown_clone.store(true, Ordering::Relaxed);
719    })
720    .context("Error setting Ctrl-C handler")?;
721
722    if let Some(intv) = opts.monitor.or(opts.stats) {
723        let shutdown_copy = shutdown.clone();
724        let jh = std::thread::spawn(move || {
725            stats::monitor(Duration::from_secs_f64(intv), shutdown_copy).unwrap()
726        });
727        if opts.monitor.is_some() {
728            let _ = jh.join();
729            return Ok(());
730        }
731    }
732
733    let mut open_object = MaybeUninit::uninit();
734    loop {
735        let mut sched = Scheduler::init(&opts, &mut open_object)?;
736        if !sched.run(shutdown.clone())?.should_restart() {
737            break;
738        }
739    }
740    Ok(())
741}