scx_wd40/
main.rs

1// Copyright (c) Meta Platforms, Inc. and affiliates.
2
3// This software may be used and distributed according to the terms of the
4// GNU General Public License version 2.
5mod bpf_skel;
6pub use bpf_skel::*;
7pub mod bpf_intf;
8
9mod domain;
10use domain::DomainGroup;
11
12pub mod tuner;
13use tuner::Tuner;
14
15pub mod load_balance;
16use load_balance::LoadBalancer;
17
18mod stats;
19use std::collections::BTreeMap;
20use std::mem::MaybeUninit;
21use std::sync::atomic::AtomicBool;
22use std::sync::atomic::Ordering;
23use std::sync::Arc;
24use std::time::Duration;
25use std::time::Instant;
26use std::time::SystemTime;
27use std::time::UNIX_EPOCH;
28
29use stats::ClusterStats;
30use stats::NodeStats;
31
32use std::ffi::c_ulong;
33
34#[macro_use]
35extern crate static_assertions;
36
37use ::fb_procfs as procfs;
38use anyhow::anyhow;
39use anyhow::bail;
40use anyhow::Context;
41use anyhow::Result;
42use clap::Parser;
43use crossbeam::channel::RecvTimeoutError;
44use libbpf_rs::MapCore as _;
45use libbpf_rs::OpenObject;
46use libbpf_rs::ProgramInput;
47use log::info;
48use scx_stats::prelude::*;
49use scx_utils::build_id;
50use scx_utils::compat;
51use scx_utils::init_libbpf_logging;
52use scx_utils::scx_enums;
53use scx_utils::scx_ops_attach;
54use scx_utils::scx_ops_load;
55use scx_utils::scx_ops_open;
56use scx_utils::uei_exited;
57use scx_utils::uei_report;
58use scx_utils::Core;
59use scx_utils::Cpumask;
60use scx_utils::Llc;
61use scx_utils::Topology;
62use scx_utils::UserExitInfo;
63use scx_utils::NR_CPU_IDS;
64
65const MAX_DOMS: usize = bpf_intf::consts_MAX_DOMS as usize;
66const MAX_CPUS: usize = bpf_intf::consts_MAX_CPUS as usize;
67
68// Number of u64 words in a BPF CPU mask.
69static mut MASK_LEN: usize = 0;
70
71/// scx_wd40: A fork of the scx_rusty multi-domain scheduler.
72///
73/// The message below is from the original scx_rusty codebase:
74///
75/// The BPF part does simple vtime or round robin scheduling in each domain
76/// while tracking average load of each domain and duty cycle of each task.
77///
78/// The userspace part performs two roles. First, it makes higher frequency
79/// (100ms) tuning decisions. It identifies CPUs which are not too heavily
80/// loaded and marks them so that they can pull tasks from other overloaded
81/// domains on the fly.
82///
83/// Second, it drives lower frequency (2s) load balancing. It determines
84/// whether load balancing is necessary by comparing domain load averages.
85/// If there are large enough load differences, it examines upto 1024
86/// recently active tasks on the domain to determine which should be
87/// migrated.
88///
89/// The overhead of userspace operations is low. Load balancing is not
90/// performed frequently, but work-conservation is still maintained through
91/// tuning and greedy execution. Load balancing itself is not that expensive
92/// either. It only accesses per-domain load metrics to determine the domains
93/// that need load balancing, as well as limited number of per-task metrics
94/// for each pushing domain.
95///
96/// An earlier variant of this scheduler was used to balance across six
97/// domains, each representing a chiplet in a six-chiplet AMD processor, and
98/// could match the performance of production setup using CFS.
99///
100/// WARNING: scx_rusty currently assumes that all domains have equal
101/// processing power and at similar distances from each other. This
102/// limitation will be removed in the future.
103#[derive(Debug, Parser)]
104struct Opts {
105    /// Scheduling slice duration for under-utilized hosts, in microseconds.
106    #[clap(short = 'u', long, default_value = "20000")]
107    slice_us_underutil: u64,
108
109    /// Scheduling slice duration for over-utilized hosts, in microseconds.
110    #[clap(short = 'o', long, default_value = "1000")]
111    slice_us_overutil: u64,
112
113    /// Load balance interval in seconds.
114    #[clap(short = 'i', long, default_value = "2.0")]
115    interval: f64,
116
117    /// The tuner runs at a higher frequency than the load balancer to dynamically
118    /// tune scheduling behavior. Tuning interval in seconds.
119    #[clap(short = 'I', long, default_value = "0.1")]
120    tune_interval: f64,
121
122    /// The half-life of task and domain load running averages in seconds.
123    #[clap(short = 'l', long, default_value = "1.0")]
124    load_half_life: f64,
125
126    /// Build domains according to how CPUs are grouped at this cache level
127    /// as determined by /sys/devices/system/cpu/cpuX/cache/indexI/id.
128    #[clap(short = 'c', long, default_value = "3")]
129    cache_level: u32,
130
131    /// When non-zero, enable greedy task stealing. When a domain is idle, a cpu
132    /// will attempt to steal tasks from another domain as follows:
133    ///
134    /// 1. Try to consume a task from the current domain
135    /// 2. Try to consume a task from another domain in the current NUMA node
136    ///    (or globally, if running on a single-socket system), if the domain
137    ///    has at least this specified number of tasks enqueued.
138    ///
139    /// See greedy_threshold_x_numa to enable task stealing across NUMA nodes.
140    /// Tasks stolen in this manner are not permanently stolen from their
141    /// domain.
142    #[clap(short = 'g', long, default_value = "1")]
143    greedy_threshold: u32,
144
145    /// When non-zero, enable greedy task stealing across NUMA nodes. The order
146    /// of greedy task stealing follows greedy-threshold as described above, and
147    /// greedy-threshold must be nonzero to enable task stealing across NUMA
148    /// nodes.
149    #[clap(long, default_value = "0")]
150    greedy_threshold_x_numa: u32,
151
152    /// Disable load balancing. Unless disabled, userspace will periodically calculate
153    /// the load factor of each domain and instruct BPF which processes to move.
154    #[clap(long, action = clap::ArgAction::SetTrue)]
155    no_load_balance: bool,
156
157    /// Put per-cpu kthreads directly into local dsq's.
158    #[clap(short = 'k', long, action = clap::ArgAction::SetTrue)]
159    kthreads_local: bool,
160
161    /// In recent kernels (>=v6.6), the kernel is responsible for balancing
162    /// kworkers across L3 cache domains. Exclude them from load-balancing
163    /// to avoid conflicting operations. Greedy executions still apply.
164    #[clap(short = 'b', long, action = clap::ArgAction::SetTrue)]
165    balanced_kworkers: bool,
166
167    /// Use FIFO scheduling instead of weighted vtime scheduling.
168    #[clap(short = 'f', long, action = clap::ArgAction::SetTrue)]
169    fifo_sched: bool,
170
171    /// Idle CPUs with utilization lower than this will get remote tasks
172    /// directly pushed onto them. 0 disables, 100 always enables.
173    #[clap(short = 'D', long, default_value = "90.0")]
174    direct_greedy_under: f64,
175
176    /// Idle CPUs with utilization lower than this may get kicked to
177    /// accelerate stealing when a task is queued on a saturated remote
178    /// domain. 0 disables, 100 enables always.
179    #[clap(short = 'K', long, default_value = "100.0")]
180    kick_greedy_under: f64,
181
182    /// Whether tasks can be pushed directly to idle CPUs on NUMA nodes
183    /// different than their domain's node. If direct-greedy-under is disabled,
184    /// this option is a no-op. Otherwise, if this option is set to false
185    /// (default), tasks will only be directly pushed to idle CPUs if they
186    /// reside on the same NUMA node as the task's domain.
187    #[clap(short = 'r', long, action = clap::ArgAction::SetTrue)]
188    direct_greedy_numa: bool,
189
190    /// If specified, only tasks which have their scheduling policy set to
191    /// SCHED_EXT using sched_setscheduler(2) are switched. Otherwise, all
192    /// tasks are switched.
193    #[clap(short = 'p', long, action = clap::ArgAction::SetTrue)]
194    partial: bool,
195
196    /// Enables soft NUMA affinity for tasks that use set_mempolicy. This
197    /// may improve performance in some scenarios when using mempolicies.
198    #[clap(long, action = clap::ArgAction::SetTrue)]
199    mempolicy_affinity: bool,
200
201    /// Enable stats monitoring with the specified interval.
202    #[clap(long)]
203    stats: Option<f64>,
204
205    /// Run in stats monitoring mode with the specified interval. The scheduler
206    /// is not launched.
207    #[clap(long)]
208    monitor: Option<f64>,
209
210    /// Exit debug dump buffer length. 0 indicates default.
211    #[clap(long, default_value = "0")]
212    exit_dump_len: u32,
213
214    /// Enable verbose output, including libbpf details. Specify multiple
215    /// times to increase verbosity.
216    #[clap(short = 'v', long, action = clap::ArgAction::Count)]
217    verbose: u8,
218
219    /// Print version and exit.
220    #[clap(long)]
221    version: bool,
222
223    /// Show descriptions for statistics.
224    #[clap(long)]
225    help_stats: bool,
226
227    /// Tunable for prioritizing CPU performance by configuring the CPU frequency governor.
228    /// Valid values are [0, 1024]. Higher values prioritize performance, lower values
229    /// prioritize energy efficiency. When in doubt, use 0 or 1024.
230    #[clap(long, default_value = "0")]
231    perf: u32,
232}
233
234fn read_cpu_busy_and_total(reader: &procfs::ProcReader) -> Result<(u64, u64)> {
235    let cs = reader
236        .read_stat()
237        .context("Failed to read procfs")?
238        .total_cpu
239        .ok_or_else(|| anyhow!("Could not read total cpu stat in proc"))?;
240
241    Ok(match cs {
242        procfs::CpuStat {
243            user_usec: Some(user),
244            nice_usec: Some(nice),
245            system_usec: Some(system),
246            idle_usec: Some(idle),
247            iowait_usec: Some(iowait),
248            irq_usec: Some(irq),
249            softirq_usec: Some(softirq),
250            stolen_usec: Some(stolen),
251            guest_usec: _,
252            guest_nice_usec: _,
253        } => {
254            let busy = user + system + nice + irq + softirq + stolen;
255            let total = busy + idle + iowait;
256            (busy, total)
257        }
258        _ => bail!("Some procfs stats are not populated!"),
259    })
260}
261
262pub fn sub_or_zero(curr: &u64, prev: &u64) -> u64 {
263    curr.checked_sub(*prev).unwrap_or(0u64)
264}
265
266pub fn update_bpf_mask(bpfptr: *mut types::scx_bitmap, cpumask: &Cpumask) -> Result<()> {
267    let bpfmask = unsafe { &mut *bpfptr };
268
269    unsafe { cpumask.write_to_ptr(&raw mut bpfmask.bits as *mut u64, MASK_LEN)? };
270
271    Ok(())
272}
273
274#[derive(Clone, Debug)]
275struct StatsCtx {
276    cpu_busy: u64,
277    cpu_total: u64,
278    bpf_stats: Vec<u64>,
279    time_used: Duration,
280}
281
282impl StatsCtx {
283    fn read_bpf_stats(skel: &BpfSkel) -> Result<Vec<u64>> {
284        let stats_map = &skel.maps.stats;
285        let mut stats: Vec<u64> = Vec::new();
286
287        for stat in 0..bpf_intf::stat_idx_RUSTY_NR_STATS {
288            let cpu_stat_vec = stats_map
289                .lookup_percpu(&stat.to_ne_bytes(), libbpf_rs::MapFlags::ANY)
290                .with_context(|| format!("Failed to lookup stat {}", stat))?
291                .expect("per-cpu stat should exist");
292            let sum = cpu_stat_vec
293                .iter()
294                .map(|val| {
295                    u64::from_ne_bytes(
296                        val.as_slice()
297                            .try_into()
298                            .expect("Invalid value length in stat map"),
299                    )
300                })
301                .sum();
302            stats.push(sum);
303        }
304        Ok(stats)
305    }
306
307    fn blank() -> Self {
308        Self {
309            cpu_busy: 0,
310            cpu_total: 0,
311            bpf_stats: vec![0u64; bpf_intf::stat_idx_RUSTY_NR_STATS as usize],
312            time_used: Duration::default(),
313        }
314    }
315
316    fn new(skel: &BpfSkel, proc_reader: &procfs::ProcReader, time_used: Duration) -> Result<Self> {
317        let (cpu_busy, cpu_total) = read_cpu_busy_and_total(proc_reader)?;
318
319        Ok(Self {
320            cpu_busy,
321            cpu_total,
322            bpf_stats: Self::read_bpf_stats(skel)?,
323            time_used,
324        })
325    }
326
327    fn delta(&self, rhs: &Self) -> Self {
328        Self {
329            cpu_busy: sub_or_zero(&self.cpu_busy, &rhs.cpu_busy),
330            cpu_total: sub_or_zero(&self.cpu_total, &rhs.cpu_total),
331            bpf_stats: self
332                .bpf_stats
333                .iter()
334                .zip(rhs.bpf_stats.iter())
335                .map(|(lhs, rhs)| sub_or_zero(&lhs, &rhs))
336                .collect(),
337            time_used: self.time_used - rhs.time_used,
338        }
339    }
340}
341
342struct Scheduler<'a> {
343    skel: BpfSkel<'a>,
344    struct_ops: Option<libbpf_rs::Link>,
345
346    sched_interval: Duration,
347    tune_interval: Duration,
348    balance_load: bool,
349    balanced_kworkers: bool,
350
351    dom_group: Arc<DomainGroup>,
352
353    proc_reader: procfs::ProcReader,
354
355    lb_at: SystemTime,
356    lb_stats: BTreeMap<usize, NodeStats>,
357    time_used: Duration,
358
359    tuner: Tuner,
360    stats_server: StatsServer<StatsCtx, (StatsCtx, ClusterStats)>,
361}
362
363impl<'a> Scheduler<'a> {
364    fn setup_allocators(skel: &mut BpfSkel<'a>) -> Result<()> {
365        // Allocate the arena memory from the BPF side so userspace initializes it before starting
366        // the scheduler. Despite the function call's name this is neither a test nor a test run,
367        // it's the recommended way of executing SEC("syscall") probes.
368        let mut args = types::arena_init_args {
369            static_pages: bpf_intf::consts_STATIC_ALLOC_PAGES_GRANULARITY as c_ulong,
370            task_ctx_size: std::mem::size_of::<types::task_ctx>() as c_ulong,
371        };
372
373        let input = ProgramInput {
374            context_in: Some(unsafe {
375                std::slice::from_raw_parts_mut(
376                    &mut args as *mut _ as *mut u8,
377                    std::mem::size_of_val(&args),
378                )
379            }),
380            ..Default::default()
381        };
382
383        let output = skel.progs.arena_init.test_run(input)?;
384        if output.return_value != 0 {
385            bail!(
386                "Could not initialize arenas, p2dq_setup returned {}",
387                output.return_value as i32
388            );
389        }
390
391        Ok(())
392    }
393
394    fn setup_topology_node(
395        skel: &mut BpfSkel<'a>,
396        mask: &[u64],
397        data_size: usize,
398        id: usize,
399    ) -> Result<()> {
400        let mut args = types::arena_alloc_mask_args {
401            bitmap: 0 as c_ulong,
402        };
403
404        let input = ProgramInput {
405            context_in: Some(unsafe {
406                std::slice::from_raw_parts_mut(
407                    &mut args as *mut _ as *mut u8,
408                    std::mem::size_of_val(&args),
409                )
410            }),
411            ..Default::default()
412        };
413
414        let output = skel.progs.arena_alloc_mask.test_run(input)?;
415        if output.return_value != 0 {
416            bail!(
417                "Could not initialize arenas, setup_topology_node returned {}",
418                output.return_value as i32
419            );
420        }
421
422        let ptr = unsafe { std::mem::transmute::<u64, &mut [u64; 10]>(args.bitmap) };
423
424        let (valid_mask, _) = ptr.split_at_mut(mask.len());
425        valid_mask.clone_from_slice(mask);
426
427        let mut args = types::arena_topology_node_init_args {
428            bitmap: args.bitmap as c_ulong,
429            data_size: data_size as c_ulong,
430            id: id as c_ulong,
431        };
432
433        let input = ProgramInput {
434            context_in: Some(unsafe {
435                std::slice::from_raw_parts_mut(
436                    &mut args as *mut _ as *mut u8,
437                    std::mem::size_of_val(&args),
438                )
439            }),
440            ..Default::default()
441        };
442        let output = skel.progs.arena_topology_node_init.test_run(input)?;
443        if output.return_value != 0 {
444            bail!(
445                "p2dq_topology_node_init returned {}",
446                output.return_value as i32
447            );
448        }
449
450        Ok(())
451    }
452
453    fn setup_topology(skel: &mut BpfSkel<'a>) -> Result<()> {
454        let topo = Topology::new().expect("Failed to build host topology");
455
456        // We never use the topology-provided IDs, because we do not need them anymore now that
457        // we have a proper topology struct. We instead only use sequential IDs we create
458        // ourselves to attach to the topology nodes. This is because the Topology-provided IDs
459        // are used to determine relations between topology nodes, e.g., LLCs belonging to a
460        // node, while sequential IDs are useful for linearly scanning the topology, e.g.,
461        // iterating over domains. This is why we LLC IDs and domain IDs in scx_wd40 are different.
462        // For now we only need the sequential IDs, so use those. Eventually we will be able
463        // to remove those, too, once we remove the hardcoded arrays from the code.
464        Self::setup_topology_node(skel, topo.span.as_raw_slice(), 0, 0)?;
465
466        for (id, (_, node)) in topo.nodes.into_iter().enumerate() {
467            Self::setup_topology_node(skel, node.span.as_raw_slice(), 0, id)?;
468        }
469
470        for (id, (_, llc)) in topo.all_llcs.into_iter().into_iter().enumerate() {
471            Self::setup_topology_node(
472                skel,
473                Arc::<Llc>::into_inner(llc)
474                    .expect("missing llc")
475                    .span
476                    .as_raw_slice(),
477                0,
478                id,
479            )?;
480        }
481        for (id, (_, core)) in topo.all_cores.into_iter().into_iter().enumerate() {
482            Self::setup_topology_node(
483                skel,
484                Arc::<Core>::into_inner(core)
485                    .expect("missing core")
486                    .span
487                    .as_raw_slice(),
488                0,
489                id,
490            )?;
491        }
492        for (id, (_, cpu)) in topo.all_cpus.into_iter().into_iter().enumerate() {
493            let mut mask = [0; 9];
494            mask[cpu.id.checked_shr(64).unwrap_or(0)] |= 1 << (cpu.id % 64);
495            Self::setup_topology_node(skel, &mask, 0, id)?;
496        }
497
498        Ok(())
499    }
500
501    fn setup_wd40(skel: &mut BpfSkel<'a>) -> Result<()> {
502        // Allocate the arena memory from the BPF side so userspace initializes it before starting
503        // the scheduler. Despite the function call's name this is neither a test nor a test run,
504        // it's the recommended way of executing SEC("syscall") probes.
505        let input = ProgramInput {
506            ..Default::default()
507        };
508        let output = skel.progs.wd40_setup.test_run(input)?;
509        if output.return_value != 0 {
510            bail!(
511                "Could not initialize WD40 arenas, wd40_arena_setup returned {}",
512                output.return_value as i32
513            );
514        }
515
516        Ok(())
517    }
518
519    fn setup_arenas(skel: &mut BpfSkel<'a>) -> Result<()> {
520        Self::setup_allocators(skel)?;
521        Self::setup_topology(skel)?;
522        Self::setup_wd40(skel)?;
523
524        Ok(())
525    }
526
527    fn init(opts: &Opts, open_object: &'a mut MaybeUninit<OpenObject>) -> Result<Self> {
528        // Open the BPF prog first for verification.
529        let mut skel_builder = BpfSkelBuilder::default();
530        skel_builder.obj_builder.debug(opts.verbose > 0);
531        init_libbpf_logging(None);
532        info!(
533            "Running scx_wd40 (build ID: {})",
534            build_id::full_version(env!("CARGO_PKG_VERSION"))
535        );
536        let mut skel = scx_ops_open!(skel_builder, open_object, wd40).unwrap();
537
538        // Initialize skel according to @opts.
539        let domains = Arc::new(DomainGroup::new(&Topology::new()?)?);
540
541        if *NR_CPU_IDS > MAX_CPUS {
542            bail!(
543                "Num possible CPU IDs ({}) exceeds maximum of ({})",
544                *NR_CPU_IDS,
545                MAX_CPUS
546            );
547        }
548
549        if domains.nr_doms() > MAX_DOMS {
550            bail!(
551                "nr_doms ({}) is greater than MAX_DOMS ({})",
552                domains.nr_doms(),
553                MAX_DOMS
554            );
555        }
556
557        skel.maps.bss_data.as_mut().unwrap().slice_ns = scx_enums.SCX_SLICE_DFL;
558
559        let rodata = skel.maps.rodata_data.as_mut().unwrap();
560        rodata.nr_nodes = domains.nr_nodes() as u32;
561        rodata.nr_doms = domains.nr_doms() as u32;
562        rodata.nr_cpu_ids = *NR_CPU_IDS as u32;
563
564        if opts.partial {
565            skel.struct_ops.wd40_mut().flags |= *compat::SCX_OPS_SWITCH_PARTIAL;
566        }
567        skel.struct_ops.wd40_mut().exit_dump_len = opts.exit_dump_len;
568
569        rodata.load_half_life = (opts.load_half_life * 1000000000.0) as u32;
570        rodata.kthreads_local = opts.kthreads_local;
571        rodata.fifo_sched = opts.fifo_sched;
572        rodata.greedy_threshold = opts.greedy_threshold;
573        rodata.greedy_threshold_x_numa = opts.greedy_threshold_x_numa;
574        rodata.direct_greedy_numa = opts.direct_greedy_numa;
575        rodata.mempolicy_affinity = opts.mempolicy_affinity;
576        rodata.debug = opts.verbose as u32;
577        rodata.wd40_perf_mode = opts.perf;
578
579        let mut skel = scx_ops_load!(skel, wd40, uei)?;
580
581        Self::setup_arenas(&mut skel)?;
582
583        let bss_data = skel.maps.bss_data.as_mut().unwrap();
584        info!(
585            "Mask length {}, number of possible CPUs {}",
586            bss_data.mask_size,
587            skel.maps.rodata_data.as_mut().unwrap().nr_cpu_ids
588        );
589        // Read the mask length chosen by BPF. We count elements in the u64 array, like the BPF
590        // program does.
591        //
592        // This invocation is safe because there is no concurrency in the program during initialization.
593        unsafe { MASK_LEN = bss_data.mask_size as usize };
594
595        let types::topo_level(index) = types::topo_level::TOPO_LLC;
596
597        for numa in 0..domains.nr_nodes() {
598            let mut numa_mask = Cpumask::new();
599            let node_domains = domains.numa_doms(&numa);
600            for dom in node_domains.iter() {
601                let dom_mask = dom.mask();
602                numa_mask = numa_mask.or(&dom_mask);
603            }
604
605            update_bpf_mask(bss_data.node_data[numa], &numa_mask)?;
606            info!("NODE[{:02}] mask= {}", numa, numa_mask);
607
608            for dom in node_domains.iter() {
609                // XXX Remove this by using the topo node's cpumask.
610                let ptr = bss_data.topo_nodes[index as usize][dom.id()];
611                let domc = unsafe { std::mem::transmute::<u64, &mut types::dom_ctx>(ptr) };
612                update_bpf_mask(domc.cpumask, &dom.mask())?;
613
614                bss_data.dom_numa_id_map[dom.id()] =
615                    numa.try_into().expect("NUMA ID could not fit into 32 bits");
616
617                info!(" DOM[{:02}] mask= {}", dom.id(), dom.mask());
618            }
619        }
620
621        // Actually get the scheduler starting.
622        let struct_ops = Some(scx_ops_attach!(skel, wd40)?);
623        let stats_server = StatsServer::new(stats::server_data()).launch()?;
624
625        for (id, dom) in domains.doms().iter() {
626            let mut ctx = dom.ctx.lock().unwrap();
627
628            let ptr = skel.maps.bss_data.as_mut().unwrap().topo_nodes[index as usize][*id];
629            let domc = unsafe { std::mem::transmute::<u64, &mut types::dom_ctx>(ptr) };
630            *ctx = Some(domc);
631        }
632
633        info!("WD40 scheduler started! Run `scx_wd40 --monitor` for metrics.");
634
635        // Other stuff.
636        let proc_reader = procfs::ProcReader::new();
637
638        Ok(Self {
639            skel,
640            struct_ops, // should be held to keep it attached
641
642            sched_interval: Duration::from_secs_f64(opts.interval),
643            tune_interval: Duration::from_secs_f64(opts.tune_interval),
644            balance_load: !opts.no_load_balance,
645            balanced_kworkers: opts.balanced_kworkers,
646
647            dom_group: domains.clone(),
648            proc_reader,
649
650            lb_at: SystemTime::now(),
651            lb_stats: BTreeMap::new(),
652            time_used: Duration::default(),
653
654            tuner: Tuner::new(
655                domains,
656                opts.direct_greedy_under,
657                opts.kick_greedy_under,
658                opts.slice_us_underutil * 1000,
659                opts.slice_us_overutil * 1000,
660            )?,
661            stats_server,
662        })
663    }
664
665    fn cluster_stats(&self, sc: &StatsCtx, node_stats: BTreeMap<usize, NodeStats>) -> ClusterStats {
666        let stat = |idx| sc.bpf_stats[idx as usize];
667        let total = stat(bpf_intf::stat_idx_RUSTY_STAT_WAKE_SYNC)
668            + stat(bpf_intf::stat_idx_RUSTY_STAT_SYNC_PREV_IDLE)
669            + stat(bpf_intf::stat_idx_RUSTY_STAT_PREV_IDLE)
670            + stat(bpf_intf::stat_idx_RUSTY_STAT_GREEDY_IDLE)
671            + stat(bpf_intf::stat_idx_RUSTY_STAT_PINNED)
672            + stat(bpf_intf::stat_idx_RUSTY_STAT_DIRECT_DISPATCH)
673            + stat(bpf_intf::stat_idx_RUSTY_STAT_DIRECT_GREEDY)
674            + stat(bpf_intf::stat_idx_RUSTY_STAT_DIRECT_GREEDY_FAR)
675            + stat(bpf_intf::stat_idx_RUSTY_STAT_DSQ_DISPATCH)
676            + stat(bpf_intf::stat_idx_RUSTY_STAT_GREEDY_LOCAL)
677            + stat(bpf_intf::stat_idx_RUSTY_STAT_GREEDY_XNUMA);
678        let stat_pct = |idx| stat(idx) as f64 / total as f64 * 100.0;
679
680        let cpu_busy = if sc.cpu_total != 0 {
681            (sc.cpu_busy as f64 / sc.cpu_total as f64) * 100.0
682        } else {
683            0.0
684        };
685
686        ClusterStats {
687            at_us: SystemTime::now()
688                .duration_since(UNIX_EPOCH)
689                .unwrap()
690                .as_micros()
691                .try_into()
692                .unwrap(),
693            lb_at_us: self
694                .lb_at
695                .duration_since(UNIX_EPOCH)
696                .unwrap()
697                .as_micros()
698                .try_into()
699                .unwrap(),
700            total,
701            slice_us: self.tuner.slice_ns / 1000,
702
703            cpu_busy,
704            load: node_stats.iter().map(|(_k, v)| v.load).sum::<f64>(),
705            nr_migrations: sc.bpf_stats[bpf_intf::stat_idx_RUSTY_STAT_LOAD_BALANCE as usize],
706
707            task_get_err: sc.bpf_stats[bpf_intf::stat_idx_RUSTY_STAT_TASK_GET_ERR as usize],
708            time_used: sc.time_used.as_secs_f64(),
709
710            sync_prev_idle: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_SYNC_PREV_IDLE),
711            wake_sync: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_WAKE_SYNC),
712            prev_idle: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_PREV_IDLE),
713            greedy_idle: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_GREEDY_IDLE),
714            pinned: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_PINNED),
715            direct: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_DIRECT_DISPATCH),
716            greedy: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_DIRECT_GREEDY),
717            greedy_far: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_DIRECT_GREEDY_FAR),
718            dsq_dispatch: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_DSQ_DISPATCH),
719            greedy_local: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_GREEDY_LOCAL),
720            greedy_xnuma: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_GREEDY_XNUMA),
721            kick_greedy: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_KICK_GREEDY),
722            repatriate: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_REPATRIATE),
723            dl_clamp: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_DL_CLAMP),
724            dl_preset: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_DL_PRESET),
725
726            direct_greedy_cpus: self.tuner.direct_greedy_mask.as_raw_slice().to_owned(),
727            kick_greedy_cpus: self.tuner.kick_greedy_mask.as_raw_slice().to_owned(),
728
729            nodes: node_stats,
730        }
731    }
732
733    fn lb_step(&mut self) -> Result<()> {
734        let mut lb = LoadBalancer::new(
735            &mut self.skel,
736            self.dom_group.clone(),
737            self.balanced_kworkers,
738            self.tuner.fully_utilized,
739            self.balance_load,
740        );
741
742        lb.load_balance()?;
743
744        self.lb_at = SystemTime::now();
745        self.lb_stats = lb.get_stats();
746        Ok(())
747    }
748
749    fn run(&mut self, shutdown: Arc<AtomicBool>) -> Result<UserExitInfo> {
750        let (res_ch, req_ch) = self.stats_server.channels();
751        let now = Instant::now();
752        let mut next_tune_at = now + self.tune_interval;
753        let mut next_sched_at = now + self.sched_interval;
754
755        self.skel.maps.stats.value_size() as usize;
756
757        while !shutdown.load(Ordering::Relaxed) && !uei_exited!(&self.skel, uei) {
758            let now = Instant::now();
759
760            if now >= next_tune_at {
761                self.tuner.step(&mut self.skel)?;
762                next_tune_at += self.tune_interval;
763                if next_tune_at < now {
764                    next_tune_at = now + self.tune_interval;
765                }
766            }
767
768            if now >= next_sched_at {
769                self.lb_step()?;
770                next_sched_at += self.sched_interval;
771                if next_sched_at < now {
772                    next_sched_at = now + self.sched_interval;
773                }
774            }
775
776            self.time_used += Instant::now().duration_since(now);
777
778            match req_ch.recv_deadline(next_sched_at.min(next_tune_at)) {
779                Ok(prev_sc) => {
780                    let cur_sc = StatsCtx::new(&self.skel, &self.proc_reader, self.time_used)?;
781                    let delta_sc = cur_sc.delta(&prev_sc);
782                    let cstats = self.cluster_stats(&delta_sc, self.lb_stats.clone());
783                    res_ch.send((cur_sc, cstats))?;
784                }
785                Err(RecvTimeoutError::Timeout) => {}
786                Err(e) => Err(e)?,
787            }
788        }
789
790        let _ = self.struct_ops.take();
791        uei_report!(&self.skel, uei)
792    }
793}
794
795impl Drop for Scheduler<'_> {
796    fn drop(&mut self) {
797        if let Some(struct_ops) = self.struct_ops.take() {
798            drop(struct_ops);
799        }
800    }
801}
802
803fn main() -> Result<()> {
804    let opts = Opts::parse();
805
806    if opts.version {
807        println!(
808            "scx_wd40: {}",
809            build_id::full_version(env!("CARGO_PKG_VERSION"))
810        );
811        return Ok(());
812    }
813
814    if opts.help_stats {
815        stats::server_data().describe_meta(&mut std::io::stdout(), None)?;
816        return Ok(());
817    }
818
819    let llv = match opts.verbose {
820        0 => simplelog::LevelFilter::Info,
821        1 => simplelog::LevelFilter::Debug,
822        _ => simplelog::LevelFilter::Trace,
823    };
824    let mut lcfg = simplelog::ConfigBuilder::new();
825    lcfg.set_time_offset_to_local()
826        .expect("Failed to set local time offset")
827        .set_time_level(simplelog::LevelFilter::Error)
828        .set_location_level(simplelog::LevelFilter::Off)
829        .set_target_level(simplelog::LevelFilter::Off)
830        .set_thread_level(simplelog::LevelFilter::Off);
831    simplelog::TermLogger::init(
832        llv,
833        lcfg.build(),
834        simplelog::TerminalMode::Stderr,
835        simplelog::ColorChoice::Auto,
836    )?;
837
838    let shutdown = Arc::new(AtomicBool::new(false));
839    let shutdown_clone = shutdown.clone();
840    ctrlc::set_handler(move || {
841        shutdown_clone.store(true, Ordering::Relaxed);
842    })
843    .context("Error setting Ctrl-C handler")?;
844
845    if let Some(intv) = opts.monitor.or(opts.stats) {
846        let shutdown_copy = shutdown.clone();
847        let jh = std::thread::spawn(move || {
848            stats::monitor(Duration::from_secs_f64(intv), shutdown_copy).unwrap()
849        });
850        if opts.monitor.is_some() {
851            let _ = jh.join();
852            return Ok(());
853        }
854    }
855
856    let mut open_object = MaybeUninit::uninit();
857    loop {
858        let mut sched = Scheduler::init(&opts, &mut open_object)?;
859        if !sched.run(shutdown.clone())?.should_restart() {
860            break;
861        }
862    }
863    Ok(())
864}