scx_mitosis/
main.rs

1// Copyright (c) Meta Platforms, Inc. and affiliates.
2
3// This software may be used and distributed according to the terms of the
4// GNU General Public License version 2.
5mod bpf_skel;
6pub use bpf_skel::*;
7pub mod bpf_intf;
8
9use std::cmp::max;
10use std::collections::HashMap;
11use std::mem::MaybeUninit;
12use std::sync::atomic::AtomicBool;
13use std::sync::atomic::Ordering;
14use std::sync::Arc;
15use std::time::Duration;
16
17use anyhow::bail;
18use anyhow::Context;
19use anyhow::Result;
20use clap::Parser;
21use libbpf_rs::MapCore as _;
22use libbpf_rs::OpenObject;
23use log::debug;
24use log::info;
25use log::trace;
26use scx_utils::init_libbpf_logging;
27use scx_utils::scx_enums;
28use scx_utils::scx_ops_attach;
29use scx_utils::scx_ops_load;
30use scx_utils::scx_ops_open;
31use scx_utils::uei_exited;
32use scx_utils::uei_report;
33use scx_utils::Cpumask;
34use scx_utils::Topology;
35use scx_utils::UserExitInfo;
36use scx_utils::NR_CPUS_POSSIBLE;
37use scx_utils::NR_CPU_IDS;
38
39const MAX_CELLS: usize = bpf_intf::consts_MAX_CELLS as usize;
40const NR_CSTATS: usize = bpf_intf::cell_stat_idx_NR_CSTATS as usize;
41
42/// scx_mitosis: A dynamic affinity scheduler
43///
44/// Cgroups are assigned to a dynamic number of Cells which are assigned to a
45/// dynamic set of CPUs. The BPF part does simple vtime scheduling for each cell.
46///
47/// Userspace makes the dynamic decisions of which Cells should be merged or
48/// split and which CPUs they should be assigned to.
49#[derive(Debug, Parser)]
50struct Opts {
51    /// Enable verbose output, including libbpf details. Specify multiple
52    /// times to increase verbosity.
53    #[clap(short = 'v', long, action = clap::ArgAction::Count)]
54    verbose: u8,
55
56    /// Exit debug dump buffer length. 0 indicates default.
57    #[clap(long, default_value = "0")]
58    exit_dump_len: u32,
59
60    /// Interval to consider reconfiguring the Cells (e.g. merge or split)
61    #[clap(long, default_value = "10")]
62    reconfiguration_interval_s: u64,
63
64    /// Interval to consider rebalancing CPUs to Cells
65    #[clap(long, default_value = "5")]
66    rebalance_cpus_interval_s: u64,
67
68    /// Interval to report monitoring information
69    #[clap(long, default_value = "1")]
70    monitor_interval_s: u64,
71}
72
73// The subset of cstats we care about.
74// Local + Default + Hi + Lo = Total Decisions
75// Affinity violations are not queue decisions, but
76// will be calculated separately and reported as a percent of the total
77const QUEUE_STATS_IDX: [bpf_intf::cell_stat_idx; 4] = [
78    bpf_intf::cell_stat_idx_CSTAT_LOCAL,
79    bpf_intf::cell_stat_idx_CSTAT_DEFAULT_Q,
80    bpf_intf::cell_stat_idx_CSTAT_HI_FALLBACK_Q,
81    bpf_intf::cell_stat_idx_CSTAT_LO_FALLBACK_Q,
82];
83
84unsafe fn any_as_u8_slice<T: Sized>(p: &T) -> &[u8] {
85    unsafe {
86        ::std::slice::from_raw_parts((p as *const T) as *const u8, ::std::mem::size_of::<T>())
87    }
88}
89
90// Per cell book-keeping
91#[derive(Debug)]
92struct Cell {
93    cpus: Cpumask,
94}
95
96struct Scheduler<'a> {
97    skel: BpfSkel<'a>,
98    prev_percpu_cell_cycles: Vec<[u64; MAX_CELLS]>,
99    monitor_interval: Duration,
100    cells: HashMap<u32, Cell>,
101    // These are the per-cell cstats.
102    // Note these are accumulated across all CPUs.
103    prev_cell_stats: [[u64; NR_CSTATS]; MAX_CELLS],
104}
105
106impl<'a> Scheduler<'a> {
107    fn init(opts: &Opts, open_object: &'a mut MaybeUninit<OpenObject>) -> Result<Self> {
108        let topology = Topology::new()?;
109
110        let mut skel_builder = BpfSkelBuilder::default();
111        skel_builder.obj_builder.debug(opts.verbose > 1);
112        init_libbpf_logging(None);
113        let mut skel = scx_ops_open!(skel_builder, open_object, mitosis)?;
114
115        skel.struct_ops.mitosis_mut().exit_dump_len = opts.exit_dump_len;
116
117        skel.maps.rodata_data.slice_ns = scx_enums.SCX_SLICE_DFL;
118
119        skel.maps.rodata_data.nr_possible_cpus = *NR_CPUS_POSSIBLE as u32;
120        for cpu in topology.all_cores.keys() {
121            skel.maps.rodata_data.all_cpus[cpu / 8] |= 1 << (cpu % 8);
122        }
123
124        let skel = scx_ops_load!(skel, mitosis, uei)?;
125
126        Ok(Self {
127            skel,
128            prev_percpu_cell_cycles: vec![[0; MAX_CELLS]; *NR_CPU_IDS],
129            monitor_interval: Duration::from_secs(opts.monitor_interval_s),
130            cells: HashMap::new(),
131            prev_cell_stats: [[0; NR_CSTATS]; MAX_CELLS],
132        })
133    }
134
135    fn run(&mut self, shutdown: Arc<AtomicBool>) -> Result<UserExitInfo> {
136        let struct_ops = scx_ops_attach!(self.skel, mitosis)?;
137        info!("Mitosis Scheduler Attached");
138        while !shutdown.load(Ordering::Relaxed) && !uei_exited!(&self.skel, uei) {
139            std::thread::sleep(self.monitor_interval);
140            self.refresh_bpf_cells()?;
141            self.debug()?;
142        }
143        drop(struct_ops);
144        uei_report!(&self.skel, uei)
145    }
146
147    fn calculate_distribution_and_log(
148        &self,
149        queue_counts: &[u64; QUEUE_STATS_IDX.len()],
150        global_queue_decisions: u64,
151        scope_queue_decisions: u64,
152        scope_affn_viols: u64,
153        prefix: &str,
154    ) -> Result<()> {
155        // First % on the line: share of global work
156        // We know global_queue_decisions is non-zero.
157        let share_of_global =
158            100.0 * (scope_queue_decisions as f64) / (global_queue_decisions as f64);
159
160        // Each queue's % of the scope total
161        let queue_pct = if scope_queue_decisions == 0 {
162            debug!("No queue decisions in scope, zeroing out queue distribution");
163            [0.0; QUEUE_STATS_IDX.len()]
164        } else {
165            core::array::from_fn(|i| {
166                100.0 * (queue_counts[i] as f64) / (scope_queue_decisions as f64)
167            })
168        };
169
170        // These are summed differently for the global and per-cell totals.
171        let affinity_violations_percent = if scope_queue_decisions == 0 {
172            debug!("No queue decisions in scope, zeroing out affinity violations");
173            0.0
174        } else {
175            100.0 * (scope_affn_viols as f64) / (scope_queue_decisions as f64)
176        };
177
178        // This makes the output easier to read by improving column alignment. First, it guarantees that within a
179        // given logging interval, the global and cell queueing decision counts print at the same width.
180        // Second, it reduces variance in column width between logging intervals. 5 is simply a heuristic.
181        const MIN_DECISIONS_WIDTH: usize = 5;
182        let decisions_format_width: usize = max(
183            MIN_DECISIONS_WIDTH,
184            (global_queue_decisions as f64).log10().ceil() as usize,
185        );
186
187        const EXPECTED_QUEUES: usize = 4;
188        if queue_pct.len() != EXPECTED_QUEUES {
189            bail!(
190                "Expected {} queues, got {}",
191                EXPECTED_QUEUES,
192                queue_pct.len()
193            );
194        }
195
196        trace!(
197            "{} {:width$} {:5.1}% | L:{:4.1}% D:{:4.1}% hi:{:4.1}% lo:{:4.1}% | V:{:4.1}%",
198            prefix,
199            scope_queue_decisions,
200            share_of_global,
201            queue_pct[0],
202            queue_pct[1],
203            queue_pct[2],
204            queue_pct[3],
205            affinity_violations_percent,
206            width = decisions_format_width
207        );
208        Ok(())
209    }
210
211    // Queue stats for the whole node
212    fn log_global_queue_stats(
213        &self,
214        global_queue_decisions: u64,
215        cell_stats_delta: &[[u64; NR_CSTATS]; MAX_CELLS],
216    ) -> Result<()> {
217        // Get total of each queue summed over all cells
218        let mut queue_counts = [0; QUEUE_STATS_IDX.len()];
219        for cells in 0..MAX_CELLS {
220            for (i, stat) in QUEUE_STATS_IDX.iter().enumerate() {
221                queue_counts[i] += cell_stats_delta[cells][*stat as usize];
222            }
223        }
224
225        let prefix = "Total Decisions:";
226
227        // Here we want to sum the affinity violations over all cells.
228        let scope_affn_viols: u64 = cell_stats_delta
229            .iter()
230            .map(|&cell| cell[bpf_intf::cell_stat_idx_CSTAT_AFFN_VIOL as usize])
231            .sum::<u64>();
232
233        // Special case where the number of scope decisions == number global decisions
234        self.calculate_distribution_and_log(
235            &queue_counts,
236            global_queue_decisions,
237            global_queue_decisions,
238            scope_affn_viols,
239            &prefix,
240        )?;
241
242        Ok(())
243    }
244
245    // Print out the per-cell stats
246    fn log_cell_queue_stats(
247        &self,
248        global_queue_decisions: u64,
249        cell_stats_delta: &[[u64; NR_CSTATS]; MAX_CELLS],
250    ) -> Result<()> {
251        for cell in 0..MAX_CELLS {
252            let cell_queue_decisions = QUEUE_STATS_IDX
253                .iter()
254                .map(|&stat| cell_stats_delta[cell][stat as usize])
255                .sum::<u64>();
256
257            // FIXME: This should really query if the cell is enabled or not.
258            if cell_queue_decisions == 0 {
259                continue;
260            }
261
262            let mut queue_counts = [0; QUEUE_STATS_IDX.len()];
263            for (i, &stat) in QUEUE_STATS_IDX.iter().enumerate() {
264                queue_counts[i] = cell_stats_delta[cell][stat as usize];
265            }
266
267            const MIN_CELL_WIDTH: usize = 2;
268            let cell_width: usize = max(MIN_CELL_WIDTH, (MAX_CELLS as f64).log10().ceil() as usize);
269
270            let prefix = format!("        Cell {:width$}:", cell, width = cell_width);
271
272            // Sum affinity violations for this cell
273            let scope_affn_viols: u64 =
274                cell_stats_delta[cell][bpf_intf::cell_stat_idx_CSTAT_AFFN_VIOL as usize];
275
276            self.calculate_distribution_and_log(
277                &queue_counts,
278                global_queue_decisions,
279                cell_queue_decisions,
280                scope_affn_viols,
281                &prefix,
282            )?;
283        }
284        Ok(())
285    }
286
287    fn log_all_queue_stats(&self, cell_stats_delta: &[[u64; NR_CSTATS]; MAX_CELLS]) -> Result<()> {
288        // Get total decisions
289        let global_queue_decisions: u64 = cell_stats_delta
290            .iter()
291            .flat_map(|cell| QUEUE_STATS_IDX.iter().map(|&idx| cell[idx as usize]))
292            .sum();
293
294        // We don't want to divide by zero later, but this is never expected.
295        if global_queue_decisions == 0 {
296            return bail!("Error: No queueing decisions made globally");
297        }
298
299        self.log_global_queue_stats(global_queue_decisions, &cell_stats_delta)?;
300
301        self.log_cell_queue_stats(global_queue_decisions, &cell_stats_delta)?;
302
303        Ok(())
304    }
305
306    fn calculate_cell_stat_delta(&mut self) -> Result<[[u64; NR_CSTATS]; MAX_CELLS]> {
307        let mut cell_stats_delta = [[0 as u64; NR_CSTATS]; MAX_CELLS];
308
309        // Read CPU contexts
310        let cpu_ctxs = read_cpu_ctxs(&self.skel)?;
311
312        // Loop over cells and stats first, then CPU contexts
313        // TODO: We should loop over the in_use cells only.
314        for cell in 0..MAX_CELLS {
315            for stat in 0..NR_CSTATS {
316                let mut cur_cell_stat = 0;
317
318                // Accumulate stats from all CPUs
319                for cpu_ctx in cpu_ctxs.iter() {
320                    cur_cell_stat += cpu_ctx.cstats[cell][stat];
321                }
322
323                // Calculate delta and update previous stat
324                cell_stats_delta[cell][stat] = cur_cell_stat - self.prev_cell_stats[cell][stat];
325                self.prev_cell_stats[cell][stat] = cur_cell_stat;
326            }
327        }
328        Ok(cell_stats_delta)
329    }
330
331    /// Output various debugging data like per cell stats, per-cpu stats, etc.
332    fn debug(&mut self) -> Result<()> {
333        let cell_stats_delta = self.calculate_cell_stat_delta()?;
334
335        self.log_all_queue_stats(&cell_stats_delta)?;
336
337        for (cell_id, cell) in &self.cells {
338            trace!("CELL[{}]: {}", cell_id, cell.cpus);
339        }
340        Ok(())
341    }
342
343    fn refresh_bpf_cells(&mut self) -> Result<()> {
344        // collect all cpus per cell.
345        let mut cell_to_cpus: HashMap<u32, Cpumask> = HashMap::new();
346        let cpu_ctxs = read_cpu_ctxs(&self.skel)?;
347        for (i, cpu_ctx) in cpu_ctxs.iter().enumerate() {
348            cell_to_cpus
349                .entry(cpu_ctx.cell)
350                .and_modify(|mask| mask.set_cpu(i).expect("set cpu in existing mask"))
351                .or_insert_with(|| {
352                    let mut mask = Cpumask::new();
353                    mask.set_cpu(i).expect("set cpu in new mask");
354                    mask
355                });
356        }
357
358        // create cells we don't have yet, drop cells that are no longer in use.
359        let cells = &self.skel.maps.bss_data.cells;
360        for i in 0..MAX_CELLS {
361            let cell_idx = i as u32;
362            let bpf_cell = cells[i];
363            if bpf_cell.in_use > 0 {
364                self.cells.entry(cell_idx).or_insert(Cell {
365                    cpus: cell_to_cpus
366                        .get(&cell_idx)
367                        .expect("missing cell in cpu map")
368                        .clone(),
369                });
370            } else {
371                self.cells.remove(&cell_idx);
372            }
373        }
374
375        Ok(())
376    }
377}
378
379fn read_cpu_ctxs(skel: &BpfSkel) -> Result<Vec<bpf_intf::cpu_ctx>> {
380    let mut cpu_ctxs = vec![];
381    let cpu_ctxs_vec = skel
382        .maps
383        .cpu_ctxs
384        .lookup_percpu(&0u32.to_ne_bytes(), libbpf_rs::MapFlags::ANY)
385        .context("Failed to lookup cpu_ctx")?
386        .unwrap();
387    for cpu in 0..*NR_CPUS_POSSIBLE {
388        cpu_ctxs.push(*unsafe {
389            &*(cpu_ctxs_vec[cpu].as_slice().as_ptr() as *const bpf_intf::cpu_ctx)
390        });
391    }
392    Ok(cpu_ctxs)
393}
394
395fn main() -> Result<()> {
396    let opts = Opts::parse();
397
398    let llv = match opts.verbose {
399        0 => simplelog::LevelFilter::Info,
400        1 => simplelog::LevelFilter::Debug,
401        _ => simplelog::LevelFilter::Trace,
402    };
403    let mut lcfg = simplelog::ConfigBuilder::new();
404    lcfg.set_time_offset_to_local()
405        .expect("Failed to set local time offset")
406        .set_time_level(simplelog::LevelFilter::Error)
407        .set_location_level(simplelog::LevelFilter::Off)
408        .set_target_level(simplelog::LevelFilter::Off)
409        .set_thread_level(simplelog::LevelFilter::Off);
410    simplelog::TermLogger::init(
411        llv,
412        lcfg.build(),
413        simplelog::TerminalMode::Stderr,
414        simplelog::ColorChoice::Auto,
415    )?;
416
417    debug!("opts={:?}", &opts);
418
419    let shutdown = Arc::new(AtomicBool::new(false));
420    let shutdown_clone = shutdown.clone();
421    ctrlc::set_handler(move || {
422        shutdown_clone.store(true, Ordering::Relaxed);
423    })
424    .context("Error setting Ctrl-C handler")?;
425
426    let mut open_object = MaybeUninit::uninit();
427    loop {
428        let mut sched = Scheduler::init(&opts, &mut open_object)?;
429        if !sched.run(shutdown.clone())?.should_restart() {
430            break;
431        }
432    }
433
434    Ok(())
435}