scx_mitosis/
main.rs

1// Copyright (c) Meta Platforms, Inc. and affiliates.
2
3// This software may be used and distributed according to the terms of the
4// GNU General Public License version 2.
5mod bpf_skel;
6pub use bpf_skel::*;
7pub mod bpf_intf;
8mod stats;
9
10use std::cmp::max;
11use std::collections::HashMap;
12use std::fmt;
13use std::fmt::Display;
14use std::mem::MaybeUninit;
15use std::sync::atomic::AtomicBool;
16use std::sync::atomic::Ordering;
17use std::sync::Arc;
18use std::time::Duration;
19
20use anyhow::bail;
21use anyhow::Context;
22use anyhow::Result;
23use clap::Parser;
24use crossbeam::channel::RecvTimeoutError;
25use libbpf_rs::MapCore as _;
26use libbpf_rs::OpenObject;
27use log::debug;
28use log::info;
29use log::trace;
30use log::warn;
31use scx_stats::prelude::*;
32use scx_utils::build_id;
33use scx_utils::compat;
34use scx_utils::init_libbpf_logging;
35use scx_utils::libbpf_clap_opts::LibbpfOpts;
36use scx_utils::scx_enums;
37use scx_utils::scx_ops_attach;
38use scx_utils::scx_ops_load;
39use scx_utils::scx_ops_open;
40use scx_utils::uei_exited;
41use scx_utils::uei_report;
42use scx_utils::Cpumask;
43use scx_utils::Topology;
44use scx_utils::UserExitInfo;
45use scx_utils::NR_CPUS_POSSIBLE;
46
47use stats::CellMetrics;
48use stats::Metrics;
49
50const SCHEDULER_NAME: &str = "scx_mitosis";
51const MAX_CELLS: usize = bpf_intf::consts_MAX_CELLS as usize;
52const NR_CSTATS: usize = bpf_intf::cell_stat_idx_NR_CSTATS as usize;
53
54/// scx_mitosis: A dynamic affinity scheduler
55///
56/// Cgroups are assigned to a dynamic number of Cells which are assigned to a
57/// dynamic set of CPUs. The BPF part does simple vtime scheduling for each cell.
58///
59/// Userspace makes the dynamic decisions of which Cells should be merged or
60/// split and which CPUs they should be assigned to.
61#[derive(Debug, Parser)]
62struct Opts {
63    /// Enable verbose output, including libbpf details. Specify multiple
64    /// times to increase verbosity.
65    #[clap(short = 'v', long, action = clap::ArgAction::Count)]
66    verbose: u8,
67
68    /// Exit debug dump buffer length. 0 indicates default.
69    #[clap(long, default_value = "0")]
70    exit_dump_len: u32,
71
72    /// Interval to consider reconfiguring the Cells (e.g. merge or split)
73    #[clap(long, default_value = "10")]
74    reconfiguration_interval_s: u64,
75
76    /// Interval to consider rebalancing CPUs to Cells
77    #[clap(long, default_value = "5")]
78    rebalance_cpus_interval_s: u64,
79
80    /// Interval to report monitoring information
81    #[clap(long, default_value = "1")]
82    monitor_interval_s: u64,
83
84    /// Run in stats monitoring mode with the specified interval. Scheduler
85    /// is not launched.
86    #[clap(long)]
87    monitor: Option<f64>,
88
89    /// Print scheduler version and exit.
90    #[clap(short = 'V', long, action = clap::ArgAction::SetTrue)]
91    version: bool,
92
93    #[clap(flatten, next_help_heading = "Libbpf Options")]
94    pub libbpf: LibbpfOpts,
95}
96
97// The subset of cstats we care about.
98// Local + Default + Hi + Lo = Total Decisions
99// Affinity violations are not queue decisions, but
100// will be calculated separately and reported as a percent of the total
101const QUEUE_STATS_IDX: [bpf_intf::cell_stat_idx; 3] = [
102    bpf_intf::cell_stat_idx_CSTAT_LOCAL,
103    bpf_intf::cell_stat_idx_CSTAT_CPU_DSQ,
104    bpf_intf::cell_stat_idx_CSTAT_CELL_DSQ,
105];
106
107// Per cell book-keeping
108#[derive(Debug)]
109struct Cell {
110    cpus: Cpumask,
111}
112
113struct Scheduler<'a> {
114    skel: BpfSkel<'a>,
115    monitor_interval: Duration,
116    cells: HashMap<u32, Cell>,
117    // These are the per-cell cstats.
118    // Note these are accumulated across all CPUs.
119    prev_cell_stats: [[u64; NR_CSTATS]; MAX_CELLS],
120    metrics: Metrics,
121    stats_server: StatsServer<(), Metrics>,
122    last_configuration_seq: Option<u32>,
123}
124
125struct DistributionStats {
126    total_decisions: u64,
127    share_of_decisions_pct: f64,
128    local_q_pct: f64,
129    cpu_q_pct: f64,
130    cell_q_pct: f64,
131    affn_viol_pct: f64,
132
133    // for formatting
134    global_queue_decisions: u64,
135}
136
137impl Display for DistributionStats {
138    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
139        // This makes the output easier to read by improving column alignment. First, it guarantees that within a
140        // given logging interval, the global and cell queueing decision counts print at the same width.
141        // Second, it reduces variance in column width between logging intervals. 5 is simply a heuristic.
142        const MIN_DECISIONS_WIDTH: usize = 5;
143        let descisions_width = max(
144            MIN_DECISIONS_WIDTH,
145            (self.global_queue_decisions as f64).log10().ceil() as usize,
146        );
147        write!(
148            f,
149            "{:width$} {:5.1}% | Local:{:4.1}% From: CPU:{:4.1}% Cell:{:4.1}% | V:{:4.1}%",
150            self.total_decisions,
151            self.share_of_decisions_pct,
152            self.local_q_pct,
153            self.cpu_q_pct,
154            self.cell_q_pct,
155            self.affn_viol_pct,
156            width = descisions_width,
157        )
158    }
159}
160
161impl<'a> Scheduler<'a> {
162    fn init(opts: &Opts, open_object: &'a mut MaybeUninit<OpenObject>) -> Result<Self> {
163        let topology = Topology::new()?;
164
165        let mut skel_builder = BpfSkelBuilder::default();
166        skel_builder.obj_builder.debug(opts.verbose > 1);
167        init_libbpf_logging(None);
168        info!(
169            "Running scx_mitosis (build ID: {})",
170            build_id::full_version(env!("CARGO_PKG_VERSION"))
171        );
172
173        let open_opts = opts.libbpf.clone().into_bpf_open_opts();
174        let mut skel = scx_ops_open!(skel_builder, open_object, mitosis, open_opts)?;
175
176        skel.struct_ops.mitosis_mut().exit_dump_len = opts.exit_dump_len;
177
178        skel.maps.rodata_data.as_mut().unwrap().slice_ns = scx_enums.SCX_SLICE_DFL;
179
180        skel.maps.rodata_data.as_mut().unwrap().nr_possible_cpus = *NR_CPUS_POSSIBLE as u32;
181        for cpu in topology.all_cpus.keys() {
182            skel.maps.rodata_data.as_mut().unwrap().all_cpus[cpu / 8] |= 1 << (cpu % 8);
183        }
184
185        match *compat::SCX_OPS_ALLOW_QUEUED_WAKEUP {
186            0 => info!("Kernel does not support queued wakeup optimization."),
187            v => skel.struct_ops.mitosis_mut().flags |= v,
188        }
189
190        let skel = scx_ops_load!(skel, mitosis, uei)?;
191
192        let stats_server = StatsServer::new(stats::server_data()).launch()?;
193
194        Ok(Self {
195            skel,
196            monitor_interval: Duration::from_secs(opts.monitor_interval_s),
197            cells: HashMap::new(),
198            prev_cell_stats: [[0; NR_CSTATS]; MAX_CELLS],
199            metrics: Metrics::default(),
200            stats_server,
201            last_configuration_seq: None,
202        })
203    }
204
205    fn run(&mut self, shutdown: Arc<AtomicBool>) -> Result<UserExitInfo> {
206        let struct_ops = scx_ops_attach!(self.skel, mitosis)?;
207
208        info!("Mitosis Scheduler Attached. Run `scx_mitosis --monitor` for metrics.");
209
210        let (res_ch, req_ch) = self.stats_server.channels();
211
212        while !shutdown.load(Ordering::Relaxed) && !uei_exited!(&self.skel, uei) {
213            self.refresh_bpf_cells()?;
214            self.collect_metrics()?;
215
216            match req_ch.recv_timeout(self.monitor_interval) {
217                Ok(()) => res_ch.send(self.get_metrics())?,
218                Err(RecvTimeoutError::Timeout) => {}
219                Err(e) => Err(e)?,
220            }
221        }
222        drop(struct_ops);
223        info!("Unregister {SCHEDULER_NAME} scheduler");
224        uei_report!(&self.skel, uei)
225    }
226
227    fn get_metrics(&self) -> Metrics {
228        self.metrics.clone()
229    }
230
231    fn calculate_distribution_stats(
232        &self,
233        queue_counts: &[u64; QUEUE_STATS_IDX.len()],
234        global_queue_decisions: u64,
235        scope_queue_decisions: u64,
236        scope_affn_viols: u64,
237    ) -> Result<DistributionStats> {
238        // First % on the line: share of global work
239        // We know global_queue_decisions is non-zero.
240        let share_of_global =
241            100.0 * (scope_queue_decisions as f64) / (global_queue_decisions as f64);
242
243        // Each queue's % of the scope total
244        let queue_pct = if scope_queue_decisions == 0 {
245            debug!("No queue decisions in scope, zeroing out queue distribution");
246            [0.0; QUEUE_STATS_IDX.len()]
247        } else {
248            core::array::from_fn(|i| {
249                100.0 * (queue_counts[i] as f64) / (scope_queue_decisions as f64)
250            })
251        };
252
253        // These are summed differently for the global and per-cell totals.
254        let affinity_violations_percent = if scope_queue_decisions == 0 {
255            debug!("No queue decisions in scope, zeroing out affinity violations");
256            0.0
257        } else {
258            100.0 * (scope_affn_viols as f64) / (scope_queue_decisions as f64)
259        };
260
261        const EXPECTED_QUEUES: usize = 3;
262        if queue_pct.len() != EXPECTED_QUEUES {
263            bail!(
264                "Expected {} queues, got {}",
265                EXPECTED_QUEUES,
266                queue_pct.len()
267            );
268        }
269
270        return Ok(DistributionStats {
271            total_decisions: scope_queue_decisions,
272            share_of_decisions_pct: share_of_global,
273            local_q_pct: queue_pct[0],
274            cpu_q_pct: queue_pct[1],
275            cell_q_pct: queue_pct[2],
276            affn_viol_pct: affinity_violations_percent,
277            global_queue_decisions,
278        });
279    }
280
281    // Queue stats for the whole node
282    fn update_and_log_global_queue_stats(
283        &mut self,
284        global_queue_decisions: u64,
285        cell_stats_delta: &[[u64; NR_CSTATS]; MAX_CELLS],
286    ) -> Result<()> {
287        // Get total of each queue summed over all cells
288        let mut queue_counts = [0; QUEUE_STATS_IDX.len()];
289        for cells in 0..MAX_CELLS {
290            for (i, stat) in QUEUE_STATS_IDX.iter().enumerate() {
291                queue_counts[i] += cell_stats_delta[cells][*stat as usize];
292            }
293        }
294
295        let prefix = "Total Decisions:";
296
297        // Here we want to sum the affinity violations over all cells.
298        let scope_affn_viols: u64 = cell_stats_delta
299            .iter()
300            .map(|&cell| cell[bpf_intf::cell_stat_idx_CSTAT_AFFN_VIOL as usize])
301            .sum::<u64>();
302
303        // Special case where the number of scope decisions == number global decisions
304        let stats = self.calculate_distribution_stats(
305            &queue_counts,
306            global_queue_decisions,
307            global_queue_decisions,
308            scope_affn_viols,
309        )?;
310
311        self.metrics.update(&stats);
312
313        trace!("{} {}", prefix, stats);
314
315        Ok(())
316    }
317
318    // Print out the per-cell stats
319    fn update_and_log_cell_queue_stats(
320        &mut self,
321        global_queue_decisions: u64,
322        cell_stats_delta: &[[u64; NR_CSTATS]; MAX_CELLS],
323    ) -> Result<()> {
324        for cell in 0..MAX_CELLS {
325            let cell_queue_decisions = QUEUE_STATS_IDX
326                .iter()
327                .map(|&stat| cell_stats_delta[cell][stat as usize])
328                .sum::<u64>();
329
330            // FIXME: This should really query if the cell is enabled or not.
331            if cell_queue_decisions == 0 {
332                continue;
333            }
334
335            let mut queue_counts = [0; QUEUE_STATS_IDX.len()];
336            for (i, &stat) in QUEUE_STATS_IDX.iter().enumerate() {
337                queue_counts[i] = cell_stats_delta[cell][stat as usize];
338            }
339
340            const MIN_CELL_WIDTH: usize = 2;
341            let cell_width: usize = max(MIN_CELL_WIDTH, (MAX_CELLS as f64).log10().ceil() as usize);
342
343            let prefix = format!("        Cell {:width$}:", cell, width = cell_width);
344
345            // Sum affinity violations for this cell
346            let scope_affn_viols: u64 =
347                cell_stats_delta[cell][bpf_intf::cell_stat_idx_CSTAT_AFFN_VIOL as usize];
348
349            let stats = self.calculate_distribution_stats(
350                &queue_counts,
351                global_queue_decisions,
352                cell_queue_decisions,
353                scope_affn_viols,
354            )?;
355
356            self.metrics
357                .cells
358                .entry(cell as u32)
359                .or_default()
360                .update(&stats);
361
362            trace!("{} {}", prefix, stats);
363        }
364        Ok(())
365    }
366
367    fn log_all_queue_stats(
368        &mut self,
369        cell_stats_delta: &[[u64; NR_CSTATS]; MAX_CELLS],
370    ) -> Result<()> {
371        // Get total decisions
372        let global_queue_decisions: u64 = cell_stats_delta
373            .iter()
374            .flat_map(|cell| QUEUE_STATS_IDX.iter().map(|&idx| cell[idx as usize]))
375            .sum();
376
377        // We don't want to divide by zero later, but this is never expected.
378        if global_queue_decisions == 0 {
379            bail!("Error: No queueing decisions made globally");
380        }
381
382        self.update_and_log_global_queue_stats(global_queue_decisions, &cell_stats_delta)?;
383
384        self.update_and_log_cell_queue_stats(global_queue_decisions, &cell_stats_delta)?;
385
386        Ok(())
387    }
388
389    fn calculate_cell_stat_delta(&mut self) -> Result<[[u64; NR_CSTATS]; MAX_CELLS]> {
390        let mut cell_stats_delta = [[0 as u64; NR_CSTATS]; MAX_CELLS];
391
392        // Read CPU contexts
393        let cpu_ctxs = read_cpu_ctxs(&self.skel)?;
394
395        // Loop over cells and stats first, then CPU contexts
396        // TODO: We should loop over the in_use cells only.
397        for cell in 0..MAX_CELLS {
398            for stat in 0..NR_CSTATS {
399                let mut cur_cell_stat = 0;
400
401                // Accumulate stats from all CPUs
402                for cpu_ctx in cpu_ctxs.iter() {
403                    cur_cell_stat += cpu_ctx.cstats[cell][stat];
404                }
405
406                // Calculate delta and update previous stat
407                cell_stats_delta[cell][stat] = cur_cell_stat - self.prev_cell_stats[cell][stat];
408                self.prev_cell_stats[cell][stat] = cur_cell_stat;
409            }
410        }
411        Ok(cell_stats_delta)
412    }
413
414    /// Collect metrics and out various debugging data like per cell stats, per-cpu stats, etc.
415    fn collect_metrics(&mut self) -> Result<()> {
416        let cell_stats_delta = self.calculate_cell_stat_delta()?;
417
418        self.log_all_queue_stats(&cell_stats_delta)?;
419
420        for (cell_id, cell) in &self.cells {
421            trace!("CELL[{}]: {}", cell_id, cell.cpus);
422        }
423
424        for (cell_id, cell) in self.cells.iter() {
425            // Assume we have a CellMetrics entry if we have a known cell
426            self.metrics
427                .cells
428                .entry(*cell_id)
429                .and_modify(|cell_metrics| cell_metrics.num_cpus = cell.cpus.weight() as u32);
430        }
431        self.metrics.num_cells = self.cells.len() as u32;
432
433        Ok(())
434    }
435
436    fn refresh_bpf_cells(&mut self) -> Result<()> {
437        let applied_configuration = unsafe {
438            std::ptr::read_volatile(
439                &self
440                    .skel
441                    .maps
442                    .bss_data
443                    .as_ref()
444                    .unwrap()
445                    .applied_configuration_seq as *const u32,
446            )
447        };
448        if self
449            .last_configuration_seq
450            .is_some_and(|seq| applied_configuration == seq)
451        {
452            return Ok(());
453        }
454        // collect all cpus per cell.
455        let mut cell_to_cpus: HashMap<u32, Cpumask> = HashMap::new();
456        let cpu_ctxs = read_cpu_ctxs(&self.skel)?;
457        for (i, cpu_ctx) in cpu_ctxs.iter().enumerate() {
458            cell_to_cpus
459                .entry(cpu_ctx.cell)
460                .or_insert_with(|| Cpumask::new())
461                .set_cpu(i)
462                .expect("set cpu in existing mask");
463        }
464
465        // Create cells we don't have yet, drop cells that are no longer in use.
466        // If we continue to drop cell metrics once a cell is removed, we'll need to make sure we
467        // flush metrics for a cell before we remove it completely.
468        let cells = &self.skel.maps.bss_data.as_ref().unwrap().cells;
469        for i in 0..MAX_CELLS {
470            let cell_idx = i as u32;
471            let bpf_cell = cells[i];
472            let in_use = unsafe { std::ptr::read_volatile(&bpf_cell.in_use as *const u32) };
473            if in_use > 0 {
474                self.cells
475                    .entry(cell_idx)
476                    .or_insert_with(|| Cell {
477                        cpus: Cpumask::new(),
478                    })
479                    .cpus = cell_to_cpus
480                    .get(&cell_idx)
481                    .expect("missing cell in cpu map")
482                    .clone();
483                self.metrics.cells.insert(cell_idx, CellMetrics::default());
484            } else {
485                self.cells.remove(&cell_idx);
486                self.metrics.cells.remove(&cell_idx);
487            }
488        }
489
490        self.last_configuration_seq = Some(applied_configuration);
491
492        Ok(())
493    }
494}
495
496fn read_cpu_ctxs(skel: &BpfSkel) -> Result<Vec<bpf_intf::cpu_ctx>> {
497    let mut cpu_ctxs = vec![];
498    let cpu_ctxs_vec = skel
499        .maps
500        .cpu_ctxs
501        .lookup_percpu(&0u32.to_ne_bytes(), libbpf_rs::MapFlags::ANY)
502        .context("Failed to lookup cpu_ctx")?
503        .unwrap();
504    for cpu in 0..*NR_CPUS_POSSIBLE {
505        cpu_ctxs.push(*unsafe {
506            &*(cpu_ctxs_vec[cpu].as_slice().as_ptr() as *const bpf_intf::cpu_ctx)
507        });
508    }
509    Ok(cpu_ctxs)
510}
511
512fn main() -> Result<()> {
513    let opts = Opts::parse();
514
515    if opts.version {
516        println!(
517            "scx_mitosis {}",
518            build_id::full_version(env!("CARGO_PKG_VERSION"))
519        );
520        return Ok(());
521    }
522
523    let llv = match opts.verbose {
524        0 => simplelog::LevelFilter::Info,
525        1 => simplelog::LevelFilter::Debug,
526        _ => simplelog::LevelFilter::Trace,
527    };
528    let mut lcfg = simplelog::ConfigBuilder::new();
529    lcfg.set_time_offset_to_local()
530        .expect("Failed to set local time offset")
531        .set_time_level(simplelog::LevelFilter::Error)
532        .set_location_level(simplelog::LevelFilter::Off)
533        .set_target_level(simplelog::LevelFilter::Off)
534        .set_thread_level(simplelog::LevelFilter::Off);
535    simplelog::TermLogger::init(
536        llv,
537        lcfg.build(),
538        simplelog::TerminalMode::Stderr,
539        simplelog::ColorChoice::Auto,
540    )?;
541
542    debug!("opts={:?}", &opts);
543
544    let shutdown = Arc::new(AtomicBool::new(false));
545    let shutdown_clone = shutdown.clone();
546    ctrlc::set_handler(move || {
547        shutdown_clone.store(true, Ordering::Relaxed);
548    })
549    .context("Error setting Ctrl-C handler")?;
550
551    if let Some(intv) = opts.monitor {
552        let shutdown_clone = shutdown.clone();
553        let jh = std::thread::spawn(move || {
554            match stats::monitor(Duration::from_secs_f64(intv), shutdown_clone) {
555                Ok(_) => {
556                    debug!("stats monitor thread finished successfully")
557                }
558                Err(error_object) => {
559                    warn!(
560                        "stats monitor thread finished because of an error {}",
561                        error_object
562                    )
563                }
564            }
565        });
566        if opts.monitor.is_some() {
567            let _ = jh.join();
568            return Ok(());
569        }
570    }
571
572    let mut open_object = MaybeUninit::uninit();
573    loop {
574        let mut sched = Scheduler::init(&opts, &mut open_object)?;
575        if !sched.run(shutdown.clone())?.should_restart() {
576            break;
577        }
578    }
579
580    Ok(())
581}