1mod bpf_skel;
6pub use bpf_skel::*;
7pub mod bpf_intf;
8mod mitosis_topology_utils;
9mod stats;
10
11use std::cmp::max;
12use std::collections::{HashMap, HashSet};
13use std::fmt;
14use std::fmt::Display;
15use std::mem::MaybeUninit;
16use std::sync::atomic::AtomicBool;
17use std::sync::atomic::Ordering;
18use std::sync::Arc;
19use std::time::Duration;
20
21use anyhow::bail;
22use anyhow::Context;
23use anyhow::Result;
24use clap::Parser;
25use crossbeam::channel::RecvTimeoutError;
26use libbpf_rs::MapCore as _;
27use libbpf_rs::OpenObject;
28use scx_stats::prelude::*;
29use scx_utils::build_id;
30use scx_utils::compat;
31use scx_utils::init_libbpf_logging;
32use scx_utils::libbpf_clap_opts::LibbpfOpts;
33use scx_utils::scx_enums;
34use scx_utils::scx_ops_attach;
35use scx_utils::scx_ops_load;
36use scx_utils::scx_ops_open;
37use scx_utils::uei_exited;
38use scx_utils::uei_report;
39use scx_utils::Cpumask;
40use scx_utils::Topology;
41use scx_utils::UserExitInfo;
42use scx_utils::NR_CPUS_POSSIBLE;
43use tracing::{debug, info, trace, warn};
44use tracing_subscriber::filter::EnvFilter;
45
46use stats::CellMetrics;
47use stats::Metrics;
48
49const SCHEDULER_NAME: &str = "scx_mitosis";
50const MAX_CELLS: usize = bpf_intf::consts_MAX_CELLS as usize;
51const NR_CSTATS: usize = bpf_intf::cell_stat_idx_NR_CSTATS as usize;
52
53#[derive(Debug, Parser)]
61struct Opts {
62 #[clap(short = 'v', long, action = clap::ArgAction::Count)]
64 verbose: u8,
65
66 #[clap(long, default_value = "info")]
69 log_level: String,
70
71 #[clap(long, default_value = "0")]
73 exit_dump_len: u32,
74
75 #[clap(long, default_value = "10")]
77 reconfiguration_interval_s: u64,
78
79 #[clap(long, default_value = "5")]
81 rebalance_cpus_interval_s: u64,
82
83 #[clap(long, default_value = "1")]
85 monitor_interval_s: u64,
86
87 #[clap(long)]
90 monitor: Option<f64>,
91
92 #[clap(short = 'V', long, action = clap::ArgAction::SetTrue)]
94 version: bool,
95
96 #[clap(long)]
98 run_id: Option<u64>,
99
100 #[clap(long, action = clap::ArgAction::SetTrue)]
103 debug_events: bool,
104
105 #[clap(long, default_value = "true", action = clap::ArgAction::Set)]
109 exiting_task_workaround: bool,
110
111 #[clap(long, action = clap::ArgAction::SetTrue)]
114 cpu_controller_disabled: bool,
115
116 #[clap(long, action = clap::ArgAction::SetTrue)]
119 reject_multicpu_pinning: bool,
120
121 #[clap(long, action = clap::ArgAction::SetTrue)]
124 enable_llc_awareness: bool,
125
126 #[clap(long, action = clap::ArgAction::SetTrue)]
128 enable_work_stealing: bool,
129
130 #[clap(flatten, next_help_heading = "Libbpf Options")]
131 pub libbpf: LibbpfOpts,
132}
133
134const QUEUE_STATS_IDX: [bpf_intf::cell_stat_idx; 3] = [
139 bpf_intf::cell_stat_idx_CSTAT_LOCAL,
140 bpf_intf::cell_stat_idx_CSTAT_CPU_DSQ,
141 bpf_intf::cell_stat_idx_CSTAT_CELL_DSQ,
142];
143
144#[derive(Debug)]
146struct Cell {
147 cpus: Cpumask,
148}
149
150struct Scheduler<'a> {
151 skel: BpfSkel<'a>,
152 monitor_interval: Duration,
153 cells: HashMap<u32, Cell>,
154 prev_cell_stats: [[u64; NR_CSTATS]; MAX_CELLS],
157 metrics: Metrics,
158 stats_server: StatsServer<(), Metrics>,
159 last_configuration_seq: Option<u32>,
160}
161
162struct DistributionStats {
163 total_decisions: u64,
164 share_of_decisions_pct: f64,
165 local_q_pct: f64,
166 cpu_q_pct: f64,
167 cell_q_pct: f64,
168 affn_viol_pct: f64,
169 steal_pct: f64,
170
171 global_queue_decisions: u64,
173}
174
175impl Display for DistributionStats {
176 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
177 const MIN_DECISIONS_WIDTH: usize = 5;
181 let descisions_width = if self.global_queue_decisions > 0 {
182 max(
183 MIN_DECISIONS_WIDTH,
184 (self.global_queue_decisions as f64).log10().ceil() as usize,
185 )
186 } else {
187 MIN_DECISIONS_WIDTH
188 };
189 write!(
190 f,
191 "{:width$} {:5.1}% | Local:{:4.1}% From: CPU:{:4.1}% Cell:{:4.1}% | V:{:4.1}% S:{:4.1}%",
192 self.total_decisions,
193 self.share_of_decisions_pct,
194 self.local_q_pct,
195 self.cpu_q_pct,
196 self.cell_q_pct,
197 self.affn_viol_pct,
198 self.steal_pct,
199 width = descisions_width,
200 )
201 }
202}
203
204impl<'a> Scheduler<'a> {
205 fn validate_args(opts: &Opts) -> Result<()> {
206 if opts.enable_work_stealing && !opts.enable_llc_awareness {
207 bail!("Work stealing requires LLC-aware mode (--enable-llc-awareness)");
208 }
209
210 Ok(())
211 }
212
213 fn init(opts: &Opts, open_object: &'a mut MaybeUninit<OpenObject>) -> Result<Self> {
214 Self::validate_args(opts)?;
215
216 let topology = Topology::new()?;
217
218 let nr_llc = topology.all_llcs.len().max(1);
219
220 let mut skel_builder = BpfSkelBuilder::default();
221 skel_builder
222 .obj_builder
223 .debug(opts.log_level.contains("trace"));
224 init_libbpf_logging(None);
225 info!(
226 "Running scx_mitosis (build ID: {})",
227 build_id::full_version(env!("CARGO_PKG_VERSION"))
228 );
229
230 let open_opts = opts.libbpf.clone().into_bpf_open_opts();
231 let mut skel = scx_ops_open!(skel_builder, open_object, mitosis, open_opts)?;
232
233 skel.struct_ops.mitosis_mut().exit_dump_len = opts.exit_dump_len;
234
235 let rodata = skel.maps.rodata_data.as_mut().unwrap();
236
237 rodata.slice_ns = scx_enums.SCX_SLICE_DFL;
238 rodata.debug_events_enabled = opts.debug_events;
239 rodata.exiting_task_workaround_enabled = opts.exiting_task_workaround;
240 rodata.cpu_controller_disabled = opts.cpu_controller_disabled;
241
242 rodata.nr_possible_cpus = *NR_CPUS_POSSIBLE as u32;
243 for cpu in topology.all_cpus.keys() {
244 rodata.all_cpus[cpu / 8] |= 1 << (cpu % 8);
245 }
246
247 rodata.reject_multicpu_pinning = opts.reject_multicpu_pinning;
248
249 rodata.nr_llc = nr_llc as u32;
251 rodata.enable_llc_awareness = opts.enable_llc_awareness;
252 rodata.enable_work_stealing = opts.enable_work_stealing;
253
254 match *compat::SCX_OPS_ALLOW_QUEUED_WAKEUP {
255 0 => info!("Kernel does not support queued wakeup optimization."),
256 v => skel.struct_ops.mitosis_mut().flags |= v,
257 }
258
259 mitosis_topology_utils::populate_topology_maps(
261 &mut skel,
262 mitosis_topology_utils::MapKind::CpuToLLC,
263 None,
264 )?;
265 mitosis_topology_utils::populate_topology_maps(
266 &mut skel,
267 mitosis_topology_utils::MapKind::LLCToCpus,
268 None,
269 )?;
270
271 let skel = scx_ops_load!(skel, mitosis, uei)?;
272
273 let stats_server = StatsServer::new(stats::server_data()).launch()?;
274
275 Ok(Self {
276 skel,
277 monitor_interval: Duration::from_secs(opts.monitor_interval_s),
278 cells: HashMap::new(),
279 prev_cell_stats: [[0; NR_CSTATS]; MAX_CELLS],
280 metrics: Metrics::default(),
281 stats_server,
282 last_configuration_seq: None,
283 })
284 }
285
286 fn run(&mut self, shutdown: Arc<AtomicBool>) -> Result<UserExitInfo> {
287 let struct_ops = scx_ops_attach!(self.skel, mitosis)?;
288
289 info!("Mitosis Scheduler Attached. Run `scx_mitosis --monitor` for metrics.");
290
291 let (res_ch, req_ch) = self.stats_server.channels();
292
293 while !shutdown.load(Ordering::Relaxed) && !uei_exited!(&self.skel, uei) {
294 self.refresh_bpf_cells()?;
295 self.collect_metrics()?;
296
297 match req_ch.recv_timeout(self.monitor_interval) {
298 Ok(()) => res_ch.send(self.get_metrics())?,
299 Err(RecvTimeoutError::Timeout) => {}
300 Err(e) => Err(e)?,
301 }
302 }
303 drop(struct_ops);
304 info!("Unregister {SCHEDULER_NAME} scheduler");
305 uei_report!(&self.skel, uei)
306 }
307
308 fn get_metrics(&self) -> Metrics {
309 self.metrics.clone()
310 }
311
312 fn calculate_distribution_stats(
313 &self,
314 queue_counts: &[u64; QUEUE_STATS_IDX.len()],
315 global_queue_decisions: u64,
316 scope_queue_decisions: u64,
317 scope_affn_viols: u64,
318 scope_steals: u64,
319 ) -> Result<DistributionStats> {
320 let share_of_global =
323 100.0 * (scope_queue_decisions as f64) / (global_queue_decisions as f64);
324
325 let queue_pct = if scope_queue_decisions == 0 {
327 debug!("No queue decisions in scope, zeroing out queue distribution");
328 [0.0; QUEUE_STATS_IDX.len()]
329 } else {
330 core::array::from_fn(|i| {
331 100.0 * (queue_counts[i] as f64) / (scope_queue_decisions as f64)
332 })
333 };
334
335 let affinity_violations_percent = if scope_queue_decisions == 0 {
337 debug!("No queue decisions in scope, zeroing out affinity violations");
338 0.0
339 } else {
340 100.0 * (scope_affn_viols as f64) / (scope_queue_decisions as f64)
341 };
342
343 let steal_pct = if scope_queue_decisions == 0 {
344 0.0
345 } else {
346 100.0 * (scope_steals as f64) / (scope_queue_decisions as f64)
347 };
348
349 const EXPECTED_QUEUES: usize = 3;
350 if queue_pct.len() != EXPECTED_QUEUES {
351 bail!(
352 "Expected {} queues, got {}",
353 EXPECTED_QUEUES,
354 queue_pct.len()
355 );
356 }
357
358 return Ok(DistributionStats {
359 total_decisions: scope_queue_decisions,
360 share_of_decisions_pct: share_of_global,
361 local_q_pct: queue_pct[0],
362 cpu_q_pct: queue_pct[1],
363 cell_q_pct: queue_pct[2],
364 affn_viol_pct: affinity_violations_percent,
365 steal_pct,
366 global_queue_decisions,
367 });
368 }
369
370 fn update_and_log_global_queue_stats(
372 &mut self,
373 global_queue_decisions: u64,
374 cell_stats_delta: &[[u64; NR_CSTATS]; MAX_CELLS],
375 ) -> Result<()> {
376 let mut queue_counts = [0; QUEUE_STATS_IDX.len()];
378 for cells in 0..MAX_CELLS {
379 for (i, stat) in QUEUE_STATS_IDX.iter().enumerate() {
380 queue_counts[i] += cell_stats_delta[cells][*stat as usize];
381 }
382 }
383
384 let prefix = "Total Decisions:";
385
386 let scope_affn_viols: u64 = cell_stats_delta
388 .iter()
389 .map(|&cell| cell[bpf_intf::cell_stat_idx_CSTAT_AFFN_VIOL as usize])
390 .sum::<u64>();
391
392 let scope_steals: u64 = cell_stats_delta
394 .iter()
395 .map(|&cell| cell[bpf_intf::cell_stat_idx_CSTAT_STEAL as usize])
396 .sum::<u64>();
397
398 let stats = self.calculate_distribution_stats(
400 &queue_counts,
401 global_queue_decisions,
402 global_queue_decisions,
403 scope_affn_viols,
404 scope_steals,
405 )?;
406
407 self.metrics.update(&stats);
408
409 trace!("{} {}", prefix, stats);
410
411 Ok(())
412 }
413
414 fn update_and_log_cell_queue_stats(
416 &mut self,
417 global_queue_decisions: u64,
418 cell_stats_delta: &[[u64; NR_CSTATS]; MAX_CELLS],
419 ) -> Result<()> {
420 for cell in 0..MAX_CELLS {
421 let cell_queue_decisions = QUEUE_STATS_IDX
422 .iter()
423 .map(|&stat| cell_stats_delta[cell][stat as usize])
424 .sum::<u64>();
425
426 if cell_queue_decisions == 0 {
428 continue;
429 }
430
431 let mut queue_counts = [0; QUEUE_STATS_IDX.len()];
432 for (i, &stat) in QUEUE_STATS_IDX.iter().enumerate() {
433 queue_counts[i] = cell_stats_delta[cell][stat as usize];
434 }
435
436 const MIN_CELL_WIDTH: usize = 2;
437 let cell_width: usize = max(MIN_CELL_WIDTH, (MAX_CELLS as f64).log10().ceil() as usize);
438
439 let prefix = format!(" Cell {:width$}:", cell, width = cell_width);
440
441 let scope_affn_viols: u64 =
443 cell_stats_delta[cell][bpf_intf::cell_stat_idx_CSTAT_AFFN_VIOL as usize];
444
445 let scope_steals: u64 =
447 cell_stats_delta[cell][bpf_intf::cell_stat_idx_CSTAT_STEAL as usize];
448
449 let stats = self.calculate_distribution_stats(
450 &queue_counts,
451 global_queue_decisions,
452 cell_queue_decisions,
453 scope_affn_viols,
454 scope_steals,
455 )?;
456
457 self.metrics
458 .cells
459 .entry(cell as u32)
460 .or_default()
461 .update(&stats);
462
463 trace!("{} {}", prefix, stats);
464 }
465 Ok(())
466 }
467
468 fn log_all_queue_stats(
469 &mut self,
470 cell_stats_delta: &[[u64; NR_CSTATS]; MAX_CELLS],
471 ) -> Result<()> {
472 let global_queue_decisions: u64 = cell_stats_delta
474 .iter()
475 .flat_map(|cell| QUEUE_STATS_IDX.iter().map(|&idx| cell[idx as usize]))
476 .sum();
477
478 if global_queue_decisions == 0 {
480 bail!("Error: No queueing decisions made globally");
481 }
482
483 self.update_and_log_global_queue_stats(global_queue_decisions, &cell_stats_delta)?;
484
485 self.update_and_log_cell_queue_stats(global_queue_decisions, &cell_stats_delta)?;
486
487 Ok(())
488 }
489
490 fn calculate_cell_stat_delta(&mut self) -> Result<[[u64; NR_CSTATS]; MAX_CELLS]> {
491 let mut cell_stats_delta = [[0 as u64; NR_CSTATS]; MAX_CELLS];
492
493 let cpu_ctxs = read_cpu_ctxs(&self.skel)?;
495
496 for cell in 0..MAX_CELLS {
499 for stat in 0..NR_CSTATS {
500 let mut cur_cell_stat = 0;
501
502 for cpu_ctx in cpu_ctxs.iter() {
504 cur_cell_stat += cpu_ctx.cstats[cell][stat];
505 }
506
507 cell_stats_delta[cell][stat] = cur_cell_stat - self.prev_cell_stats[cell][stat];
509 self.prev_cell_stats[cell][stat] = cur_cell_stat;
510 }
511 }
512 Ok(cell_stats_delta)
513 }
514
515 fn collect_metrics(&mut self) -> Result<()> {
517 let cell_stats_delta = self.calculate_cell_stat_delta()?;
518
519 self.log_all_queue_stats(&cell_stats_delta)?;
520
521 for (cell_id, cell) in &self.cells {
522 trace!("CELL[{}]: {}", cell_id, cell.cpus);
523 }
524
525 for (cell_id, cell) in self.cells.iter() {
526 self.metrics
528 .cells
529 .entry(*cell_id)
530 .and_modify(|cell_metrics| cell_metrics.num_cpus = cell.cpus.weight() as u32);
531 }
532 self.metrics.num_cells = self.cells.len() as u32;
533
534 Ok(())
535 }
536
537 fn refresh_bpf_cells(&mut self) -> Result<()> {
538 let applied_configuration = unsafe {
539 let ptr = &self
540 .skel
541 .maps
542 .bss_data
543 .as_ref()
544 .unwrap()
545 .applied_configuration_seq as *const u32;
546 (ptr as *const std::sync::atomic::AtomicU32)
547 .as_ref()
548 .unwrap()
549 .load(std::sync::atomic::Ordering::Acquire)
550 };
551 if self
552 .last_configuration_seq
553 .is_some_and(|seq| applied_configuration == seq)
554 {
555 return Ok(());
556 }
557 let mut cell_to_cpus: HashMap<u32, Cpumask> = HashMap::new();
559 let cpu_ctxs = read_cpu_ctxs(&self.skel)?;
560 for (i, cpu_ctx) in cpu_ctxs.iter().enumerate() {
561 cell_to_cpus
562 .entry(cpu_ctx.cell)
563 .or_insert_with(|| Cpumask::new())
564 .set_cpu(i)
565 .expect("set cpu in existing mask");
566 }
567
568 let cells_with_cpus: HashSet<u32> = cell_to_cpus.keys().copied().collect();
579 let mut active_cells = cells_with_cpus.clone();
580 active_cells.insert(0);
581
582 for cell_idx in &active_cells {
583 let cpus = cell_to_cpus
584 .get(cell_idx)
585 .cloned()
586 .unwrap_or_else(|| Cpumask::new());
587 self.cells
588 .entry(*cell_idx)
589 .or_insert_with(|| Cell {
590 cpus: Cpumask::new(),
591 })
592 .cpus = cpus;
593 self.metrics.cells.insert(*cell_idx, CellMetrics::default());
594 }
595
596 self.cells.retain(|&k, _| active_cells.contains(&k));
598 self.metrics.cells.retain(|&k, _| active_cells.contains(&k));
599
600 self.last_configuration_seq = Some(applied_configuration);
601
602 Ok(())
603 }
604}
605
606fn read_cpu_ctxs(skel: &BpfSkel) -> Result<Vec<bpf_intf::cpu_ctx>> {
607 let mut cpu_ctxs = vec![];
608 let cpu_ctxs_vec = skel
609 .maps
610 .cpu_ctxs
611 .lookup_percpu(&0u32.to_ne_bytes(), libbpf_rs::MapFlags::ANY)
612 .context("Failed to lookup cpu_ctx")?
613 .unwrap();
614 if cpu_ctxs_vec.len() < *NR_CPUS_POSSIBLE {
615 bail!(
616 "Percpu map returned {} entries but expected {}",
617 cpu_ctxs_vec.len(),
618 *NR_CPUS_POSSIBLE
619 );
620 }
621 for cpu in 0..*NR_CPUS_POSSIBLE {
622 cpu_ctxs.push(*unsafe {
623 &*(cpu_ctxs_vec[cpu].as_slice().as_ptr() as *const bpf_intf::cpu_ctx)
624 });
625 }
626 Ok(cpu_ctxs)
627}
628
629#[clap_main::clap_main]
630fn main(opts: Opts) -> Result<()> {
631 if opts.version {
632 println!(
633 "scx_mitosis {}",
634 build_id::full_version(env!("CARGO_PKG_VERSION"))
635 );
636 return Ok(());
637 }
638
639 let env_filter = EnvFilter::try_from_default_env()
640 .or_else(|_| match EnvFilter::try_new(&opts.log_level) {
641 Ok(filter) => Ok(filter),
642 Err(e) => {
643 eprintln!(
644 "invalid log envvar: {}, using info, err is: {}",
645 opts.log_level, e
646 );
647 EnvFilter::try_new("info")
648 }
649 })
650 .unwrap_or_else(|_| EnvFilter::new("info"));
651
652 match tracing_subscriber::fmt()
653 .with_env_filter(env_filter)
654 .with_target(true)
655 .with_thread_ids(true)
656 .with_file(true)
657 .with_line_number(true)
658 .try_init()
659 {
660 Ok(()) => {}
661 Err(e) => eprintln!("failed to init logger: {}", e),
662 }
663
664 if opts.verbose > 0 {
665 warn!("Setting verbose via -v is deprecated and will be an error in future releases.");
666 }
667
668 debug!("opts={:?}", &opts);
669
670 if let Some(run_id) = opts.run_id {
671 info!("scx_mitosis run_id: {}", run_id);
672 }
673
674 let shutdown = Arc::new(AtomicBool::new(false));
675 let shutdown_clone = shutdown.clone();
676 ctrlc::set_handler(move || {
677 shutdown_clone.store(true, Ordering::Relaxed);
678 })
679 .context("Error setting Ctrl-C handler")?;
680
681 if let Some(intv) = opts.monitor {
682 let shutdown_clone = shutdown.clone();
683 let jh = std::thread::spawn(move || {
684 match stats::monitor(Duration::from_secs_f64(intv), shutdown_clone) {
685 Ok(_) => {
686 debug!("stats monitor thread finished successfully")
687 }
688 Err(error_object) => {
689 warn!(
690 "stats monitor thread finished because of an error {}",
691 error_object
692 )
693 }
694 }
695 });
696 if opts.monitor.is_some() {
697 let _ = jh.join();
698 return Ok(());
699 }
700 }
701
702 let mut open_object = MaybeUninit::uninit();
703 loop {
704 let mut sched = Scheduler::init(&opts, &mut open_object)?;
705 if !sched.run(shutdown.clone())?.should_restart() {
706 break;
707 }
708 }
709
710 Ok(())
711}