1mod bpf_skel;
6pub use bpf_skel::*;
7pub mod bpf_intf;
8mod stats;
9
10use std::cmp::max;
11use std::collections::HashMap;
12use std::fmt;
13use std::fmt::Display;
14use std::mem::MaybeUninit;
15use std::sync::atomic::AtomicBool;
16use std::sync::atomic::Ordering;
17use std::sync::Arc;
18use std::time::Duration;
19
20use anyhow::bail;
21use anyhow::Context;
22use anyhow::Result;
23use clap::Parser;
24use crossbeam::channel::RecvTimeoutError;
25use libbpf_rs::MapCore as _;
26use libbpf_rs::OpenObject;
27use log::debug;
28use log::info;
29use log::trace;
30use log::warn;
31use scx_stats::prelude::*;
32use scx_utils::build_id;
33use scx_utils::compat;
34use scx_utils::init_libbpf_logging;
35use scx_utils::libbpf_clap_opts::LibbpfOpts;
36use scx_utils::scx_enums;
37use scx_utils::scx_ops_attach;
38use scx_utils::scx_ops_load;
39use scx_utils::scx_ops_open;
40use scx_utils::uei_exited;
41use scx_utils::uei_report;
42use scx_utils::Cpumask;
43use scx_utils::Topology;
44use scx_utils::UserExitInfo;
45use scx_utils::NR_CPUS_POSSIBLE;
46
47use stats::CellMetrics;
48use stats::Metrics;
49
50const SCHEDULER_NAME: &str = "scx_mitosis";
51const MAX_CELLS: usize = bpf_intf::consts_MAX_CELLS as usize;
52const NR_CSTATS: usize = bpf_intf::cell_stat_idx_NR_CSTATS as usize;
53
54#[derive(Debug, Parser)]
62struct Opts {
63 #[clap(short = 'v', long, action = clap::ArgAction::Count)]
66 verbose: u8,
67
68 #[clap(long, default_value = "0")]
70 exit_dump_len: u32,
71
72 #[clap(long, default_value = "10")]
74 reconfiguration_interval_s: u64,
75
76 #[clap(long, default_value = "5")]
78 rebalance_cpus_interval_s: u64,
79
80 #[clap(long, default_value = "1")]
82 monitor_interval_s: u64,
83
84 #[clap(long)]
87 monitor: Option<f64>,
88
89 #[clap(short = 'V', long, action = clap::ArgAction::SetTrue)]
91 version: bool,
92
93 #[clap(flatten, next_help_heading = "Libbpf Options")]
94 pub libbpf: LibbpfOpts,
95}
96
97const QUEUE_STATS_IDX: [bpf_intf::cell_stat_idx; 3] = [
102 bpf_intf::cell_stat_idx_CSTAT_LOCAL,
103 bpf_intf::cell_stat_idx_CSTAT_CPU_DSQ,
104 bpf_intf::cell_stat_idx_CSTAT_CELL_DSQ,
105];
106
107#[derive(Debug)]
109struct Cell {
110 cpus: Cpumask,
111}
112
113struct Scheduler<'a> {
114 skel: BpfSkel<'a>,
115 monitor_interval: Duration,
116 cells: HashMap<u32, Cell>,
117 prev_cell_stats: [[u64; NR_CSTATS]; MAX_CELLS],
120 metrics: Metrics,
121 stats_server: StatsServer<(), Metrics>,
122 last_configuration_seq: Option<u32>,
123}
124
125struct DistributionStats {
126 total_decisions: u64,
127 share_of_decisions_pct: f64,
128 local_q_pct: f64,
129 cpu_q_pct: f64,
130 cell_q_pct: f64,
131 affn_viol_pct: f64,
132
133 global_queue_decisions: u64,
135}
136
137impl Display for DistributionStats {
138 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
139 const MIN_DECISIONS_WIDTH: usize = 5;
143 let descisions_width = max(
144 MIN_DECISIONS_WIDTH,
145 (self.global_queue_decisions as f64).log10().ceil() as usize,
146 );
147 write!(
148 f,
149 "{:width$} {:5.1}% | Local:{:4.1}% From: CPU:{:4.1}% Cell:{:4.1}% | V:{:4.1}%",
150 self.total_decisions,
151 self.share_of_decisions_pct,
152 self.local_q_pct,
153 self.cpu_q_pct,
154 self.cell_q_pct,
155 self.affn_viol_pct,
156 width = descisions_width,
157 )
158 }
159}
160
161impl<'a> Scheduler<'a> {
162 fn init(opts: &Opts, open_object: &'a mut MaybeUninit<OpenObject>) -> Result<Self> {
163 let topology = Topology::new()?;
164
165 let mut skel_builder = BpfSkelBuilder::default();
166 skel_builder.obj_builder.debug(opts.verbose > 1);
167 init_libbpf_logging(None);
168 info!(
169 "Running scx_mitosis (build ID: {})",
170 build_id::full_version(env!("CARGO_PKG_VERSION"))
171 );
172
173 let open_opts = opts.libbpf.clone().into_bpf_open_opts();
174 let mut skel = scx_ops_open!(skel_builder, open_object, mitosis, open_opts)?;
175
176 skel.struct_ops.mitosis_mut().exit_dump_len = opts.exit_dump_len;
177
178 skel.maps.rodata_data.as_mut().unwrap().slice_ns = scx_enums.SCX_SLICE_DFL;
179
180 skel.maps.rodata_data.as_mut().unwrap().nr_possible_cpus = *NR_CPUS_POSSIBLE as u32;
181 for cpu in topology.all_cpus.keys() {
182 skel.maps.rodata_data.as_mut().unwrap().all_cpus[cpu / 8] |= 1 << (cpu % 8);
183 }
184
185 match *compat::SCX_OPS_ALLOW_QUEUED_WAKEUP {
186 0 => info!("Kernel does not support queued wakeup optimization."),
187 v => skel.struct_ops.mitosis_mut().flags |= v,
188 }
189
190 let skel = scx_ops_load!(skel, mitosis, uei)?;
191
192 let stats_server = StatsServer::new(stats::server_data()).launch()?;
193
194 Ok(Self {
195 skel,
196 monitor_interval: Duration::from_secs(opts.monitor_interval_s),
197 cells: HashMap::new(),
198 prev_cell_stats: [[0; NR_CSTATS]; MAX_CELLS],
199 metrics: Metrics::default(),
200 stats_server,
201 last_configuration_seq: None,
202 })
203 }
204
205 fn run(&mut self, shutdown: Arc<AtomicBool>) -> Result<UserExitInfo> {
206 let struct_ops = scx_ops_attach!(self.skel, mitosis)?;
207
208 info!("Mitosis Scheduler Attached. Run `scx_mitosis --monitor` for metrics.");
209
210 let (res_ch, req_ch) = self.stats_server.channels();
211
212 while !shutdown.load(Ordering::Relaxed) && !uei_exited!(&self.skel, uei) {
213 self.refresh_bpf_cells()?;
214 self.collect_metrics()?;
215
216 match req_ch.recv_timeout(self.monitor_interval) {
217 Ok(()) => res_ch.send(self.get_metrics())?,
218 Err(RecvTimeoutError::Timeout) => {}
219 Err(e) => Err(e)?,
220 }
221 }
222 drop(struct_ops);
223 info!("Unregister {SCHEDULER_NAME} scheduler");
224 uei_report!(&self.skel, uei)
225 }
226
227 fn get_metrics(&self) -> Metrics {
228 self.metrics.clone()
229 }
230
231 fn calculate_distribution_stats(
232 &self,
233 queue_counts: &[u64; QUEUE_STATS_IDX.len()],
234 global_queue_decisions: u64,
235 scope_queue_decisions: u64,
236 scope_affn_viols: u64,
237 ) -> Result<DistributionStats> {
238 let share_of_global =
241 100.0 * (scope_queue_decisions as f64) / (global_queue_decisions as f64);
242
243 let queue_pct = if scope_queue_decisions == 0 {
245 debug!("No queue decisions in scope, zeroing out queue distribution");
246 [0.0; QUEUE_STATS_IDX.len()]
247 } else {
248 core::array::from_fn(|i| {
249 100.0 * (queue_counts[i] as f64) / (scope_queue_decisions as f64)
250 })
251 };
252
253 let affinity_violations_percent = if scope_queue_decisions == 0 {
255 debug!("No queue decisions in scope, zeroing out affinity violations");
256 0.0
257 } else {
258 100.0 * (scope_affn_viols as f64) / (scope_queue_decisions as f64)
259 };
260
261 const EXPECTED_QUEUES: usize = 3;
262 if queue_pct.len() != EXPECTED_QUEUES {
263 bail!(
264 "Expected {} queues, got {}",
265 EXPECTED_QUEUES,
266 queue_pct.len()
267 );
268 }
269
270 return Ok(DistributionStats {
271 total_decisions: scope_queue_decisions,
272 share_of_decisions_pct: share_of_global,
273 local_q_pct: queue_pct[0],
274 cpu_q_pct: queue_pct[1],
275 cell_q_pct: queue_pct[2],
276 affn_viol_pct: affinity_violations_percent,
277 global_queue_decisions,
278 });
279 }
280
281 fn update_and_log_global_queue_stats(
283 &mut self,
284 global_queue_decisions: u64,
285 cell_stats_delta: &[[u64; NR_CSTATS]; MAX_CELLS],
286 ) -> Result<()> {
287 let mut queue_counts = [0; QUEUE_STATS_IDX.len()];
289 for cells in 0..MAX_CELLS {
290 for (i, stat) in QUEUE_STATS_IDX.iter().enumerate() {
291 queue_counts[i] += cell_stats_delta[cells][*stat as usize];
292 }
293 }
294
295 let prefix = "Total Decisions:";
296
297 let scope_affn_viols: u64 = cell_stats_delta
299 .iter()
300 .map(|&cell| cell[bpf_intf::cell_stat_idx_CSTAT_AFFN_VIOL as usize])
301 .sum::<u64>();
302
303 let stats = self.calculate_distribution_stats(
305 &queue_counts,
306 global_queue_decisions,
307 global_queue_decisions,
308 scope_affn_viols,
309 )?;
310
311 self.metrics.update(&stats);
312
313 trace!("{} {}", prefix, stats);
314
315 Ok(())
316 }
317
318 fn update_and_log_cell_queue_stats(
320 &mut self,
321 global_queue_decisions: u64,
322 cell_stats_delta: &[[u64; NR_CSTATS]; MAX_CELLS],
323 ) -> Result<()> {
324 for cell in 0..MAX_CELLS {
325 let cell_queue_decisions = QUEUE_STATS_IDX
326 .iter()
327 .map(|&stat| cell_stats_delta[cell][stat as usize])
328 .sum::<u64>();
329
330 if cell_queue_decisions == 0 {
332 continue;
333 }
334
335 let mut queue_counts = [0; QUEUE_STATS_IDX.len()];
336 for (i, &stat) in QUEUE_STATS_IDX.iter().enumerate() {
337 queue_counts[i] = cell_stats_delta[cell][stat as usize];
338 }
339
340 const MIN_CELL_WIDTH: usize = 2;
341 let cell_width: usize = max(MIN_CELL_WIDTH, (MAX_CELLS as f64).log10().ceil() as usize);
342
343 let prefix = format!(" Cell {:width$}:", cell, width = cell_width);
344
345 let scope_affn_viols: u64 =
347 cell_stats_delta[cell][bpf_intf::cell_stat_idx_CSTAT_AFFN_VIOL as usize];
348
349 let stats = self.calculate_distribution_stats(
350 &queue_counts,
351 global_queue_decisions,
352 cell_queue_decisions,
353 scope_affn_viols,
354 )?;
355
356 self.metrics
357 .cells
358 .entry(cell as u32)
359 .or_default()
360 .update(&stats);
361
362 trace!("{} {}", prefix, stats);
363 }
364 Ok(())
365 }
366
367 fn log_all_queue_stats(
368 &mut self,
369 cell_stats_delta: &[[u64; NR_CSTATS]; MAX_CELLS],
370 ) -> Result<()> {
371 let global_queue_decisions: u64 = cell_stats_delta
373 .iter()
374 .flat_map(|cell| QUEUE_STATS_IDX.iter().map(|&idx| cell[idx as usize]))
375 .sum();
376
377 if global_queue_decisions == 0 {
379 bail!("Error: No queueing decisions made globally");
380 }
381
382 self.update_and_log_global_queue_stats(global_queue_decisions, &cell_stats_delta)?;
383
384 self.update_and_log_cell_queue_stats(global_queue_decisions, &cell_stats_delta)?;
385
386 Ok(())
387 }
388
389 fn calculate_cell_stat_delta(&mut self) -> Result<[[u64; NR_CSTATS]; MAX_CELLS]> {
390 let mut cell_stats_delta = [[0 as u64; NR_CSTATS]; MAX_CELLS];
391
392 let cpu_ctxs = read_cpu_ctxs(&self.skel)?;
394
395 for cell in 0..MAX_CELLS {
398 for stat in 0..NR_CSTATS {
399 let mut cur_cell_stat = 0;
400
401 for cpu_ctx in cpu_ctxs.iter() {
403 cur_cell_stat += cpu_ctx.cstats[cell][stat];
404 }
405
406 cell_stats_delta[cell][stat] = cur_cell_stat - self.prev_cell_stats[cell][stat];
408 self.prev_cell_stats[cell][stat] = cur_cell_stat;
409 }
410 }
411 Ok(cell_stats_delta)
412 }
413
414 fn collect_metrics(&mut self) -> Result<()> {
416 let cell_stats_delta = self.calculate_cell_stat_delta()?;
417
418 self.log_all_queue_stats(&cell_stats_delta)?;
419
420 for (cell_id, cell) in &self.cells {
421 trace!("CELL[{}]: {}", cell_id, cell.cpus);
422 }
423
424 for (cell_id, cell) in self.cells.iter() {
425 self.metrics
427 .cells
428 .entry(*cell_id)
429 .and_modify(|cell_metrics| cell_metrics.num_cpus = cell.cpus.weight() as u32);
430 }
431 self.metrics.num_cells = self.cells.len() as u32;
432
433 Ok(())
434 }
435
436 fn refresh_bpf_cells(&mut self) -> Result<()> {
437 let applied_configuration = unsafe {
438 std::ptr::read_volatile(
439 &self
440 .skel
441 .maps
442 .bss_data
443 .as_ref()
444 .unwrap()
445 .applied_configuration_seq as *const u32,
446 )
447 };
448 if self
449 .last_configuration_seq
450 .is_some_and(|seq| applied_configuration == seq)
451 {
452 return Ok(());
453 }
454 let mut cell_to_cpus: HashMap<u32, Cpumask> = HashMap::new();
456 let cpu_ctxs = read_cpu_ctxs(&self.skel)?;
457 for (i, cpu_ctx) in cpu_ctxs.iter().enumerate() {
458 cell_to_cpus
459 .entry(cpu_ctx.cell)
460 .or_insert_with(|| Cpumask::new())
461 .set_cpu(i)
462 .expect("set cpu in existing mask");
463 }
464
465 let cells = &self.skel.maps.bss_data.as_ref().unwrap().cells;
469 for i in 0..MAX_CELLS {
470 let cell_idx = i as u32;
471 let bpf_cell = cells[i];
472 let in_use = unsafe { std::ptr::read_volatile(&bpf_cell.in_use as *const u32) };
473 if in_use > 0 {
474 self.cells
475 .entry(cell_idx)
476 .or_insert_with(|| Cell {
477 cpus: Cpumask::new(),
478 })
479 .cpus = cell_to_cpus
480 .get(&cell_idx)
481 .expect("missing cell in cpu map")
482 .clone();
483 self.metrics.cells.insert(cell_idx, CellMetrics::default());
484 } else {
485 self.cells.remove(&cell_idx);
486 self.metrics.cells.remove(&cell_idx);
487 }
488 }
489
490 self.last_configuration_seq = Some(applied_configuration);
491
492 Ok(())
493 }
494}
495
496fn read_cpu_ctxs(skel: &BpfSkel) -> Result<Vec<bpf_intf::cpu_ctx>> {
497 let mut cpu_ctxs = vec![];
498 let cpu_ctxs_vec = skel
499 .maps
500 .cpu_ctxs
501 .lookup_percpu(&0u32.to_ne_bytes(), libbpf_rs::MapFlags::ANY)
502 .context("Failed to lookup cpu_ctx")?
503 .unwrap();
504 for cpu in 0..*NR_CPUS_POSSIBLE {
505 cpu_ctxs.push(*unsafe {
506 &*(cpu_ctxs_vec[cpu].as_slice().as_ptr() as *const bpf_intf::cpu_ctx)
507 });
508 }
509 Ok(cpu_ctxs)
510}
511
512fn main() -> Result<()> {
513 let opts = Opts::parse();
514
515 if opts.version {
516 println!(
517 "scx_mitosis {}",
518 build_id::full_version(env!("CARGO_PKG_VERSION"))
519 );
520 return Ok(());
521 }
522
523 let llv = match opts.verbose {
524 0 => simplelog::LevelFilter::Info,
525 1 => simplelog::LevelFilter::Debug,
526 _ => simplelog::LevelFilter::Trace,
527 };
528 let mut lcfg = simplelog::ConfigBuilder::new();
529 lcfg.set_time_offset_to_local()
530 .expect("Failed to set local time offset")
531 .set_time_level(simplelog::LevelFilter::Error)
532 .set_location_level(simplelog::LevelFilter::Off)
533 .set_target_level(simplelog::LevelFilter::Off)
534 .set_thread_level(simplelog::LevelFilter::Off);
535 simplelog::TermLogger::init(
536 llv,
537 lcfg.build(),
538 simplelog::TerminalMode::Stderr,
539 simplelog::ColorChoice::Auto,
540 )?;
541
542 debug!("opts={:?}", &opts);
543
544 let shutdown = Arc::new(AtomicBool::new(false));
545 let shutdown_clone = shutdown.clone();
546 ctrlc::set_handler(move || {
547 shutdown_clone.store(true, Ordering::Relaxed);
548 })
549 .context("Error setting Ctrl-C handler")?;
550
551 if let Some(intv) = opts.monitor {
552 let shutdown_clone = shutdown.clone();
553 let jh = std::thread::spawn(move || {
554 match stats::monitor(Duration::from_secs_f64(intv), shutdown_clone) {
555 Ok(_) => {
556 debug!("stats monitor thread finished successfully")
557 }
558 Err(error_object) => {
559 warn!(
560 "stats monitor thread finished because of an error {}",
561 error_object
562 )
563 }
564 }
565 });
566 if opts.monitor.is_some() {
567 let _ = jh.join();
568 return Ok(());
569 }
570 }
571
572 let mut open_object = MaybeUninit::uninit();
573 loop {
574 let mut sched = Scheduler::init(&opts, &mut open_object)?;
575 if !sched.run(shutdown.clone())?.should_restart() {
576 break;
577 }
578 }
579
580 Ok(())
581}