1#[allow(clippy::unwrap_used)]
7mod bpf_skel;
8pub use bpf_skel::*;
9pub mod bpf_intf;
10mod cell_manager;
11mod mitosis_topology_utils;
12mod stats;
13
14use cell_manager::{CellManager, CpuAssignment};
15
16use std::cmp::max;
17use std::collections::{HashMap, HashSet};
18use std::fmt;
19use std::fmt::Display;
20use std::mem::MaybeUninit;
21use std::os::fd::AsFd;
22use std::sync::atomic::AtomicBool;
23use std::sync::atomic::AtomicU32;
24use std::sync::atomic::Ordering;
25use std::sync::Arc;
26use std::time::Duration;
27use std::time::Instant;
28
29use anyhow::bail;
30use anyhow::Context;
31use anyhow::Result;
32use clap::Parser;
33use libbpf_rs::MapCore as _;
34use libbpf_rs::OpenObject;
35use libbpf_rs::ProgramInput;
36use nix::sys::epoll::{Epoll, EpollCreateFlags, EpollEvent, EpollFlags, EpollTimeout};
37use nix::sys::eventfd::EventFd;
38use scx_stats::prelude::*;
39use scx_utils::build_id;
40use scx_utils::compat;
41use scx_utils::init_libbpf_logging;
42use scx_utils::libbpf_clap_opts::LibbpfOpts;
43use scx_utils::scx_enums;
44use scx_utils::scx_ops_attach;
45use scx_utils::scx_ops_load;
46use scx_utils::scx_ops_open;
47use scx_utils::uei_exited;
48use scx_utils::uei_report;
49use scx_utils::Cpumask;
50use scx_utils::Topology;
51use scx_utils::UserExitInfo;
52use scx_utils::NR_CPUS_POSSIBLE;
53use tracing::{debug, info, trace, warn};
54use tracing_subscriber::filter::EnvFilter;
55
56use stats::CellMetrics;
57use stats::Metrics;
58
59const SCHEDULER_NAME: &str = "scx_mitosis";
60const MAX_CELLS: usize = bpf_intf::consts_MAX_CELLS as usize;
61const NR_CSTATS: usize = bpf_intf::cell_stat_idx_NR_CSTATS as usize;
62const INOTIFY_TOKEN: u64 = 1;
64const STATS_TOKEN: u64 = 2;
66
67fn parse_ewma_factor(s: &str) -> Result<f64, String> {
68 let v: f64 = s.parse().map_err(|e| format!("{e}"))?;
69 if !(0.0..=1.0).contains(&v) {
70 return Err(format!("value {v} not in range 0.0..=1.0"));
71 }
72 Ok(v)
73}
74
75#[derive(Debug, Parser)]
83struct Opts {
84 #[clap(short = 'v', long, action = clap::ArgAction::Count)]
86 verbose: u8,
87
88 #[clap(long, default_value = "info")]
91 log_level: String,
92
93 #[clap(long, default_value = "0")]
95 exit_dump_len: u32,
96
97 #[clap(long, default_value = "10")]
99 reconfiguration_interval_s: u64,
100
101 #[clap(long, default_value = "5")]
103 rebalance_cpus_interval_s: u64,
104
105 #[clap(long, default_value = "1")]
107 monitor_interval_s: u64,
108
109 #[clap(long)]
112 monitor: Option<f64>,
113
114 #[clap(short = 'V', long, action = clap::ArgAction::SetTrue)]
116 version: bool,
117
118 #[clap(long)]
120 run_id: Option<u64>,
121
122 #[clap(long, action = clap::ArgAction::SetTrue)]
125 debug_events: bool,
126
127 #[clap(long, default_value = "true", action = clap::ArgAction::Set)]
131 exiting_task_workaround: bool,
132
133 #[clap(long, action = clap::ArgAction::SetTrue)]
136 cpu_controller_disabled: bool,
137
138 #[clap(long, action = clap::ArgAction::SetTrue)]
141 reject_multicpu_pinning: bool,
142
143 #[clap(long, action = clap::ArgAction::SetTrue)]
146 enable_llc_awareness: bool,
147
148 #[clap(long, action = clap::ArgAction::SetTrue)]
150 enable_work_stealing: bool,
151
152 #[clap(long)]
156 cell_parent_cgroup: Option<String>,
157
158 #[clap(long)]
163 cell_exclude: Vec<String>,
164
165 #[clap(long, action = clap::ArgAction::SetTrue)]
168 enable_borrowing: bool,
169
170 #[clap(long, action = clap::ArgAction::SetTrue)]
172 use_lockless_peek: bool,
173
174 #[clap(long, action = clap::ArgAction::SetTrue)]
176 enable_rebalancing: bool,
177
178 #[clap(long, default_value = "20.0")]
180 rebalance_threshold: f64,
181
182 #[clap(long, default_value = "5")]
184 rebalance_cooldown_s: u64,
185
186 #[clap(long, default_value = "0.3", value_parser = parse_ewma_factor)]
188 demand_smoothing: f64,
189
190 #[clap(long, action = clap::ArgAction::SetTrue)]
194 dynamic_affinity_cpu_selection: bool,
195
196 #[clap(long, action = clap::ArgAction::SetTrue)]
199 enable_slice_shrinking: bool,
200
201 #[clap(long, default_value = "4000")]
204 slice_shrink_max_us: u64,
205
206 #[clap(long, default_value = "500")]
210 slice_shrink_min_us: u64,
211
212 #[clap(flatten, next_help_heading = "Libbpf Options")]
213 pub libbpf: LibbpfOpts,
214}
215
216const QUEUE_STATS_IDX: [bpf_intf::cell_stat_idx; 4] = [
221 bpf_intf::cell_stat_idx_CSTAT_LOCAL,
222 bpf_intf::cell_stat_idx_CSTAT_CPU_DSQ,
223 bpf_intf::cell_stat_idx_CSTAT_CELL_DSQ,
224 bpf_intf::cell_stat_idx_CSTAT_BORROWED,
225];
226
227#[derive(Debug)]
229struct Cell {
230 cpus: Cpumask,
231}
232
233struct Scheduler<'a> {
234 skel: BpfSkel<'a>,
235 monitor_interval: Duration,
236 cells: HashMap<u32, Cell>,
237 prev_cell_stats: [[u64; NR_CSTATS]; MAX_CELLS],
240 prev_cell_running_ns: [u64; MAX_CELLS],
242 prev_cell_own_ns: [u64; MAX_CELLS],
243 prev_cell_lent_ns: [u64; MAX_CELLS],
244 metrics: Metrics,
245 stats_server: Option<StatsServer<(), Metrics>>,
246 last_configuration_seq: Option<u32>,
247 last_cpuset_seq: u32,
249 cell_manager: Option<CellManager>,
251 enable_borrowing: bool,
253 enable_rebalancing: bool,
255 rebalance_threshold: f64,
257 rebalance_cooldown: Duration,
259 demand_smoothing: f64,
261 smoothed_util: [f64; MAX_CELLS],
263 last_rebalance: Instant,
265 rebalance_count: u64,
267 epoll: Epoll,
269 stats_waker: EventFd,
271}
272
273struct DistributionStats {
274 total_decisions: u64,
275 share_of_decisions_pct: f64,
276 local_q_pct: f64,
277 cpu_q_pct: f64,
278 cell_q_pct: f64,
279 borrowed_pct: f64,
280 affn_viol_pct: f64,
281 steal_pct: f64,
282 pin_skip_pct: f64,
283
284 global_queue_decisions: u64,
286}
287
288impl Display for DistributionStats {
289 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
290 const MIN_DECISIONS_WIDTH: usize = 5;
294 let descisions_width = if self.global_queue_decisions > 0 {
295 max(
296 MIN_DECISIONS_WIDTH,
297 (self.global_queue_decisions as f64).log10().ceil() as usize,
298 )
299 } else {
300 MIN_DECISIONS_WIDTH
301 };
302 write!(
303 f,
304 "{:width$} {:5.1}% | Local:{:4.1}% From: CPU:{:4.1}% Cell:{:4.1}% Borrow:{:4.1}% | V:{:4.1}% S:{:4.1}% PS:{:4.1}%",
305 self.total_decisions,
306 self.share_of_decisions_pct,
307 self.local_q_pct,
308 self.cpu_q_pct,
309 self.cell_q_pct,
310 self.borrowed_pct,
311 self.affn_viol_pct,
312 self.steal_pct,
313 self.pin_skip_pct,
314 width = descisions_width,
315 )
316 }
317}
318
319impl<'a> Scheduler<'a> {
320 fn validate_args(opts: &Opts) -> Result<()> {
321 if opts.enable_work_stealing && !opts.enable_llc_awareness {
322 bail!("Work stealing requires LLC-aware mode (--enable-llc-awareness)");
323 }
324
325 Ok(())
326 }
327
328 fn init(opts: &Opts, open_object: &'a mut MaybeUninit<OpenObject>) -> Result<Self> {
329 Self::validate_args(opts).context("validating scheduler options")?;
330
331 let topology = Topology::new().context("detecting system topology")?;
332
333 let nr_llc = topology.all_llcs.len().max(1);
334
335 let mut skel_builder = BpfSkelBuilder::default();
336 skel_builder
337 .obj_builder
338 .debug(opts.log_level.contains("trace"));
339 init_libbpf_logging(None);
340 info!(
341 "Running scx_mitosis (build ID: {})",
342 build_id::full_version(env!("CARGO_PKG_VERSION"))
343 );
344
345 let open_opts = opts.libbpf.clone().into_bpf_open_opts();
346 let mut skel = scx_ops_open!(skel_builder, open_object, mitosis, open_opts)
347 .context("opening BPF skeleton")?;
348
349 skel.struct_ops.mitosis_mut().exit_dump_len = opts.exit_dump_len;
350
351 let rodata = skel
352 .maps
353 .rodata_data
354 .as_mut()
355 .expect("BUG: rodata_data missing after skel open");
356
357 rodata.slice_ns = scx_enums.SCX_SLICE_DFL;
358 rodata.debug_events_enabled = opts.debug_events;
359 rodata.exiting_task_workaround_enabled = opts.exiting_task_workaround;
360 rodata.cpu_controller_disabled = opts.cpu_controller_disabled;
361 rodata.dynamic_affinity_cpu_selection = opts.dynamic_affinity_cpu_selection;
362
363 if opts.slice_shrink_min_us >= opts.slice_shrink_max_us {
365 bail!(
366 "--slice-shrink-min-us ({}) must be less than --slice-shrink-max-us ({})",
367 opts.slice_shrink_min_us,
368 opts.slice_shrink_max_us
369 );
370 }
371 rodata.enable_slice_shrinking = opts.enable_slice_shrinking;
372 rodata.slice_shrink_max_ns = opts.slice_shrink_max_us * 1_000;
373 rodata.slice_shrink_multiplier = 2;
375 rodata.slice_shrink_min_ns = opts.slice_shrink_min_us * 1_000;
376
377 rodata.nr_possible_cpus = *NR_CPUS_POSSIBLE as u32;
378 for cpu in topology.all_cpus.keys() {
379 rodata.all_cpus[cpu / 8] |= 1 << (cpu % 8);
380 }
381
382 rodata.reject_multicpu_pinning = opts.reject_multicpu_pinning;
383
384 rodata.nr_llc = nr_llc as u32;
386 rodata.enable_llc_awareness = opts.enable_llc_awareness;
387 rodata.enable_work_stealing = opts.enable_work_stealing;
388
389 rodata.userspace_managed_cell_mode = opts.cell_parent_cgroup.is_some();
390
391 rodata.enable_borrowing = opts.enable_borrowing;
392 rodata.use_lockless_peek = opts.use_lockless_peek;
393
394 match *compat::SCX_OPS_ALLOW_QUEUED_WAKEUP {
395 0 => info!("Kernel does not support queued wakeup optimization."),
396 v => skel.struct_ops.mitosis_mut().flags |= v,
397 }
398
399 mitosis_topology_utils::populate_topology_maps(
401 &mut skel,
402 mitosis_topology_utils::MapKind::CpuToLLC,
403 None,
404 )
405 .context("populating CPU-to-LLC topology map")?;
406 mitosis_topology_utils::populate_topology_maps(
407 &mut skel,
408 mitosis_topology_utils::MapKind::LLCToCpus,
409 None,
410 )
411 .context("populating LLC-to-CPUs topology map")?;
412
413 let skel = scx_ops_load!(skel, mitosis, uei).context("loading BPF skeleton")?;
414
415 let stats_server = StatsServer::new(stats::server_data())
416 .launch()
417 .context("launching stats server")?;
418
419 if !opts.cell_exclude.is_empty() && opts.cell_parent_cgroup.is_none() {
421 bail!("--cell-exclude requires --cell-parent-cgroup");
422 }
423 let cell_manager = if let Some(ref parent_cgroup) = opts.cell_parent_cgroup {
424 let exclude: HashSet<String> = opts.cell_exclude.iter().cloned().collect();
425 Some(
426 CellManager::new(
427 parent_cgroup,
428 MAX_CELLS as u32,
429 topology.span.clone(),
430 exclude,
431 )
432 .with_context(|| {
433 format!("initializing cell manager for cgroup {}", parent_cgroup)
434 })?,
435 )
436 } else {
437 None
438 };
439
440 let epoll = Epoll::new(EpollCreateFlags::empty()).context("creating epoll instance")?;
442
443 let stats_waker = EventFd::from_value_and_flags(
445 0,
446 nix::sys::eventfd::EfdFlags::EFD_NONBLOCK | nix::sys::eventfd::EfdFlags::EFD_SEMAPHORE,
447 )
448 .context("creating stats-waker eventfd")?;
449
450 epoll
452 .add(
453 &stats_waker,
454 EpollEvent::new(EpollFlags::EPOLLIN, STATS_TOKEN),
455 )
456 .context("registering stats-waker with epoll")?;
457
458 if let Some(ref cell_manager) = cell_manager {
460 epoll
461 .add(
462 cell_manager,
463 EpollEvent::new(EpollFlags::EPOLLIN, INOTIFY_TOKEN),
464 )
465 .context("registering cell manager inotify with epoll")?;
466 }
467
468 Ok(Self {
469 skel,
470 monitor_interval: Duration::from_secs(opts.monitor_interval_s),
471 cells: HashMap::new(),
472 prev_cell_stats: [[0; NR_CSTATS]; MAX_CELLS],
473 prev_cell_running_ns: [0; MAX_CELLS],
474 prev_cell_own_ns: [0; MAX_CELLS],
475 prev_cell_lent_ns: [0; MAX_CELLS],
476 metrics: Metrics::default(),
477 stats_server: Some(stats_server),
478 last_configuration_seq: None,
479 last_cpuset_seq: 0,
480 cell_manager,
481 enable_borrowing: opts.enable_borrowing,
482 enable_rebalancing: opts.enable_rebalancing,
483 rebalance_threshold: opts.rebalance_threshold,
484 rebalance_cooldown: Duration::from_secs(opts.rebalance_cooldown_s),
485 demand_smoothing: opts.demand_smoothing,
486 smoothed_util: [0.0; MAX_CELLS],
487 last_rebalance: Instant::now(),
488 rebalance_count: 0,
489 epoll,
490 stats_waker,
491 })
492 }
493
494 fn run(&mut self, shutdown: Arc<AtomicBool>) -> Result<UserExitInfo> {
495 let struct_ops = scx_ops_attach!(self.skel, mitosis).context("attaching BPF scheduler")?;
496
497 info!("Mitosis Scheduler Attached. Run `scx_mitosis --monitor` for metrics.");
498
499 self.apply_initial_cells()
501 .context("applying initial cell configuration")?;
502
503 let (res_ch, req_ch) = self
504 .stats_server
505 .as_ref()
506 .expect("BUG: stats_server missing after init")
507 .channels();
508
509 let stats_waker_fd = self
513 .stats_waker
514 .as_fd()
515 .try_clone_to_owned()
516 .context("cloning stats-waker fd for bridge thread")?;
517 let stats_waker = unsafe { EventFd::from_owned_fd(stats_waker_fd) };
518 let stats_bridge = std::thread::spawn(move || {
519 while req_ch.recv().is_ok() {
520 let _ = stats_waker.write(1);
522 }
523 });
524
525 while !shutdown.load(Ordering::Relaxed) && !uei_exited!(&self.skel, uei) {
526 let mut events = [EpollEvent::empty(); 1];
527 let timeout = EpollTimeout::try_from(self.monitor_interval).with_context(|| {
528 format!(
529 "monitor_interval {:?} exceeds maximum epoll timeout",
530 self.monitor_interval,
531 )
532 })?;
533
534 match self.epoll.wait(&mut events, timeout) {
535 Ok(n) => {
536 for event in &events[..n] {
537 match event.data() {
538 INOTIFY_TOKEN => {
539 self.process_cell_events()
541 .context("processing cell manager events")?;
542 }
543 STATS_TOKEN => {
544 let _ = self.stats_waker.read();
546 res_ch
547 .send(self.get_metrics())
548 .context("sending metrics response")?;
549 }
550 _ => {}
551 }
552 }
553 }
554 Err(nix::errno::Errno::EINTR) => continue,
555 Err(e) => return Err(e.into()),
556 }
557
558 self.refresh_bpf_cells()
560 .context("refreshing BPF cell state")?;
561 self.check_cpuset_changes()
562 .context("checking cpuset changes")?;
563 self.collect_metrics().context("collecting metrics")?;
564
565 if self.enable_rebalancing && self.cell_manager.is_some() {
566 self.maybe_rebalance().context("running rebalance check")?;
567 }
568 }
569
570 drop(struct_ops);
571 drop(self.stats_server.take());
573 let _ = stats_bridge.join();
574 info!("Unregister {SCHEDULER_NAME} scheduler");
575 uei_report!(&self.skel, uei)
576 }
577
578 fn apply_initial_cells(&mut self) -> Result<()> {
580 if self.cell_manager.is_none() {
581 return Ok(());
582 }
583
584 let cpu_assignments = self
585 .compute_and_apply_cell_config(&[])
586 .context("computing initial cell configuration")?;
587
588 let cell_manager = self
589 .cell_manager
590 .as_ref()
591 .expect("BUG: cell_manager missing in apply_initial_cells");
592 info!(
593 "Applied initial cell configuration: {}",
594 cell_manager.format_cell_config(&cpu_assignments)
595 );
596
597 Ok(())
598 }
599
600 fn process_cell_events(&mut self) -> Result<()> {
602 let (num_new, num_destroyed, new_cell_ids, destroyed_cell_ids) = {
603 let Some(ref mut cell_manager) = self.cell_manager else {
604 return Ok(());
605 };
606
607 let (new_cells, destroyed_cells) = cell_manager
608 .process_events()
609 .context("processing inotify events")?;
610
611 if new_cells.is_empty() && destroyed_cells.is_empty() {
612 return Ok(());
613 }
614
615 let new_ids: Vec<u32> = new_cells.iter().map(|(_, cell_id)| *cell_id).collect();
616 (
617 new_cells.len(),
618 destroyed_cells.len(),
619 new_ids,
620 destroyed_cells,
621 )
622 };
623
624 for &cell_id in &destroyed_cell_ids {
627 self.smoothed_util[cell_id as usize] = 0.0;
628 }
629
630 let cpu_assignments = self
631 .compute_and_apply_cell_config(&new_cell_ids)
632 .context("recomputing cell configuration for new cgroups")?;
633
634 let cell_manager = self
635 .cell_manager
636 .as_ref()
637 .expect("BUG: cell_manager missing in process_cell_events");
638 info!(
639 "Cell config updated ({} new, {} destroyed): {}",
640 num_new,
641 num_destroyed,
642 cell_manager.format_cell_config(&cpu_assignments)
643 );
644
645 Ok(())
646 }
647
648 fn compute_and_apply_cell_config(
657 &mut self,
658 new_cell_ids: &[u32],
659 ) -> Result<Vec<CpuAssignment>> {
660 let (cell_assignments, cpu_assignments) = {
661 let cell_manager = self
662 .cell_manager
663 .as_ref()
664 .expect("BUG: cell_manager missing in compute_and_apply_cell_config");
665 let active_cell_ids: Vec<u32> = cell_manager
666 .get_cell_assignments()
667 .iter()
668 .map(|(_, cell_id)| *cell_id)
669 .collect();
670 let all_cell_ids: Vec<u32> = std::iter::once(0)
672 .chain(active_cell_ids.iter().copied())
673 .collect();
674
675 let cpu_assignments = if self.enable_rebalancing {
676 let new_set: HashSet<u32> = new_cell_ids.iter().copied().collect();
678 let existing_utils: Vec<f64> = all_cell_ids
679 .iter()
680 .filter(|id| !new_set.contains(id))
681 .map(|&id| self.smoothed_util[id as usize])
682 .collect();
683
684 let has_data = existing_utils.iter().any(|&u| u > 0.0);
685
686 if has_data {
687 let avg_util: f64 =
689 existing_utils.iter().sum::<f64>() / existing_utils.len().max(1) as f64;
690 for &id in new_cell_ids {
691 self.smoothed_util[id as usize] = avg_util;
692 info!(
693 "Seeded new cell {} smoothed_util to average {:.1}%",
694 id, avg_util
695 );
696 }
697
698 let cell_demands: HashMap<u32, f64> = all_cell_ids
700 .iter()
701 .map(|&id| (id, self.smoothed_util[id as usize]))
702 .collect();
703
704 cell_manager
705 .compute_demand_cpu_assignments(&cell_demands, self.enable_borrowing)
706 .context("computing demand-weighted CPU assignments")?
707 } else {
708 cell_manager
710 .compute_cpu_assignments(self.enable_borrowing)
711 .context("computing equal-weight CPU assignments (no utilization data)")?
712 }
713 } else {
714 cell_manager
715 .compute_cpu_assignments(self.enable_borrowing)
716 .context("computing equal-weight CPU assignments (rebalancing disabled)")?
717 };
718
719 (cell_manager.get_cell_assignments(), cpu_assignments)
720 };
721
722 self.apply_cell_config(&cell_assignments, &cpu_assignments)
723 .context("applying cell configuration to BPF")?;
724
725 Ok(cpu_assignments)
726 }
727
728 fn maybe_rebalance(&mut self) -> Result<()> {
730 if self.last_rebalance.elapsed() < self.rebalance_cooldown {
732 return Ok(());
733 }
734
735 let active_cells: Vec<u32> = self.cells.keys().copied().collect();
737 if active_cells.len() < 2 {
738 return Ok(());
739 }
740
741 let mut min_util = f64::MAX;
742 let mut max_util = f64::MIN;
743 for &cell_id in &active_cells {
744 let util = self.smoothed_util[cell_id as usize];
745 if util < min_util {
746 min_util = util;
747 }
748 if util > max_util {
749 max_util = util;
750 }
751 }
752
753 let spread = max_util - min_util;
754 if spread < self.rebalance_threshold {
755 return Ok(());
756 }
757
758 let cell_demands: HashMap<u32, f64> = active_cells
760 .iter()
761 .map(|&cell_id| (cell_id, self.smoothed_util[cell_id as usize]))
762 .collect();
763
764 let (cell_assignments, cpu_assignments) = {
766 let cell_manager = self
767 .cell_manager
768 .as_ref()
769 .expect("BUG: cell_manager missing in maybe_rebalance");
770 let cpu_assignments = cell_manager
771 .compute_demand_cpu_assignments(&cell_demands, self.enable_borrowing)
772 .context("computing demand-weighted CPU assignments for rebalance")?;
773
774 let changed = cpu_assignments.iter().any(|a| {
775 self.cells
776 .get(&a.cell_id)
777 .map_or(true, |cell| cell.cpus != a.primary)
778 });
779
780 if !changed {
781 return Ok(());
782 }
783
784 (cell_manager.get_cell_assignments(), cpu_assignments)
785 };
786
787 self.apply_cell_config(&cell_assignments, &cpu_assignments)
788 .context("applying rebalanced cell configuration to BPF")?;
789
790 self.last_rebalance = Instant::now();
791 self.rebalance_count += 1;
792 self.metrics.rebalance_count = self.rebalance_count;
793
794 let cell_manager = self
795 .cell_manager
796 .as_ref()
797 .expect("BUG: cell_manager missing after apply_cell_config in maybe_rebalance");
798 info!(
799 "Rebalanced CPUs (spread={:.1}%, count={}): {}",
800 spread,
801 self.rebalance_count,
802 cell_manager.format_cell_config(&cpu_assignments)
803 );
804
805 Ok(())
806 }
807
808 fn apply_cell_config(
813 &mut self,
814 cell_assignments: &[(u64, u32)],
815 cpu_assignments: &[CpuAssignment],
816 ) -> Result<()> {
817 let bss_data = self
818 .skel
819 .maps
820 .bss_data
821 .as_mut()
822 .expect("bss_data must be available after scheduler load");
823
824 let config = &mut bss_data.cell_config;
825
826 unsafe {
834 std::ptr::write_bytes(
835 config as *mut _ as *mut u8,
836 0,
837 std::mem::size_of_val(config),
838 );
839 }
840
841 if cell_assignments.len() > bpf_intf::consts_MAX_CELLS as usize {
842 bail!(
843 "Too many cell assignments: {} > MAX_CELLS ({})",
844 cell_assignments.len(),
845 bpf_intf::consts_MAX_CELLS
846 );
847 }
848 config.num_cell_assignments = cell_assignments.len() as u32;
849
850 for (i, (cgid, cell_id)) in cell_assignments.iter().enumerate() {
851 config.assignments[i].cgid = *cgid;
852 config.assignments[i].cell_id = *cell_id;
853 }
854
855 let mut max_cell_id: u32 = 0;
857 for a in cpu_assignments {
858 if a.cell_id >= bpf_intf::consts_MAX_CELLS {
859 bail!(
860 "Cell ID {} exceeds MAX_CELLS ({})",
861 a.cell_id,
862 bpf_intf::consts_MAX_CELLS
863 );
864 }
865 max_cell_id = max_cell_id.max(a.cell_id + 1);
866
867 write_cpumask_to_config(&a.primary, &mut config.cpumasks[a.cell_id as usize].mask);
868
869 if let Some(ref borrowable) = a.borrowable {
870 write_cpumask_to_config(
871 borrowable,
872 &mut config.borrowable_cpumasks[a.cell_id as usize].mask,
873 );
874 }
875 }
876 config.num_cells = max_cell_id;
877
878 let prog = &mut self.skel.progs.apply_cell_config;
880 let out = prog
881 .test_run(ProgramInput::default())
882 .context("Failed to run apply_cell_config BPF program")?;
883 if out.return_value != 0 {
884 bail!(
885 "apply_cell_config BPF program returned error {} (num_assignments={}, num_cells={})",
886 out.return_value as i32,
887 cell_assignments.len(),
888 cpu_assignments.len()
889 );
890 }
891
892 Ok(())
893 }
894
895 fn get_metrics(&self) -> Metrics {
896 self.metrics.clone()
897 }
898
899 fn calculate_distribution_stats(
900 &self,
901 queue_counts: &[u64; QUEUE_STATS_IDX.len()],
902 global_queue_decisions: u64,
903 scope_queue_decisions: u64,
904 scope_affn_viols: u64,
905 scope_steals: u64,
906 scope_pin_skips: u64,
907 ) -> Result<DistributionStats> {
908 let share_of_global =
911 100.0 * (scope_queue_decisions as f64) / (global_queue_decisions as f64);
912
913 let queue_pct = if scope_queue_decisions == 0 {
915 debug!("No queue decisions in scope, zeroing out queue distribution");
916 [0.0; QUEUE_STATS_IDX.len()]
917 } else {
918 core::array::from_fn(|i| {
919 100.0 * (queue_counts[i] as f64) / (scope_queue_decisions as f64)
920 })
921 };
922
923 let affinity_violations_percent = if scope_queue_decisions == 0 {
925 debug!("No queue decisions in scope, zeroing out affinity violations");
926 0.0
927 } else {
928 100.0 * (scope_affn_viols as f64) / (scope_queue_decisions as f64)
929 };
930
931 let steal_pct = if scope_queue_decisions == 0 {
932 0.0
933 } else {
934 100.0 * (scope_steals as f64) / (scope_queue_decisions as f64)
935 };
936
937 let pin_skip_pct = if scope_queue_decisions == 0 {
938 0.0
939 } else {
940 100.0 * (scope_pin_skips as f64) / (scope_queue_decisions as f64)
941 };
942
943 const EXPECTED_QUEUES: usize = 4;
944 if queue_pct.len() != EXPECTED_QUEUES {
945 bail!(
946 "Expected {} queues, got {}",
947 EXPECTED_QUEUES,
948 queue_pct.len()
949 );
950 }
951
952 return Ok(DistributionStats {
953 total_decisions: scope_queue_decisions,
954 share_of_decisions_pct: share_of_global,
955 local_q_pct: queue_pct[0],
956 cpu_q_pct: queue_pct[1],
957 cell_q_pct: queue_pct[2],
958 borrowed_pct: queue_pct[3],
959 affn_viol_pct: affinity_violations_percent,
960 steal_pct,
961 pin_skip_pct,
962 global_queue_decisions,
963 });
964 }
965
966 fn update_and_log_global_queue_stats(
968 &mut self,
969 global_queue_decisions: u64,
970 cell_stats_delta: &[[u64; NR_CSTATS]; MAX_CELLS],
971 ) -> Result<()> {
972 let mut queue_counts = [0; QUEUE_STATS_IDX.len()];
974 for cells in 0..MAX_CELLS {
975 for (i, stat) in QUEUE_STATS_IDX.iter().enumerate() {
976 queue_counts[i] += cell_stats_delta[cells][*stat as usize];
977 }
978 }
979
980 let prefix = "Total Decisions:";
981
982 let scope_affn_viols: u64 = cell_stats_delta
984 .iter()
985 .map(|&cell| cell[bpf_intf::cell_stat_idx_CSTAT_AFFN_VIOL as usize])
986 .sum::<u64>();
987
988 let scope_steals: u64 = cell_stats_delta
990 .iter()
991 .map(|&cell| cell[bpf_intf::cell_stat_idx_CSTAT_STEAL as usize])
992 .sum::<u64>();
993
994 let scope_pin_skips: u64 = cell_stats_delta
996 .iter()
997 .map(|&cell| cell[bpf_intf::cell_stat_idx_CSTAT_PIN_SKIP as usize])
998 .sum::<u64>();
999
1000 let stats = self
1002 .calculate_distribution_stats(
1003 &queue_counts,
1004 global_queue_decisions,
1005 global_queue_decisions,
1006 scope_affn_viols,
1007 scope_steals,
1008 scope_pin_skips,
1009 )
1010 .context("calculating global queue distribution stats")?;
1011
1012 self.metrics.update(&stats);
1013
1014 let sum = |idx: usize| -> u64 { cell_stats_delta.iter().map(|c| c[idx]).sum() };
1016 self.metrics.slice_shrink_max =
1017 sum(bpf_intf::cell_stat_idx_CSTAT_SLICE_SHRINK_MAX as usize);
1018 self.metrics.slice_shrink_proportional =
1019 sum(bpf_intf::cell_stat_idx_CSTAT_SLICE_SHRINK_PROPORTIONAL as usize);
1020 self.metrics.slice_shrink_min =
1021 sum(bpf_intf::cell_stat_idx_CSTAT_SLICE_SHRINK_MIN as usize);
1022 self.metrics.slice_shrink = self.metrics.slice_shrink_max
1023 + self.metrics.slice_shrink_proportional
1024 + self.metrics.slice_shrink_min;
1025
1026 trace!("{} {}", prefix, stats);
1027
1028 Ok(())
1029 }
1030
1031 fn update_and_log_cell_queue_stats(
1033 &mut self,
1034 global_queue_decisions: u64,
1035 cell_stats_delta: &[[u64; NR_CSTATS]; MAX_CELLS],
1036 ) -> Result<()> {
1037 for cell in 0..MAX_CELLS {
1038 let cell_queue_decisions = QUEUE_STATS_IDX
1039 .iter()
1040 .map(|&stat| cell_stats_delta[cell][stat as usize])
1041 .sum::<u64>();
1042
1043 if cell_queue_decisions == 0 {
1045 continue;
1046 }
1047
1048 let mut queue_counts = [0; QUEUE_STATS_IDX.len()];
1049 for (i, &stat) in QUEUE_STATS_IDX.iter().enumerate() {
1050 queue_counts[i] = cell_stats_delta[cell][stat as usize];
1051 }
1052
1053 const MIN_CELL_WIDTH: usize = 2;
1054 let cell_width: usize = max(MIN_CELL_WIDTH, (MAX_CELLS as f64).log10().ceil() as usize);
1055
1056 let prefix = format!(" Cell {:width$}:", cell, width = cell_width);
1057
1058 let scope_affn_viols: u64 =
1060 cell_stats_delta[cell][bpf_intf::cell_stat_idx_CSTAT_AFFN_VIOL as usize];
1061
1062 let scope_steals: u64 =
1064 cell_stats_delta[cell][bpf_intf::cell_stat_idx_CSTAT_STEAL as usize];
1065
1066 let scope_pin_skips: u64 =
1068 cell_stats_delta[cell][bpf_intf::cell_stat_idx_CSTAT_PIN_SKIP as usize];
1069
1070 let stats = self
1071 .calculate_distribution_stats(
1072 &queue_counts,
1073 global_queue_decisions,
1074 cell_queue_decisions,
1075 scope_affn_viols,
1076 scope_steals,
1077 scope_pin_skips,
1078 )
1079 .with_context(|| {
1080 format!("calculating queue distribution stats for cell {}", cell)
1081 })?;
1082
1083 let cell_metrics = self.metrics.cells.entry(cell as u32).or_default();
1084 cell_metrics.update(&stats);
1085
1086 cell_metrics.slice_shrink_max =
1088 cell_stats_delta[cell][bpf_intf::cell_stat_idx_CSTAT_SLICE_SHRINK_MAX as usize];
1089 cell_metrics.slice_shrink_proportional = cell_stats_delta[cell]
1090 [bpf_intf::cell_stat_idx_CSTAT_SLICE_SHRINK_PROPORTIONAL as usize];
1091 cell_metrics.slice_shrink_min =
1092 cell_stats_delta[cell][bpf_intf::cell_stat_idx_CSTAT_SLICE_SHRINK_MIN as usize];
1093 cell_metrics.slice_shrink = cell_metrics.slice_shrink_max
1094 + cell_metrics.slice_shrink_proportional
1095 + cell_metrics.slice_shrink_min;
1096
1097 trace!("{} {}", prefix, stats);
1098 }
1099 Ok(())
1100 }
1101
1102 fn log_all_queue_stats(
1103 &mut self,
1104 cell_stats_delta: &[[u64; NR_CSTATS]; MAX_CELLS],
1105 ) -> Result<()> {
1106 let global_queue_decisions: u64 = cell_stats_delta
1108 .iter()
1109 .flat_map(|cell| QUEUE_STATS_IDX.iter().map(|&idx| cell[idx as usize]))
1110 .sum();
1111
1112 if global_queue_decisions == 0 {
1113 warn!("No queueing decisions made globally");
1114 return Ok(());
1115 }
1116
1117 self.update_and_log_global_queue_stats(global_queue_decisions, &cell_stats_delta)
1118 .context("updating global queue stats")?;
1119
1120 self.update_and_log_cell_queue_stats(global_queue_decisions, &cell_stats_delta)
1121 .context("updating per-cell queue stats")?;
1122
1123 Ok(())
1124 }
1125
1126 fn calculate_cell_stat_delta(
1127 &mut self,
1128 cpu_ctxs: &[bpf_intf::cpu_ctx],
1129 ) -> Result<[[u64; NR_CSTATS]; MAX_CELLS]> {
1130 let mut cell_stats_delta = [[0 as u64; NR_CSTATS]; MAX_CELLS];
1131
1132 for cell in 0..MAX_CELLS {
1135 for stat in 0..NR_CSTATS {
1136 let mut cur_cell_stat = 0;
1137
1138 for cpu_ctx in cpu_ctxs.iter() {
1140 cur_cell_stat += cpu_ctx.cstats[cell][stat];
1141 }
1142
1143 cell_stats_delta[cell][stat] = cur_cell_stat - self.prev_cell_stats[cell][stat];
1145 self.prev_cell_stats[cell][stat] = cur_cell_stat;
1146 }
1147 }
1148 Ok(cell_stats_delta)
1149 }
1150
1151 fn collect_metrics(&mut self) -> Result<()> {
1153 let cpu_ctxs = read_cpu_ctxs(&self.skel).context("reading per-CPU contexts for metrics")?;
1154
1155 let cell_stats_delta = self
1156 .calculate_cell_stat_delta(&cpu_ctxs)
1157 .context("calculating cell stat deltas")?;
1158
1159 self.log_all_queue_stats(&cell_stats_delta)
1160 .context("logging queue stats")?;
1161
1162 if self.cell_manager.is_some() {
1163 self.collect_demand_metrics(&cpu_ctxs)
1164 .context("collecting demand metrics")?;
1165 }
1166
1167 for (cell_id, cell) in &self.cells {
1168 trace!("CELL[{}]: {}", cell_id, cell.cpus);
1169 }
1170
1171 for (cell_id, cell) in self.cells.iter() {
1172 self.metrics
1174 .cells
1175 .entry(*cell_id)
1176 .and_modify(|cell_metrics| cell_metrics.num_cpus = cell.cpus.weight() as u32);
1177 }
1178 self.metrics.num_cells = self.cells.len() as u32;
1179
1180 Ok(())
1181 }
1182
1183 fn collect_demand_metrics(&mut self, cpu_ctxs: &[bpf_intf::cpu_ctx]) -> Result<()> {
1185 let mut total_running_ns = [0u64; MAX_CELLS];
1190 let mut on_own_ns = [0u64; MAX_CELLS];
1191 let mut lent_ns = [0u64; MAX_CELLS];
1192
1193 for cpu_ctx in cpu_ctxs.iter() {
1194 let owner = cpu_ctx.cell as usize;
1195 for cell in 0..MAX_CELLS {
1196 let ns = cpu_ctx.running_ns[cell];
1197 total_running_ns[cell] += ns;
1198 if owner == cell {
1199 on_own_ns[cell] += ns;
1200 }
1201 }
1202 if owner >= MAX_CELLS {
1203 bail!(
1204 "CPU has invalid cell assignment {} (MAX_CELLS={})",
1205 owner,
1206 MAX_CELLS
1207 );
1208 }
1209 let total_on_cpu: u64 = cpu_ctx.running_ns.iter().sum();
1211 let owner_on_cpu = cpu_ctx.running_ns[owner];
1212 lent_ns[owner] += total_on_cpu.saturating_sub(owner_on_cpu);
1213 }
1214
1215 let interval_ns = self.monitor_interval.as_nanos() as u64;
1217
1218 let mut global_running_delta = 0u64;
1219 let mut global_borrowed_delta = 0u64;
1220 let mut global_lent_delta = 0u64;
1221 let mut global_capacity = 0u64;
1222
1223 for cell in 0..MAX_CELLS {
1224 let delta_running =
1225 total_running_ns[cell].saturating_sub(self.prev_cell_running_ns[cell]);
1226 let delta_on_own = on_own_ns[cell].saturating_sub(self.prev_cell_own_ns[cell]);
1227 let delta_lent = lent_ns[cell].saturating_sub(self.prev_cell_lent_ns[cell]);
1228
1229 self.prev_cell_running_ns[cell] = total_running_ns[cell];
1230 self.prev_cell_own_ns[cell] = on_own_ns[cell];
1231 self.prev_cell_lent_ns[cell] = lent_ns[cell];
1232
1233 if delta_running == 0 && delta_lent == 0 {
1234 continue;
1235 }
1236
1237 let delta_borrowed = delta_running.saturating_sub(delta_on_own);
1239
1240 let Some(cell_info) = self.cells.get(&(cell as u32)) else {
1243 continue;
1244 };
1245
1246 let nr_cpus = cell_info.cpus.weight() as u64;
1247 if nr_cpus == 0 {
1248 bail!("Cell {} has 0 CPUs assigned", cell);
1249 }
1250
1251 let capacity = nr_cpus * interval_ns;
1253 let util_pct = 100.0 * (delta_running as f64) / (capacity as f64);
1255 let demand_borrow_pct = if delta_running > 0 {
1257 100.0 * (delta_borrowed as f64) / (delta_running as f64)
1258 } else {
1259 0.0
1260 };
1261 let lent_pct = 100.0 * (delta_lent as f64) / (capacity as f64);
1263
1264 if self.enable_rebalancing {
1266 self.smoothed_util[cell] = self.demand_smoothing * util_pct
1267 + (1.0 - self.demand_smoothing) * self.smoothed_util[cell];
1268 }
1269
1270 self.metrics
1271 .cells
1272 .entry(cell as u32)
1273 .or_default()
1274 .update_demand(util_pct, demand_borrow_pct, lent_pct);
1275
1276 if self.enable_rebalancing {
1278 self.metrics
1279 .cells
1280 .entry(cell as u32)
1281 .or_default()
1282 .smoothed_util_pct = self.smoothed_util[cell];
1283 }
1284
1285 global_running_delta = global_running_delta.saturating_add(delta_running);
1286 global_borrowed_delta = global_borrowed_delta.saturating_add(delta_borrowed);
1287 global_lent_delta = global_lent_delta.saturating_add(delta_lent);
1288 global_capacity = global_capacity.saturating_add(capacity);
1289 }
1290
1291 let global_util_pct = if global_capacity > 0 {
1292 100.0 * (global_running_delta as f64) / (global_capacity as f64)
1293 } else {
1294 0.0
1295 };
1296 let global_borrow_pct = if global_running_delta > 0 {
1297 100.0 * (global_borrowed_delta as f64) / (global_running_delta as f64)
1298 } else {
1299 0.0
1300 };
1301 let global_lent_pct = if global_capacity > 0 {
1302 100.0 * (global_lent_delta as f64) / (global_capacity as f64)
1303 } else {
1304 0.0
1305 };
1306
1307 self.metrics
1308 .update_demand(global_util_pct, global_borrow_pct, global_lent_pct);
1309
1310 Ok(())
1311 }
1312
1313 fn update_applied_cpuset_seq(&mut self) {
1315 unsafe {
1316 let ptr = &mut self
1317 .skel
1318 .maps
1319 .bss_data
1320 .as_mut()
1321 .expect("BUG: bss_data missing after scheduler load")
1322 .applied_cpuset_seq as *mut u32;
1323 std::ptr::write_volatile(ptr, self.last_cpuset_seq);
1324 }
1325 }
1326
1327 fn check_cpuset_changes(&mut self) -> Result<()> {
1329 let Some(ref mut cm) = self.cell_manager else {
1330 return Ok(());
1331 };
1332
1333 let current_seq = unsafe {
1334 let ptr = &self
1335 .skel
1336 .maps
1337 .bss_data
1338 .as_ref()
1339 .expect("BUG: bss_data missing after scheduler load")
1340 .cpuset_seq as *const u32;
1341 (ptr as *const AtomicU32)
1342 .as_ref()
1343 .expect("BUG: cpuset_seq pointer cast yielded null")
1344 .load(Ordering::Acquire)
1345 };
1346
1347 if current_seq == self.last_cpuset_seq {
1348 return Ok(());
1349 }
1350 self.last_cpuset_seq = current_seq;
1351
1352 if !cm.refresh_cpusets().context("refreshing cell cpusets")? {
1353 self.update_applied_cpuset_seq();
1355 return Ok(());
1356 }
1357
1358 let cpu_assignments = self
1359 .compute_and_apply_cell_config(&[])
1360 .context("recomputing cell configuration after cpuset change")?;
1361 self.update_applied_cpuset_seq();
1362 let cell_manager = self
1363 .cell_manager
1364 .as_ref()
1365 .expect("BUG: cell_manager missing in check_cpuset_changes");
1366 info!(
1367 "Cpuset change detected, recomputed config: {}",
1368 cell_manager.format_cell_config(&cpu_assignments)
1369 );
1370 Ok(())
1371 }
1372
1373 fn refresh_bpf_cells(&mut self) -> Result<()> {
1374 let applied_configuration = unsafe {
1375 let ptr = &self
1376 .skel
1377 .maps
1378 .bss_data
1379 .as_ref()
1380 .expect("BUG: bss_data missing after scheduler load")
1381 .applied_configuration_seq as *const u32;
1382 (ptr as *const std::sync::atomic::AtomicU32)
1383 .as_ref()
1384 .expect("BUG: applied_configuration_seq pointer cast yielded null")
1385 .load(std::sync::atomic::Ordering::Acquire)
1386 };
1387 if self
1388 .last_configuration_seq
1389 .is_some_and(|seq| applied_configuration == seq)
1390 {
1391 return Ok(());
1392 }
1393 let mut cell_to_cpus: HashMap<u32, Cpumask> = HashMap::new();
1395 let cpu_ctxs =
1396 read_cpu_ctxs(&self.skel).context("reading per-CPU contexts for BPF cell refresh")?;
1397 for (i, cpu_ctx) in cpu_ctxs.iter().enumerate() {
1398 cell_to_cpus
1399 .entry(cpu_ctx.cell)
1400 .or_insert_with(|| Cpumask::new())
1401 .set_cpu(i)
1402 .expect("set cpu in existing mask");
1403 }
1404
1405 let cells_with_cpus: HashSet<u32> = cell_to_cpus.keys().copied().collect();
1416 let mut active_cells = cells_with_cpus.clone();
1417 active_cells.insert(0);
1418
1419 for cell_idx in &active_cells {
1420 let cpus = cell_to_cpus
1421 .get(cell_idx)
1422 .cloned()
1423 .unwrap_or_else(|| Cpumask::new());
1424 self.cells
1425 .entry(*cell_idx)
1426 .or_insert_with(|| Cell {
1427 cpus: Cpumask::new(),
1428 })
1429 .cpus = cpus;
1430 self.metrics.cells.insert(*cell_idx, CellMetrics::default());
1431 }
1432
1433 self.cells.retain(|&k, _| active_cells.contains(&k));
1435 self.metrics.cells.retain(|&k, _| active_cells.contains(&k));
1436
1437 self.last_configuration_seq = Some(applied_configuration);
1438
1439 Ok(())
1440 }
1441}
1442
1443fn write_cpumask_to_config(cpumask: &Cpumask, dest: &mut [u8]) {
1444 let raw_slice = cpumask.as_raw_slice();
1445 for (word_idx, word) in raw_slice.iter().enumerate() {
1446 let byte_start = word_idx * 8;
1447 let bytes = word.to_le_bytes();
1448 for (j, byte) in bytes.iter().enumerate() {
1449 let idx = byte_start + j;
1450 if idx < dest.len() {
1451 dest[idx] = *byte;
1452 }
1453 }
1454 }
1455}
1456
1457fn read_cpu_ctxs(skel: &BpfSkel) -> Result<Vec<bpf_intf::cpu_ctx>> {
1458 let mut cpu_ctxs = vec![];
1459 let cpu_ctxs_vec = skel
1460 .maps
1461 .cpu_ctxs
1462 .lookup_percpu(&0u32.to_ne_bytes(), libbpf_rs::MapFlags::ANY)
1463 .context("Failed to lookup cpu_ctx")?
1464 .expect("BUG: cpu_ctxs lookup_percpu returned None for key 0");
1465 if cpu_ctxs_vec.len() < *NR_CPUS_POSSIBLE {
1466 bail!(
1467 "Percpu map returned {} entries but expected {}",
1468 cpu_ctxs_vec.len(),
1469 *NR_CPUS_POSSIBLE
1470 );
1471 }
1472 for cpu in 0..*NR_CPUS_POSSIBLE {
1473 cpu_ctxs.push(*unsafe {
1474 &*(cpu_ctxs_vec[cpu].as_slice().as_ptr() as *const bpf_intf::cpu_ctx)
1475 });
1476 }
1477 Ok(cpu_ctxs)
1478}
1479
1480#[clap_main::clap_main]
1481fn main(opts: Opts) -> Result<()> {
1482 if opts.version {
1483 println!(
1484 "scx_mitosis {}",
1485 build_id::full_version(env!("CARGO_PKG_VERSION"))
1486 );
1487 return Ok(());
1488 }
1489
1490 let env_filter = EnvFilter::try_from_default_env()
1491 .or_else(|_| match EnvFilter::try_new(&opts.log_level) {
1492 Ok(filter) => Ok(filter),
1493 Err(e) => {
1494 eprintln!(
1495 "invalid log envvar: {}, using info, err is: {}",
1496 opts.log_level, e
1497 );
1498 EnvFilter::try_new("info")
1499 }
1500 })
1501 .unwrap_or_else(|_| EnvFilter::new("info"));
1502
1503 match tracing_subscriber::fmt()
1504 .with_env_filter(env_filter)
1505 .with_target(true)
1506 .with_thread_ids(true)
1507 .with_file(true)
1508 .with_line_number(true)
1509 .try_init()
1510 {
1511 Ok(()) => {}
1512 Err(e) => eprintln!("failed to init logger: {}", e),
1513 }
1514
1515 if opts.verbose > 0 {
1516 warn!("Setting verbose via -v is deprecated and will be an error in future releases.");
1517 }
1518
1519 debug!("opts={:?}", &opts);
1520
1521 if let Some(run_id) = opts.run_id {
1522 info!("scx_mitosis run_id: {}", run_id);
1523 }
1524
1525 let shutdown = Arc::new(AtomicBool::new(false));
1526 let shutdown_clone = shutdown.clone();
1527 ctrlc::set_handler(move || {
1528 shutdown_clone.store(true, Ordering::Relaxed);
1529 })
1530 .context("Error setting Ctrl-C handler")?;
1531
1532 if let Some(intv) = opts.monitor {
1533 let shutdown_clone = shutdown.clone();
1534 let jh = std::thread::spawn(move || {
1535 match stats::monitor(Duration::from_secs_f64(intv), shutdown_clone) {
1536 Ok(_) => {
1537 debug!("stats monitor thread finished successfully")
1538 }
1539 Err(error_object) => {
1540 warn!(
1541 "stats monitor thread finished because of an error {}",
1542 error_object
1543 )
1544 }
1545 }
1546 });
1547 if opts.monitor.is_some() {
1548 let _ = jh.join();
1549 return Ok(());
1550 }
1551 }
1552
1553 let mut open_object = MaybeUninit::uninit();
1554 loop {
1555 let mut sched =
1556 Scheduler::init(&opts, &mut open_object).context("initializing scheduler")?;
1557 if !sched
1558 .run(shutdown.clone())
1559 .context("running scheduler main loop")?
1560 .should_restart()
1561 {
1562 break;
1563 }
1564 }
1565
1566 Ok(())
1567}