1mod bpf_skel;
6pub use bpf_skel::*;
7pub mod bpf_intf;
8mod cell_manager;
9mod mitosis_topology_utils;
10mod stats;
11
12use cell_manager::{CellManager, CpuAssignment};
13
14use std::cmp::max;
15use std::collections::{HashMap, HashSet};
16use std::fmt;
17use std::fmt::Display;
18use std::mem::MaybeUninit;
19use std::os::fd::AsFd;
20use std::sync::atomic::AtomicBool;
21use std::sync::atomic::AtomicU32;
22use std::sync::atomic::Ordering;
23use std::sync::Arc;
24use std::time::Duration;
25use std::time::Instant;
26
27use anyhow::bail;
28use anyhow::Context;
29use anyhow::Result;
30use clap::Parser;
31use libbpf_rs::MapCore as _;
32use libbpf_rs::OpenObject;
33use libbpf_rs::ProgramInput;
34use nix::sys::epoll::{Epoll, EpollCreateFlags, EpollEvent, EpollFlags, EpollTimeout};
35use nix::sys::eventfd::EventFd;
36use scx_stats::prelude::*;
37use scx_utils::build_id;
38use scx_utils::compat;
39use scx_utils::init_libbpf_logging;
40use scx_utils::libbpf_clap_opts::LibbpfOpts;
41use scx_utils::scx_enums;
42use scx_utils::scx_ops_attach;
43use scx_utils::scx_ops_load;
44use scx_utils::scx_ops_open;
45use scx_utils::uei_exited;
46use scx_utils::uei_report;
47use scx_utils::Cpumask;
48use scx_utils::Topology;
49use scx_utils::UserExitInfo;
50use scx_utils::NR_CPUS_POSSIBLE;
51use tracing::{debug, info, trace, warn};
52use tracing_subscriber::filter::EnvFilter;
53
54use stats::CellMetrics;
55use stats::Metrics;
56
57const SCHEDULER_NAME: &str = "scx_mitosis";
58const MAX_CELLS: usize = bpf_intf::consts_MAX_CELLS as usize;
59const NR_CSTATS: usize = bpf_intf::cell_stat_idx_NR_CSTATS as usize;
60const INOTIFY_TOKEN: u64 = 1;
62const STATS_TOKEN: u64 = 2;
64
65fn parse_ewma_factor(s: &str) -> Result<f64, String> {
66 let v: f64 = s.parse().map_err(|e| format!("{e}"))?;
67 if !(0.0..=1.0).contains(&v) {
68 return Err(format!("value {v} not in range 0.0..=1.0"));
69 }
70 Ok(v)
71}
72
73#[derive(Debug, Parser)]
81struct Opts {
82 #[clap(short = 'v', long, action = clap::ArgAction::Count)]
84 verbose: u8,
85
86 #[clap(long, default_value = "info")]
89 log_level: String,
90
91 #[clap(long, default_value = "0")]
93 exit_dump_len: u32,
94
95 #[clap(long, default_value = "10")]
97 reconfiguration_interval_s: u64,
98
99 #[clap(long, default_value = "5")]
101 rebalance_cpus_interval_s: u64,
102
103 #[clap(long, default_value = "1")]
105 monitor_interval_s: u64,
106
107 #[clap(long)]
110 monitor: Option<f64>,
111
112 #[clap(short = 'V', long, action = clap::ArgAction::SetTrue)]
114 version: bool,
115
116 #[clap(long)]
118 run_id: Option<u64>,
119
120 #[clap(long, action = clap::ArgAction::SetTrue)]
123 debug_events: bool,
124
125 #[clap(long, default_value = "true", action = clap::ArgAction::Set)]
129 exiting_task_workaround: bool,
130
131 #[clap(long, action = clap::ArgAction::SetTrue)]
134 cpu_controller_disabled: bool,
135
136 #[clap(long, action = clap::ArgAction::SetTrue)]
139 reject_multicpu_pinning: bool,
140
141 #[clap(long, action = clap::ArgAction::SetTrue)]
144 enable_llc_awareness: bool,
145
146 #[clap(long, action = clap::ArgAction::SetTrue)]
148 enable_work_stealing: bool,
149
150 #[clap(long)]
154 cell_parent_cgroup: Option<String>,
155
156 #[clap(long)]
161 cell_exclude: Vec<String>,
162
163 #[clap(long, action = clap::ArgAction::SetTrue)]
166 enable_borrowing: bool,
167
168 #[clap(long, action = clap::ArgAction::SetTrue)]
170 use_lockless_peek: bool,
171
172 #[clap(long, action = clap::ArgAction::SetTrue)]
174 enable_rebalancing: bool,
175
176 #[clap(long, default_value = "20.0")]
178 rebalance_threshold: f64,
179
180 #[clap(long, default_value = "5")]
182 rebalance_cooldown_s: u64,
183
184 #[clap(long, default_value = "0.3", value_parser = parse_ewma_factor)]
186 demand_smoothing: f64,
187
188 #[clap(long, action = clap::ArgAction::SetTrue)]
192 dynamic_affinity_cpu_selection: bool,
193
194 #[clap(long, action = clap::ArgAction::SetTrue)]
197 enable_slice_shrinking: bool,
198
199 #[clap(long, default_value = "4000")]
202 slice_shrink_max_us: u64,
203
204 #[clap(long, default_value = "500")]
208 slice_shrink_min_us: u64,
209
210 #[clap(flatten, next_help_heading = "Libbpf Options")]
211 pub libbpf: LibbpfOpts,
212}
213
214const QUEUE_STATS_IDX: [bpf_intf::cell_stat_idx; 4] = [
219 bpf_intf::cell_stat_idx_CSTAT_LOCAL,
220 bpf_intf::cell_stat_idx_CSTAT_CPU_DSQ,
221 bpf_intf::cell_stat_idx_CSTAT_CELL_DSQ,
222 bpf_intf::cell_stat_idx_CSTAT_BORROWED,
223];
224
225#[derive(Debug)]
227struct Cell {
228 cpus: Cpumask,
229}
230
231struct Scheduler<'a> {
232 skel: BpfSkel<'a>,
233 monitor_interval: Duration,
234 cells: HashMap<u32, Cell>,
235 prev_cell_stats: [[u64; NR_CSTATS]; MAX_CELLS],
238 prev_cell_running_ns: [u64; MAX_CELLS],
240 prev_cell_own_ns: [u64; MAX_CELLS],
241 prev_cell_lent_ns: [u64; MAX_CELLS],
242 metrics: Metrics,
243 stats_server: Option<StatsServer<(), Metrics>>,
244 last_configuration_seq: Option<u32>,
245 last_cpuset_seq: u32,
247 cell_manager: Option<CellManager>,
249 enable_borrowing: bool,
251 enable_rebalancing: bool,
253 rebalance_threshold: f64,
255 rebalance_cooldown: Duration,
257 demand_smoothing: f64,
259 smoothed_util: [f64; MAX_CELLS],
261 last_rebalance: Instant,
263 rebalance_count: u64,
265 epoll: Epoll,
267 stats_waker: EventFd,
269}
270
271struct DistributionStats {
272 total_decisions: u64,
273 share_of_decisions_pct: f64,
274 local_q_pct: f64,
275 cpu_q_pct: f64,
276 cell_q_pct: f64,
277 borrowed_pct: f64,
278 affn_viol_pct: f64,
279 steal_pct: f64,
280 pin_skip_pct: f64,
281
282 global_queue_decisions: u64,
284}
285
286impl Display for DistributionStats {
287 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
288 const MIN_DECISIONS_WIDTH: usize = 5;
292 let descisions_width = if self.global_queue_decisions > 0 {
293 max(
294 MIN_DECISIONS_WIDTH,
295 (self.global_queue_decisions as f64).log10().ceil() as usize,
296 )
297 } else {
298 MIN_DECISIONS_WIDTH
299 };
300 write!(
301 f,
302 "{:width$} {:5.1}% | Local:{:4.1}% From: CPU:{:4.1}% Cell:{:4.1}% Borrow:{:4.1}% | V:{:4.1}% S:{:4.1}% PS:{:4.1}%",
303 self.total_decisions,
304 self.share_of_decisions_pct,
305 self.local_q_pct,
306 self.cpu_q_pct,
307 self.cell_q_pct,
308 self.borrowed_pct,
309 self.affn_viol_pct,
310 self.steal_pct,
311 self.pin_skip_pct,
312 width = descisions_width,
313 )
314 }
315}
316
317impl<'a> Scheduler<'a> {
318 fn validate_args(opts: &Opts) -> Result<()> {
319 if opts.enable_work_stealing && !opts.enable_llc_awareness {
320 bail!("Work stealing requires LLC-aware mode (--enable-llc-awareness)");
321 }
322
323 Ok(())
324 }
325
326 fn init(opts: &Opts, open_object: &'a mut MaybeUninit<OpenObject>) -> Result<Self> {
327 Self::validate_args(opts)?;
328
329 let topology = Topology::new()?;
330
331 let nr_llc = topology.all_llcs.len().max(1);
332
333 let mut skel_builder = BpfSkelBuilder::default();
334 skel_builder
335 .obj_builder
336 .debug(opts.log_level.contains("trace"));
337 init_libbpf_logging(None);
338 info!(
339 "Running scx_mitosis (build ID: {})",
340 build_id::full_version(env!("CARGO_PKG_VERSION"))
341 );
342
343 let open_opts = opts.libbpf.clone().into_bpf_open_opts();
344 let mut skel = scx_ops_open!(skel_builder, open_object, mitosis, open_opts)?;
345
346 skel.struct_ops.mitosis_mut().exit_dump_len = opts.exit_dump_len;
347
348 let rodata = skel.maps.rodata_data.as_mut().unwrap();
349
350 rodata.slice_ns = scx_enums.SCX_SLICE_DFL;
351 rodata.debug_events_enabled = opts.debug_events;
352 rodata.exiting_task_workaround_enabled = opts.exiting_task_workaround;
353 rodata.cpu_controller_disabled = opts.cpu_controller_disabled;
354 rodata.dynamic_affinity_cpu_selection = opts.dynamic_affinity_cpu_selection;
355
356 if opts.slice_shrink_min_us >= opts.slice_shrink_max_us {
358 bail!(
359 "--slice-shrink-min-us ({}) must be less than --slice-shrink-max-us ({})",
360 opts.slice_shrink_min_us,
361 opts.slice_shrink_max_us
362 );
363 }
364 rodata.enable_slice_shrinking = opts.enable_slice_shrinking;
365 rodata.slice_shrink_max_ns = opts.slice_shrink_max_us * 1_000;
366 rodata.slice_shrink_multiplier = 2;
368 rodata.slice_shrink_min_ns = opts.slice_shrink_min_us * 1_000;
369
370 rodata.nr_possible_cpus = *NR_CPUS_POSSIBLE as u32;
371 for cpu in topology.all_cpus.keys() {
372 rodata.all_cpus[cpu / 8] |= 1 << (cpu % 8);
373 }
374
375 rodata.reject_multicpu_pinning = opts.reject_multicpu_pinning;
376
377 rodata.nr_llc = nr_llc as u32;
379 rodata.enable_llc_awareness = opts.enable_llc_awareness;
380 rodata.enable_work_stealing = opts.enable_work_stealing;
381
382 rodata.userspace_managed_cell_mode = opts.cell_parent_cgroup.is_some();
383
384 rodata.enable_borrowing = opts.enable_borrowing;
385 rodata.use_lockless_peek = opts.use_lockless_peek;
386
387 match *compat::SCX_OPS_ALLOW_QUEUED_WAKEUP {
388 0 => info!("Kernel does not support queued wakeup optimization."),
389 v => skel.struct_ops.mitosis_mut().flags |= v,
390 }
391
392 mitosis_topology_utils::populate_topology_maps(
394 &mut skel,
395 mitosis_topology_utils::MapKind::CpuToLLC,
396 None,
397 )?;
398 mitosis_topology_utils::populate_topology_maps(
399 &mut skel,
400 mitosis_topology_utils::MapKind::LLCToCpus,
401 None,
402 )?;
403
404 let skel = scx_ops_load!(skel, mitosis, uei)?;
405
406 let stats_server = StatsServer::new(stats::server_data()).launch()?;
407
408 if !opts.cell_exclude.is_empty() && opts.cell_parent_cgroup.is_none() {
410 bail!("--cell-exclude requires --cell-parent-cgroup");
411 }
412 let cell_manager = if let Some(ref parent_cgroup) = opts.cell_parent_cgroup {
413 let exclude: HashSet<String> = opts.cell_exclude.iter().cloned().collect();
414 Some(CellManager::new(
415 parent_cgroup,
416 MAX_CELLS as u32,
417 topology.span.clone(),
418 exclude,
419 )?)
420 } else {
421 None
422 };
423
424 let epoll = Epoll::new(EpollCreateFlags::empty())?;
426
427 let stats_waker = EventFd::from_value_and_flags(
429 0,
430 nix::sys::eventfd::EfdFlags::EFD_NONBLOCK | nix::sys::eventfd::EfdFlags::EFD_SEMAPHORE,
431 )?;
432
433 epoll.add(
435 &stats_waker,
436 EpollEvent::new(EpollFlags::EPOLLIN, STATS_TOKEN),
437 )?;
438
439 if let Some(ref cell_manager) = cell_manager {
441 epoll.add(
442 cell_manager,
443 EpollEvent::new(EpollFlags::EPOLLIN, INOTIFY_TOKEN),
444 )?;
445 }
446
447 Ok(Self {
448 skel,
449 monitor_interval: Duration::from_secs(opts.monitor_interval_s),
450 cells: HashMap::new(),
451 prev_cell_stats: [[0; NR_CSTATS]; MAX_CELLS],
452 prev_cell_running_ns: [0; MAX_CELLS],
453 prev_cell_own_ns: [0; MAX_CELLS],
454 prev_cell_lent_ns: [0; MAX_CELLS],
455 metrics: Metrics::default(),
456 stats_server: Some(stats_server),
457 last_configuration_seq: None,
458 last_cpuset_seq: 0,
459 cell_manager,
460 enable_borrowing: opts.enable_borrowing,
461 enable_rebalancing: opts.enable_rebalancing,
462 rebalance_threshold: opts.rebalance_threshold,
463 rebalance_cooldown: Duration::from_secs(opts.rebalance_cooldown_s),
464 demand_smoothing: opts.demand_smoothing,
465 smoothed_util: [0.0; MAX_CELLS],
466 last_rebalance: Instant::now(),
467 rebalance_count: 0,
468 epoll,
469 stats_waker,
470 })
471 }
472
473 fn run(&mut self, shutdown: Arc<AtomicBool>) -> Result<UserExitInfo> {
474 let struct_ops = scx_ops_attach!(self.skel, mitosis)?;
475
476 info!("Mitosis Scheduler Attached. Run `scx_mitosis --monitor` for metrics.");
477
478 self.apply_initial_cells()?;
480
481 let (res_ch, req_ch) = self.stats_server.as_ref().unwrap().channels();
482
483 let stats_waker_fd = self.stats_waker.as_fd().try_clone_to_owned()?;
487 let stats_waker = unsafe { EventFd::from_owned_fd(stats_waker_fd) };
488 let stats_bridge = std::thread::spawn(move || {
489 while req_ch.recv().is_ok() {
490 let _ = stats_waker.write(1);
492 }
493 });
494
495 while !shutdown.load(Ordering::Relaxed) && !uei_exited!(&self.skel, uei) {
496 let mut events = [EpollEvent::empty(); 1];
497 let timeout = EpollTimeout::try_from(self.monitor_interval).with_context(|| {
498 format!(
499 "monitor_interval {:?} exceeds maximum epoll timeout",
500 self.monitor_interval,
501 )
502 })?;
503
504 match self.epoll.wait(&mut events, timeout) {
505 Ok(n) => {
506 for event in &events[..n] {
507 match event.data() {
508 INOTIFY_TOKEN => {
509 self.process_cell_events()?;
511 }
512 STATS_TOKEN => {
513 let _ = self.stats_waker.read();
515 res_ch.send(self.get_metrics())?;
516 }
517 _ => {}
518 }
519 }
520 }
521 Err(nix::errno::Errno::EINTR) => continue,
522 Err(e) => return Err(e.into()),
523 }
524
525 self.refresh_bpf_cells()?;
527 self.check_cpuset_changes()?;
528 self.collect_metrics()?;
529
530 if self.enable_rebalancing && self.cell_manager.is_some() {
531 self.maybe_rebalance()?;
532 }
533 }
534
535 drop(struct_ops);
536 drop(self.stats_server.take());
538 let _ = stats_bridge.join();
539 info!("Unregister {SCHEDULER_NAME} scheduler");
540 uei_report!(&self.skel, uei)
541 }
542
543 fn apply_initial_cells(&mut self) -> Result<()> {
545 if self.cell_manager.is_none() {
546 return Ok(());
547 }
548
549 let cpu_assignments = self.compute_and_apply_cell_config(&[])?;
550
551 let cell_manager = self.cell_manager.as_ref().unwrap();
552 info!(
553 "Applied initial cell configuration: {}",
554 cell_manager.format_cell_config(&cpu_assignments)
555 );
556
557 Ok(())
558 }
559
560 fn process_cell_events(&mut self) -> Result<()> {
562 let (num_new, num_destroyed, new_cell_ids, destroyed_cell_ids) = {
563 let Some(ref mut cell_manager) = self.cell_manager else {
564 return Ok(());
565 };
566
567 let (new_cells, destroyed_cells) = cell_manager.process_events()?;
568
569 if new_cells.is_empty() && destroyed_cells.is_empty() {
570 return Ok(());
571 }
572
573 let new_ids: Vec<u32> = new_cells.iter().map(|(_, cell_id)| *cell_id).collect();
574 (
575 new_cells.len(),
576 destroyed_cells.len(),
577 new_ids,
578 destroyed_cells,
579 )
580 };
581
582 for &cell_id in &destroyed_cell_ids {
585 self.smoothed_util[cell_id as usize] = 0.0;
586 }
587
588 let cpu_assignments = self.compute_and_apply_cell_config(&new_cell_ids)?;
589
590 let cell_manager = self.cell_manager.as_ref().unwrap();
591 info!(
592 "Cell config updated ({} new, {} destroyed): {}",
593 num_new,
594 num_destroyed,
595 cell_manager.format_cell_config(&cpu_assignments)
596 );
597
598 Ok(())
599 }
600
601 fn compute_and_apply_cell_config(
610 &mut self,
611 new_cell_ids: &[u32],
612 ) -> Result<Vec<CpuAssignment>> {
613 let (cell_assignments, cpu_assignments) = {
614 let cell_manager = self.cell_manager.as_ref().unwrap();
615 let active_cell_ids: Vec<u32> = cell_manager
616 .get_cell_assignments()
617 .iter()
618 .map(|(_, cell_id)| *cell_id)
619 .collect();
620 let all_cell_ids: Vec<u32> = std::iter::once(0)
622 .chain(active_cell_ids.iter().copied())
623 .collect();
624
625 let cpu_assignments = if self.enable_rebalancing {
626 let new_set: HashSet<u32> = new_cell_ids.iter().copied().collect();
628 let existing_utils: Vec<f64> = all_cell_ids
629 .iter()
630 .filter(|id| !new_set.contains(id))
631 .map(|&id| self.smoothed_util[id as usize])
632 .collect();
633
634 let has_data = existing_utils.iter().any(|&u| u > 0.0);
635
636 if has_data {
637 let avg_util: f64 =
639 existing_utils.iter().sum::<f64>() / existing_utils.len().max(1) as f64;
640 for &id in new_cell_ids {
641 self.smoothed_util[id as usize] = avg_util;
642 info!(
643 "Seeded new cell {} smoothed_util to average {:.1}%",
644 id, avg_util
645 );
646 }
647
648 let cell_demands: HashMap<u32, f64> = all_cell_ids
650 .iter()
651 .map(|&id| (id, self.smoothed_util[id as usize]))
652 .collect();
653
654 cell_manager
655 .compute_demand_cpu_assignments(&cell_demands, self.enable_borrowing)?
656 } else {
657 cell_manager.compute_cpu_assignments(self.enable_borrowing)?
659 }
660 } else {
661 cell_manager.compute_cpu_assignments(self.enable_borrowing)?
662 };
663
664 (cell_manager.get_cell_assignments(), cpu_assignments)
665 };
666
667 self.apply_cell_config(&cell_assignments, &cpu_assignments)?;
668
669 Ok(cpu_assignments)
670 }
671
672 fn maybe_rebalance(&mut self) -> Result<()> {
674 if self.last_rebalance.elapsed() < self.rebalance_cooldown {
676 return Ok(());
677 }
678
679 let active_cells: Vec<u32> = self.cells.keys().copied().collect();
681 if active_cells.len() < 2 {
682 return Ok(());
683 }
684
685 let mut min_util = f64::MAX;
686 let mut max_util = f64::MIN;
687 for &cell_id in &active_cells {
688 let util = self.smoothed_util[cell_id as usize];
689 if util < min_util {
690 min_util = util;
691 }
692 if util > max_util {
693 max_util = util;
694 }
695 }
696
697 let spread = max_util - min_util;
698 if spread < self.rebalance_threshold {
699 return Ok(());
700 }
701
702 let cell_demands: HashMap<u32, f64> = active_cells
704 .iter()
705 .map(|&cell_id| (cell_id, self.smoothed_util[cell_id as usize]))
706 .collect();
707
708 let (cell_assignments, cpu_assignments) = {
710 let cell_manager = self.cell_manager.as_ref().unwrap();
711 let cpu_assignments = cell_manager
712 .compute_demand_cpu_assignments(&cell_demands, self.enable_borrowing)?;
713
714 let changed = cpu_assignments.iter().any(|a| {
715 self.cells
716 .get(&a.cell_id)
717 .map_or(true, |cell| cell.cpus != a.primary)
718 });
719
720 if !changed {
721 return Ok(());
722 }
723
724 (cell_manager.get_cell_assignments(), cpu_assignments)
725 };
726
727 self.apply_cell_config(&cell_assignments, &cpu_assignments)?;
728
729 self.last_rebalance = Instant::now();
730 self.rebalance_count += 1;
731 self.metrics.rebalance_count = self.rebalance_count;
732
733 let cell_manager = self.cell_manager.as_ref().unwrap();
734 info!(
735 "Rebalanced CPUs (spread={:.1}%, count={}): {}",
736 spread,
737 self.rebalance_count,
738 cell_manager.format_cell_config(&cpu_assignments)
739 );
740
741 Ok(())
742 }
743
744 fn apply_cell_config(
749 &mut self,
750 cell_assignments: &[(u64, u32)],
751 cpu_assignments: &[CpuAssignment],
752 ) -> Result<()> {
753 let bss_data = self
754 .skel
755 .maps
756 .bss_data
757 .as_mut()
758 .expect("bss_data must be available after scheduler load");
759
760 let config = &mut bss_data.cell_config;
761
762 unsafe {
770 std::ptr::write_bytes(
771 config as *mut _ as *mut u8,
772 0,
773 std::mem::size_of_val(config),
774 );
775 }
776
777 if cell_assignments.len() > bpf_intf::consts_MAX_CELLS as usize {
778 bail!(
779 "Too many cell assignments: {} > MAX_CELLS ({})",
780 cell_assignments.len(),
781 bpf_intf::consts_MAX_CELLS
782 );
783 }
784 config.num_cell_assignments = cell_assignments.len() as u32;
785
786 for (i, (cgid, cell_id)) in cell_assignments.iter().enumerate() {
787 config.assignments[i].cgid = *cgid;
788 config.assignments[i].cell_id = *cell_id;
789 }
790
791 let mut max_cell_id: u32 = 0;
793 for a in cpu_assignments {
794 if a.cell_id >= bpf_intf::consts_MAX_CELLS {
795 bail!(
796 "Cell ID {} exceeds MAX_CELLS ({})",
797 a.cell_id,
798 bpf_intf::consts_MAX_CELLS
799 );
800 }
801 max_cell_id = max_cell_id.max(a.cell_id + 1);
802
803 write_cpumask_to_config(&a.primary, &mut config.cpumasks[a.cell_id as usize].mask);
804
805 if let Some(ref borrowable) = a.borrowable {
806 write_cpumask_to_config(
807 borrowable,
808 &mut config.borrowable_cpumasks[a.cell_id as usize].mask,
809 );
810 }
811 }
812 config.num_cells = max_cell_id;
813
814 let prog = &mut self.skel.progs.apply_cell_config;
816 let out = prog
817 .test_run(ProgramInput::default())
818 .context("Failed to run apply_cell_config BPF program")?;
819 if out.return_value != 0 {
820 bail!(
821 "apply_cell_config BPF program returned error {} (num_assignments={}, num_cells={})",
822 out.return_value as i32,
823 cell_assignments.len(),
824 cpu_assignments.len()
825 );
826 }
827
828 Ok(())
829 }
830
831 fn get_metrics(&self) -> Metrics {
832 self.metrics.clone()
833 }
834
835 fn calculate_distribution_stats(
836 &self,
837 queue_counts: &[u64; QUEUE_STATS_IDX.len()],
838 global_queue_decisions: u64,
839 scope_queue_decisions: u64,
840 scope_affn_viols: u64,
841 scope_steals: u64,
842 scope_pin_skips: u64,
843 ) -> Result<DistributionStats> {
844 let share_of_global =
847 100.0 * (scope_queue_decisions as f64) / (global_queue_decisions as f64);
848
849 let queue_pct = if scope_queue_decisions == 0 {
851 debug!("No queue decisions in scope, zeroing out queue distribution");
852 [0.0; QUEUE_STATS_IDX.len()]
853 } else {
854 core::array::from_fn(|i| {
855 100.0 * (queue_counts[i] as f64) / (scope_queue_decisions as f64)
856 })
857 };
858
859 let affinity_violations_percent = if scope_queue_decisions == 0 {
861 debug!("No queue decisions in scope, zeroing out affinity violations");
862 0.0
863 } else {
864 100.0 * (scope_affn_viols as f64) / (scope_queue_decisions as f64)
865 };
866
867 let steal_pct = if scope_queue_decisions == 0 {
868 0.0
869 } else {
870 100.0 * (scope_steals as f64) / (scope_queue_decisions as f64)
871 };
872
873 let pin_skip_pct = if scope_queue_decisions == 0 {
874 0.0
875 } else {
876 100.0 * (scope_pin_skips as f64) / (scope_queue_decisions as f64)
877 };
878
879 const EXPECTED_QUEUES: usize = 4;
880 if queue_pct.len() != EXPECTED_QUEUES {
881 bail!(
882 "Expected {} queues, got {}",
883 EXPECTED_QUEUES,
884 queue_pct.len()
885 );
886 }
887
888 return Ok(DistributionStats {
889 total_decisions: scope_queue_decisions,
890 share_of_decisions_pct: share_of_global,
891 local_q_pct: queue_pct[0],
892 cpu_q_pct: queue_pct[1],
893 cell_q_pct: queue_pct[2],
894 borrowed_pct: queue_pct[3],
895 affn_viol_pct: affinity_violations_percent,
896 steal_pct,
897 pin_skip_pct,
898 global_queue_decisions,
899 });
900 }
901
902 fn update_and_log_global_queue_stats(
904 &mut self,
905 global_queue_decisions: u64,
906 cell_stats_delta: &[[u64; NR_CSTATS]; MAX_CELLS],
907 ) -> Result<()> {
908 let mut queue_counts = [0; QUEUE_STATS_IDX.len()];
910 for cells in 0..MAX_CELLS {
911 for (i, stat) in QUEUE_STATS_IDX.iter().enumerate() {
912 queue_counts[i] += cell_stats_delta[cells][*stat as usize];
913 }
914 }
915
916 let prefix = "Total Decisions:";
917
918 let scope_affn_viols: u64 = cell_stats_delta
920 .iter()
921 .map(|&cell| cell[bpf_intf::cell_stat_idx_CSTAT_AFFN_VIOL as usize])
922 .sum::<u64>();
923
924 let scope_steals: u64 = cell_stats_delta
926 .iter()
927 .map(|&cell| cell[bpf_intf::cell_stat_idx_CSTAT_STEAL as usize])
928 .sum::<u64>();
929
930 let scope_pin_skips: u64 = cell_stats_delta
932 .iter()
933 .map(|&cell| cell[bpf_intf::cell_stat_idx_CSTAT_PIN_SKIP as usize])
934 .sum::<u64>();
935
936 let stats = self.calculate_distribution_stats(
938 &queue_counts,
939 global_queue_decisions,
940 global_queue_decisions,
941 scope_affn_viols,
942 scope_steals,
943 scope_pin_skips,
944 )?;
945
946 self.metrics.update(&stats);
947
948 let sum = |idx: usize| -> u64 { cell_stats_delta.iter().map(|c| c[idx]).sum() };
950 self.metrics.slice_shrink_max =
951 sum(bpf_intf::cell_stat_idx_CSTAT_SLICE_SHRINK_MAX as usize);
952 self.metrics.slice_shrink_proportional =
953 sum(bpf_intf::cell_stat_idx_CSTAT_SLICE_SHRINK_PROPORTIONAL as usize);
954 self.metrics.slice_shrink_min =
955 sum(bpf_intf::cell_stat_idx_CSTAT_SLICE_SHRINK_MIN as usize);
956 self.metrics.slice_shrink = self.metrics.slice_shrink_max
957 + self.metrics.slice_shrink_proportional
958 + self.metrics.slice_shrink_min;
959
960 trace!("{} {}", prefix, stats);
961
962 Ok(())
963 }
964
965 fn update_and_log_cell_queue_stats(
967 &mut self,
968 global_queue_decisions: u64,
969 cell_stats_delta: &[[u64; NR_CSTATS]; MAX_CELLS],
970 ) -> Result<()> {
971 for cell in 0..MAX_CELLS {
972 let cell_queue_decisions = QUEUE_STATS_IDX
973 .iter()
974 .map(|&stat| cell_stats_delta[cell][stat as usize])
975 .sum::<u64>();
976
977 if cell_queue_decisions == 0 {
979 continue;
980 }
981
982 let mut queue_counts = [0; QUEUE_STATS_IDX.len()];
983 for (i, &stat) in QUEUE_STATS_IDX.iter().enumerate() {
984 queue_counts[i] = cell_stats_delta[cell][stat as usize];
985 }
986
987 const MIN_CELL_WIDTH: usize = 2;
988 let cell_width: usize = max(MIN_CELL_WIDTH, (MAX_CELLS as f64).log10().ceil() as usize);
989
990 let prefix = format!(" Cell {:width$}:", cell, width = cell_width);
991
992 let scope_affn_viols: u64 =
994 cell_stats_delta[cell][bpf_intf::cell_stat_idx_CSTAT_AFFN_VIOL as usize];
995
996 let scope_steals: u64 =
998 cell_stats_delta[cell][bpf_intf::cell_stat_idx_CSTAT_STEAL as usize];
999
1000 let scope_pin_skips: u64 =
1002 cell_stats_delta[cell][bpf_intf::cell_stat_idx_CSTAT_PIN_SKIP as usize];
1003
1004 let stats = self.calculate_distribution_stats(
1005 &queue_counts,
1006 global_queue_decisions,
1007 cell_queue_decisions,
1008 scope_affn_viols,
1009 scope_steals,
1010 scope_pin_skips,
1011 )?;
1012
1013 let cell_metrics = self.metrics.cells.entry(cell as u32).or_default();
1014 cell_metrics.update(&stats);
1015
1016 cell_metrics.slice_shrink_max =
1018 cell_stats_delta[cell][bpf_intf::cell_stat_idx_CSTAT_SLICE_SHRINK_MAX as usize];
1019 cell_metrics.slice_shrink_proportional = cell_stats_delta[cell]
1020 [bpf_intf::cell_stat_idx_CSTAT_SLICE_SHRINK_PROPORTIONAL as usize];
1021 cell_metrics.slice_shrink_min =
1022 cell_stats_delta[cell][bpf_intf::cell_stat_idx_CSTAT_SLICE_SHRINK_MIN as usize];
1023 cell_metrics.slice_shrink = cell_metrics.slice_shrink_max
1024 + cell_metrics.slice_shrink_proportional
1025 + cell_metrics.slice_shrink_min;
1026
1027 trace!("{} {}", prefix, stats);
1028 }
1029 Ok(())
1030 }
1031
1032 fn log_all_queue_stats(
1033 &mut self,
1034 cell_stats_delta: &[[u64; NR_CSTATS]; MAX_CELLS],
1035 ) -> Result<()> {
1036 let global_queue_decisions: u64 = cell_stats_delta
1038 .iter()
1039 .flat_map(|cell| QUEUE_STATS_IDX.iter().map(|&idx| cell[idx as usize]))
1040 .sum();
1041
1042 if global_queue_decisions == 0 {
1043 warn!("No queueing decisions made globally");
1044 return Ok(());
1045 }
1046
1047 self.update_and_log_global_queue_stats(global_queue_decisions, &cell_stats_delta)?;
1048
1049 self.update_and_log_cell_queue_stats(global_queue_decisions, &cell_stats_delta)?;
1050
1051 Ok(())
1052 }
1053
1054 fn calculate_cell_stat_delta(
1055 &mut self,
1056 cpu_ctxs: &[bpf_intf::cpu_ctx],
1057 ) -> Result<[[u64; NR_CSTATS]; MAX_CELLS]> {
1058 let mut cell_stats_delta = [[0 as u64; NR_CSTATS]; MAX_CELLS];
1059
1060 for cell in 0..MAX_CELLS {
1063 for stat in 0..NR_CSTATS {
1064 let mut cur_cell_stat = 0;
1065
1066 for cpu_ctx in cpu_ctxs.iter() {
1068 cur_cell_stat += cpu_ctx.cstats[cell][stat];
1069 }
1070
1071 cell_stats_delta[cell][stat] = cur_cell_stat - self.prev_cell_stats[cell][stat];
1073 self.prev_cell_stats[cell][stat] = cur_cell_stat;
1074 }
1075 }
1076 Ok(cell_stats_delta)
1077 }
1078
1079 fn collect_metrics(&mut self) -> Result<()> {
1081 let cpu_ctxs = read_cpu_ctxs(&self.skel)?;
1082
1083 let cell_stats_delta = self.calculate_cell_stat_delta(&cpu_ctxs)?;
1084
1085 self.log_all_queue_stats(&cell_stats_delta)?;
1086
1087 if self.cell_manager.is_some() {
1088 self.collect_demand_metrics(&cpu_ctxs)?;
1089 }
1090
1091 for (cell_id, cell) in &self.cells {
1092 trace!("CELL[{}]: {}", cell_id, cell.cpus);
1093 }
1094
1095 for (cell_id, cell) in self.cells.iter() {
1096 self.metrics
1098 .cells
1099 .entry(*cell_id)
1100 .and_modify(|cell_metrics| cell_metrics.num_cpus = cell.cpus.weight() as u32);
1101 }
1102 self.metrics.num_cells = self.cells.len() as u32;
1103
1104 Ok(())
1105 }
1106
1107 fn collect_demand_metrics(&mut self, cpu_ctxs: &[bpf_intf::cpu_ctx]) -> Result<()> {
1109 let mut total_running_ns = [0u64; MAX_CELLS];
1114 let mut on_own_ns = [0u64; MAX_CELLS];
1115 let mut lent_ns = [0u64; MAX_CELLS];
1116
1117 for cpu_ctx in cpu_ctxs.iter() {
1118 let owner = cpu_ctx.cell as usize;
1119 for cell in 0..MAX_CELLS {
1120 let ns = cpu_ctx.running_ns[cell];
1121 total_running_ns[cell] += ns;
1122 if owner == cell {
1123 on_own_ns[cell] += ns;
1124 }
1125 }
1126 if owner >= MAX_CELLS {
1127 bail!(
1128 "CPU has invalid cell assignment {} (MAX_CELLS={})",
1129 owner,
1130 MAX_CELLS
1131 );
1132 }
1133 let total_on_cpu: u64 = cpu_ctx.running_ns.iter().sum();
1135 let owner_on_cpu = cpu_ctx.running_ns[owner];
1136 lent_ns[owner] += total_on_cpu.saturating_sub(owner_on_cpu);
1137 }
1138
1139 let interval_ns = self.monitor_interval.as_nanos() as u64;
1141
1142 let mut global_running_delta = 0u64;
1143 let mut global_borrowed_delta = 0u64;
1144 let mut global_lent_delta = 0u64;
1145 let mut global_capacity = 0u64;
1146
1147 for cell in 0..MAX_CELLS {
1148 let delta_running =
1149 total_running_ns[cell].saturating_sub(self.prev_cell_running_ns[cell]);
1150 let delta_on_own = on_own_ns[cell].saturating_sub(self.prev_cell_own_ns[cell]);
1151 let delta_lent = lent_ns[cell].saturating_sub(self.prev_cell_lent_ns[cell]);
1152
1153 self.prev_cell_running_ns[cell] = total_running_ns[cell];
1154 self.prev_cell_own_ns[cell] = on_own_ns[cell];
1155 self.prev_cell_lent_ns[cell] = lent_ns[cell];
1156
1157 if delta_running == 0 && delta_lent == 0 {
1158 continue;
1159 }
1160
1161 let delta_borrowed = delta_running.saturating_sub(delta_on_own);
1163
1164 let Some(cell_info) = self.cells.get(&(cell as u32)) else {
1167 continue;
1168 };
1169
1170 let nr_cpus = cell_info.cpus.weight() as u64;
1171 if nr_cpus == 0 {
1172 bail!("Cell {} has 0 CPUs assigned", cell);
1173 }
1174
1175 let capacity = nr_cpus * interval_ns;
1177 let util_pct = 100.0 * (delta_running as f64) / (capacity as f64);
1179 let demand_borrow_pct = if delta_running > 0 {
1181 100.0 * (delta_borrowed as f64) / (delta_running as f64)
1182 } else {
1183 0.0
1184 };
1185 let lent_pct = 100.0 * (delta_lent as f64) / (capacity as f64);
1187
1188 if self.enable_rebalancing {
1190 self.smoothed_util[cell] = self.demand_smoothing * util_pct
1191 + (1.0 - self.demand_smoothing) * self.smoothed_util[cell];
1192 }
1193
1194 self.metrics
1195 .cells
1196 .entry(cell as u32)
1197 .or_default()
1198 .update_demand(util_pct, demand_borrow_pct, lent_pct);
1199
1200 if self.enable_rebalancing {
1202 self.metrics
1203 .cells
1204 .entry(cell as u32)
1205 .or_default()
1206 .smoothed_util_pct = self.smoothed_util[cell];
1207 }
1208
1209 global_running_delta = global_running_delta.saturating_add(delta_running);
1210 global_borrowed_delta = global_borrowed_delta.saturating_add(delta_borrowed);
1211 global_lent_delta = global_lent_delta.saturating_add(delta_lent);
1212 global_capacity = global_capacity.saturating_add(capacity);
1213 }
1214
1215 let global_util_pct = if global_capacity > 0 {
1216 100.0 * (global_running_delta as f64) / (global_capacity as f64)
1217 } else {
1218 0.0
1219 };
1220 let global_borrow_pct = if global_running_delta > 0 {
1221 100.0 * (global_borrowed_delta as f64) / (global_running_delta as f64)
1222 } else {
1223 0.0
1224 };
1225 let global_lent_pct = if global_capacity > 0 {
1226 100.0 * (global_lent_delta as f64) / (global_capacity as f64)
1227 } else {
1228 0.0
1229 };
1230
1231 self.metrics
1232 .update_demand(global_util_pct, global_borrow_pct, global_lent_pct);
1233
1234 Ok(())
1235 }
1236
1237 fn update_applied_cpuset_seq(&mut self) {
1239 unsafe {
1240 let ptr = &mut self.skel.maps.bss_data.as_mut().unwrap().applied_cpuset_seq as *mut u32;
1241 std::ptr::write_volatile(ptr, self.last_cpuset_seq);
1242 }
1243 }
1244
1245 fn check_cpuset_changes(&mut self) -> Result<()> {
1247 let Some(ref mut cm) = self.cell_manager else {
1248 return Ok(());
1249 };
1250
1251 let current_seq = unsafe {
1252 let ptr = &self.skel.maps.bss_data.as_ref().unwrap().cpuset_seq as *const u32;
1253 (ptr as *const AtomicU32)
1254 .as_ref()
1255 .unwrap()
1256 .load(Ordering::Acquire)
1257 };
1258
1259 if current_seq == self.last_cpuset_seq {
1260 return Ok(());
1261 }
1262 self.last_cpuset_seq = current_seq;
1263
1264 if !cm.refresh_cpusets()? {
1265 self.update_applied_cpuset_seq();
1267 return Ok(());
1268 }
1269
1270 let cpu_assignments = self.compute_and_apply_cell_config(&[])?;
1271 self.update_applied_cpuset_seq();
1272 let cell_manager = self.cell_manager.as_ref().unwrap();
1273 info!(
1274 "Cpuset change detected, recomputed config: {}",
1275 cell_manager.format_cell_config(&cpu_assignments)
1276 );
1277 Ok(())
1278 }
1279
1280 fn refresh_bpf_cells(&mut self) -> Result<()> {
1281 let applied_configuration = unsafe {
1282 let ptr = &self
1283 .skel
1284 .maps
1285 .bss_data
1286 .as_ref()
1287 .unwrap()
1288 .applied_configuration_seq as *const u32;
1289 (ptr as *const std::sync::atomic::AtomicU32)
1290 .as_ref()
1291 .unwrap()
1292 .load(std::sync::atomic::Ordering::Acquire)
1293 };
1294 if self
1295 .last_configuration_seq
1296 .is_some_and(|seq| applied_configuration == seq)
1297 {
1298 return Ok(());
1299 }
1300 let mut cell_to_cpus: HashMap<u32, Cpumask> = HashMap::new();
1302 let cpu_ctxs = read_cpu_ctxs(&self.skel)?;
1303 for (i, cpu_ctx) in cpu_ctxs.iter().enumerate() {
1304 cell_to_cpus
1305 .entry(cpu_ctx.cell)
1306 .or_insert_with(|| Cpumask::new())
1307 .set_cpu(i)
1308 .expect("set cpu in existing mask");
1309 }
1310
1311 let cells_with_cpus: HashSet<u32> = cell_to_cpus.keys().copied().collect();
1322 let mut active_cells = cells_with_cpus.clone();
1323 active_cells.insert(0);
1324
1325 for cell_idx in &active_cells {
1326 let cpus = cell_to_cpus
1327 .get(cell_idx)
1328 .cloned()
1329 .unwrap_or_else(|| Cpumask::new());
1330 self.cells
1331 .entry(*cell_idx)
1332 .or_insert_with(|| Cell {
1333 cpus: Cpumask::new(),
1334 })
1335 .cpus = cpus;
1336 self.metrics.cells.insert(*cell_idx, CellMetrics::default());
1337 }
1338
1339 self.cells.retain(|&k, _| active_cells.contains(&k));
1341 self.metrics.cells.retain(|&k, _| active_cells.contains(&k));
1342
1343 self.last_configuration_seq = Some(applied_configuration);
1344
1345 Ok(())
1346 }
1347}
1348
1349fn write_cpumask_to_config(cpumask: &Cpumask, dest: &mut [u8]) {
1350 let raw_slice = cpumask.as_raw_slice();
1351 for (word_idx, word) in raw_slice.iter().enumerate() {
1352 let byte_start = word_idx * 8;
1353 let bytes = word.to_le_bytes();
1354 for (j, byte) in bytes.iter().enumerate() {
1355 let idx = byte_start + j;
1356 if idx < dest.len() {
1357 dest[idx] = *byte;
1358 }
1359 }
1360 }
1361}
1362
1363fn read_cpu_ctxs(skel: &BpfSkel) -> Result<Vec<bpf_intf::cpu_ctx>> {
1364 let mut cpu_ctxs = vec![];
1365 let cpu_ctxs_vec = skel
1366 .maps
1367 .cpu_ctxs
1368 .lookup_percpu(&0u32.to_ne_bytes(), libbpf_rs::MapFlags::ANY)
1369 .context("Failed to lookup cpu_ctx")?
1370 .unwrap();
1371 if cpu_ctxs_vec.len() < *NR_CPUS_POSSIBLE {
1372 bail!(
1373 "Percpu map returned {} entries but expected {}",
1374 cpu_ctxs_vec.len(),
1375 *NR_CPUS_POSSIBLE
1376 );
1377 }
1378 for cpu in 0..*NR_CPUS_POSSIBLE {
1379 cpu_ctxs.push(*unsafe {
1380 &*(cpu_ctxs_vec[cpu].as_slice().as_ptr() as *const bpf_intf::cpu_ctx)
1381 });
1382 }
1383 Ok(cpu_ctxs)
1384}
1385
1386#[clap_main::clap_main]
1387fn main(opts: Opts) -> Result<()> {
1388 if opts.version {
1389 println!(
1390 "scx_mitosis {}",
1391 build_id::full_version(env!("CARGO_PKG_VERSION"))
1392 );
1393 return Ok(());
1394 }
1395
1396 let env_filter = EnvFilter::try_from_default_env()
1397 .or_else(|_| match EnvFilter::try_new(&opts.log_level) {
1398 Ok(filter) => Ok(filter),
1399 Err(e) => {
1400 eprintln!(
1401 "invalid log envvar: {}, using info, err is: {}",
1402 opts.log_level, e
1403 );
1404 EnvFilter::try_new("info")
1405 }
1406 })
1407 .unwrap_or_else(|_| EnvFilter::new("info"));
1408
1409 match tracing_subscriber::fmt()
1410 .with_env_filter(env_filter)
1411 .with_target(true)
1412 .with_thread_ids(true)
1413 .with_file(true)
1414 .with_line_number(true)
1415 .try_init()
1416 {
1417 Ok(()) => {}
1418 Err(e) => eprintln!("failed to init logger: {}", e),
1419 }
1420
1421 if opts.verbose > 0 {
1422 warn!("Setting verbose via -v is deprecated and will be an error in future releases.");
1423 }
1424
1425 debug!("opts={:?}", &opts);
1426
1427 if let Some(run_id) = opts.run_id {
1428 info!("scx_mitosis run_id: {}", run_id);
1429 }
1430
1431 let shutdown = Arc::new(AtomicBool::new(false));
1432 let shutdown_clone = shutdown.clone();
1433 ctrlc::set_handler(move || {
1434 shutdown_clone.store(true, Ordering::Relaxed);
1435 })
1436 .context("Error setting Ctrl-C handler")?;
1437
1438 if let Some(intv) = opts.monitor {
1439 let shutdown_clone = shutdown.clone();
1440 let jh = std::thread::spawn(move || {
1441 match stats::monitor(Duration::from_secs_f64(intv), shutdown_clone) {
1442 Ok(_) => {
1443 debug!("stats monitor thread finished successfully")
1444 }
1445 Err(error_object) => {
1446 warn!(
1447 "stats monitor thread finished because of an error {}",
1448 error_object
1449 )
1450 }
1451 }
1452 });
1453 if opts.monitor.is_some() {
1454 let _ = jh.join();
1455 return Ok(());
1456 }
1457 }
1458
1459 let mut open_object = MaybeUninit::uninit();
1460 loop {
1461 let mut sched = Scheduler::init(&opts, &mut open_object)?;
1462 if !sched.run(shutdown.clone())?.should_restart() {
1463 break;
1464 }
1465 }
1466
1467 Ok(())
1468}