scx_mitosis/
main.rs

1// Copyright (c) Meta Platforms, Inc. and affiliates.
2
3// This software may be used and distributed according to the terms of the
4// GNU General Public License version 2.
5mod bpf_skel;
6pub use bpf_skel::*;
7pub mod bpf_intf;
8mod stats;
9
10use std::cmp::max;
11use std::collections::{HashMap, HashSet};
12use std::fmt;
13use std::fmt::Display;
14use std::mem::MaybeUninit;
15use std::sync::atomic::AtomicBool;
16use std::sync::atomic::Ordering;
17use std::sync::Arc;
18use std::time::Duration;
19
20use anyhow::bail;
21use anyhow::Context;
22use anyhow::Result;
23use clap::Parser;
24use crossbeam::channel::RecvTimeoutError;
25use libbpf_rs::MapCore as _;
26use libbpf_rs::OpenObject;
27use scx_stats::prelude::*;
28use scx_utils::build_id;
29use scx_utils::compat;
30use scx_utils::init_libbpf_logging;
31use scx_utils::libbpf_clap_opts::LibbpfOpts;
32use scx_utils::scx_enums;
33use scx_utils::scx_ops_attach;
34use scx_utils::scx_ops_load;
35use scx_utils::scx_ops_open;
36use scx_utils::uei_exited;
37use scx_utils::uei_report;
38use scx_utils::Cpumask;
39use scx_utils::Topology;
40use scx_utils::UserExitInfo;
41use scx_utils::NR_CPUS_POSSIBLE;
42use tracing::{debug, info, trace, warn};
43use tracing_subscriber::filter::EnvFilter;
44
45use stats::CellMetrics;
46use stats::Metrics;
47
48const SCHEDULER_NAME: &str = "scx_mitosis";
49const MAX_CELLS: usize = bpf_intf::consts_MAX_CELLS as usize;
50const NR_CSTATS: usize = bpf_intf::cell_stat_idx_NR_CSTATS as usize;
51
52/// scx_mitosis: A dynamic affinity scheduler
53///
54/// Cgroups are assigned to a dynamic number of Cells which are assigned to a
55/// dynamic set of CPUs. The BPF part does simple vtime scheduling for each cell.
56///
57/// Userspace makes the dynamic decisions of which Cells should be merged or
58/// split and which CPUs they should be assigned to.
59#[derive(Debug, Parser)]
60struct Opts {
61    /// Depricated, noop, use RUST_LOG or --log-level instead.
62    #[clap(short = 'v', long, action = clap::ArgAction::Count)]
63    verbose: u8,
64
65    /// Specify the logging level. Accepts rust's envfilter syntax for modular
66    /// logging: https://docs.rs/tracing-subscriber/latest/tracing_subscriber/filter/struct.EnvFilter.html#example-syntax. Examples: ["info", "warn,tokio=info"]
67    #[clap(long, default_value = "info")]
68    log_level: String,
69
70    /// Exit debug dump buffer length. 0 indicates default.
71    #[clap(long, default_value = "0")]
72    exit_dump_len: u32,
73
74    /// Interval to consider reconfiguring the Cells (e.g. merge or split)
75    #[clap(long, default_value = "10")]
76    reconfiguration_interval_s: u64,
77
78    /// Interval to consider rebalancing CPUs to Cells
79    #[clap(long, default_value = "5")]
80    rebalance_cpus_interval_s: u64,
81
82    /// Interval to report monitoring information
83    #[clap(long, default_value = "1")]
84    monitor_interval_s: u64,
85
86    /// Run in stats monitoring mode with the specified interval. Scheduler
87    /// is not launched.
88    #[clap(long)]
89    monitor: Option<f64>,
90
91    /// Print scheduler version and exit.
92    #[clap(short = 'V', long, action = clap::ArgAction::SetTrue)]
93    version: bool,
94
95    /// Optional run ID for tracking scheduler instances.
96    #[clap(long)]
97    run_id: Option<u64>,
98
99    /// Enable debug event tracking for cgroup_init, init_task, and cgroup_exit.
100    /// Events are recorded in a ring buffer and output in dump().
101    #[clap(long, action = clap::ArgAction::SetTrue)]
102    debug_events: bool,
103
104    /// Enable workaround for exiting tasks with offline cgroups during scheduler load.
105    /// This works around a kernel bug where tasks can be initialized with cgroups that
106    /// were never initialized. Disable this once the kernel bug is fixed.
107    #[clap(long, default_value = "true", action = clap::ArgAction::Set)]
108    exiting_task_workaround: bool,
109
110    /// Split vtime updates between running() and stopping() instead of unifying them in stopping().
111    /// Enabling this flag restores the legacy behavior of vtime updates, which we've observed to
112    /// cause "vtime too far ahead" errors.
113    #[clap(long, action = clap::ArgAction::SetTrue)]
114    split_vtime_updates: bool,
115
116    #[clap(flatten, next_help_heading = "Libbpf Options")]
117    pub libbpf: LibbpfOpts,
118}
119
120// The subset of cstats we care about.
121// Local + Default + Hi + Lo = Total Decisions
122// Affinity violations are not queue decisions, but
123// will be calculated separately and reported as a percent of the total
124const QUEUE_STATS_IDX: [bpf_intf::cell_stat_idx; 3] = [
125    bpf_intf::cell_stat_idx_CSTAT_LOCAL,
126    bpf_intf::cell_stat_idx_CSTAT_CPU_DSQ,
127    bpf_intf::cell_stat_idx_CSTAT_CELL_DSQ,
128];
129
130// Per cell book-keeping
131#[derive(Debug)]
132struct Cell {
133    cpus: Cpumask,
134}
135
136struct Scheduler<'a> {
137    skel: BpfSkel<'a>,
138    monitor_interval: Duration,
139    cells: HashMap<u32, Cell>,
140    // These are the per-cell cstats.
141    // Note these are accumulated across all CPUs.
142    prev_cell_stats: [[u64; NR_CSTATS]; MAX_CELLS],
143    metrics: Metrics,
144    stats_server: StatsServer<(), Metrics>,
145    last_configuration_seq: Option<u32>,
146}
147
148struct DistributionStats {
149    total_decisions: u64,
150    share_of_decisions_pct: f64,
151    local_q_pct: f64,
152    cpu_q_pct: f64,
153    cell_q_pct: f64,
154    affn_viol_pct: f64,
155
156    // for formatting
157    global_queue_decisions: u64,
158}
159
160impl Display for DistributionStats {
161    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
162        // This makes the output easier to read by improving column alignment. First, it guarantees that within a
163        // given logging interval, the global and cell queueing decision counts print at the same width.
164        // Second, it reduces variance in column width between logging intervals. 5 is simply a heuristic.
165        const MIN_DECISIONS_WIDTH: usize = 5;
166        let descisions_width = if self.global_queue_decisions > 0 {
167            max(
168                MIN_DECISIONS_WIDTH,
169                (self.global_queue_decisions as f64).log10().ceil() as usize,
170            )
171        } else {
172            MIN_DECISIONS_WIDTH
173        };
174        write!(
175            f,
176            "{:width$} {:5.1}% | Local:{:4.1}% From: CPU:{:4.1}% Cell:{:4.1}% | V:{:4.1}%",
177            self.total_decisions,
178            self.share_of_decisions_pct,
179            self.local_q_pct,
180            self.cpu_q_pct,
181            self.cell_q_pct,
182            self.affn_viol_pct,
183            width = descisions_width,
184        )
185    }
186}
187
188impl<'a> Scheduler<'a> {
189    fn init(opts: &Opts, open_object: &'a mut MaybeUninit<OpenObject>) -> Result<Self> {
190        let topology = Topology::new()?;
191
192        let mut skel_builder = BpfSkelBuilder::default();
193        skel_builder
194            .obj_builder
195            .debug(opts.log_level.contains("trace"));
196        init_libbpf_logging(None);
197        info!(
198            "Running scx_mitosis (build ID: {})",
199            build_id::full_version(env!("CARGO_PKG_VERSION"))
200        );
201
202        let open_opts = opts.libbpf.clone().into_bpf_open_opts();
203        let mut skel = scx_ops_open!(skel_builder, open_object, mitosis, open_opts)?;
204
205        skel.struct_ops.mitosis_mut().exit_dump_len = opts.exit_dump_len;
206
207        skel.maps.rodata_data.as_mut().unwrap().slice_ns = scx_enums.SCX_SLICE_DFL;
208        skel.maps.rodata_data.as_mut().unwrap().debug_events_enabled = opts.debug_events;
209        skel.maps
210            .rodata_data
211            .as_mut()
212            .unwrap()
213            .exiting_task_workaround_enabled = opts.exiting_task_workaround;
214        skel.maps.rodata_data.as_mut().unwrap().split_vtime_updates = opts.split_vtime_updates;
215
216        skel.maps.rodata_data.as_mut().unwrap().nr_possible_cpus = *NR_CPUS_POSSIBLE as u32;
217        for cpu in topology.all_cpus.keys() {
218            skel.maps.rodata_data.as_mut().unwrap().all_cpus[cpu / 8] |= 1 << (cpu % 8);
219        }
220
221        match *compat::SCX_OPS_ALLOW_QUEUED_WAKEUP {
222            0 => info!("Kernel does not support queued wakeup optimization."),
223            v => skel.struct_ops.mitosis_mut().flags |= v,
224        }
225
226        let skel = scx_ops_load!(skel, mitosis, uei)?;
227
228        let stats_server = StatsServer::new(stats::server_data()).launch()?;
229
230        Ok(Self {
231            skel,
232            monitor_interval: Duration::from_secs(opts.monitor_interval_s),
233            cells: HashMap::new(),
234            prev_cell_stats: [[0; NR_CSTATS]; MAX_CELLS],
235            metrics: Metrics::default(),
236            stats_server,
237            last_configuration_seq: None,
238        })
239    }
240
241    fn run(&mut self, shutdown: Arc<AtomicBool>) -> Result<UserExitInfo> {
242        let struct_ops = scx_ops_attach!(self.skel, mitosis)?;
243
244        info!("Mitosis Scheduler Attached. Run `scx_mitosis --monitor` for metrics.");
245
246        let (res_ch, req_ch) = self.stats_server.channels();
247
248        while !shutdown.load(Ordering::Relaxed) && !uei_exited!(&self.skel, uei) {
249            self.refresh_bpf_cells()?;
250            self.collect_metrics()?;
251
252            match req_ch.recv_timeout(self.monitor_interval) {
253                Ok(()) => res_ch.send(self.get_metrics())?,
254                Err(RecvTimeoutError::Timeout) => {}
255                Err(e) => Err(e)?,
256            }
257        }
258        drop(struct_ops);
259        info!("Unregister {SCHEDULER_NAME} scheduler");
260        uei_report!(&self.skel, uei)
261    }
262
263    fn get_metrics(&self) -> Metrics {
264        self.metrics.clone()
265    }
266
267    fn calculate_distribution_stats(
268        &self,
269        queue_counts: &[u64; QUEUE_STATS_IDX.len()],
270        global_queue_decisions: u64,
271        scope_queue_decisions: u64,
272        scope_affn_viols: u64,
273    ) -> Result<DistributionStats> {
274        // First % on the line: share of global work
275        // We know global_queue_decisions is non-zero.
276        let share_of_global =
277            100.0 * (scope_queue_decisions as f64) / (global_queue_decisions as f64);
278
279        // Each queue's % of the scope total
280        let queue_pct = if scope_queue_decisions == 0 {
281            debug!("No queue decisions in scope, zeroing out queue distribution");
282            [0.0; QUEUE_STATS_IDX.len()]
283        } else {
284            core::array::from_fn(|i| {
285                100.0 * (queue_counts[i] as f64) / (scope_queue_decisions as f64)
286            })
287        };
288
289        // These are summed differently for the global and per-cell totals.
290        let affinity_violations_percent = if scope_queue_decisions == 0 {
291            debug!("No queue decisions in scope, zeroing out affinity violations");
292            0.0
293        } else {
294            100.0 * (scope_affn_viols as f64) / (scope_queue_decisions as f64)
295        };
296
297        const EXPECTED_QUEUES: usize = 3;
298        if queue_pct.len() != EXPECTED_QUEUES {
299            bail!(
300                "Expected {} queues, got {}",
301                EXPECTED_QUEUES,
302                queue_pct.len()
303            );
304        }
305
306        return Ok(DistributionStats {
307            total_decisions: scope_queue_decisions,
308            share_of_decisions_pct: share_of_global,
309            local_q_pct: queue_pct[0],
310            cpu_q_pct: queue_pct[1],
311            cell_q_pct: queue_pct[2],
312            affn_viol_pct: affinity_violations_percent,
313            global_queue_decisions,
314        });
315    }
316
317    // Queue stats for the whole node
318    fn update_and_log_global_queue_stats(
319        &mut self,
320        global_queue_decisions: u64,
321        cell_stats_delta: &[[u64; NR_CSTATS]; MAX_CELLS],
322    ) -> Result<()> {
323        // Get total of each queue summed over all cells
324        let mut queue_counts = [0; QUEUE_STATS_IDX.len()];
325        for cells in 0..MAX_CELLS {
326            for (i, stat) in QUEUE_STATS_IDX.iter().enumerate() {
327                queue_counts[i] += cell_stats_delta[cells][*stat as usize];
328            }
329        }
330
331        let prefix = "Total Decisions:";
332
333        // Here we want to sum the affinity violations over all cells.
334        let scope_affn_viols: u64 = cell_stats_delta
335            .iter()
336            .map(|&cell| cell[bpf_intf::cell_stat_idx_CSTAT_AFFN_VIOL as usize])
337            .sum::<u64>();
338
339        // Special case where the number of scope decisions == number global decisions
340        let stats = self.calculate_distribution_stats(
341            &queue_counts,
342            global_queue_decisions,
343            global_queue_decisions,
344            scope_affn_viols,
345        )?;
346
347        self.metrics.update(&stats);
348
349        trace!("{} {}", prefix, stats);
350
351        Ok(())
352    }
353
354    // Print out the per-cell stats
355    fn update_and_log_cell_queue_stats(
356        &mut self,
357        global_queue_decisions: u64,
358        cell_stats_delta: &[[u64; NR_CSTATS]; MAX_CELLS],
359    ) -> Result<()> {
360        for cell in 0..MAX_CELLS {
361            let cell_queue_decisions = QUEUE_STATS_IDX
362                .iter()
363                .map(|&stat| cell_stats_delta[cell][stat as usize])
364                .sum::<u64>();
365
366            // FIXME: This should really query if the cell is enabled or not.
367            if cell_queue_decisions == 0 {
368                continue;
369            }
370
371            let mut queue_counts = [0; QUEUE_STATS_IDX.len()];
372            for (i, &stat) in QUEUE_STATS_IDX.iter().enumerate() {
373                queue_counts[i] = cell_stats_delta[cell][stat as usize];
374            }
375
376            const MIN_CELL_WIDTH: usize = 2;
377            let cell_width: usize = max(MIN_CELL_WIDTH, (MAX_CELLS as f64).log10().ceil() as usize);
378
379            let prefix = format!("        Cell {:width$}:", cell, width = cell_width);
380
381            // Sum affinity violations for this cell
382            let scope_affn_viols: u64 =
383                cell_stats_delta[cell][bpf_intf::cell_stat_idx_CSTAT_AFFN_VIOL as usize];
384
385            let stats = self.calculate_distribution_stats(
386                &queue_counts,
387                global_queue_decisions,
388                cell_queue_decisions,
389                scope_affn_viols,
390            )?;
391
392            self.metrics
393                .cells
394                .entry(cell as u32)
395                .or_default()
396                .update(&stats);
397
398            trace!("{} {}", prefix, stats);
399        }
400        Ok(())
401    }
402
403    fn log_all_queue_stats(
404        &mut self,
405        cell_stats_delta: &[[u64; NR_CSTATS]; MAX_CELLS],
406    ) -> Result<()> {
407        // Get total decisions
408        let global_queue_decisions: u64 = cell_stats_delta
409            .iter()
410            .flat_map(|cell| QUEUE_STATS_IDX.iter().map(|&idx| cell[idx as usize]))
411            .sum();
412
413        // We don't want to divide by zero later, but this is never expected.
414        if global_queue_decisions == 0 {
415            bail!("Error: No queueing decisions made globally");
416        }
417
418        self.update_and_log_global_queue_stats(global_queue_decisions, &cell_stats_delta)?;
419
420        self.update_and_log_cell_queue_stats(global_queue_decisions, &cell_stats_delta)?;
421
422        Ok(())
423    }
424
425    fn calculate_cell_stat_delta(&mut self) -> Result<[[u64; NR_CSTATS]; MAX_CELLS]> {
426        let mut cell_stats_delta = [[0 as u64; NR_CSTATS]; MAX_CELLS];
427
428        // Read CPU contexts
429        let cpu_ctxs = read_cpu_ctxs(&self.skel)?;
430
431        // Loop over cells and stats first, then CPU contexts
432        // TODO: We should loop over the in_use cells only.
433        for cell in 0..MAX_CELLS {
434            for stat in 0..NR_CSTATS {
435                let mut cur_cell_stat = 0;
436
437                // Accumulate stats from all CPUs
438                for cpu_ctx in cpu_ctxs.iter() {
439                    cur_cell_stat += cpu_ctx.cstats[cell][stat];
440                }
441
442                // Calculate delta and update previous stat
443                cell_stats_delta[cell][stat] = cur_cell_stat - self.prev_cell_stats[cell][stat];
444                self.prev_cell_stats[cell][stat] = cur_cell_stat;
445            }
446        }
447        Ok(cell_stats_delta)
448    }
449
450    /// Collect metrics and out various debugging data like per cell stats, per-cpu stats, etc.
451    fn collect_metrics(&mut self) -> Result<()> {
452        let cell_stats_delta = self.calculate_cell_stat_delta()?;
453
454        self.log_all_queue_stats(&cell_stats_delta)?;
455
456        for (cell_id, cell) in &self.cells {
457            trace!("CELL[{}]: {}", cell_id, cell.cpus);
458        }
459
460        for (cell_id, cell) in self.cells.iter() {
461            // Assume we have a CellMetrics entry if we have a known cell
462            self.metrics
463                .cells
464                .entry(*cell_id)
465                .and_modify(|cell_metrics| cell_metrics.num_cpus = cell.cpus.weight() as u32);
466        }
467        self.metrics.num_cells = self.cells.len() as u32;
468
469        Ok(())
470    }
471
472    fn refresh_bpf_cells(&mut self) -> Result<()> {
473        let applied_configuration = unsafe {
474            let ptr = &self
475                .skel
476                .maps
477                .bss_data
478                .as_ref()
479                .unwrap()
480                .applied_configuration_seq as *const u32;
481            (ptr as *const std::sync::atomic::AtomicU32)
482                .as_ref()
483                .unwrap()
484                .load(std::sync::atomic::Ordering::Acquire)
485        };
486        if self
487            .last_configuration_seq
488            .is_some_and(|seq| applied_configuration == seq)
489        {
490            return Ok(());
491        }
492        // collect all cpus per cell.
493        let mut cell_to_cpus: HashMap<u32, Cpumask> = HashMap::new();
494        let cpu_ctxs = read_cpu_ctxs(&self.skel)?;
495        for (i, cpu_ctx) in cpu_ctxs.iter().enumerate() {
496            cell_to_cpus
497                .entry(cpu_ctx.cell)
498                .or_insert_with(|| Cpumask::new())
499                .set_cpu(i)
500                .expect("set cpu in existing mask");
501        }
502
503        // Create cells we don't have yet, drop cells that are no longer in use.
504        // If we continue to drop cell metrics once a cell is removed, we'll need to make sure we
505        // flush metrics for a cell before we remove it completely.
506        //
507        // IMPORTANT: We determine which cells exist based on CPU assignments (which are
508        // synchronized by applied_configuration_seq), NOT by reading the in_use field
509        // separately. This avoids a TOCTOU race where a cell's in_use is set before
510        // CPUs are assigned.
511
512        // Cell 0 (root cell) always exists even if it has no CPUs temporarily
513        let cells_with_cpus: HashSet<u32> = cell_to_cpus.keys().copied().collect();
514        let mut active_cells = cells_with_cpus.clone();
515        active_cells.insert(0);
516
517        for cell_idx in &active_cells {
518            let cpus = cell_to_cpus
519                .get(cell_idx)
520                .cloned()
521                .unwrap_or_else(|| Cpumask::new());
522            self.cells
523                .entry(*cell_idx)
524                .or_insert_with(|| Cell {
525                    cpus: Cpumask::new(),
526                })
527                .cpus = cpus;
528            self.metrics.cells.insert(*cell_idx, CellMetrics::default());
529        }
530
531        // Remove cells that no longer have CPUs assigned
532        self.cells.retain(|&k, _| active_cells.contains(&k));
533        self.metrics.cells.retain(|&k, _| active_cells.contains(&k));
534
535        self.last_configuration_seq = Some(applied_configuration);
536
537        Ok(())
538    }
539}
540
541fn read_cpu_ctxs(skel: &BpfSkel) -> Result<Vec<bpf_intf::cpu_ctx>> {
542    let mut cpu_ctxs = vec![];
543    let cpu_ctxs_vec = skel
544        .maps
545        .cpu_ctxs
546        .lookup_percpu(&0u32.to_ne_bytes(), libbpf_rs::MapFlags::ANY)
547        .context("Failed to lookup cpu_ctx")?
548        .unwrap();
549    if cpu_ctxs_vec.len() < *NR_CPUS_POSSIBLE {
550        bail!(
551            "Percpu map returned {} entries but expected {}",
552            cpu_ctxs_vec.len(),
553            *NR_CPUS_POSSIBLE
554        );
555    }
556    for cpu in 0..*NR_CPUS_POSSIBLE {
557        cpu_ctxs.push(*unsafe {
558            &*(cpu_ctxs_vec[cpu].as_slice().as_ptr() as *const bpf_intf::cpu_ctx)
559        });
560    }
561    Ok(cpu_ctxs)
562}
563
564#[clap_main::clap_main]
565fn main(opts: Opts) -> Result<()> {
566    if opts.version {
567        println!(
568            "scx_mitosis {}",
569            build_id::full_version(env!("CARGO_PKG_VERSION"))
570        );
571        return Ok(());
572    }
573
574    let env_filter = EnvFilter::try_from_default_env()
575        .or_else(|_| match EnvFilter::try_new(&opts.log_level) {
576            Ok(filter) => Ok(filter),
577            Err(e) => {
578                eprintln!(
579                    "invalid log envvar: {}, using info, err is: {}",
580                    opts.log_level, e
581                );
582                EnvFilter::try_new("info")
583            }
584        })
585        .unwrap_or_else(|_| EnvFilter::new("info"));
586
587    match tracing_subscriber::fmt()
588        .with_env_filter(env_filter)
589        .with_target(true)
590        .with_thread_ids(true)
591        .with_file(true)
592        .with_line_number(true)
593        .try_init()
594    {
595        Ok(()) => {}
596        Err(e) => eprintln!("failed to init logger: {}", e),
597    }
598
599    if opts.verbose > 0 {
600        warn!("Setting verbose via -v is depricated and will be an error in future releases.");
601    }
602
603    debug!("opts={:?}", &opts);
604
605    if let Some(run_id) = opts.run_id {
606        info!("scx_mitosis run_id: {}", run_id);
607    }
608
609    let shutdown = Arc::new(AtomicBool::new(false));
610    let shutdown_clone = shutdown.clone();
611    ctrlc::set_handler(move || {
612        shutdown_clone.store(true, Ordering::Relaxed);
613    })
614    .context("Error setting Ctrl-C handler")?;
615
616    if let Some(intv) = opts.monitor {
617        let shutdown_clone = shutdown.clone();
618        let jh = std::thread::spawn(move || {
619            match stats::monitor(Duration::from_secs_f64(intv), shutdown_clone) {
620                Ok(_) => {
621                    debug!("stats monitor thread finished successfully")
622                }
623                Err(error_object) => {
624                    warn!(
625                        "stats monitor thread finished because of an error {}",
626                        error_object
627                    )
628                }
629            }
630        });
631        if opts.monitor.is_some() {
632            let _ = jh.join();
633            return Ok(());
634        }
635    }
636
637    let mut open_object = MaybeUninit::uninit();
638    loop {
639        let mut sched = Scheduler::init(&opts, &mut open_object)?;
640        if !sched.run(shutdown.clone())?.should_restart() {
641            break;
642        }
643    }
644
645    Ok(())
646}