scx_mitosis/
main.rs

1// Copyright (c) Meta Platforms, Inc. and affiliates.
2
3// This software may be used and distributed according to the terms of the
4// GNU General Public License version 2.
5mod bpf_skel;
6pub use bpf_skel::*;
7pub mod bpf_intf;
8mod stats;
9
10use std::cmp::max;
11use std::collections::HashMap;
12use std::fmt;
13use std::fmt::Display;
14use std::mem::MaybeUninit;
15use std::sync::atomic::AtomicBool;
16use std::sync::atomic::Ordering;
17use std::sync::Arc;
18use std::time::Duration;
19
20use anyhow::bail;
21use anyhow::Context;
22use anyhow::Result;
23use clap::Parser;
24use crossbeam::channel::RecvTimeoutError;
25use libbpf_rs::MapCore as _;
26use libbpf_rs::OpenObject;
27use log::debug;
28use log::info;
29use log::trace;
30use log::warn;
31use scx_stats::prelude::*;
32use scx_utils::init_libbpf_logging;
33use scx_utils::scx_enums;
34use scx_utils::scx_ops_attach;
35use scx_utils::scx_ops_load;
36use scx_utils::scx_ops_open;
37use scx_utils::uei_exited;
38use scx_utils::uei_report;
39use scx_utils::Cpumask;
40use scx_utils::Topology;
41use scx_utils::UserExitInfo;
42use scx_utils::NR_CPUS_POSSIBLE;
43
44use stats::CellMetrics;
45use stats::Metrics;
46
47const MAX_CELLS: usize = bpf_intf::consts_MAX_CELLS as usize;
48const NR_CSTATS: usize = bpf_intf::cell_stat_idx_NR_CSTATS as usize;
49
50/// scx_mitosis: A dynamic affinity scheduler
51///
52/// Cgroups are assigned to a dynamic number of Cells which are assigned to a
53/// dynamic set of CPUs. The BPF part does simple vtime scheduling for each cell.
54///
55/// Userspace makes the dynamic decisions of which Cells should be merged or
56/// split and which CPUs they should be assigned to.
57#[derive(Debug, Parser)]
58struct Opts {
59    /// Enable verbose output, including libbpf details. Specify multiple
60    /// times to increase verbosity.
61    #[clap(short = 'v', long, action = clap::ArgAction::Count)]
62    verbose: u8,
63
64    /// Exit debug dump buffer length. 0 indicates default.
65    #[clap(long, default_value = "0")]
66    exit_dump_len: u32,
67
68    /// Interval to consider reconfiguring the Cells (e.g. merge or split)
69    #[clap(long, default_value = "10")]
70    reconfiguration_interval_s: u64,
71
72    /// Interval to consider rebalancing CPUs to Cells
73    #[clap(long, default_value = "5")]
74    rebalance_cpus_interval_s: u64,
75
76    /// Interval to report monitoring information
77    #[clap(long, default_value = "1")]
78    monitor_interval_s: u64,
79
80    /// Run in stats monitoring mode with the specified interval. Scheduler
81    /// is not launched.
82    #[clap(long)]
83    monitor: Option<f64>,
84}
85
86// The subset of cstats we care about.
87// Local + Default + Hi + Lo = Total Decisions
88// Affinity violations are not queue decisions, but
89// will be calculated separately and reported as a percent of the total
90const QUEUE_STATS_IDX: [bpf_intf::cell_stat_idx; 3] = [
91    bpf_intf::cell_stat_idx_CSTAT_LOCAL,
92    bpf_intf::cell_stat_idx_CSTAT_CPU_DSQ,
93    bpf_intf::cell_stat_idx_CSTAT_CELL_DSQ,
94];
95
96// Per cell book-keeping
97#[derive(Debug)]
98struct Cell {
99    cpus: Cpumask,
100}
101
102struct Scheduler<'a> {
103    skel: BpfSkel<'a>,
104    monitor_interval: Duration,
105    cells: HashMap<u32, Cell>,
106    // These are the per-cell cstats.
107    // Note these are accumulated across all CPUs.
108    prev_cell_stats: [[u64; NR_CSTATS]; MAX_CELLS],
109    metrics: Metrics,
110    stats_server: StatsServer<(), Metrics>,
111}
112
113struct DistributionStats {
114    total_decisions: u64,
115    share_of_decisions_pct: f64,
116    local_q_pct: f64,
117    cpu_q_pct: f64,
118    cell_q_pct: f64,
119    affn_viol_pct: f64,
120
121    // for formatting
122    global_queue_decisions: u64,
123}
124
125impl Display for DistributionStats {
126    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
127        // This makes the output easier to read by improving column alignment. First, it guarantees that within a
128        // given logging interval, the global and cell queueing decision counts print at the same width.
129        // Second, it reduces variance in column width between logging intervals. 5 is simply a heuristic.
130        const MIN_DECISIONS_WIDTH: usize = 5;
131        let descisions_width = max(
132            MIN_DECISIONS_WIDTH,
133            (self.global_queue_decisions as f64).log10().ceil() as usize,
134        );
135        write!(
136            f,
137            "{:width$} {:5.1}% | Local:{:4.1}% From: CPU:{:4.1}% Cell:{:4.1}% | V:{:4.1}%",
138            self.total_decisions,
139            self.share_of_decisions_pct,
140            self.local_q_pct,
141            self.cpu_q_pct,
142            self.cell_q_pct,
143            self.affn_viol_pct,
144            width = descisions_width,
145        )
146    }
147}
148
149impl<'a> Scheduler<'a> {
150    fn init(opts: &Opts, open_object: &'a mut MaybeUninit<OpenObject>) -> Result<Self> {
151        let topology = Topology::new()?;
152
153        let mut skel_builder = BpfSkelBuilder::default();
154        skel_builder.obj_builder.debug(opts.verbose > 1);
155        init_libbpf_logging(None);
156        let mut skel = scx_ops_open!(skel_builder, open_object, mitosis)?;
157
158        skel.struct_ops.mitosis_mut().exit_dump_len = opts.exit_dump_len;
159
160        skel.maps.rodata_data.as_mut().unwrap().slice_ns = scx_enums.SCX_SLICE_DFL;
161
162        skel.maps.rodata_data.as_mut().unwrap().nr_possible_cpus = *NR_CPUS_POSSIBLE as u32;
163        for cpu in topology.all_cpus.keys() {
164            skel.maps.rodata_data.as_mut().unwrap().all_cpus[cpu / 8] |= 1 << (cpu % 8);
165        }
166
167        let skel = scx_ops_load!(skel, mitosis, uei)?;
168
169        let stats_server = StatsServer::new(stats::server_data()).launch()?;
170
171        Ok(Self {
172            skel,
173            monitor_interval: Duration::from_secs(opts.monitor_interval_s),
174            cells: HashMap::new(),
175            prev_cell_stats: [[0; NR_CSTATS]; MAX_CELLS],
176            metrics: Metrics::default(),
177            stats_server,
178        })
179    }
180
181    fn run(&mut self, shutdown: Arc<AtomicBool>) -> Result<UserExitInfo> {
182        let struct_ops = scx_ops_attach!(self.skel, mitosis)?;
183
184        info!("Layered Scheduler Attached. Run `scx_mitosis --monitor` for metrics.");
185
186        let (res_ch, req_ch) = self.stats_server.channels();
187
188        while !shutdown.load(Ordering::Relaxed) && !uei_exited!(&self.skel, uei) {
189            self.refresh_bpf_cells()?;
190            self.collect_metrics()?;
191
192            match req_ch.recv_timeout(self.monitor_interval) {
193                Ok(()) => res_ch.send(self.get_metrics())?,
194                Err(RecvTimeoutError::Timeout) => {}
195                Err(e) => Err(e)?,
196            }
197        }
198        drop(struct_ops);
199        uei_report!(&self.skel, uei)
200    }
201
202    fn get_metrics(&self) -> Metrics {
203        self.metrics.clone()
204    }
205
206    fn calculate_distribution_stats(
207        &self,
208        queue_counts: &[u64; QUEUE_STATS_IDX.len()],
209        global_queue_decisions: u64,
210        scope_queue_decisions: u64,
211        scope_affn_viols: u64,
212    ) -> Result<DistributionStats> {
213        // First % on the line: share of global work
214        // We know global_queue_decisions is non-zero.
215        let share_of_global =
216            100.0 * (scope_queue_decisions as f64) / (global_queue_decisions as f64);
217
218        // Each queue's % of the scope total
219        let queue_pct = if scope_queue_decisions == 0 {
220            debug!("No queue decisions in scope, zeroing out queue distribution");
221            [0.0; QUEUE_STATS_IDX.len()]
222        } else {
223            core::array::from_fn(|i| {
224                100.0 * (queue_counts[i] as f64) / (scope_queue_decisions as f64)
225            })
226        };
227
228        // These are summed differently for the global and per-cell totals.
229        let affinity_violations_percent = if scope_queue_decisions == 0 {
230            debug!("No queue decisions in scope, zeroing out affinity violations");
231            0.0
232        } else {
233            100.0 * (scope_affn_viols as f64) / (scope_queue_decisions as f64)
234        };
235
236        const EXPECTED_QUEUES: usize = 3;
237        if queue_pct.len() != EXPECTED_QUEUES {
238            bail!(
239                "Expected {} queues, got {}",
240                EXPECTED_QUEUES,
241                queue_pct.len()
242            );
243        }
244
245        return Ok(DistributionStats {
246            total_decisions: scope_queue_decisions,
247            share_of_decisions_pct: share_of_global,
248            local_q_pct: queue_pct[0],
249            cpu_q_pct: queue_pct[1],
250            cell_q_pct: queue_pct[2],
251            affn_viol_pct: affinity_violations_percent,
252            global_queue_decisions,
253        });
254    }
255
256    // Queue stats for the whole node
257    fn update_and_log_global_queue_stats(
258        &mut self,
259        global_queue_decisions: u64,
260        cell_stats_delta: &[[u64; NR_CSTATS]; MAX_CELLS],
261    ) -> Result<()> {
262        // Get total of each queue summed over all cells
263        let mut queue_counts = [0; QUEUE_STATS_IDX.len()];
264        for cells in 0..MAX_CELLS {
265            for (i, stat) in QUEUE_STATS_IDX.iter().enumerate() {
266                queue_counts[i] += cell_stats_delta[cells][*stat as usize];
267            }
268        }
269
270        let prefix = "Total Decisions:";
271
272        // Here we want to sum the affinity violations over all cells.
273        let scope_affn_viols: u64 = cell_stats_delta
274            .iter()
275            .map(|&cell| cell[bpf_intf::cell_stat_idx_CSTAT_AFFN_VIOL as usize])
276            .sum::<u64>();
277
278        // Special case where the number of scope decisions == number global decisions
279        let stats = self.calculate_distribution_stats(
280            &queue_counts,
281            global_queue_decisions,
282            global_queue_decisions,
283            scope_affn_viols,
284        )?;
285
286        self.metrics.update(&stats);
287
288        trace!("{} {}", prefix, stats);
289
290        Ok(())
291    }
292
293    // Print out the per-cell stats
294    fn update_and_log_cell_queue_stats(
295        &mut self,
296        global_queue_decisions: u64,
297        cell_stats_delta: &[[u64; NR_CSTATS]; MAX_CELLS],
298    ) -> Result<()> {
299        for cell in 0..MAX_CELLS {
300            let cell_queue_decisions = QUEUE_STATS_IDX
301                .iter()
302                .map(|&stat| cell_stats_delta[cell][stat as usize])
303                .sum::<u64>();
304
305            // FIXME: This should really query if the cell is enabled or not.
306            if cell_queue_decisions == 0 {
307                continue;
308            }
309
310            let mut queue_counts = [0; QUEUE_STATS_IDX.len()];
311            for (i, &stat) in QUEUE_STATS_IDX.iter().enumerate() {
312                queue_counts[i] = cell_stats_delta[cell][stat as usize];
313            }
314
315            const MIN_CELL_WIDTH: usize = 2;
316            let cell_width: usize = max(MIN_CELL_WIDTH, (MAX_CELLS as f64).log10().ceil() as usize);
317
318            let prefix = format!("        Cell {:width$}:", cell, width = cell_width);
319
320            // Sum affinity violations for this cell
321            let scope_affn_viols: u64 =
322                cell_stats_delta[cell][bpf_intf::cell_stat_idx_CSTAT_AFFN_VIOL as usize];
323
324            let stats = self.calculate_distribution_stats(
325                &queue_counts,
326                global_queue_decisions,
327                cell_queue_decisions,
328                scope_affn_viols,
329            )?;
330
331            self.metrics
332                .cells
333                .entry(cell as u32)
334                .or_default()
335                .update(&stats);
336
337            trace!("{} {}", prefix, stats);
338        }
339        Ok(())
340    }
341
342    fn log_all_queue_stats(
343        &mut self,
344        cell_stats_delta: &[[u64; NR_CSTATS]; MAX_CELLS],
345    ) -> Result<()> {
346        // Get total decisions
347        let global_queue_decisions: u64 = cell_stats_delta
348            .iter()
349            .flat_map(|cell| QUEUE_STATS_IDX.iter().map(|&idx| cell[idx as usize]))
350            .sum();
351
352        // We don't want to divide by zero later, but this is never expected.
353        if global_queue_decisions == 0 {
354            bail!("Error: No queueing decisions made globally");
355        }
356
357        self.update_and_log_global_queue_stats(global_queue_decisions, &cell_stats_delta)?;
358
359        self.update_and_log_cell_queue_stats(global_queue_decisions, &cell_stats_delta)?;
360
361        Ok(())
362    }
363
364    fn calculate_cell_stat_delta(&mut self) -> Result<[[u64; NR_CSTATS]; MAX_CELLS]> {
365        let mut cell_stats_delta = [[0 as u64; NR_CSTATS]; MAX_CELLS];
366
367        // Read CPU contexts
368        let cpu_ctxs = read_cpu_ctxs(&self.skel)?;
369
370        // Loop over cells and stats first, then CPU contexts
371        // TODO: We should loop over the in_use cells only.
372        for cell in 0..MAX_CELLS {
373            for stat in 0..NR_CSTATS {
374                let mut cur_cell_stat = 0;
375
376                // Accumulate stats from all CPUs
377                for cpu_ctx in cpu_ctxs.iter() {
378                    cur_cell_stat += cpu_ctx.cstats[cell][stat];
379                }
380
381                // Calculate delta and update previous stat
382                cell_stats_delta[cell][stat] = cur_cell_stat - self.prev_cell_stats[cell][stat];
383                self.prev_cell_stats[cell][stat] = cur_cell_stat;
384            }
385        }
386        Ok(cell_stats_delta)
387    }
388
389    /// Collect metrics and out various debugging data like per cell stats, per-cpu stats, etc.
390    fn collect_metrics(&mut self) -> Result<()> {
391        let cell_stats_delta = self.calculate_cell_stat_delta()?;
392
393        self.log_all_queue_stats(&cell_stats_delta)?;
394
395        for (cell_id, cell) in &self.cells {
396            trace!("CELL[{}]: {}", cell_id, cell.cpus);
397        }
398
399        for (cell_id, cell) in self.cells.iter() {
400            // Assume we have a CellMetrics entry if we have a known cell
401            self.metrics
402                .cells
403                .entry(*cell_id)
404                .and_modify(|cell_metrics| cell_metrics.num_cpus = cell.cpus.weight() as u32);
405        }
406        self.metrics.num_cells = self.cells.len() as u32;
407
408        Ok(())
409    }
410
411    fn refresh_bpf_cells(&mut self) -> Result<()> {
412        // collect all cpus per cell.
413        let mut cell_to_cpus: HashMap<u32, Cpumask> = HashMap::new();
414        let cpu_ctxs = read_cpu_ctxs(&self.skel)?;
415        for (i, cpu_ctx) in cpu_ctxs.iter().enumerate() {
416            cell_to_cpus
417                .entry(cpu_ctx.cell)
418                .and_modify(|mask| mask.set_cpu(i).expect("set cpu in existing mask"))
419                .or_insert_with(|| {
420                    let mut mask = Cpumask::new();
421                    mask.set_cpu(i).expect("set cpu in new mask");
422                    mask
423                });
424        }
425
426        // Create cells we don't have yet, drop cells that are no longer in use.
427        // If we continue to drop cell metrics once a cell is removed, we'll need to make sure we
428        // flush metrics for a cell before we remove it completely.
429        let cells = &self.skel.maps.bss_data.as_ref().unwrap().cells;
430        for i in 0..MAX_CELLS {
431            let cell_idx = i as u32;
432            let bpf_cell = cells[i];
433            if bpf_cell.in_use > 0 {
434                self.cells.entry(cell_idx).or_insert(Cell {
435                    cpus: cell_to_cpus
436                        .get(&cell_idx)
437                        .expect("missing cell in cpu map")
438                        .clone(),
439                });
440                self.metrics.cells.insert(cell_idx, CellMetrics::default());
441            } else {
442                self.cells.remove(&cell_idx);
443                self.metrics.cells.remove(&cell_idx);
444            }
445        }
446
447        Ok(())
448    }
449}
450
451fn read_cpu_ctxs(skel: &BpfSkel) -> Result<Vec<bpf_intf::cpu_ctx>> {
452    let mut cpu_ctxs = vec![];
453    let cpu_ctxs_vec = skel
454        .maps
455        .cpu_ctxs
456        .lookup_percpu(&0u32.to_ne_bytes(), libbpf_rs::MapFlags::ANY)
457        .context("Failed to lookup cpu_ctx")?
458        .unwrap();
459    for cpu in 0..*NR_CPUS_POSSIBLE {
460        cpu_ctxs.push(*unsafe {
461            &*(cpu_ctxs_vec[cpu].as_slice().as_ptr() as *const bpf_intf::cpu_ctx)
462        });
463    }
464    Ok(cpu_ctxs)
465}
466
467fn main() -> Result<()> {
468    let opts = Opts::parse();
469
470    let llv = match opts.verbose {
471        0 => simplelog::LevelFilter::Info,
472        1 => simplelog::LevelFilter::Debug,
473        _ => simplelog::LevelFilter::Trace,
474    };
475    let mut lcfg = simplelog::ConfigBuilder::new();
476    lcfg.set_time_offset_to_local()
477        .expect("Failed to set local time offset")
478        .set_time_level(simplelog::LevelFilter::Error)
479        .set_location_level(simplelog::LevelFilter::Off)
480        .set_target_level(simplelog::LevelFilter::Off)
481        .set_thread_level(simplelog::LevelFilter::Off);
482    simplelog::TermLogger::init(
483        llv,
484        lcfg.build(),
485        simplelog::TerminalMode::Stderr,
486        simplelog::ColorChoice::Auto,
487    )?;
488
489    debug!("opts={:?}", &opts);
490
491    let shutdown = Arc::new(AtomicBool::new(false));
492    let shutdown_clone = shutdown.clone();
493    ctrlc::set_handler(move || {
494        shutdown_clone.store(true, Ordering::Relaxed);
495    })
496    .context("Error setting Ctrl-C handler")?;
497
498    if let Some(intv) = opts.monitor {
499        let shutdown_clone = shutdown.clone();
500        let jh = std::thread::spawn(move || {
501            match stats::monitor(Duration::from_secs_f64(intv), shutdown_clone) {
502                Ok(_) => {
503                    debug!("stats monitor thread finished successfully")
504                }
505                Err(error_object) => {
506                    warn!(
507                        "stats monitor thread finished because of an error {}",
508                        error_object
509                    )
510                }
511            }
512        });
513        if opts.monitor.is_some() {
514            let _ = jh.join();
515            return Ok(());
516        }
517    }
518
519    let mut open_object = MaybeUninit::uninit();
520    loop {
521        let mut sched = Scheduler::init(&opts, &mut open_object)?;
522        if !sched.run(shutdown.clone())?.should_restart() {
523            break;
524        }
525    }
526
527    Ok(())
528}