1mod bpf_skel;
6pub use bpf_skel::*;
7pub mod bpf_intf;
8
9use std::cmp::max;
10use std::collections::HashMap;
11use std::mem::MaybeUninit;
12use std::sync::atomic::AtomicBool;
13use std::sync::atomic::Ordering;
14use std::sync::Arc;
15use std::time::Duration;
16
17use anyhow::bail;
18use anyhow::Context;
19use anyhow::Result;
20use clap::Parser;
21use libbpf_rs::MapCore as _;
22use libbpf_rs::OpenObject;
23use log::debug;
24use log::info;
25use log::trace;
26use scx_utils::init_libbpf_logging;
27use scx_utils::scx_enums;
28use scx_utils::scx_ops_attach;
29use scx_utils::scx_ops_load;
30use scx_utils::scx_ops_open;
31use scx_utils::uei_exited;
32use scx_utils::uei_report;
33use scx_utils::Cpumask;
34use scx_utils::Topology;
35use scx_utils::UserExitInfo;
36use scx_utils::NR_CPUS_POSSIBLE;
37use scx_utils::NR_CPU_IDS;
38
39const MAX_CELLS: usize = bpf_intf::consts_MAX_CELLS as usize;
40const NR_CSTATS: usize = bpf_intf::cell_stat_idx_NR_CSTATS as usize;
41
42#[derive(Debug, Parser)]
50struct Opts {
51 #[clap(short = 'v', long, action = clap::ArgAction::Count)]
54 verbose: u8,
55
56 #[clap(long, default_value = "0")]
58 exit_dump_len: u32,
59
60 #[clap(long, default_value = "10")]
62 reconfiguration_interval_s: u64,
63
64 #[clap(long, default_value = "5")]
66 rebalance_cpus_interval_s: u64,
67
68 #[clap(long, default_value = "1")]
70 monitor_interval_s: u64,
71}
72
73const QUEUE_STATS_IDX: [bpf_intf::cell_stat_idx; 4] = [
78 bpf_intf::cell_stat_idx_CSTAT_LOCAL,
79 bpf_intf::cell_stat_idx_CSTAT_DEFAULT_Q,
80 bpf_intf::cell_stat_idx_CSTAT_HI_FALLBACK_Q,
81 bpf_intf::cell_stat_idx_CSTAT_LO_FALLBACK_Q,
82];
83
84unsafe fn any_as_u8_slice<T: Sized>(p: &T) -> &[u8] {
85 unsafe {
86 ::std::slice::from_raw_parts((p as *const T) as *const u8, ::std::mem::size_of::<T>())
87 }
88}
89
90#[derive(Debug)]
92struct Cell {
93 cpus: Cpumask,
94}
95
96struct Scheduler<'a> {
97 skel: BpfSkel<'a>,
98 prev_percpu_cell_cycles: Vec<[u64; MAX_CELLS]>,
99 monitor_interval: Duration,
100 cells: HashMap<u32, Cell>,
101 prev_cell_stats: [[u64; NR_CSTATS]; MAX_CELLS],
104}
105
106impl<'a> Scheduler<'a> {
107 fn init(opts: &Opts, open_object: &'a mut MaybeUninit<OpenObject>) -> Result<Self> {
108 let topology = Topology::new()?;
109
110 let mut skel_builder = BpfSkelBuilder::default();
111 skel_builder.obj_builder.debug(opts.verbose > 1);
112 init_libbpf_logging(None);
113 let mut skel = scx_ops_open!(skel_builder, open_object, mitosis)?;
114
115 skel.struct_ops.mitosis_mut().exit_dump_len = opts.exit_dump_len;
116
117 skel.maps.rodata_data.slice_ns = scx_enums.SCX_SLICE_DFL;
118
119 skel.maps.rodata_data.nr_possible_cpus = *NR_CPUS_POSSIBLE as u32;
120 for cpu in topology.all_cores.keys() {
121 skel.maps.rodata_data.all_cpus[cpu / 8] |= 1 << (cpu % 8);
122 }
123
124 let skel = scx_ops_load!(skel, mitosis, uei)?;
125
126 Ok(Self {
127 skel,
128 prev_percpu_cell_cycles: vec![[0; MAX_CELLS]; *NR_CPU_IDS],
129 monitor_interval: Duration::from_secs(opts.monitor_interval_s),
130 cells: HashMap::new(),
131 prev_cell_stats: [[0; NR_CSTATS]; MAX_CELLS],
132 })
133 }
134
135 fn run(&mut self, shutdown: Arc<AtomicBool>) -> Result<UserExitInfo> {
136 let struct_ops = scx_ops_attach!(self.skel, mitosis)?;
137 info!("Mitosis Scheduler Attached");
138 while !shutdown.load(Ordering::Relaxed) && !uei_exited!(&self.skel, uei) {
139 std::thread::sleep(self.monitor_interval);
140 self.refresh_bpf_cells()?;
141 self.debug()?;
142 }
143 drop(struct_ops);
144 uei_report!(&self.skel, uei)
145 }
146
147 fn calculate_distribution_and_log(
148 &self,
149 queue_counts: &[u64; QUEUE_STATS_IDX.len()],
150 global_queue_decisions: u64,
151 scope_queue_decisions: u64,
152 scope_affn_viols: u64,
153 prefix: &str,
154 ) -> Result<()> {
155 let share_of_global =
158 100.0 * (scope_queue_decisions as f64) / (global_queue_decisions as f64);
159
160 let queue_pct = if scope_queue_decisions == 0 {
162 debug!("No queue decisions in scope, zeroing out queue distribution");
163 [0.0; QUEUE_STATS_IDX.len()]
164 } else {
165 core::array::from_fn(|i| {
166 100.0 * (queue_counts[i] as f64) / (scope_queue_decisions as f64)
167 })
168 };
169
170 let affinity_violations_percent = if scope_queue_decisions == 0 {
172 debug!("No queue decisions in scope, zeroing out affinity violations");
173 0.0
174 } else {
175 100.0 * (scope_affn_viols as f64) / (scope_queue_decisions as f64)
176 };
177
178 const MIN_DECISIONS_WIDTH: usize = 5;
182 let decisions_format_width: usize = max(
183 MIN_DECISIONS_WIDTH,
184 (global_queue_decisions as f64).log10().ceil() as usize,
185 );
186
187 const EXPECTED_QUEUES: usize = 4;
188 if queue_pct.len() != EXPECTED_QUEUES {
189 bail!(
190 "Expected {} queues, got {}",
191 EXPECTED_QUEUES,
192 queue_pct.len()
193 );
194 }
195
196 trace!(
197 "{} {:width$} {:5.1}% | L:{:4.1}% D:{:4.1}% hi:{:4.1}% lo:{:4.1}% | V:{:4.1}%",
198 prefix,
199 scope_queue_decisions,
200 share_of_global,
201 queue_pct[0],
202 queue_pct[1],
203 queue_pct[2],
204 queue_pct[3],
205 affinity_violations_percent,
206 width = decisions_format_width
207 );
208 Ok(())
209 }
210
211 fn log_global_queue_stats(
213 &self,
214 global_queue_decisions: u64,
215 cell_stats_delta: &[[u64; NR_CSTATS]; MAX_CELLS],
216 ) -> Result<()> {
217 let mut queue_counts = [0; QUEUE_STATS_IDX.len()];
219 for cells in 0..MAX_CELLS {
220 for (i, stat) in QUEUE_STATS_IDX.iter().enumerate() {
221 queue_counts[i] += cell_stats_delta[cells][*stat as usize];
222 }
223 }
224
225 let prefix = "Total Decisions:";
226
227 let scope_affn_viols: u64 = cell_stats_delta
229 .iter()
230 .map(|&cell| cell[bpf_intf::cell_stat_idx_CSTAT_AFFN_VIOL as usize])
231 .sum::<u64>();
232
233 self.calculate_distribution_and_log(
235 &queue_counts,
236 global_queue_decisions,
237 global_queue_decisions,
238 scope_affn_viols,
239 &prefix,
240 )?;
241
242 Ok(())
243 }
244
245 fn log_cell_queue_stats(
247 &self,
248 global_queue_decisions: u64,
249 cell_stats_delta: &[[u64; NR_CSTATS]; MAX_CELLS],
250 ) -> Result<()> {
251 for cell in 0..MAX_CELLS {
252 let cell_queue_decisions = QUEUE_STATS_IDX
253 .iter()
254 .map(|&stat| cell_stats_delta[cell][stat as usize])
255 .sum::<u64>();
256
257 if cell_queue_decisions == 0 {
259 continue;
260 }
261
262 let mut queue_counts = [0; QUEUE_STATS_IDX.len()];
263 for (i, &stat) in QUEUE_STATS_IDX.iter().enumerate() {
264 queue_counts[i] = cell_stats_delta[cell][stat as usize];
265 }
266
267 const MIN_CELL_WIDTH: usize = 2;
268 let cell_width: usize = max(MIN_CELL_WIDTH, (MAX_CELLS as f64).log10().ceil() as usize);
269
270 let prefix = format!(" Cell {:width$}:", cell, width = cell_width);
271
272 let scope_affn_viols: u64 =
274 cell_stats_delta[cell][bpf_intf::cell_stat_idx_CSTAT_AFFN_VIOL as usize];
275
276 self.calculate_distribution_and_log(
277 &queue_counts,
278 global_queue_decisions,
279 cell_queue_decisions,
280 scope_affn_viols,
281 &prefix,
282 )?;
283 }
284 Ok(())
285 }
286
287 fn log_all_queue_stats(&self, cell_stats_delta: &[[u64; NR_CSTATS]; MAX_CELLS]) -> Result<()> {
288 let global_queue_decisions: u64 = cell_stats_delta
290 .iter()
291 .flat_map(|cell| QUEUE_STATS_IDX.iter().map(|&idx| cell[idx as usize]))
292 .sum();
293
294 if global_queue_decisions == 0 {
296 return bail!("Error: No queueing decisions made globally");
297 }
298
299 self.log_global_queue_stats(global_queue_decisions, &cell_stats_delta)?;
300
301 self.log_cell_queue_stats(global_queue_decisions, &cell_stats_delta)?;
302
303 Ok(())
304 }
305
306 fn calculate_cell_stat_delta(&mut self) -> Result<[[u64; NR_CSTATS]; MAX_CELLS]> {
307 let mut cell_stats_delta = [[0 as u64; NR_CSTATS]; MAX_CELLS];
308
309 let cpu_ctxs = read_cpu_ctxs(&self.skel)?;
311
312 for cell in 0..MAX_CELLS {
315 for stat in 0..NR_CSTATS {
316 let mut cur_cell_stat = 0;
317
318 for cpu_ctx in cpu_ctxs.iter() {
320 cur_cell_stat += cpu_ctx.cstats[cell][stat];
321 }
322
323 cell_stats_delta[cell][stat] = cur_cell_stat - self.prev_cell_stats[cell][stat];
325 self.prev_cell_stats[cell][stat] = cur_cell_stat;
326 }
327 }
328 Ok(cell_stats_delta)
329 }
330
331 fn debug(&mut self) -> Result<()> {
333 let cell_stats_delta = self.calculate_cell_stat_delta()?;
334
335 self.log_all_queue_stats(&cell_stats_delta)?;
336
337 for (cell_id, cell) in &self.cells {
338 trace!("CELL[{}]: {}", cell_id, cell.cpus);
339 }
340 Ok(())
341 }
342
343 fn refresh_bpf_cells(&mut self) -> Result<()> {
344 let mut cell_to_cpus: HashMap<u32, Cpumask> = HashMap::new();
346 let cpu_ctxs = read_cpu_ctxs(&self.skel)?;
347 for (i, cpu_ctx) in cpu_ctxs.iter().enumerate() {
348 cell_to_cpus
349 .entry(cpu_ctx.cell)
350 .and_modify(|mask| mask.set_cpu(i).expect("set cpu in existing mask"))
351 .or_insert_with(|| {
352 let mut mask = Cpumask::new();
353 mask.set_cpu(i).expect("set cpu in new mask");
354 mask
355 });
356 }
357
358 let cells = &self.skel.maps.bss_data.cells;
360 for i in 0..MAX_CELLS {
361 let cell_idx = i as u32;
362 let bpf_cell = cells[i];
363 if bpf_cell.in_use > 0 {
364 self.cells.entry(cell_idx).or_insert(Cell {
365 cpus: cell_to_cpus
366 .get(&cell_idx)
367 .expect("missing cell in cpu map")
368 .clone(),
369 });
370 } else {
371 self.cells.remove(&cell_idx);
372 }
373 }
374
375 Ok(())
376 }
377}
378
379fn read_cpu_ctxs(skel: &BpfSkel) -> Result<Vec<bpf_intf::cpu_ctx>> {
380 let mut cpu_ctxs = vec![];
381 let cpu_ctxs_vec = skel
382 .maps
383 .cpu_ctxs
384 .lookup_percpu(&0u32.to_ne_bytes(), libbpf_rs::MapFlags::ANY)
385 .context("Failed to lookup cpu_ctx")?
386 .unwrap();
387 for cpu in 0..*NR_CPUS_POSSIBLE {
388 cpu_ctxs.push(*unsafe {
389 &*(cpu_ctxs_vec[cpu].as_slice().as_ptr() as *const bpf_intf::cpu_ctx)
390 });
391 }
392 Ok(cpu_ctxs)
393}
394
395fn main() -> Result<()> {
396 let opts = Opts::parse();
397
398 let llv = match opts.verbose {
399 0 => simplelog::LevelFilter::Info,
400 1 => simplelog::LevelFilter::Debug,
401 _ => simplelog::LevelFilter::Trace,
402 };
403 let mut lcfg = simplelog::ConfigBuilder::new();
404 lcfg.set_time_offset_to_local()
405 .expect("Failed to set local time offset")
406 .set_time_level(simplelog::LevelFilter::Error)
407 .set_location_level(simplelog::LevelFilter::Off)
408 .set_target_level(simplelog::LevelFilter::Off)
409 .set_thread_level(simplelog::LevelFilter::Off);
410 simplelog::TermLogger::init(
411 llv,
412 lcfg.build(),
413 simplelog::TerminalMode::Stderr,
414 simplelog::ColorChoice::Auto,
415 )?;
416
417 debug!("opts={:?}", &opts);
418
419 let shutdown = Arc::new(AtomicBool::new(false));
420 let shutdown_clone = shutdown.clone();
421 ctrlc::set_handler(move || {
422 shutdown_clone.store(true, Ordering::Relaxed);
423 })
424 .context("Error setting Ctrl-C handler")?;
425
426 let mut open_object = MaybeUninit::uninit();
427 loop {
428 let mut sched = Scheduler::init(&opts, &mut open_object)?;
429 if !sched.run(shutdown.clone())?.should_restart() {
430 break;
431 }
432 }
433
434 Ok(())
435}