Skip to main content

scx_mitosis/
main.rs

1// Copyright (c) Meta Platforms, Inc. and affiliates.
2
3// This software may be used and distributed according to the terms of the
4// GNU General Public License version 2.
5mod bpf_skel;
6pub use bpf_skel::*;
7pub mod bpf_intf;
8mod mitosis_topology_utils;
9mod stats;
10
11use std::cmp::max;
12use std::collections::{HashMap, HashSet};
13use std::fmt;
14use std::fmt::Display;
15use std::mem::MaybeUninit;
16use std::sync::atomic::AtomicBool;
17use std::sync::atomic::Ordering;
18use std::sync::Arc;
19use std::time::Duration;
20
21use anyhow::bail;
22use anyhow::Context;
23use anyhow::Result;
24use clap::Parser;
25use crossbeam::channel::RecvTimeoutError;
26use libbpf_rs::MapCore as _;
27use libbpf_rs::OpenObject;
28use scx_stats::prelude::*;
29use scx_utils::build_id;
30use scx_utils::compat;
31use scx_utils::init_libbpf_logging;
32use scx_utils::libbpf_clap_opts::LibbpfOpts;
33use scx_utils::scx_enums;
34use scx_utils::scx_ops_attach;
35use scx_utils::scx_ops_load;
36use scx_utils::scx_ops_open;
37use scx_utils::uei_exited;
38use scx_utils::uei_report;
39use scx_utils::Cpumask;
40use scx_utils::Topology;
41use scx_utils::UserExitInfo;
42use scx_utils::NR_CPUS_POSSIBLE;
43use tracing::{debug, info, trace, warn};
44use tracing_subscriber::filter::EnvFilter;
45
46use stats::CellMetrics;
47use stats::Metrics;
48
49const SCHEDULER_NAME: &str = "scx_mitosis";
50const MAX_CELLS: usize = bpf_intf::consts_MAX_CELLS as usize;
51const NR_CSTATS: usize = bpf_intf::cell_stat_idx_NR_CSTATS as usize;
52
53/// scx_mitosis: A dynamic affinity scheduler
54///
55/// Cgroups are assigned to a dynamic number of Cells which are assigned to a
56/// dynamic set of CPUs. The BPF part does simple vtime scheduling for each cell.
57///
58/// Userspace makes the dynamic decisions of which Cells should be merged or
59/// split and which CPUs they should be assigned to.
60#[derive(Debug, Parser)]
61struct Opts {
62    /// Deprecated, noop, use RUST_LOG or --log-level instead.
63    #[clap(short = 'v', long, action = clap::ArgAction::Count)]
64    verbose: u8,
65
66    /// Specify the logging level. Accepts rust's envfilter syntax for modular
67    /// logging: https://docs.rs/tracing-subscriber/latest/tracing_subscriber/filter/struct.EnvFilter.html#example-syntax. Examples: ["info", "warn,tokio=info"]
68    #[clap(long, default_value = "info")]
69    log_level: String,
70
71    /// Exit debug dump buffer length. 0 indicates default.
72    #[clap(long, default_value = "0")]
73    exit_dump_len: u32,
74
75    /// Interval to consider reconfiguring the Cells (e.g. merge or split)
76    #[clap(long, default_value = "10")]
77    reconfiguration_interval_s: u64,
78
79    /// Interval to consider rebalancing CPUs to Cells
80    #[clap(long, default_value = "5")]
81    rebalance_cpus_interval_s: u64,
82
83    /// Interval to report monitoring information
84    #[clap(long, default_value = "1")]
85    monitor_interval_s: u64,
86
87    /// Run in stats monitoring mode with the specified interval. Scheduler
88    /// is not launched.
89    #[clap(long)]
90    monitor: Option<f64>,
91
92    /// Print scheduler version and exit.
93    #[clap(short = 'V', long, action = clap::ArgAction::SetTrue)]
94    version: bool,
95
96    /// Optional run ID for tracking scheduler instances.
97    #[clap(long)]
98    run_id: Option<u64>,
99
100    /// Enable debug event tracking for cgroup_init, init_task, and cgroup_exit.
101    /// Events are recorded in a ring buffer and output in dump().
102    #[clap(long, action = clap::ArgAction::SetTrue)]
103    debug_events: bool,
104
105    /// Enable workaround for exiting tasks with offline cgroups during scheduler load.
106    /// This works around a kernel bug where tasks can be initialized with cgroups that
107    /// were never initialized. Disable this once the kernel bug is fixed.
108    #[clap(long, default_value = "true", action = clap::ArgAction::Set)]
109    exiting_task_workaround: bool,
110
111    /// Disable SCX cgroup callbacks (for when CPU cgroup controller is disabled).
112    /// Uses tracepoints and cgroup iteration instead.
113    #[clap(long, action = clap::ArgAction::SetTrue)]
114    cpu_controller_disabled: bool,
115
116    /// Reject tasks with multi-CPU pinning that doesn't cover the entire cell.
117    /// By default, these tasks are allowed but may have degraded performance.
118    #[clap(long, action = clap::ArgAction::SetTrue)]
119    reject_multicpu_pinning: bool,
120
121    /// Enable LLC-awareness. This will populate the scheduler's LLC maps and cause it
122    /// to use LLC-aware scheduling.
123    #[clap(long, action = clap::ArgAction::SetTrue)]
124    enable_llc_awareness: bool,
125
126    /// Enable work stealing. This is only relevant when LLC-awareness is enabled.
127    #[clap(long, action = clap::ArgAction::SetTrue)]
128    enable_work_stealing: bool,
129
130    #[clap(flatten, next_help_heading = "Libbpf Options")]
131    pub libbpf: LibbpfOpts,
132}
133
134// The subset of cstats we care about.
135// Local + Default + Hi + Lo = Total Decisions
136// Affinity violations are not queue decisions, but
137// will be calculated separately and reported as a percent of the total
138const QUEUE_STATS_IDX: [bpf_intf::cell_stat_idx; 3] = [
139    bpf_intf::cell_stat_idx_CSTAT_LOCAL,
140    bpf_intf::cell_stat_idx_CSTAT_CPU_DSQ,
141    bpf_intf::cell_stat_idx_CSTAT_CELL_DSQ,
142];
143
144// Per cell book-keeping
145#[derive(Debug)]
146struct Cell {
147    cpus: Cpumask,
148}
149
150struct Scheduler<'a> {
151    skel: BpfSkel<'a>,
152    monitor_interval: Duration,
153    cells: HashMap<u32, Cell>,
154    // These are the per-cell cstats.
155    // Note these are accumulated across all CPUs.
156    prev_cell_stats: [[u64; NR_CSTATS]; MAX_CELLS],
157    metrics: Metrics,
158    stats_server: StatsServer<(), Metrics>,
159    last_configuration_seq: Option<u32>,
160}
161
162struct DistributionStats {
163    total_decisions: u64,
164    share_of_decisions_pct: f64,
165    local_q_pct: f64,
166    cpu_q_pct: f64,
167    cell_q_pct: f64,
168    affn_viol_pct: f64,
169    steal_pct: f64,
170
171    // for formatting
172    global_queue_decisions: u64,
173}
174
175impl Display for DistributionStats {
176    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
177        // This makes the output easier to read by improving column alignment. First, it guarantees that within a
178        // given logging interval, the global and cell queueing decision counts print at the same width.
179        // Second, it reduces variance in column width between logging intervals. 5 is simply a heuristic.
180        const MIN_DECISIONS_WIDTH: usize = 5;
181        let descisions_width = if self.global_queue_decisions > 0 {
182            max(
183                MIN_DECISIONS_WIDTH,
184                (self.global_queue_decisions as f64).log10().ceil() as usize,
185            )
186        } else {
187            MIN_DECISIONS_WIDTH
188        };
189        write!(
190            f,
191            "{:width$} {:5.1}% | Local:{:4.1}% From: CPU:{:4.1}% Cell:{:4.1}% | V:{:4.1}% S:{:4.1}%",
192            self.total_decisions,
193            self.share_of_decisions_pct,
194            self.local_q_pct,
195            self.cpu_q_pct,
196            self.cell_q_pct,
197            self.affn_viol_pct,
198            self.steal_pct,
199            width = descisions_width,
200        )
201    }
202}
203
204impl<'a> Scheduler<'a> {
205    fn validate_args(opts: &Opts) -> Result<()> {
206        if opts.enable_work_stealing && !opts.enable_llc_awareness {
207            bail!("Work stealing requires LLC-aware mode (--enable-llc-awareness)");
208        }
209
210        Ok(())
211    }
212
213    fn init(opts: &Opts, open_object: &'a mut MaybeUninit<OpenObject>) -> Result<Self> {
214        Self::validate_args(opts)?;
215
216        let topology = Topology::new()?;
217
218        let nr_llc = topology.all_llcs.len().max(1);
219
220        let mut skel_builder = BpfSkelBuilder::default();
221        skel_builder
222            .obj_builder
223            .debug(opts.log_level.contains("trace"));
224        init_libbpf_logging(None);
225        info!(
226            "Running scx_mitosis (build ID: {})",
227            build_id::full_version(env!("CARGO_PKG_VERSION"))
228        );
229
230        let open_opts = opts.libbpf.clone().into_bpf_open_opts();
231        let mut skel = scx_ops_open!(skel_builder, open_object, mitosis, open_opts)?;
232
233        skel.struct_ops.mitosis_mut().exit_dump_len = opts.exit_dump_len;
234
235        let rodata = skel.maps.rodata_data.as_mut().unwrap();
236
237        rodata.slice_ns = scx_enums.SCX_SLICE_DFL;
238        rodata.debug_events_enabled = opts.debug_events;
239        rodata.exiting_task_workaround_enabled = opts.exiting_task_workaround;
240        rodata.cpu_controller_disabled = opts.cpu_controller_disabled;
241
242        rodata.nr_possible_cpus = *NR_CPUS_POSSIBLE as u32;
243        for cpu in topology.all_cpus.keys() {
244            rodata.all_cpus[cpu / 8] |= 1 << (cpu % 8);
245        }
246
247        rodata.reject_multicpu_pinning = opts.reject_multicpu_pinning;
248
249        // Set nr_llc in rodata
250        rodata.nr_llc = nr_llc as u32;
251        rodata.enable_llc_awareness = opts.enable_llc_awareness;
252        rodata.enable_work_stealing = opts.enable_work_stealing;
253
254        match *compat::SCX_OPS_ALLOW_QUEUED_WAKEUP {
255            0 => info!("Kernel does not support queued wakeup optimization."),
256            v => skel.struct_ops.mitosis_mut().flags |= v,
257        }
258
259        // Populate LLC topology arrays before load (data section is only writable before load)
260        mitosis_topology_utils::populate_topology_maps(
261            &mut skel,
262            mitosis_topology_utils::MapKind::CpuToLLC,
263            None,
264        )?;
265        mitosis_topology_utils::populate_topology_maps(
266            &mut skel,
267            mitosis_topology_utils::MapKind::LLCToCpus,
268            None,
269        )?;
270
271        let skel = scx_ops_load!(skel, mitosis, uei)?;
272
273        let stats_server = StatsServer::new(stats::server_data()).launch()?;
274
275        Ok(Self {
276            skel,
277            monitor_interval: Duration::from_secs(opts.monitor_interval_s),
278            cells: HashMap::new(),
279            prev_cell_stats: [[0; NR_CSTATS]; MAX_CELLS],
280            metrics: Metrics::default(),
281            stats_server,
282            last_configuration_seq: None,
283        })
284    }
285
286    fn run(&mut self, shutdown: Arc<AtomicBool>) -> Result<UserExitInfo> {
287        let struct_ops = scx_ops_attach!(self.skel, mitosis)?;
288
289        info!("Mitosis Scheduler Attached. Run `scx_mitosis --monitor` for metrics.");
290
291        let (res_ch, req_ch) = self.stats_server.channels();
292
293        while !shutdown.load(Ordering::Relaxed) && !uei_exited!(&self.skel, uei) {
294            self.refresh_bpf_cells()?;
295            self.collect_metrics()?;
296
297            match req_ch.recv_timeout(self.monitor_interval) {
298                Ok(()) => res_ch.send(self.get_metrics())?,
299                Err(RecvTimeoutError::Timeout) => {}
300                Err(e) => Err(e)?,
301            }
302        }
303        drop(struct_ops);
304        info!("Unregister {SCHEDULER_NAME} scheduler");
305        uei_report!(&self.skel, uei)
306    }
307
308    fn get_metrics(&self) -> Metrics {
309        self.metrics.clone()
310    }
311
312    fn calculate_distribution_stats(
313        &self,
314        queue_counts: &[u64; QUEUE_STATS_IDX.len()],
315        global_queue_decisions: u64,
316        scope_queue_decisions: u64,
317        scope_affn_viols: u64,
318        scope_steals: u64,
319    ) -> Result<DistributionStats> {
320        // First % on the line: share of global work
321        // We know global_queue_decisions is non-zero.
322        let share_of_global =
323            100.0 * (scope_queue_decisions as f64) / (global_queue_decisions as f64);
324
325        // Each queue's % of the scope total
326        let queue_pct = if scope_queue_decisions == 0 {
327            debug!("No queue decisions in scope, zeroing out queue distribution");
328            [0.0; QUEUE_STATS_IDX.len()]
329        } else {
330            core::array::from_fn(|i| {
331                100.0 * (queue_counts[i] as f64) / (scope_queue_decisions as f64)
332            })
333        };
334
335        // These are summed differently for the global and per-cell totals.
336        let affinity_violations_percent = if scope_queue_decisions == 0 {
337            debug!("No queue decisions in scope, zeroing out affinity violations");
338            0.0
339        } else {
340            100.0 * (scope_affn_viols as f64) / (scope_queue_decisions as f64)
341        };
342
343        let steal_pct = if scope_queue_decisions == 0 {
344            0.0
345        } else {
346            100.0 * (scope_steals as f64) / (scope_queue_decisions as f64)
347        };
348
349        const EXPECTED_QUEUES: usize = 3;
350        if queue_pct.len() != EXPECTED_QUEUES {
351            bail!(
352                "Expected {} queues, got {}",
353                EXPECTED_QUEUES,
354                queue_pct.len()
355            );
356        }
357
358        return Ok(DistributionStats {
359            total_decisions: scope_queue_decisions,
360            share_of_decisions_pct: share_of_global,
361            local_q_pct: queue_pct[0],
362            cpu_q_pct: queue_pct[1],
363            cell_q_pct: queue_pct[2],
364            affn_viol_pct: affinity_violations_percent,
365            steal_pct,
366            global_queue_decisions,
367        });
368    }
369
370    // Queue stats for the whole node
371    fn update_and_log_global_queue_stats(
372        &mut self,
373        global_queue_decisions: u64,
374        cell_stats_delta: &[[u64; NR_CSTATS]; MAX_CELLS],
375    ) -> Result<()> {
376        // Get total of each queue summed over all cells
377        let mut queue_counts = [0; QUEUE_STATS_IDX.len()];
378        for cells in 0..MAX_CELLS {
379            for (i, stat) in QUEUE_STATS_IDX.iter().enumerate() {
380                queue_counts[i] += cell_stats_delta[cells][*stat as usize];
381            }
382        }
383
384        let prefix = "Total Decisions:";
385
386        // Here we want to sum the affinity violations over all cells.
387        let scope_affn_viols: u64 = cell_stats_delta
388            .iter()
389            .map(|&cell| cell[bpf_intf::cell_stat_idx_CSTAT_AFFN_VIOL as usize])
390            .sum::<u64>();
391
392        // Sum steals over all cells
393        let scope_steals: u64 = cell_stats_delta
394            .iter()
395            .map(|&cell| cell[bpf_intf::cell_stat_idx_CSTAT_STEAL as usize])
396            .sum::<u64>();
397
398        // Special case where the number of scope decisions == number global decisions
399        let stats = self.calculate_distribution_stats(
400            &queue_counts,
401            global_queue_decisions,
402            global_queue_decisions,
403            scope_affn_viols,
404            scope_steals,
405        )?;
406
407        self.metrics.update(&stats);
408
409        trace!("{} {}", prefix, stats);
410
411        Ok(())
412    }
413
414    // Print out the per-cell stats
415    fn update_and_log_cell_queue_stats(
416        &mut self,
417        global_queue_decisions: u64,
418        cell_stats_delta: &[[u64; NR_CSTATS]; MAX_CELLS],
419    ) -> Result<()> {
420        for cell in 0..MAX_CELLS {
421            let cell_queue_decisions = QUEUE_STATS_IDX
422                .iter()
423                .map(|&stat| cell_stats_delta[cell][stat as usize])
424                .sum::<u64>();
425
426            // FIXME: This should really query if the cell is enabled or not.
427            if cell_queue_decisions == 0 {
428                continue;
429            }
430
431            let mut queue_counts = [0; QUEUE_STATS_IDX.len()];
432            for (i, &stat) in QUEUE_STATS_IDX.iter().enumerate() {
433                queue_counts[i] = cell_stats_delta[cell][stat as usize];
434            }
435
436            const MIN_CELL_WIDTH: usize = 2;
437            let cell_width: usize = max(MIN_CELL_WIDTH, (MAX_CELLS as f64).log10().ceil() as usize);
438
439            let prefix = format!("        Cell {:width$}:", cell, width = cell_width);
440
441            // Sum affinity violations for this cell
442            let scope_affn_viols: u64 =
443                cell_stats_delta[cell][bpf_intf::cell_stat_idx_CSTAT_AFFN_VIOL as usize];
444
445            // Steals for this cell
446            let scope_steals: u64 =
447                cell_stats_delta[cell][bpf_intf::cell_stat_idx_CSTAT_STEAL as usize];
448
449            let stats = self.calculate_distribution_stats(
450                &queue_counts,
451                global_queue_decisions,
452                cell_queue_decisions,
453                scope_affn_viols,
454                scope_steals,
455            )?;
456
457            self.metrics
458                .cells
459                .entry(cell as u32)
460                .or_default()
461                .update(&stats);
462
463            trace!("{} {}", prefix, stats);
464        }
465        Ok(())
466    }
467
468    fn log_all_queue_stats(
469        &mut self,
470        cell_stats_delta: &[[u64; NR_CSTATS]; MAX_CELLS],
471    ) -> Result<()> {
472        // Get total decisions
473        let global_queue_decisions: u64 = cell_stats_delta
474            .iter()
475            .flat_map(|cell| QUEUE_STATS_IDX.iter().map(|&idx| cell[idx as usize]))
476            .sum();
477
478        // We don't want to divide by zero later, but this is never expected.
479        if global_queue_decisions == 0 {
480            bail!("Error: No queueing decisions made globally");
481        }
482
483        self.update_and_log_global_queue_stats(global_queue_decisions, &cell_stats_delta)?;
484
485        self.update_and_log_cell_queue_stats(global_queue_decisions, &cell_stats_delta)?;
486
487        Ok(())
488    }
489
490    fn calculate_cell_stat_delta(&mut self) -> Result<[[u64; NR_CSTATS]; MAX_CELLS]> {
491        let mut cell_stats_delta = [[0 as u64; NR_CSTATS]; MAX_CELLS];
492
493        // Read CPU contexts
494        let cpu_ctxs = read_cpu_ctxs(&self.skel)?;
495
496        // Loop over cells and stats first, then CPU contexts
497        // TODO: We should loop over the in_use cells only.
498        for cell in 0..MAX_CELLS {
499            for stat in 0..NR_CSTATS {
500                let mut cur_cell_stat = 0;
501
502                // Accumulate stats from all CPUs
503                for cpu_ctx in cpu_ctxs.iter() {
504                    cur_cell_stat += cpu_ctx.cstats[cell][stat];
505                }
506
507                // Calculate delta and update previous stat
508                cell_stats_delta[cell][stat] = cur_cell_stat - self.prev_cell_stats[cell][stat];
509                self.prev_cell_stats[cell][stat] = cur_cell_stat;
510            }
511        }
512        Ok(cell_stats_delta)
513    }
514
515    /// Collect metrics and out various debugging data like per cell stats, per-cpu stats, etc.
516    fn collect_metrics(&mut self) -> Result<()> {
517        let cell_stats_delta = self.calculate_cell_stat_delta()?;
518
519        self.log_all_queue_stats(&cell_stats_delta)?;
520
521        for (cell_id, cell) in &self.cells {
522            trace!("CELL[{}]: {}", cell_id, cell.cpus);
523        }
524
525        for (cell_id, cell) in self.cells.iter() {
526            // Assume we have a CellMetrics entry if we have a known cell
527            self.metrics
528                .cells
529                .entry(*cell_id)
530                .and_modify(|cell_metrics| cell_metrics.num_cpus = cell.cpus.weight() as u32);
531        }
532        self.metrics.num_cells = self.cells.len() as u32;
533
534        Ok(())
535    }
536
537    fn refresh_bpf_cells(&mut self) -> Result<()> {
538        let applied_configuration = unsafe {
539            let ptr = &self
540                .skel
541                .maps
542                .bss_data
543                .as_ref()
544                .unwrap()
545                .applied_configuration_seq as *const u32;
546            (ptr as *const std::sync::atomic::AtomicU32)
547                .as_ref()
548                .unwrap()
549                .load(std::sync::atomic::Ordering::Acquire)
550        };
551        if self
552            .last_configuration_seq
553            .is_some_and(|seq| applied_configuration == seq)
554        {
555            return Ok(());
556        }
557        // collect all cpus per cell.
558        let mut cell_to_cpus: HashMap<u32, Cpumask> = HashMap::new();
559        let cpu_ctxs = read_cpu_ctxs(&self.skel)?;
560        for (i, cpu_ctx) in cpu_ctxs.iter().enumerate() {
561            cell_to_cpus
562                .entry(cpu_ctx.cell)
563                .or_insert_with(|| Cpumask::new())
564                .set_cpu(i)
565                .expect("set cpu in existing mask");
566        }
567
568        // Create cells we don't have yet, drop cells that are no longer in use.
569        // If we continue to drop cell metrics once a cell is removed, we'll need to make sure we
570        // flush metrics for a cell before we remove it completely.
571        //
572        // IMPORTANT: We determine which cells exist based on CPU assignments (which are
573        // synchronized by applied_configuration_seq), NOT by reading the in_use field
574        // separately. This avoids a TOCTOU race where a cell's in_use is set before
575        // CPUs are assigned.
576
577        // Cell 0 (root cell) always exists even if it has no CPUs temporarily
578        let cells_with_cpus: HashSet<u32> = cell_to_cpus.keys().copied().collect();
579        let mut active_cells = cells_with_cpus.clone();
580        active_cells.insert(0);
581
582        for cell_idx in &active_cells {
583            let cpus = cell_to_cpus
584                .get(cell_idx)
585                .cloned()
586                .unwrap_or_else(|| Cpumask::new());
587            self.cells
588                .entry(*cell_idx)
589                .or_insert_with(|| Cell {
590                    cpus: Cpumask::new(),
591                })
592                .cpus = cpus;
593            self.metrics.cells.insert(*cell_idx, CellMetrics::default());
594        }
595
596        // Remove cells that no longer have CPUs assigned
597        self.cells.retain(|&k, _| active_cells.contains(&k));
598        self.metrics.cells.retain(|&k, _| active_cells.contains(&k));
599
600        self.last_configuration_seq = Some(applied_configuration);
601
602        Ok(())
603    }
604}
605
606fn read_cpu_ctxs(skel: &BpfSkel) -> Result<Vec<bpf_intf::cpu_ctx>> {
607    let mut cpu_ctxs = vec![];
608    let cpu_ctxs_vec = skel
609        .maps
610        .cpu_ctxs
611        .lookup_percpu(&0u32.to_ne_bytes(), libbpf_rs::MapFlags::ANY)
612        .context("Failed to lookup cpu_ctx")?
613        .unwrap();
614    if cpu_ctxs_vec.len() < *NR_CPUS_POSSIBLE {
615        bail!(
616            "Percpu map returned {} entries but expected {}",
617            cpu_ctxs_vec.len(),
618            *NR_CPUS_POSSIBLE
619        );
620    }
621    for cpu in 0..*NR_CPUS_POSSIBLE {
622        cpu_ctxs.push(*unsafe {
623            &*(cpu_ctxs_vec[cpu].as_slice().as_ptr() as *const bpf_intf::cpu_ctx)
624        });
625    }
626    Ok(cpu_ctxs)
627}
628
629#[clap_main::clap_main]
630fn main(opts: Opts) -> Result<()> {
631    if opts.version {
632        println!(
633            "scx_mitosis {}",
634            build_id::full_version(env!("CARGO_PKG_VERSION"))
635        );
636        return Ok(());
637    }
638
639    let env_filter = EnvFilter::try_from_default_env()
640        .or_else(|_| match EnvFilter::try_new(&opts.log_level) {
641            Ok(filter) => Ok(filter),
642            Err(e) => {
643                eprintln!(
644                    "invalid log envvar: {}, using info, err is: {}",
645                    opts.log_level, e
646                );
647                EnvFilter::try_new("info")
648            }
649        })
650        .unwrap_or_else(|_| EnvFilter::new("info"));
651
652    match tracing_subscriber::fmt()
653        .with_env_filter(env_filter)
654        .with_target(true)
655        .with_thread_ids(true)
656        .with_file(true)
657        .with_line_number(true)
658        .try_init()
659    {
660        Ok(()) => {}
661        Err(e) => eprintln!("failed to init logger: {}", e),
662    }
663
664    if opts.verbose > 0 {
665        warn!("Setting verbose via -v is deprecated and will be an error in future releases.");
666    }
667
668    debug!("opts={:?}", &opts);
669
670    if let Some(run_id) = opts.run_id {
671        info!("scx_mitosis run_id: {}", run_id);
672    }
673
674    let shutdown = Arc::new(AtomicBool::new(false));
675    let shutdown_clone = shutdown.clone();
676    ctrlc::set_handler(move || {
677        shutdown_clone.store(true, Ordering::Relaxed);
678    })
679    .context("Error setting Ctrl-C handler")?;
680
681    if let Some(intv) = opts.monitor {
682        let shutdown_clone = shutdown.clone();
683        let jh = std::thread::spawn(move || {
684            match stats::monitor(Duration::from_secs_f64(intv), shutdown_clone) {
685                Ok(_) => {
686                    debug!("stats monitor thread finished successfully")
687                }
688                Err(error_object) => {
689                    warn!(
690                        "stats monitor thread finished because of an error {}",
691                        error_object
692                    )
693                }
694            }
695        });
696        if opts.monitor.is_some() {
697            let _ = jh.join();
698            return Ok(());
699        }
700    }
701
702    let mut open_object = MaybeUninit::uninit();
703    loop {
704        let mut sched = Scheduler::init(&opts, &mut open_object)?;
705        if !sched.run(shutdown.clone())?.should_restart() {
706            break;
707        }
708    }
709
710    Ok(())
711}