1mod bpf_skel;
6pub use bpf_skel::*;
7pub mod bpf_intf;
8mod stats;
9
10use std::cmp::max;
11use std::collections::{HashMap, HashSet};
12use std::fmt;
13use std::fmt::Display;
14use std::mem::MaybeUninit;
15use std::sync::atomic::AtomicBool;
16use std::sync::atomic::Ordering;
17use std::sync::Arc;
18use std::time::Duration;
19
20use anyhow::bail;
21use anyhow::Context;
22use anyhow::Result;
23use clap::Parser;
24use crossbeam::channel::RecvTimeoutError;
25use libbpf_rs::MapCore as _;
26use libbpf_rs::OpenObject;
27use scx_stats::prelude::*;
28use scx_utils::build_id;
29use scx_utils::compat;
30use scx_utils::init_libbpf_logging;
31use scx_utils::libbpf_clap_opts::LibbpfOpts;
32use scx_utils::scx_enums;
33use scx_utils::scx_ops_attach;
34use scx_utils::scx_ops_load;
35use scx_utils::scx_ops_open;
36use scx_utils::uei_exited;
37use scx_utils::uei_report;
38use scx_utils::Cpumask;
39use scx_utils::Topology;
40use scx_utils::UserExitInfo;
41use scx_utils::NR_CPUS_POSSIBLE;
42use tracing::{debug, info, trace, warn};
43use tracing_subscriber::filter::EnvFilter;
44
45use stats::CellMetrics;
46use stats::Metrics;
47
48const SCHEDULER_NAME: &str = "scx_mitosis";
49const MAX_CELLS: usize = bpf_intf::consts_MAX_CELLS as usize;
50const NR_CSTATS: usize = bpf_intf::cell_stat_idx_NR_CSTATS as usize;
51
52#[derive(Debug, Parser)]
60struct Opts {
61 #[clap(short = 'v', long, action = clap::ArgAction::Count)]
63 verbose: u8,
64
65 #[clap(long, default_value = "info")]
68 log_level: String,
69
70 #[clap(long, default_value = "0")]
72 exit_dump_len: u32,
73
74 #[clap(long, default_value = "10")]
76 reconfiguration_interval_s: u64,
77
78 #[clap(long, default_value = "5")]
80 rebalance_cpus_interval_s: u64,
81
82 #[clap(long, default_value = "1")]
84 monitor_interval_s: u64,
85
86 #[clap(long)]
89 monitor: Option<f64>,
90
91 #[clap(short = 'V', long, action = clap::ArgAction::SetTrue)]
93 version: bool,
94
95 #[clap(long)]
97 run_id: Option<u64>,
98
99 #[clap(long, action = clap::ArgAction::SetTrue)]
102 debug_events: bool,
103
104 #[clap(long, default_value = "true", action = clap::ArgAction::Set)]
108 exiting_task_workaround: bool,
109
110 #[clap(long, action = clap::ArgAction::SetTrue)]
114 split_vtime_updates: bool,
115
116 #[clap(flatten, next_help_heading = "Libbpf Options")]
117 pub libbpf: LibbpfOpts,
118}
119
120const QUEUE_STATS_IDX: [bpf_intf::cell_stat_idx; 3] = [
125 bpf_intf::cell_stat_idx_CSTAT_LOCAL,
126 bpf_intf::cell_stat_idx_CSTAT_CPU_DSQ,
127 bpf_intf::cell_stat_idx_CSTAT_CELL_DSQ,
128];
129
130#[derive(Debug)]
132struct Cell {
133 cpus: Cpumask,
134}
135
136struct Scheduler<'a> {
137 skel: BpfSkel<'a>,
138 monitor_interval: Duration,
139 cells: HashMap<u32, Cell>,
140 prev_cell_stats: [[u64; NR_CSTATS]; MAX_CELLS],
143 metrics: Metrics,
144 stats_server: StatsServer<(), Metrics>,
145 last_configuration_seq: Option<u32>,
146}
147
148struct DistributionStats {
149 total_decisions: u64,
150 share_of_decisions_pct: f64,
151 local_q_pct: f64,
152 cpu_q_pct: f64,
153 cell_q_pct: f64,
154 affn_viol_pct: f64,
155
156 global_queue_decisions: u64,
158}
159
160impl Display for DistributionStats {
161 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
162 const MIN_DECISIONS_WIDTH: usize = 5;
166 let descisions_width = if self.global_queue_decisions > 0 {
167 max(
168 MIN_DECISIONS_WIDTH,
169 (self.global_queue_decisions as f64).log10().ceil() as usize,
170 )
171 } else {
172 MIN_DECISIONS_WIDTH
173 };
174 write!(
175 f,
176 "{:width$} {:5.1}% | Local:{:4.1}% From: CPU:{:4.1}% Cell:{:4.1}% | V:{:4.1}%",
177 self.total_decisions,
178 self.share_of_decisions_pct,
179 self.local_q_pct,
180 self.cpu_q_pct,
181 self.cell_q_pct,
182 self.affn_viol_pct,
183 width = descisions_width,
184 )
185 }
186}
187
188impl<'a> Scheduler<'a> {
189 fn init(opts: &Opts, open_object: &'a mut MaybeUninit<OpenObject>) -> Result<Self> {
190 let topology = Topology::new()?;
191
192 let mut skel_builder = BpfSkelBuilder::default();
193 skel_builder
194 .obj_builder
195 .debug(opts.log_level.contains("trace"));
196 init_libbpf_logging(None);
197 info!(
198 "Running scx_mitosis (build ID: {})",
199 build_id::full_version(env!("CARGO_PKG_VERSION"))
200 );
201
202 let open_opts = opts.libbpf.clone().into_bpf_open_opts();
203 let mut skel = scx_ops_open!(skel_builder, open_object, mitosis, open_opts)?;
204
205 skel.struct_ops.mitosis_mut().exit_dump_len = opts.exit_dump_len;
206
207 skel.maps.rodata_data.as_mut().unwrap().slice_ns = scx_enums.SCX_SLICE_DFL;
208 skel.maps.rodata_data.as_mut().unwrap().debug_events_enabled = opts.debug_events;
209 skel.maps
210 .rodata_data
211 .as_mut()
212 .unwrap()
213 .exiting_task_workaround_enabled = opts.exiting_task_workaround;
214 skel.maps.rodata_data.as_mut().unwrap().split_vtime_updates = opts.split_vtime_updates;
215
216 skel.maps.rodata_data.as_mut().unwrap().nr_possible_cpus = *NR_CPUS_POSSIBLE as u32;
217 for cpu in topology.all_cpus.keys() {
218 skel.maps.rodata_data.as_mut().unwrap().all_cpus[cpu / 8] |= 1 << (cpu % 8);
219 }
220
221 match *compat::SCX_OPS_ALLOW_QUEUED_WAKEUP {
222 0 => info!("Kernel does not support queued wakeup optimization."),
223 v => skel.struct_ops.mitosis_mut().flags |= v,
224 }
225
226 let skel = scx_ops_load!(skel, mitosis, uei)?;
227
228 let stats_server = StatsServer::new(stats::server_data()).launch()?;
229
230 Ok(Self {
231 skel,
232 monitor_interval: Duration::from_secs(opts.monitor_interval_s),
233 cells: HashMap::new(),
234 prev_cell_stats: [[0; NR_CSTATS]; MAX_CELLS],
235 metrics: Metrics::default(),
236 stats_server,
237 last_configuration_seq: None,
238 })
239 }
240
241 fn run(&mut self, shutdown: Arc<AtomicBool>) -> Result<UserExitInfo> {
242 let struct_ops = scx_ops_attach!(self.skel, mitosis)?;
243
244 info!("Mitosis Scheduler Attached. Run `scx_mitosis --monitor` for metrics.");
245
246 let (res_ch, req_ch) = self.stats_server.channels();
247
248 while !shutdown.load(Ordering::Relaxed) && !uei_exited!(&self.skel, uei) {
249 self.refresh_bpf_cells()?;
250 self.collect_metrics()?;
251
252 match req_ch.recv_timeout(self.monitor_interval) {
253 Ok(()) => res_ch.send(self.get_metrics())?,
254 Err(RecvTimeoutError::Timeout) => {}
255 Err(e) => Err(e)?,
256 }
257 }
258 drop(struct_ops);
259 info!("Unregister {SCHEDULER_NAME} scheduler");
260 uei_report!(&self.skel, uei)
261 }
262
263 fn get_metrics(&self) -> Metrics {
264 self.metrics.clone()
265 }
266
267 fn calculate_distribution_stats(
268 &self,
269 queue_counts: &[u64; QUEUE_STATS_IDX.len()],
270 global_queue_decisions: u64,
271 scope_queue_decisions: u64,
272 scope_affn_viols: u64,
273 ) -> Result<DistributionStats> {
274 let share_of_global =
277 100.0 * (scope_queue_decisions as f64) / (global_queue_decisions as f64);
278
279 let queue_pct = if scope_queue_decisions == 0 {
281 debug!("No queue decisions in scope, zeroing out queue distribution");
282 [0.0; QUEUE_STATS_IDX.len()]
283 } else {
284 core::array::from_fn(|i| {
285 100.0 * (queue_counts[i] as f64) / (scope_queue_decisions as f64)
286 })
287 };
288
289 let affinity_violations_percent = if scope_queue_decisions == 0 {
291 debug!("No queue decisions in scope, zeroing out affinity violations");
292 0.0
293 } else {
294 100.0 * (scope_affn_viols as f64) / (scope_queue_decisions as f64)
295 };
296
297 const EXPECTED_QUEUES: usize = 3;
298 if queue_pct.len() != EXPECTED_QUEUES {
299 bail!(
300 "Expected {} queues, got {}",
301 EXPECTED_QUEUES,
302 queue_pct.len()
303 );
304 }
305
306 return Ok(DistributionStats {
307 total_decisions: scope_queue_decisions,
308 share_of_decisions_pct: share_of_global,
309 local_q_pct: queue_pct[0],
310 cpu_q_pct: queue_pct[1],
311 cell_q_pct: queue_pct[2],
312 affn_viol_pct: affinity_violations_percent,
313 global_queue_decisions,
314 });
315 }
316
317 fn update_and_log_global_queue_stats(
319 &mut self,
320 global_queue_decisions: u64,
321 cell_stats_delta: &[[u64; NR_CSTATS]; MAX_CELLS],
322 ) -> Result<()> {
323 let mut queue_counts = [0; QUEUE_STATS_IDX.len()];
325 for cells in 0..MAX_CELLS {
326 for (i, stat) in QUEUE_STATS_IDX.iter().enumerate() {
327 queue_counts[i] += cell_stats_delta[cells][*stat as usize];
328 }
329 }
330
331 let prefix = "Total Decisions:";
332
333 let scope_affn_viols: u64 = cell_stats_delta
335 .iter()
336 .map(|&cell| cell[bpf_intf::cell_stat_idx_CSTAT_AFFN_VIOL as usize])
337 .sum::<u64>();
338
339 let stats = self.calculate_distribution_stats(
341 &queue_counts,
342 global_queue_decisions,
343 global_queue_decisions,
344 scope_affn_viols,
345 )?;
346
347 self.metrics.update(&stats);
348
349 trace!("{} {}", prefix, stats);
350
351 Ok(())
352 }
353
354 fn update_and_log_cell_queue_stats(
356 &mut self,
357 global_queue_decisions: u64,
358 cell_stats_delta: &[[u64; NR_CSTATS]; MAX_CELLS],
359 ) -> Result<()> {
360 for cell in 0..MAX_CELLS {
361 let cell_queue_decisions = QUEUE_STATS_IDX
362 .iter()
363 .map(|&stat| cell_stats_delta[cell][stat as usize])
364 .sum::<u64>();
365
366 if cell_queue_decisions == 0 {
368 continue;
369 }
370
371 let mut queue_counts = [0; QUEUE_STATS_IDX.len()];
372 for (i, &stat) in QUEUE_STATS_IDX.iter().enumerate() {
373 queue_counts[i] = cell_stats_delta[cell][stat as usize];
374 }
375
376 const MIN_CELL_WIDTH: usize = 2;
377 let cell_width: usize = max(MIN_CELL_WIDTH, (MAX_CELLS as f64).log10().ceil() as usize);
378
379 let prefix = format!(" Cell {:width$}:", cell, width = cell_width);
380
381 let scope_affn_viols: u64 =
383 cell_stats_delta[cell][bpf_intf::cell_stat_idx_CSTAT_AFFN_VIOL as usize];
384
385 let stats = self.calculate_distribution_stats(
386 &queue_counts,
387 global_queue_decisions,
388 cell_queue_decisions,
389 scope_affn_viols,
390 )?;
391
392 self.metrics
393 .cells
394 .entry(cell as u32)
395 .or_default()
396 .update(&stats);
397
398 trace!("{} {}", prefix, stats);
399 }
400 Ok(())
401 }
402
403 fn log_all_queue_stats(
404 &mut self,
405 cell_stats_delta: &[[u64; NR_CSTATS]; MAX_CELLS],
406 ) -> Result<()> {
407 let global_queue_decisions: u64 = cell_stats_delta
409 .iter()
410 .flat_map(|cell| QUEUE_STATS_IDX.iter().map(|&idx| cell[idx as usize]))
411 .sum();
412
413 if global_queue_decisions == 0 {
415 bail!("Error: No queueing decisions made globally");
416 }
417
418 self.update_and_log_global_queue_stats(global_queue_decisions, &cell_stats_delta)?;
419
420 self.update_and_log_cell_queue_stats(global_queue_decisions, &cell_stats_delta)?;
421
422 Ok(())
423 }
424
425 fn calculate_cell_stat_delta(&mut self) -> Result<[[u64; NR_CSTATS]; MAX_CELLS]> {
426 let mut cell_stats_delta = [[0 as u64; NR_CSTATS]; MAX_CELLS];
427
428 let cpu_ctxs = read_cpu_ctxs(&self.skel)?;
430
431 for cell in 0..MAX_CELLS {
434 for stat in 0..NR_CSTATS {
435 let mut cur_cell_stat = 0;
436
437 for cpu_ctx in cpu_ctxs.iter() {
439 cur_cell_stat += cpu_ctx.cstats[cell][stat];
440 }
441
442 cell_stats_delta[cell][stat] = cur_cell_stat - self.prev_cell_stats[cell][stat];
444 self.prev_cell_stats[cell][stat] = cur_cell_stat;
445 }
446 }
447 Ok(cell_stats_delta)
448 }
449
450 fn collect_metrics(&mut self) -> Result<()> {
452 let cell_stats_delta = self.calculate_cell_stat_delta()?;
453
454 self.log_all_queue_stats(&cell_stats_delta)?;
455
456 for (cell_id, cell) in &self.cells {
457 trace!("CELL[{}]: {}", cell_id, cell.cpus);
458 }
459
460 for (cell_id, cell) in self.cells.iter() {
461 self.metrics
463 .cells
464 .entry(*cell_id)
465 .and_modify(|cell_metrics| cell_metrics.num_cpus = cell.cpus.weight() as u32);
466 }
467 self.metrics.num_cells = self.cells.len() as u32;
468
469 Ok(())
470 }
471
472 fn refresh_bpf_cells(&mut self) -> Result<()> {
473 let applied_configuration = unsafe {
474 let ptr = &self
475 .skel
476 .maps
477 .bss_data
478 .as_ref()
479 .unwrap()
480 .applied_configuration_seq as *const u32;
481 (ptr as *const std::sync::atomic::AtomicU32)
482 .as_ref()
483 .unwrap()
484 .load(std::sync::atomic::Ordering::Acquire)
485 };
486 if self
487 .last_configuration_seq
488 .is_some_and(|seq| applied_configuration == seq)
489 {
490 return Ok(());
491 }
492 let mut cell_to_cpus: HashMap<u32, Cpumask> = HashMap::new();
494 let cpu_ctxs = read_cpu_ctxs(&self.skel)?;
495 for (i, cpu_ctx) in cpu_ctxs.iter().enumerate() {
496 cell_to_cpus
497 .entry(cpu_ctx.cell)
498 .or_insert_with(|| Cpumask::new())
499 .set_cpu(i)
500 .expect("set cpu in existing mask");
501 }
502
503 let cells_with_cpus: HashSet<u32> = cell_to_cpus.keys().copied().collect();
514 let mut active_cells = cells_with_cpus.clone();
515 active_cells.insert(0);
516
517 for cell_idx in &active_cells {
518 let cpus = cell_to_cpus
519 .get(cell_idx)
520 .cloned()
521 .unwrap_or_else(|| Cpumask::new());
522 self.cells
523 .entry(*cell_idx)
524 .or_insert_with(|| Cell {
525 cpus: Cpumask::new(),
526 })
527 .cpus = cpus;
528 self.metrics.cells.insert(*cell_idx, CellMetrics::default());
529 }
530
531 self.cells.retain(|&k, _| active_cells.contains(&k));
533 self.metrics.cells.retain(|&k, _| active_cells.contains(&k));
534
535 self.last_configuration_seq = Some(applied_configuration);
536
537 Ok(())
538 }
539}
540
541fn read_cpu_ctxs(skel: &BpfSkel) -> Result<Vec<bpf_intf::cpu_ctx>> {
542 let mut cpu_ctxs = vec![];
543 let cpu_ctxs_vec = skel
544 .maps
545 .cpu_ctxs
546 .lookup_percpu(&0u32.to_ne_bytes(), libbpf_rs::MapFlags::ANY)
547 .context("Failed to lookup cpu_ctx")?
548 .unwrap();
549 if cpu_ctxs_vec.len() < *NR_CPUS_POSSIBLE {
550 bail!(
551 "Percpu map returned {} entries but expected {}",
552 cpu_ctxs_vec.len(),
553 *NR_CPUS_POSSIBLE
554 );
555 }
556 for cpu in 0..*NR_CPUS_POSSIBLE {
557 cpu_ctxs.push(*unsafe {
558 &*(cpu_ctxs_vec[cpu].as_slice().as_ptr() as *const bpf_intf::cpu_ctx)
559 });
560 }
561 Ok(cpu_ctxs)
562}
563
564#[clap_main::clap_main]
565fn main(opts: Opts) -> Result<()> {
566 if opts.version {
567 println!(
568 "scx_mitosis {}",
569 build_id::full_version(env!("CARGO_PKG_VERSION"))
570 );
571 return Ok(());
572 }
573
574 let env_filter = EnvFilter::try_from_default_env()
575 .or_else(|_| match EnvFilter::try_new(&opts.log_level) {
576 Ok(filter) => Ok(filter),
577 Err(e) => {
578 eprintln!(
579 "invalid log envvar: {}, using info, err is: {}",
580 opts.log_level, e
581 );
582 EnvFilter::try_new("info")
583 }
584 })
585 .unwrap_or_else(|_| EnvFilter::new("info"));
586
587 match tracing_subscriber::fmt()
588 .with_env_filter(env_filter)
589 .with_target(true)
590 .with_thread_ids(true)
591 .with_file(true)
592 .with_line_number(true)
593 .try_init()
594 {
595 Ok(()) => {}
596 Err(e) => eprintln!("failed to init logger: {}", e),
597 }
598
599 if opts.verbose > 0 {
600 warn!("Setting verbose via -v is depricated and will be an error in future releases.");
601 }
602
603 debug!("opts={:?}", &opts);
604
605 if let Some(run_id) = opts.run_id {
606 info!("scx_mitosis run_id: {}", run_id);
607 }
608
609 let shutdown = Arc::new(AtomicBool::new(false));
610 let shutdown_clone = shutdown.clone();
611 ctrlc::set_handler(move || {
612 shutdown_clone.store(true, Ordering::Relaxed);
613 })
614 .context("Error setting Ctrl-C handler")?;
615
616 if let Some(intv) = opts.monitor {
617 let shutdown_clone = shutdown.clone();
618 let jh = std::thread::spawn(move || {
619 match stats::monitor(Duration::from_secs_f64(intv), shutdown_clone) {
620 Ok(_) => {
621 debug!("stats monitor thread finished successfully")
622 }
623 Err(error_object) => {
624 warn!(
625 "stats monitor thread finished because of an error {}",
626 error_object
627 )
628 }
629 }
630 });
631 if opts.monitor.is_some() {
632 let _ = jh.join();
633 return Ok(());
634 }
635 }
636
637 let mut open_object = MaybeUninit::uninit();
638 loop {
639 let mut sched = Scheduler::init(&opts, &mut open_object)?;
640 if !sched.run(shutdown.clone())?.should_restart() {
641 break;
642 }
643 }
644
645 Ok(())
646}