scx_rusty/
main.rs

1// Copyright (c) Meta Platforms, Inc. and affiliates.
2
3// This software may be used and distributed according to the terms of the
4// GNU General Public License version 2.
5mod bpf_skel;
6pub use bpf_skel::*;
7pub mod bpf_intf;
8
9mod domain;
10use domain::DomainGroup;
11
12pub mod tuner;
13use tuner::Tuner;
14
15pub mod load_balance;
16use load_balance::LoadBalancer;
17
18mod stats;
19use std::collections::BTreeMap;
20use std::mem::MaybeUninit;
21use std::sync::Arc;
22use std::sync::atomic::AtomicBool;
23use std::sync::atomic::Ordering;
24use std::time::Duration;
25use std::time::Instant;
26use std::time::SystemTime;
27use std::time::UNIX_EPOCH;
28
29use stats::ClusterStats;
30use stats::NodeStats;
31
32#[macro_use]
33extern crate static_assertions;
34
35use ::fb_procfs as procfs;
36use anyhow::Context;
37use anyhow::Result;
38use anyhow::anyhow;
39use anyhow::bail;
40use clap::Parser;
41use crossbeam::channel::RecvTimeoutError;
42use libbpf_rs::MapCore as _;
43use libbpf_rs::OpenObject;
44use log::info;
45use scx_stats::prelude::*;
46use scx_utils::Cpumask;
47use scx_utils::NR_CPU_IDS;
48use scx_utils::Topology;
49use scx_utils::UserExitInfo;
50use scx_utils::build_id;
51use scx_utils::compat;
52use scx_utils::init_libbpf_logging;
53use scx_utils::scx_enums;
54use scx_utils::scx_ops_attach;
55use scx_utils::scx_ops_load;
56use scx_utils::scx_ops_open;
57use scx_utils::uei_exited;
58use scx_utils::uei_report;
59
60const MAX_DOMS: usize = bpf_intf::consts_MAX_DOMS as usize;
61const MAX_CPUS: usize = bpf_intf::consts_MAX_CPUS as usize;
62
63/// scx_rusty: A multi-domain BPF / userspace hybrid scheduler
64///
65/// The BPF part does simple vtime or round robin scheduling in each domain
66/// while tracking average load of each domain and duty cycle of each task.
67///
68/// The userspace part performs two roles. First, it makes higher frequency
69/// (100ms) tuning decisions. It identifies CPUs which are not too heavily
70/// loaded and marks them so that they can pull tasks from other overloaded
71/// domains on the fly.
72///
73/// Second, it drives lower frequency (2s) load balancing. It determines
74/// whether load balancing is necessary by comparing domain load averages.
75/// If there are large enough load differences, it examines upto 1024
76/// recently active tasks on the domain to determine which should be
77/// migrated.
78///
79/// The overhead of userspace operations is low. Load balancing is not
80/// performed frequently, but work-conservation is still maintained through
81/// tuning and greedy execution. Load balancing itself is not that expensive
82/// either. It only accesses per-domain load metrics to determine the domains
83/// that need load balancing, as well as limited number of per-task metrics
84/// for each pushing domain.
85///
86/// An earlier variant of this scheduler was used to balance across six
87/// domains, each representing a chiplet in a six-chiplet AMD processor, and
88/// could match the performance of production setup using CFS.
89///
90/// WARNING: scx_rusty currently assumes that all domains have equal
91/// processing power and at similar distances from each other. This
92/// limitation will be removed in the future.
93#[derive(Debug, Parser)]
94struct Opts {
95    /// Scheduling slice duration for under-utilized hosts, in microseconds.
96    #[clap(short = 'u', long, default_value = "20000")]
97    slice_us_underutil: u64,
98
99    /// Scheduling slice duration for over-utilized hosts, in microseconds.
100    #[clap(short = 'o', long, default_value = "1000")]
101    slice_us_overutil: u64,
102
103    /// Load balance interval in seconds.
104    #[clap(short = 'i', long, default_value = "2.0")]
105    interval: f64,
106
107    /// The tuner runs at a higher frequency than the load balancer to dynamically
108    /// tune scheduling behavior. Tuning interval in seconds.
109    #[clap(short = 'I', long, default_value = "0.1")]
110    tune_interval: f64,
111
112    /// The half-life of task and domain load running averages in seconds.
113    #[clap(short = 'l', long, default_value = "1.0")]
114    load_half_life: f64,
115
116    /// Build domains according to how CPUs are grouped at this cache level
117    /// as determined by /sys/devices/system/cpu/cpuX/cache/indexI/id.
118    #[clap(short = 'c', long, default_value = "3")]
119    cache_level: u32,
120
121    /// Instead of using cache locality, set the cpumask for each domain
122    /// manually. Provide multiple --cpumasks, one for each domain. E.g.
123    /// --cpumasks 0xff_00ff --cpumasks 0xff00 will create two domains, with
124    /// the corresponding CPUs belonging to each domain. Each CPU must
125    /// belong to precisely one domain.
126    #[clap(short = 'C', long, num_args = 1.., conflicts_with = "cache_level")]
127    cpumasks: Vec<String>,
128
129    /// When non-zero, enable greedy task stealing. When a domain is idle, a cpu
130    /// will attempt to steal tasks from another domain as follows:
131    ///
132    /// 1. Try to consume a task from the current domain
133    /// 2. Try to consume a task from another domain in the current NUMA node
134    ///    (or globally, if running on a single-socket system), if the domain
135    ///    has at least this specified number of tasks enqueued.
136    ///
137    /// See greedy_threshold_x_numa to enable task stealing across NUMA nodes.
138    /// Tasks stolen in this manner are not permanently stolen from their
139    /// domain.
140    #[clap(short = 'g', long, default_value = "1")]
141    greedy_threshold: u32,
142
143    /// When non-zero, enable greedy task stealing across NUMA nodes. The order
144    /// of greedy task stealing follows greedy-threshold as described above, and
145    /// greedy-threshold must be nonzero to enable task stealing across NUMA
146    /// nodes.
147    #[clap(long, default_value = "0")]
148    greedy_threshold_x_numa: u32,
149
150    /// Disable load balancing. Unless disabled, userspace will periodically calculate
151    /// the load factor of each domain and instruct BPF which processes to move.
152    #[clap(long, action = clap::ArgAction::SetTrue)]
153    no_load_balance: bool,
154
155    /// Put per-cpu kthreads directly into local dsq's.
156    #[clap(short = 'k', long, action = clap::ArgAction::SetTrue)]
157    kthreads_local: bool,
158
159    /// In recent kernels (>=v6.6), the kernel is responsible for balancing
160    /// kworkers across L3 cache domains. Exclude them from load-balancing
161    /// to avoid conflicting operations. Greedy executions still apply.
162    #[clap(short = 'b', long, action = clap::ArgAction::SetTrue)]
163    balanced_kworkers: bool,
164
165    /// Use FIFO scheduling instead of weighted vtime scheduling.
166    #[clap(short = 'f', long, action = clap::ArgAction::SetTrue)]
167    fifo_sched: bool,
168
169    /// Idle CPUs with utilization lower than this will get remote tasks
170    /// directly pushed onto them. 0 disables, 100 always enables.
171    #[clap(short = 'D', long, default_value = "90.0")]
172    direct_greedy_under: f64,
173
174    /// Idle CPUs with utilization lower than this may get kicked to
175    /// accelerate stealing when a task is queued on a saturated remote
176    /// domain. 0 disables, 100 enables always.
177    #[clap(short = 'K', long, default_value = "100.0")]
178    kick_greedy_under: f64,
179
180    /// Whether tasks can be pushed directly to idle CPUs on NUMA nodes
181    /// different than their domain's node. If direct-greedy-under is disabled,
182    /// this option is a no-op. Otherwise, if this option is set to false
183    /// (default), tasks will only be directly pushed to idle CPUs if they
184    /// reside on the same NUMA node as the task's domain.
185    #[clap(short = 'r', long, action = clap::ArgAction::SetTrue)]
186    direct_greedy_numa: bool,
187
188    /// If specified, only tasks which have their scheduling policy set to
189    /// SCHED_EXT using sched_setscheduler(2) are switched. Otherwise, all
190    /// tasks are switched.
191    #[clap(short = 'p', long, action = clap::ArgAction::SetTrue)]
192    partial: bool,
193
194    /// Enables soft NUMA affinity for tasks that use set_mempolicy. This
195    /// may improve performance in some scenarios when using mempolicies.
196    #[clap(long, action = clap::ArgAction::SetTrue)]
197    mempolicy_affinity: bool,
198
199    /// Enable stats monitoring with the specified interval.
200    #[clap(long)]
201    stats: Option<f64>,
202
203    /// Run in stats monitoring mode with the specified interval. The scheduler
204    /// is not launched.
205    #[clap(long)]
206    monitor: Option<f64>,
207
208    /// Exit debug dump buffer length. 0 indicates default.
209    #[clap(long, default_value = "0")]
210    exit_dump_len: u32,
211
212    /// Enable verbose output, including libbpf details. Specify multiple
213    /// times to increase verbosity.
214    #[clap(short = 'v', long, action = clap::ArgAction::Count)]
215    verbose: u8,
216
217    /// Print version and exit.
218    #[clap(long)]
219    version: bool,
220
221    /// Show descriptions for statistics.
222    #[clap(long)]
223    help_stats: bool,
224
225    /// Tunable for prioritizing CPU performance by configuring the CPU frequency governor.
226    /// Valid values are [0, 1024]. Higher values prioritize performance, lower values
227    /// prioritize energy efficiency. When in doubt, use 0 or 1024.
228    #[clap(long, default_value = "0")]
229    perf: u32,
230}
231
232fn read_cpu_busy_and_total(reader: &procfs::ProcReader) -> Result<(u64, u64)> {
233    let cs = reader
234        .read_stat()
235        .context("Failed to read procfs")?
236        .total_cpu
237        .ok_or_else(|| anyhow!("Could not read total cpu stat in proc"))?;
238
239    Ok(match cs {
240        procfs::CpuStat {
241            user_usec: Some(user),
242            nice_usec: Some(nice),
243            system_usec: Some(system),
244            idle_usec: Some(idle),
245            iowait_usec: Some(iowait),
246            irq_usec: Some(irq),
247            softirq_usec: Some(softirq),
248            stolen_usec: Some(stolen),
249            guest_usec: _,
250            guest_nice_usec: _,
251        } => {
252            let busy = user + system + nice + irq + softirq + stolen;
253            let total = busy + idle + iowait;
254            (busy, total)
255        }
256        _ => bail!("Some procfs stats are not populated!"),
257    })
258}
259
260pub fn sub_or_zero(curr: &u64, prev: &u64) -> u64 {
261    curr.checked_sub(*prev).unwrap_or(0u64)
262}
263
264#[derive(Clone, Debug)]
265struct StatsCtx {
266    cpu_busy: u64,
267    cpu_total: u64,
268    bpf_stats: Vec<u64>,
269    time_used: Duration,
270}
271
272impl StatsCtx {
273    fn read_bpf_stats(skel: &BpfSkel) -> Result<Vec<u64>> {
274        let stats_map = &skel.maps.stats;
275        let mut stats: Vec<u64> = Vec::new();
276
277        for stat in 0..bpf_intf::stat_idx_RUSTY_NR_STATS {
278            let cpu_stat_vec = stats_map
279                .lookup_percpu(&stat.to_ne_bytes(), libbpf_rs::MapFlags::ANY)
280                .with_context(|| format!("Failed to lookup stat {}", stat))?
281                .expect("per-cpu stat should exist");
282            let sum = cpu_stat_vec
283                .iter()
284                .map(|val| {
285                    u64::from_ne_bytes(
286                        val.as_slice()
287                            .try_into()
288                            .expect("Invalid value length in stat map"),
289                    )
290                })
291                .sum();
292            stats.push(sum);
293        }
294        Ok(stats)
295    }
296
297    fn blank() -> Self {
298        Self {
299            cpu_busy: 0,
300            cpu_total: 0,
301            bpf_stats: vec![0u64; bpf_intf::stat_idx_RUSTY_NR_STATS as usize],
302            time_used: Duration::default(),
303        }
304    }
305
306    fn new(skel: &BpfSkel, proc_reader: &procfs::ProcReader, time_used: Duration) -> Result<Self> {
307        let (cpu_busy, cpu_total) = read_cpu_busy_and_total(proc_reader)?;
308
309        Ok(Self {
310            cpu_busy,
311            cpu_total,
312            bpf_stats: Self::read_bpf_stats(skel)?,
313            time_used,
314        })
315    }
316
317    fn delta(&self, rhs: &Self) -> Self {
318        Self {
319            cpu_busy: sub_or_zero(&self.cpu_busy, &rhs.cpu_busy),
320            cpu_total: sub_or_zero(&self.cpu_total, &rhs.cpu_total),
321            bpf_stats: self
322                .bpf_stats
323                .iter()
324                .zip(rhs.bpf_stats.iter())
325                .map(|(lhs, rhs)| sub_or_zero(&lhs, &rhs))
326                .collect(),
327            time_used: self.time_used - rhs.time_used,
328        }
329    }
330}
331
332struct Scheduler<'a> {
333    skel: BpfSkel<'a>,
334    struct_ops: Option<libbpf_rs::Link>,
335
336    sched_interval: Duration,
337    tune_interval: Duration,
338    balance_load: bool,
339    balanced_kworkers: bool,
340
341    dom_group: Arc<DomainGroup>,
342
343    proc_reader: procfs::ProcReader,
344
345    lb_at: SystemTime,
346    lb_stats: BTreeMap<usize, NodeStats>,
347    time_used: Duration,
348
349    tuner: Tuner,
350    stats_server: StatsServer<StatsCtx, (StatsCtx, ClusterStats)>,
351}
352
353impl<'a> Scheduler<'a> {
354    fn init(opts: &Opts, open_object: &'a mut MaybeUninit<OpenObject>) -> Result<Self> {
355        // Open the BPF prog first for verification.
356        let mut skel_builder = BpfSkelBuilder::default();
357        skel_builder.obj_builder.debug(opts.verbose > 0);
358        init_libbpf_logging(None);
359        info!(
360            "Running scx_rusty (build ID: {})",
361            build_id::full_version(env!("CARGO_PKG_VERSION"))
362        );
363        let mut skel = scx_ops_open!(skel_builder, open_object, rusty).unwrap();
364
365        // Initialize skel according to @opts.
366        let domains = Arc::new(DomainGroup::new(&Topology::new()?, &opts.cpumasks)?);
367
368        if *NR_CPU_IDS > MAX_CPUS {
369            bail!(
370                "Num possible CPU IDs ({}) exceeds maximum of ({})",
371                *NR_CPU_IDS,
372                MAX_CPUS
373            );
374        }
375
376        if domains.nr_doms() > MAX_DOMS {
377            bail!(
378                "nr_doms ({}) is greater than MAX_DOMS ({})",
379                domains.nr_doms(),
380                MAX_DOMS
381            );
382        }
383
384        skel.maps.bss_data.slice_ns = scx_enums.SCX_SLICE_DFL;
385
386        skel.maps.rodata_data.nr_nodes = domains.nr_nodes() as u32;
387        skel.maps.rodata_data.nr_doms = domains.nr_doms() as u32;
388        skel.maps.rodata_data.nr_cpu_ids = *NR_CPU_IDS as u32;
389
390        // Any CPU with dom > MAX_DOMS is considered offline by default. There
391        // are a few places in the BPF code where we skip over offlined CPUs
392        // (e.g. when initializing or refreshing tune params), and elsewhere the
393        // scheduler will error if we try to schedule from them.
394        for cpu in 0..*NR_CPU_IDS {
395            skel.maps.rodata_data.cpu_dom_id_map[cpu] = u32::MAX;
396        }
397
398        for (id, dom) in domains.doms().iter() {
399            for cpu in dom.mask().iter() {
400                skel.maps.rodata_data.cpu_dom_id_map[cpu] = *id as u32;
401            }
402        }
403
404        for numa in 0..domains.nr_nodes() {
405            let mut numa_mask = Cpumask::new();
406            let node_domains = domains.numa_doms(&numa);
407            for dom in node_domains.iter() {
408                let dom_mask = dom.mask();
409                numa_mask = numa_mask.or(&dom_mask);
410            }
411
412            let raw_numa_slice = numa_mask.as_raw_slice();
413            let node_cpumask_slice = &mut skel.maps.rodata_data.numa_cpumasks[numa];
414            let (left, _) = node_cpumask_slice.split_at_mut(raw_numa_slice.len());
415            left.clone_from_slice(raw_numa_slice);
416            info!("NODE[{:02}] mask= {}", numa, numa_mask);
417
418            for dom in node_domains.iter() {
419                let raw_dom_slice = dom.mask_slice();
420                let dom_cpumask_slice = &mut skel.maps.rodata_data.dom_cpumasks[dom.id()];
421                let (left, _) = dom_cpumask_slice.split_at_mut(raw_dom_slice.len());
422                left.clone_from_slice(raw_dom_slice);
423                skel.maps.rodata_data.dom_numa_id_map[dom.id()] =
424                    numa.try_into().expect("NUMA ID could not fit into 32 bits");
425
426                info!(" DOM[{:02}] mask= {}", dom.id(), dom.mask());
427            }
428        }
429
430        if opts.partial {
431            skel.struct_ops.rusty_mut().flags |= *compat::SCX_OPS_SWITCH_PARTIAL;
432        }
433        skel.struct_ops.rusty_mut().exit_dump_len = opts.exit_dump_len;
434
435        skel.maps.rodata_data.load_half_life = (opts.load_half_life * 1000000000.0) as u32;
436        skel.maps.rodata_data.kthreads_local = opts.kthreads_local;
437        skel.maps.rodata_data.fifo_sched = opts.fifo_sched;
438        skel.maps.rodata_data.greedy_threshold = opts.greedy_threshold;
439        skel.maps.rodata_data.greedy_threshold_x_numa = opts.greedy_threshold_x_numa;
440        skel.maps.rodata_data.direct_greedy_numa = opts.direct_greedy_numa;
441        skel.maps.rodata_data.mempolicy_affinity = opts.mempolicy_affinity;
442        skel.maps.rodata_data.debug = opts.verbose as u32;
443        skel.maps.rodata_data.rusty_perf_mode = opts.perf;
444
445        // Attach.
446        let mut skel = scx_ops_load!(skel, rusty, uei)?;
447        let struct_ops = Some(scx_ops_attach!(skel, rusty)?);
448        let stats_server = StatsServer::new(stats::server_data()).launch()?;
449
450        for (id, dom) in domains.doms().iter() {
451            let mut ctx = dom.ctx.lock().unwrap();
452
453            *ctx = Some(skel.maps.bss_data.dom_ctxs[*id]);
454        }
455
456        info!("Rusty scheduler started! Run `scx_rusty --monitor` for metrics.");
457
458        // Other stuff.
459        let proc_reader = procfs::ProcReader::new();
460
461        Ok(Self {
462            skel,
463            struct_ops, // should be held to keep it attached
464
465            sched_interval: Duration::from_secs_f64(opts.interval),
466            tune_interval: Duration::from_secs_f64(opts.tune_interval),
467            balance_load: !opts.no_load_balance,
468            balanced_kworkers: opts.balanced_kworkers,
469
470            dom_group: domains.clone(),
471            proc_reader,
472
473            lb_at: SystemTime::now(),
474            lb_stats: BTreeMap::new(),
475            time_used: Duration::default(),
476
477            tuner: Tuner::new(
478                domains,
479                opts.direct_greedy_under,
480                opts.kick_greedy_under,
481                opts.slice_us_underutil * 1000,
482                opts.slice_us_overutil * 1000,
483            )?,
484            stats_server,
485        })
486    }
487
488    fn cluster_stats(&self, sc: &StatsCtx, node_stats: BTreeMap<usize, NodeStats>) -> ClusterStats {
489        let stat = |idx| sc.bpf_stats[idx as usize];
490        let total = stat(bpf_intf::stat_idx_RUSTY_STAT_WAKE_SYNC)
491            + stat(bpf_intf::stat_idx_RUSTY_STAT_SYNC_PREV_IDLE)
492            + stat(bpf_intf::stat_idx_RUSTY_STAT_PREV_IDLE)
493            + stat(bpf_intf::stat_idx_RUSTY_STAT_GREEDY_IDLE)
494            + stat(bpf_intf::stat_idx_RUSTY_STAT_PINNED)
495            + stat(bpf_intf::stat_idx_RUSTY_STAT_DIRECT_DISPATCH)
496            + stat(bpf_intf::stat_idx_RUSTY_STAT_DIRECT_GREEDY)
497            + stat(bpf_intf::stat_idx_RUSTY_STAT_DIRECT_GREEDY_FAR)
498            + stat(bpf_intf::stat_idx_RUSTY_STAT_DSQ_DISPATCH)
499            + stat(bpf_intf::stat_idx_RUSTY_STAT_GREEDY_LOCAL)
500            + stat(bpf_intf::stat_idx_RUSTY_STAT_GREEDY_XNUMA);
501        let stat_pct = |idx| stat(idx) as f64 / total as f64 * 100.0;
502
503        let cpu_busy = if sc.cpu_total != 0 {
504            (sc.cpu_busy as f64 / sc.cpu_total as f64) * 100.0
505        } else {
506            0.0
507        };
508
509        ClusterStats {
510            at_us: SystemTime::now()
511                .duration_since(UNIX_EPOCH)
512                .unwrap()
513                .as_micros()
514                .try_into()
515                .unwrap(),
516            lb_at_us: self
517                .lb_at
518                .duration_since(UNIX_EPOCH)
519                .unwrap()
520                .as_micros()
521                .try_into()
522                .unwrap(),
523            total,
524            slice_us: self.tuner.slice_ns / 1000,
525
526            cpu_busy,
527            load: node_stats.iter().map(|(_k, v)| v.load).sum::<f64>(),
528            nr_migrations: sc.bpf_stats[bpf_intf::stat_idx_RUSTY_STAT_LOAD_BALANCE as usize],
529
530            task_get_err: sc.bpf_stats[bpf_intf::stat_idx_RUSTY_STAT_TASK_GET_ERR as usize],
531            time_used: sc.time_used.as_secs_f64(),
532
533            sync_prev_idle: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_SYNC_PREV_IDLE),
534            wake_sync: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_WAKE_SYNC),
535            prev_idle: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_PREV_IDLE),
536            greedy_idle: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_GREEDY_IDLE),
537            pinned: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_PINNED),
538            direct: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_DIRECT_DISPATCH),
539            greedy: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_DIRECT_GREEDY),
540            greedy_far: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_DIRECT_GREEDY_FAR),
541            dsq_dispatch: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_DSQ_DISPATCH),
542            greedy_local: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_GREEDY_LOCAL),
543            greedy_xnuma: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_GREEDY_XNUMA),
544            kick_greedy: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_KICK_GREEDY),
545            repatriate: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_REPATRIATE),
546            dl_clamp: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_DL_CLAMP),
547            dl_preset: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_DL_PRESET),
548
549            direct_greedy_cpus: self.tuner.direct_greedy_mask.as_raw_slice().to_owned(),
550            kick_greedy_cpus: self.tuner.kick_greedy_mask.as_raw_slice().to_owned(),
551
552            nodes: node_stats,
553        }
554    }
555
556    fn lb_step(&mut self) -> Result<()> {
557        let mut lb = LoadBalancer::new(
558            &mut self.skel,
559            self.dom_group.clone(),
560            self.balanced_kworkers,
561            self.tuner.fully_utilized,
562            self.balance_load,
563        );
564
565        lb.load_balance()?;
566
567        self.lb_at = SystemTime::now();
568        self.lb_stats = lb.get_stats();
569        Ok(())
570    }
571
572    fn run(&mut self, shutdown: Arc<AtomicBool>) -> Result<UserExitInfo> {
573        let (res_ch, req_ch) = self.stats_server.channels();
574        let now = Instant::now();
575        let mut next_tune_at = now + self.tune_interval;
576        let mut next_sched_at = now + self.sched_interval;
577
578        self.skel.maps.stats.value_size() as usize;
579
580        while !shutdown.load(Ordering::Relaxed) && !uei_exited!(&self.skel, uei) {
581            let now = Instant::now();
582
583            if now >= next_tune_at {
584                self.tuner.step(&mut self.skel)?;
585                next_tune_at += self.tune_interval;
586                if next_tune_at < now {
587                    next_tune_at = now + self.tune_interval;
588                }
589            }
590
591            if now >= next_sched_at {
592                self.lb_step()?;
593                next_sched_at += self.sched_interval;
594                if next_sched_at < now {
595                    next_sched_at = now + self.sched_interval;
596                }
597            }
598
599            self.time_used += Instant::now().duration_since(now);
600
601            match req_ch.recv_deadline(next_sched_at.min(next_tune_at)) {
602                Ok(prev_sc) => {
603                    let cur_sc = StatsCtx::new(&self.skel, &self.proc_reader, self.time_used)?;
604                    let delta_sc = cur_sc.delta(&prev_sc);
605                    let cstats = self.cluster_stats(&delta_sc, self.lb_stats.clone());
606                    res_ch.send((cur_sc, cstats))?;
607                }
608                Err(RecvTimeoutError::Timeout) => {}
609                Err(e) => Err(e)?,
610            }
611        }
612
613        self.struct_ops.take();
614        uei_report!(&self.skel, uei)
615    }
616}
617
618impl Drop for Scheduler<'_> {
619    fn drop(&mut self) {
620        if let Some(struct_ops) = self.struct_ops.take() {
621            drop(struct_ops);
622        }
623    }
624}
625
626fn main() -> Result<()> {
627    let opts = Opts::parse();
628
629    if opts.version {
630        println!(
631            "scx_rusty: {}",
632            build_id::full_version(env!("CARGO_PKG_VERSION"))
633        );
634        return Ok(());
635    }
636
637    if opts.help_stats {
638        stats::server_data().describe_meta(&mut std::io::stdout(), None)?;
639        return Ok(());
640    }
641
642    let llv = match opts.verbose {
643        0 => simplelog::LevelFilter::Info,
644        1 => simplelog::LevelFilter::Debug,
645        _ => simplelog::LevelFilter::Trace,
646    };
647    let mut lcfg = simplelog::ConfigBuilder::new();
648    lcfg.set_time_level(simplelog::LevelFilter::Error)
649        .set_location_level(simplelog::LevelFilter::Off)
650        .set_target_level(simplelog::LevelFilter::Off)
651        .set_thread_level(simplelog::LevelFilter::Off);
652    simplelog::TermLogger::init(
653        llv,
654        lcfg.build(),
655        simplelog::TerminalMode::Stderr,
656        simplelog::ColorChoice::Auto,
657    )?;
658
659    let shutdown = Arc::new(AtomicBool::new(false));
660    let shutdown_clone = shutdown.clone();
661    ctrlc::set_handler(move || {
662        shutdown_clone.store(true, Ordering::Relaxed);
663    })
664    .context("Error setting Ctrl-C handler")?;
665
666    if let Some(intv) = opts.monitor.or(opts.stats) {
667        let shutdown_copy = shutdown.clone();
668        let jh = std::thread::spawn(move || {
669            stats::monitor(Duration::from_secs_f64(intv), shutdown_copy).unwrap()
670        });
671        if opts.monitor.is_some() {
672            let _ = jh.join();
673            return Ok(());
674        }
675    }
676
677    let mut open_object = MaybeUninit::uninit();
678    loop {
679        let mut sched = Scheduler::init(&opts, &mut open_object)?;
680        if !sched.run(shutdown.clone())?.should_restart() {
681            break;
682        }
683    }
684    Ok(())
685}