Skip to main content

scx_mitosis/
main.rs

1// Copyright (c) Meta Platforms, Inc. and affiliates.
2
3// This software may be used and distributed according to the terms of the
4// GNU General Public License version 2.
5mod bpf_skel;
6pub use bpf_skel::*;
7pub mod bpf_intf;
8mod cell_manager;
9mod mitosis_topology_utils;
10mod stats;
11
12use cell_manager::{CellManager, CpuAssignment};
13
14use std::cmp::max;
15use std::collections::{HashMap, HashSet};
16use std::fmt;
17use std::fmt::Display;
18use std::mem::MaybeUninit;
19use std::os::fd::AsFd;
20use std::sync::atomic::AtomicBool;
21use std::sync::atomic::AtomicU32;
22use std::sync::atomic::Ordering;
23use std::sync::Arc;
24use std::time::Duration;
25use std::time::Instant;
26
27use anyhow::bail;
28use anyhow::Context;
29use anyhow::Result;
30use clap::Parser;
31use libbpf_rs::MapCore as _;
32use libbpf_rs::OpenObject;
33use libbpf_rs::ProgramInput;
34use nix::sys::epoll::{Epoll, EpollCreateFlags, EpollEvent, EpollFlags, EpollTimeout};
35use nix::sys::eventfd::EventFd;
36use scx_stats::prelude::*;
37use scx_utils::build_id;
38use scx_utils::compat;
39use scx_utils::init_libbpf_logging;
40use scx_utils::libbpf_clap_opts::LibbpfOpts;
41use scx_utils::scx_enums;
42use scx_utils::scx_ops_attach;
43use scx_utils::scx_ops_load;
44use scx_utils::scx_ops_open;
45use scx_utils::uei_exited;
46use scx_utils::uei_report;
47use scx_utils::Cpumask;
48use scx_utils::Topology;
49use scx_utils::UserExitInfo;
50use scx_utils::NR_CPUS_POSSIBLE;
51use tracing::{debug, info, trace, warn};
52use tracing_subscriber::filter::EnvFilter;
53
54use stats::CellMetrics;
55use stats::Metrics;
56
57const SCHEDULER_NAME: &str = "scx_mitosis";
58const MAX_CELLS: usize = bpf_intf::consts_MAX_CELLS as usize;
59const NR_CSTATS: usize = bpf_intf::cell_stat_idx_NR_CSTATS as usize;
60/// Epoll token for inotify events (cgroup creation/destruction)
61const INOTIFY_TOKEN: u64 = 1;
62/// Epoll token for stats request wakeups
63const STATS_TOKEN: u64 = 2;
64
65fn parse_ewma_factor(s: &str) -> Result<f64, String> {
66    let v: f64 = s.parse().map_err(|e| format!("{e}"))?;
67    if !(0.0..=1.0).contains(&v) {
68        return Err(format!("value {v} not in range 0.0..=1.0"));
69    }
70    Ok(v)
71}
72
73/// scx_mitosis: A dynamic affinity scheduler
74///
75/// Cgroups are assigned to a dynamic number of Cells which are assigned to a
76/// dynamic set of CPUs. The BPF part does simple vtime scheduling for each cell.
77///
78/// Userspace makes the dynamic decisions of which Cells should be merged or
79/// split and which CPUs they should be assigned to.
80#[derive(Debug, Parser)]
81struct Opts {
82    /// Deprecated, noop, use RUST_LOG or --log-level instead.
83    #[clap(short = 'v', long, action = clap::ArgAction::Count)]
84    verbose: u8,
85
86    /// Specify the logging level. Accepts rust's envfilter syntax for modular
87    /// logging: https://docs.rs/tracing-subscriber/latest/tracing_subscriber/filter/struct.EnvFilter.html#example-syntax. Examples: ["info", "warn,tokio=info"]
88    #[clap(long, default_value = "info")]
89    log_level: String,
90
91    /// Exit debug dump buffer length. 0 indicates default.
92    #[clap(long, default_value = "0")]
93    exit_dump_len: u32,
94
95    /// Interval to consider reconfiguring the Cells (e.g. merge or split)
96    #[clap(long, default_value = "10")]
97    reconfiguration_interval_s: u64,
98
99    /// Interval to consider rebalancing CPUs to Cells
100    #[clap(long, default_value = "5")]
101    rebalance_cpus_interval_s: u64,
102
103    /// Interval to report monitoring information
104    #[clap(long, default_value = "1")]
105    monitor_interval_s: u64,
106
107    /// Run in stats monitoring mode with the specified interval. Scheduler
108    /// is not launched.
109    #[clap(long)]
110    monitor: Option<f64>,
111
112    /// Print scheduler version and exit.
113    #[clap(short = 'V', long, action = clap::ArgAction::SetTrue)]
114    version: bool,
115
116    /// Optional run ID for tracking scheduler instances.
117    #[clap(long)]
118    run_id: Option<u64>,
119
120    /// Enable debug event tracking for cgroup_init, init_task, and cgroup_exit.
121    /// Events are recorded in a ring buffer and output in dump().
122    #[clap(long, action = clap::ArgAction::SetTrue)]
123    debug_events: bool,
124
125    /// Enable workaround for exiting tasks with offline cgroups during scheduler load.
126    /// This works around a kernel bug where tasks can be initialized with cgroups that
127    /// were never initialized. Disable this once the kernel bug is fixed.
128    #[clap(long, default_value = "true", action = clap::ArgAction::Set)]
129    exiting_task_workaround: bool,
130
131    /// Disable SCX cgroup callbacks (for when CPU cgroup controller is disabled).
132    /// Uses tracepoints and cgroup iteration instead.
133    #[clap(long, action = clap::ArgAction::SetTrue)]
134    cpu_controller_disabled: bool,
135
136    /// Reject tasks with multi-CPU pinning that doesn't cover the entire cell.
137    /// By default, these tasks are allowed but may have degraded performance.
138    #[clap(long, action = clap::ArgAction::SetTrue)]
139    reject_multicpu_pinning: bool,
140
141    /// Enable LLC-awareness. This will populate the scheduler's LLC maps and cause it
142    /// to use LLC-aware scheduling.
143    #[clap(long, action = clap::ArgAction::SetTrue)]
144    enable_llc_awareness: bool,
145
146    /// Enable work stealing. This is only relevant when LLC-awareness is enabled.
147    #[clap(long, action = clap::ArgAction::SetTrue)]
148    enable_work_stealing: bool,
149
150    /// Parent cgroup path whose direct children become cells.
151    /// When specified, cells are created for each direct child cgroup of this parent,
152    /// with CPUs divided equally among cells. Example: --cell-parent-cgroup /workloads
153    #[clap(long)]
154    cell_parent_cgroup: Option<String>,
155
156    /// Exact directory name of a direct child cgroup to exclude from cell creation
157    /// (excluded cgroups remain in cell 0). Matched against the directory basename,
158    /// not the full path. Can be specified multiple times. Requires --cell-parent-cgroup.
159    /// Example: --cell-exclude systemd-workaround.service
160    #[clap(long)]
161    cell_exclude: Vec<String>,
162
163    /// Enable CPU borrowing: cells can use idle CPUs from other cells.
164    /// Only meaningful with --cell-parent-cgroup and multiple cells.
165    #[clap(long, action = clap::ArgAction::SetTrue)]
166    enable_borrowing: bool,
167
168    /// Use lockless scx_bpf_dsq_peek() instead of the default iterator-based peek.
169    #[clap(long, action = clap::ArgAction::SetTrue)]
170    use_lockless_peek: bool,
171
172    /// Enable demand-based CPU rebalancing between cells.
173    #[clap(long, action = clap::ArgAction::SetTrue)]
174    enable_rebalancing: bool,
175
176    /// Utilization spread (max - min) that triggers rebalancing (default: 20%)
177    #[clap(long, default_value = "20.0")]
178    rebalance_threshold: f64,
179
180    /// Minimum seconds between rebalancing events (default: 5)
181    #[clap(long, default_value = "5")]
182    rebalance_cooldown_s: u64,
183
184    /// EWMA smoothing factor for demand tracking. Higher = more responsive (default: 0.3)
185    #[clap(long, default_value = "0.3", value_parser = parse_ewma_factor)]
186    demand_smoothing: f64,
187
188    /// Dynamically reassign multi-CPU affinitized tasks on each wake: prefer an
189    /// idle CPU within the mask, fall back to random. Redistribute at enqueue if
190    /// the target CPU already has queued work.
191    #[clap(long, action = clap::ArgAction::SetTrue)]
192    dynamic_affinity_cpu_selection: bool,
193
194    /// Enable slice shrinking for CPU-pinned tasks. Uses per-task EWMA
195    /// runtime to shrink the running task's slice when pinned waiters are queued.
196    #[clap(long, action = clap::ArgAction::SetTrue)]
197    enable_slice_shrinking: bool,
198
199    /// Upper bound for shrink limit (us). Used when the proportional
200    /// value (avg_runtime * K) exceeds it.
201    #[clap(long, default_value = "4000")]
202    slice_shrink_max_us: u64,
203
204    /// Minimum shrink limit (us). Slices are never shrunk below this value.
205    /// In practice, the resolution here is determined by the kernel's
206    /// tick period.
207    #[clap(long, default_value = "500")]
208    slice_shrink_min_us: u64,
209
210    #[clap(flatten, next_help_heading = "Libbpf Options")]
211    pub libbpf: LibbpfOpts,
212}
213
214// The subset of cstats we care about.
215// Local + Default + Hi + Lo = Total Decisions
216// Affinity violations are not queue decisions, but
217// will be calculated separately and reported as a percent of the total
218const QUEUE_STATS_IDX: [bpf_intf::cell_stat_idx; 4] = [
219    bpf_intf::cell_stat_idx_CSTAT_LOCAL,
220    bpf_intf::cell_stat_idx_CSTAT_CPU_DSQ,
221    bpf_intf::cell_stat_idx_CSTAT_CELL_DSQ,
222    bpf_intf::cell_stat_idx_CSTAT_BORROWED,
223];
224
225// Per cell book-keeping
226#[derive(Debug)]
227struct Cell {
228    cpus: Cpumask,
229}
230
231struct Scheduler<'a> {
232    skel: BpfSkel<'a>,
233    monitor_interval: Duration,
234    cells: HashMap<u32, Cell>,
235    // These are the per-cell cstats.
236    // Note these are accumulated across all CPUs.
237    prev_cell_stats: [[u64; NR_CSTATS]; MAX_CELLS],
238    // Per-cell running_ns tracking for demand metrics
239    prev_cell_running_ns: [u64; MAX_CELLS],
240    prev_cell_own_ns: [u64; MAX_CELLS],
241    prev_cell_lent_ns: [u64; MAX_CELLS],
242    metrics: Metrics,
243    stats_server: Option<StatsServer<(), Metrics>>,
244    last_configuration_seq: Option<u32>,
245    /// Last observed cpuset_seq for cpuset change detection
246    last_cpuset_seq: u32,
247    /// Optional cell manager for --cell-parent-cgroup mode
248    cell_manager: Option<CellManager>,
249    /// Whether CPU borrowing is enabled
250    enable_borrowing: bool,
251    /// Whether demand-based rebalancing is enabled
252    enable_rebalancing: bool,
253    /// Utilization spread threshold for triggering rebalancing
254    rebalance_threshold: f64,
255    /// Minimum duration between rebalancing events
256    rebalance_cooldown: Duration,
257    /// EWMA smoothing factor for demand tracking
258    demand_smoothing: f64,
259    /// EWMA-smoothed utilization per cell
260    smoothed_util: [f64; MAX_CELLS],
261    /// Last time rebalancing was performed
262    last_rebalance: Instant,
263    /// Number of rebalancing events
264    rebalance_count: u64,
265    /// Epoll instance for waiting on multiple fds (inotify, stats wakeup)
266    epoll: Epoll,
267    /// EventFd to wake up main loop when stats are requested
268    stats_waker: EventFd,
269}
270
271struct DistributionStats {
272    total_decisions: u64,
273    share_of_decisions_pct: f64,
274    local_q_pct: f64,
275    cpu_q_pct: f64,
276    cell_q_pct: f64,
277    borrowed_pct: f64,
278    affn_viol_pct: f64,
279    steal_pct: f64,
280    pin_skip_pct: f64,
281
282    // for formatting
283    global_queue_decisions: u64,
284}
285
286impl Display for DistributionStats {
287    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
288        // This makes the output easier to read by improving column alignment. First, it guarantees that within a
289        // given logging interval, the global and cell queueing decision counts print at the same width.
290        // Second, it reduces variance in column width between logging intervals. 5 is simply a heuristic.
291        const MIN_DECISIONS_WIDTH: usize = 5;
292        let descisions_width = if self.global_queue_decisions > 0 {
293            max(
294                MIN_DECISIONS_WIDTH,
295                (self.global_queue_decisions as f64).log10().ceil() as usize,
296            )
297        } else {
298            MIN_DECISIONS_WIDTH
299        };
300        write!(
301            f,
302            "{:width$} {:5.1}% | Local:{:4.1}% From: CPU:{:4.1}% Cell:{:4.1}% Borrow:{:4.1}% | V:{:4.1}% S:{:4.1}% PS:{:4.1}%",
303            self.total_decisions,
304            self.share_of_decisions_pct,
305            self.local_q_pct,
306            self.cpu_q_pct,
307            self.cell_q_pct,
308            self.borrowed_pct,
309            self.affn_viol_pct,
310            self.steal_pct,
311            self.pin_skip_pct,
312            width = descisions_width,
313        )
314    }
315}
316
317impl<'a> Scheduler<'a> {
318    fn validate_args(opts: &Opts) -> Result<()> {
319        if opts.enable_work_stealing && !opts.enable_llc_awareness {
320            bail!("Work stealing requires LLC-aware mode (--enable-llc-awareness)");
321        }
322
323        Ok(())
324    }
325
326    fn init(opts: &Opts, open_object: &'a mut MaybeUninit<OpenObject>) -> Result<Self> {
327        Self::validate_args(opts)?;
328
329        let topology = Topology::new()?;
330
331        let nr_llc = topology.all_llcs.len().max(1);
332
333        let mut skel_builder = BpfSkelBuilder::default();
334        skel_builder
335            .obj_builder
336            .debug(opts.log_level.contains("trace"));
337        init_libbpf_logging(None);
338        info!(
339            "Running scx_mitosis (build ID: {})",
340            build_id::full_version(env!("CARGO_PKG_VERSION"))
341        );
342
343        let open_opts = opts.libbpf.clone().into_bpf_open_opts();
344        let mut skel = scx_ops_open!(skel_builder, open_object, mitosis, open_opts)?;
345
346        skel.struct_ops.mitosis_mut().exit_dump_len = opts.exit_dump_len;
347
348        let rodata = skel.maps.rodata_data.as_mut().unwrap();
349
350        rodata.slice_ns = scx_enums.SCX_SLICE_DFL;
351        rodata.debug_events_enabled = opts.debug_events;
352        rodata.exiting_task_workaround_enabled = opts.exiting_task_workaround;
353        rodata.cpu_controller_disabled = opts.cpu_controller_disabled;
354        rodata.dynamic_affinity_cpu_selection = opts.dynamic_affinity_cpu_selection;
355
356        // Slice shrinking configuration
357        if opts.slice_shrink_min_us >= opts.slice_shrink_max_us {
358            bail!(
359                "--slice-shrink-min-us ({}) must be less than --slice-shrink-max-us ({})",
360                opts.slice_shrink_min_us,
361                opts.slice_shrink_max_us
362            );
363        }
364        rodata.enable_slice_shrinking = opts.enable_slice_shrinking;
365        rodata.slice_shrink_max_ns = opts.slice_shrink_max_us * 1_000;
366        // K=2: in the proportional region, a pinned task waits at most 2x its historical runtime
367        rodata.slice_shrink_multiplier = 2;
368        rodata.slice_shrink_min_ns = opts.slice_shrink_min_us * 1_000;
369
370        rodata.nr_possible_cpus = *NR_CPUS_POSSIBLE as u32;
371        for cpu in topology.all_cpus.keys() {
372            rodata.all_cpus[cpu / 8] |= 1 << (cpu % 8);
373        }
374
375        rodata.reject_multicpu_pinning = opts.reject_multicpu_pinning;
376
377        // Set nr_llc in rodata
378        rodata.nr_llc = nr_llc as u32;
379        rodata.enable_llc_awareness = opts.enable_llc_awareness;
380        rodata.enable_work_stealing = opts.enable_work_stealing;
381
382        rodata.userspace_managed_cell_mode = opts.cell_parent_cgroup.is_some();
383
384        rodata.enable_borrowing = opts.enable_borrowing;
385        rodata.use_lockless_peek = opts.use_lockless_peek;
386
387        match *compat::SCX_OPS_ALLOW_QUEUED_WAKEUP {
388            0 => info!("Kernel does not support queued wakeup optimization."),
389            v => skel.struct_ops.mitosis_mut().flags |= v,
390        }
391
392        // Populate LLC topology arrays before load (data section is only writable before load)
393        mitosis_topology_utils::populate_topology_maps(
394            &mut skel,
395            mitosis_topology_utils::MapKind::CpuToLLC,
396            None,
397        )?;
398        mitosis_topology_utils::populate_topology_maps(
399            &mut skel,
400            mitosis_topology_utils::MapKind::LLCToCpus,
401            None,
402        )?;
403
404        let skel = scx_ops_load!(skel, mitosis, uei)?;
405
406        let stats_server = StatsServer::new(stats::server_data()).launch()?;
407
408        // Initialize CellManager if --cell-parent-cgroup is specified
409        if !opts.cell_exclude.is_empty() && opts.cell_parent_cgroup.is_none() {
410            bail!("--cell-exclude requires --cell-parent-cgroup");
411        }
412        let cell_manager = if let Some(ref parent_cgroup) = opts.cell_parent_cgroup {
413            let exclude: HashSet<String> = opts.cell_exclude.iter().cloned().collect();
414            Some(CellManager::new(
415                parent_cgroup,
416                MAX_CELLS as u32,
417                topology.span.clone(),
418                exclude,
419            )?)
420        } else {
421            None
422        };
423
424        // Create epoll instance for event-driven main loop
425        let epoll = Epoll::new(EpollCreateFlags::empty())?;
426
427        // Create eventfd for stats wakeup (non-blocking, semaphore mode)
428        let stats_waker = EventFd::from_value_and_flags(
429            0,
430            nix::sys::eventfd::EfdFlags::EFD_NONBLOCK | nix::sys::eventfd::EfdFlags::EFD_SEMAPHORE,
431        )?;
432
433        // Register stats_waker with epoll
434        epoll.add(
435            &stats_waker,
436            EpollEvent::new(EpollFlags::EPOLLIN, STATS_TOKEN),
437        )?;
438
439        // Register inotify fd if cell_manager exists
440        if let Some(ref cell_manager) = cell_manager {
441            epoll.add(
442                cell_manager,
443                EpollEvent::new(EpollFlags::EPOLLIN, INOTIFY_TOKEN),
444            )?;
445        }
446
447        Ok(Self {
448            skel,
449            monitor_interval: Duration::from_secs(opts.monitor_interval_s),
450            cells: HashMap::new(),
451            prev_cell_stats: [[0; NR_CSTATS]; MAX_CELLS],
452            prev_cell_running_ns: [0; MAX_CELLS],
453            prev_cell_own_ns: [0; MAX_CELLS],
454            prev_cell_lent_ns: [0; MAX_CELLS],
455            metrics: Metrics::default(),
456            stats_server: Some(stats_server),
457            last_configuration_seq: None,
458            last_cpuset_seq: 0,
459            cell_manager,
460            enable_borrowing: opts.enable_borrowing,
461            enable_rebalancing: opts.enable_rebalancing,
462            rebalance_threshold: opts.rebalance_threshold,
463            rebalance_cooldown: Duration::from_secs(opts.rebalance_cooldown_s),
464            demand_smoothing: opts.demand_smoothing,
465            smoothed_util: [0.0; MAX_CELLS],
466            last_rebalance: Instant::now(),
467            rebalance_count: 0,
468            epoll,
469            stats_waker,
470        })
471    }
472
473    fn run(&mut self, shutdown: Arc<AtomicBool>) -> Result<UserExitInfo> {
474        let struct_ops = scx_ops_attach!(self.skel, mitosis)?;
475
476        info!("Mitosis Scheduler Attached. Run `scx_mitosis --monitor` for metrics.");
477
478        // Apply initial cell configuration if CellManager is active
479        self.apply_initial_cells()?;
480
481        let (res_ch, req_ch) = self.stats_server.as_ref().unwrap().channels();
482
483        // Spawn thread to bridge stats requests to eventfd.
484        // The thread exits when the channel closes (stats_server dropped).
485        // Clone the eventfd so the thread owns its own handle to the same kernel object.
486        let stats_waker_fd = self.stats_waker.as_fd().try_clone_to_owned()?;
487        let stats_waker = unsafe { EventFd::from_owned_fd(stats_waker_fd) };
488        let stats_bridge = std::thread::spawn(move || {
489            while req_ch.recv().is_ok() {
490                // Wake up main loop via eventfd
491                let _ = stats_waker.write(1);
492            }
493        });
494
495        while !shutdown.load(Ordering::Relaxed) && !uei_exited!(&self.skel, uei) {
496            let mut events = [EpollEvent::empty(); 1];
497            let timeout = EpollTimeout::try_from(self.monitor_interval).with_context(|| {
498                format!(
499                    "monitor_interval {:?} exceeds maximum epoll timeout",
500                    self.monitor_interval,
501                )
502            })?;
503
504            match self.epoll.wait(&mut events, timeout) {
505                Ok(n) => {
506                    for event in &events[..n] {
507                        match event.data() {
508                            INOTIFY_TOKEN => {
509                                // Cgroup event - process immediately
510                                self.process_cell_events()?;
511                            }
512                            STATS_TOKEN => {
513                                // Stats request - drain eventfd and send metrics
514                                let _ = self.stats_waker.read();
515                                res_ch.send(self.get_metrics())?;
516                            }
517                            _ => {}
518                        }
519                    }
520                }
521                Err(nix::errno::Errno::EINTR) => continue,
522                Err(e) => return Err(e.into()),
523            }
524
525            // Periodic work on every iteration
526            self.refresh_bpf_cells()?;
527            self.check_cpuset_changes()?;
528            self.collect_metrics()?;
529
530            if self.enable_rebalancing && self.cell_manager.is_some() {
531                self.maybe_rebalance()?;
532            }
533        }
534
535        drop(struct_ops);
536        // Drop stats_server to close the channel, allowing stats_bridge to exit
537        drop(self.stats_server.take());
538        let _ = stats_bridge.join();
539        info!("Unregister {SCHEDULER_NAME} scheduler");
540        uei_report!(&self.skel, uei)
541    }
542
543    /// Apply initial cell assignments discovered at startup
544    fn apply_initial_cells(&mut self) -> Result<()> {
545        if self.cell_manager.is_none() {
546            return Ok(());
547        }
548
549        let cpu_assignments = self.compute_and_apply_cell_config(&[])?;
550
551        let cell_manager = self.cell_manager.as_ref().unwrap();
552        info!(
553            "Applied initial cell configuration: {}",
554            cell_manager.format_cell_config(&cpu_assignments)
555        );
556
557        Ok(())
558    }
559
560    /// Process cell manager events (new/destroyed cgroups)
561    fn process_cell_events(&mut self) -> Result<()> {
562        let (num_new, num_destroyed, new_cell_ids, destroyed_cell_ids) = {
563            let Some(ref mut cell_manager) = self.cell_manager else {
564                return Ok(());
565            };
566
567            let (new_cells, destroyed_cells) = cell_manager.process_events()?;
568
569            if new_cells.is_empty() && destroyed_cells.is_empty() {
570                return Ok(());
571            }
572
573            let new_ids: Vec<u32> = new_cells.iter().map(|(_, cell_id)| *cell_id).collect();
574            (
575                new_cells.len(),
576                destroyed_cells.len(),
577                new_ids,
578                destroyed_cells,
579            )
580        };
581
582        // Clear smoothed_util for destroyed cells so stale data doesn't
583        // leak if the cell ID is reused later.
584        for &cell_id in &destroyed_cell_ids {
585            self.smoothed_util[cell_id as usize] = 0.0;
586        }
587
588        let cpu_assignments = self.compute_and_apply_cell_config(&new_cell_ids)?;
589
590        let cell_manager = self.cell_manager.as_ref().unwrap();
591        info!(
592            "Cell config updated ({} new, {} destroyed): {}",
593            num_new,
594            num_destroyed,
595            cell_manager.format_cell_config(&cpu_assignments)
596        );
597
598        Ok(())
599    }
600
601    /// Compute cell configuration from CellManager and apply it to BPF.
602    ///
603    /// When rebalancing is enabled and there is existing utilization data,
604    /// uses demand-weighted CPU assignment instead of equal-weight. New cells
605    /// (listed in `new_cell_ids`) are seeded to the average smoothed_util of
606    /// existing cells so they start with a fair share rather than zero.
607    ///
608    /// Returns the CPU assignments for use with `format_cell_config`.
609    fn compute_and_apply_cell_config(
610        &mut self,
611        new_cell_ids: &[u32],
612    ) -> Result<Vec<CpuAssignment>> {
613        let (cell_assignments, cpu_assignments) = {
614            let cell_manager = self.cell_manager.as_ref().unwrap();
615            let active_cell_ids: Vec<u32> = cell_manager
616                .get_cell_assignments()
617                .iter()
618                .map(|(_, cell_id)| *cell_id)
619                .collect();
620            // Cell 0 is always active
621            let all_cell_ids: Vec<u32> = std::iter::once(0)
622                .chain(active_cell_ids.iter().copied())
623                .collect();
624
625            let cpu_assignments = if self.enable_rebalancing {
626                // Check if any existing (non-new) cell has utilization data
627                let new_set: HashSet<u32> = new_cell_ids.iter().copied().collect();
628                let existing_utils: Vec<f64> = all_cell_ids
629                    .iter()
630                    .filter(|id| !new_set.contains(id))
631                    .map(|&id| self.smoothed_util[id as usize])
632                    .collect();
633
634                let has_data = existing_utils.iter().any(|&u| u > 0.0);
635
636                if has_data {
637                    // Seed new cells to the average utilization of existing cells
638                    let avg_util: f64 =
639                        existing_utils.iter().sum::<f64>() / existing_utils.len().max(1) as f64;
640                    for &id in new_cell_ids {
641                        self.smoothed_util[id as usize] = avg_util;
642                        info!(
643                            "Seeded new cell {} smoothed_util to average {:.1}%",
644                            id, avg_util
645                        );
646                    }
647
648                    // Build demand map from smoothed_util for all active cells
649                    let cell_demands: HashMap<u32, f64> = all_cell_ids
650                        .iter()
651                        .map(|&id| (id, self.smoothed_util[id as usize]))
652                        .collect();
653
654                    cell_manager
655                        .compute_demand_cpu_assignments(&cell_demands, self.enable_borrowing)?
656                } else {
657                    // No utilization data yet (e.g., initial startup) — equal weight
658                    cell_manager.compute_cpu_assignments(self.enable_borrowing)?
659                }
660            } else {
661                cell_manager.compute_cpu_assignments(self.enable_borrowing)?
662            };
663
664            (cell_manager.get_cell_assignments(), cpu_assignments)
665        };
666
667        self.apply_cell_config(&cell_assignments, &cpu_assignments)?;
668
669        Ok(cpu_assignments)
670    }
671
672    /// Check if rebalancing should be triggered and apply demand-weighted CPU assignments.
673    fn maybe_rebalance(&mut self) -> Result<()> {
674        // Check cooldown
675        if self.last_rebalance.elapsed() < self.rebalance_cooldown {
676            return Ok(());
677        }
678
679        // Compute min/max smoothed utilization across active cells
680        let active_cells: Vec<u32> = self.cells.keys().copied().collect();
681        if active_cells.len() < 2 {
682            return Ok(());
683        }
684
685        let mut min_util = f64::MAX;
686        let mut max_util = f64::MIN;
687        for &cell_id in &active_cells {
688            let util = self.smoothed_util[cell_id as usize];
689            if util < min_util {
690                min_util = util;
691            }
692            if util > max_util {
693                max_util = util;
694            }
695        }
696
697        let spread = max_util - min_util;
698        if spread < self.rebalance_threshold {
699            return Ok(());
700        }
701
702        // Build demand map from smoothed utilization
703        let cell_demands: HashMap<u32, f64> = active_cells
704            .iter()
705            .map(|&cell_id| (cell_id, self.smoothed_util[cell_id as usize]))
706            .collect();
707
708        // Compute new assignments and check if they differ from current
709        let (cell_assignments, cpu_assignments) = {
710            let cell_manager = self.cell_manager.as_ref().unwrap();
711            let cpu_assignments = cell_manager
712                .compute_demand_cpu_assignments(&cell_demands, self.enable_borrowing)?;
713
714            let changed = cpu_assignments.iter().any(|a| {
715                self.cells
716                    .get(&a.cell_id)
717                    .map_or(true, |cell| cell.cpus != a.primary)
718            });
719
720            if !changed {
721                return Ok(());
722            }
723
724            (cell_manager.get_cell_assignments(), cpu_assignments)
725        };
726
727        self.apply_cell_config(&cell_assignments, &cpu_assignments)?;
728
729        self.last_rebalance = Instant::now();
730        self.rebalance_count += 1;
731        self.metrics.rebalance_count = self.rebalance_count;
732
733        let cell_manager = self.cell_manager.as_ref().unwrap();
734        info!(
735            "Rebalanced CPUs (spread={:.1}%, count={}): {}",
736            spread,
737            self.rebalance_count,
738            cell_manager.format_cell_config(&cpu_assignments)
739        );
740
741        Ok(())
742    }
743
744    /// Apply cell configuration to BPF.
745    ///
746    /// Writes the cell and CPU assignments to the BPF config struct and triggers
747    /// the BPF program to apply the configuration.
748    fn apply_cell_config(
749        &mut self,
750        cell_assignments: &[(u64, u32)],
751        cpu_assignments: &[CpuAssignment],
752    ) -> Result<()> {
753        let bss_data = self
754            .skel
755            .maps
756            .bss_data
757            .as_mut()
758            .expect("bss_data must be available after scheduler load");
759
760        let config = &mut bss_data.cell_config;
761
762        // Zero out the config struct. This is necessary because:
763        // 1. Cell IDs can be sparse (e.g., cells 0, 2, 3 if cell 1 was destroyed)
764        // 2. We only write cpumasks for active cells, leaving gaps unwritten
765        // 3. BPF iterates 0..num_cells and applies each cpumask
766        // 4. Without zeroing, a gap (e.g., cell 1) would have a stale cpumask,
767        //    causing CPUs to be assigned to an unused cell
768        // Safety: cell_config is a plain data struct with no Drop impl
769        unsafe {
770            std::ptr::write_bytes(
771                config as *mut _ as *mut u8,
772                0,
773                std::mem::size_of_val(config),
774            );
775        }
776
777        if cell_assignments.len() > bpf_intf::consts_MAX_CELLS as usize {
778            bail!(
779                "Too many cell assignments: {} > MAX_CELLS ({})",
780                cell_assignments.len(),
781                bpf_intf::consts_MAX_CELLS
782            );
783        }
784        config.num_cell_assignments = cell_assignments.len() as u32;
785
786        for (i, (cgid, cell_id)) in cell_assignments.iter().enumerate() {
787            config.assignments[i].cgid = *cgid;
788            config.assignments[i].cell_id = *cell_id;
789        }
790
791        // Set cell cpumasks and borrowable cpumasks
792        let mut max_cell_id: u32 = 0;
793        for a in cpu_assignments {
794            if a.cell_id >= bpf_intf::consts_MAX_CELLS {
795                bail!(
796                    "Cell ID {} exceeds MAX_CELLS ({})",
797                    a.cell_id,
798                    bpf_intf::consts_MAX_CELLS
799                );
800            }
801            max_cell_id = max_cell_id.max(a.cell_id + 1);
802
803            write_cpumask_to_config(&a.primary, &mut config.cpumasks[a.cell_id as usize].mask);
804
805            if let Some(ref borrowable) = a.borrowable {
806                write_cpumask_to_config(
807                    borrowable,
808                    &mut config.borrowable_cpumasks[a.cell_id as usize].mask,
809                );
810            }
811        }
812        config.num_cells = max_cell_id;
813
814        // Trigger the BPF program to apply the configuration
815        let prog = &mut self.skel.progs.apply_cell_config;
816        let out = prog
817            .test_run(ProgramInput::default())
818            .context("Failed to run apply_cell_config BPF program")?;
819        if out.return_value != 0 {
820            bail!(
821                "apply_cell_config BPF program returned error {} (num_assignments={}, num_cells={})",
822                out.return_value as i32,
823                cell_assignments.len(),
824                cpu_assignments.len()
825            );
826        }
827
828        Ok(())
829    }
830
831    fn get_metrics(&self) -> Metrics {
832        self.metrics.clone()
833    }
834
835    fn calculate_distribution_stats(
836        &self,
837        queue_counts: &[u64; QUEUE_STATS_IDX.len()],
838        global_queue_decisions: u64,
839        scope_queue_decisions: u64,
840        scope_affn_viols: u64,
841        scope_steals: u64,
842        scope_pin_skips: u64,
843    ) -> Result<DistributionStats> {
844        // First % on the line: share of global work
845        // We know global_queue_decisions is non-zero.
846        let share_of_global =
847            100.0 * (scope_queue_decisions as f64) / (global_queue_decisions as f64);
848
849        // Each queue's % of the scope total
850        let queue_pct = if scope_queue_decisions == 0 {
851            debug!("No queue decisions in scope, zeroing out queue distribution");
852            [0.0; QUEUE_STATS_IDX.len()]
853        } else {
854            core::array::from_fn(|i| {
855                100.0 * (queue_counts[i] as f64) / (scope_queue_decisions as f64)
856            })
857        };
858
859        // These are summed differently for the global and per-cell totals.
860        let affinity_violations_percent = if scope_queue_decisions == 0 {
861            debug!("No queue decisions in scope, zeroing out affinity violations");
862            0.0
863        } else {
864            100.0 * (scope_affn_viols as f64) / (scope_queue_decisions as f64)
865        };
866
867        let steal_pct = if scope_queue_decisions == 0 {
868            0.0
869        } else {
870            100.0 * (scope_steals as f64) / (scope_queue_decisions as f64)
871        };
872
873        let pin_skip_pct = if scope_queue_decisions == 0 {
874            0.0
875        } else {
876            100.0 * (scope_pin_skips as f64) / (scope_queue_decisions as f64)
877        };
878
879        const EXPECTED_QUEUES: usize = 4;
880        if queue_pct.len() != EXPECTED_QUEUES {
881            bail!(
882                "Expected {} queues, got {}",
883                EXPECTED_QUEUES,
884                queue_pct.len()
885            );
886        }
887
888        return Ok(DistributionStats {
889            total_decisions: scope_queue_decisions,
890            share_of_decisions_pct: share_of_global,
891            local_q_pct: queue_pct[0],
892            cpu_q_pct: queue_pct[1],
893            cell_q_pct: queue_pct[2],
894            borrowed_pct: queue_pct[3],
895            affn_viol_pct: affinity_violations_percent,
896            steal_pct,
897            pin_skip_pct,
898            global_queue_decisions,
899        });
900    }
901
902    // Queue stats for the whole node
903    fn update_and_log_global_queue_stats(
904        &mut self,
905        global_queue_decisions: u64,
906        cell_stats_delta: &[[u64; NR_CSTATS]; MAX_CELLS],
907    ) -> Result<()> {
908        // Get total of each queue summed over all cells
909        let mut queue_counts = [0; QUEUE_STATS_IDX.len()];
910        for cells in 0..MAX_CELLS {
911            for (i, stat) in QUEUE_STATS_IDX.iter().enumerate() {
912                queue_counts[i] += cell_stats_delta[cells][*stat as usize];
913            }
914        }
915
916        let prefix = "Total Decisions:";
917
918        // Here we want to sum the affinity violations over all cells.
919        let scope_affn_viols: u64 = cell_stats_delta
920            .iter()
921            .map(|&cell| cell[bpf_intf::cell_stat_idx_CSTAT_AFFN_VIOL as usize])
922            .sum::<u64>();
923
924        // Sum steals over all cells
925        let scope_steals: u64 = cell_stats_delta
926            .iter()
927            .map(|&cell| cell[bpf_intf::cell_stat_idx_CSTAT_STEAL as usize])
928            .sum::<u64>();
929
930        // Sum pin skips over all cells
931        let scope_pin_skips: u64 = cell_stats_delta
932            .iter()
933            .map(|&cell| cell[bpf_intf::cell_stat_idx_CSTAT_PIN_SKIP as usize])
934            .sum::<u64>();
935
936        // Special case where the number of scope decisions == number global decisions
937        let stats = self.calculate_distribution_stats(
938            &queue_counts,
939            global_queue_decisions,
940            global_queue_decisions,
941            scope_affn_viols,
942            scope_steals,
943            scope_pin_skips,
944        )?;
945
946        self.metrics.update(&stats);
947
948        // Slice shrink stats bypass DistributionStats — they're raw event counts
949        let sum = |idx: usize| -> u64 { cell_stats_delta.iter().map(|c| c[idx]).sum() };
950        self.metrics.slice_shrink_max =
951            sum(bpf_intf::cell_stat_idx_CSTAT_SLICE_SHRINK_MAX as usize);
952        self.metrics.slice_shrink_proportional =
953            sum(bpf_intf::cell_stat_idx_CSTAT_SLICE_SHRINK_PROPORTIONAL as usize);
954        self.metrics.slice_shrink_min =
955            sum(bpf_intf::cell_stat_idx_CSTAT_SLICE_SHRINK_MIN as usize);
956        self.metrics.slice_shrink = self.metrics.slice_shrink_max
957            + self.metrics.slice_shrink_proportional
958            + self.metrics.slice_shrink_min;
959
960        trace!("{} {}", prefix, stats);
961
962        Ok(())
963    }
964
965    // Print out the per-cell stats
966    fn update_and_log_cell_queue_stats(
967        &mut self,
968        global_queue_decisions: u64,
969        cell_stats_delta: &[[u64; NR_CSTATS]; MAX_CELLS],
970    ) -> Result<()> {
971        for cell in 0..MAX_CELLS {
972            let cell_queue_decisions = QUEUE_STATS_IDX
973                .iter()
974                .map(|&stat| cell_stats_delta[cell][stat as usize])
975                .sum::<u64>();
976
977            // FIXME: This should really query if the cell is enabled or not.
978            if cell_queue_decisions == 0 {
979                continue;
980            }
981
982            let mut queue_counts = [0; QUEUE_STATS_IDX.len()];
983            for (i, &stat) in QUEUE_STATS_IDX.iter().enumerate() {
984                queue_counts[i] = cell_stats_delta[cell][stat as usize];
985            }
986
987            const MIN_CELL_WIDTH: usize = 2;
988            let cell_width: usize = max(MIN_CELL_WIDTH, (MAX_CELLS as f64).log10().ceil() as usize);
989
990            let prefix = format!("        Cell {:width$}:", cell, width = cell_width);
991
992            // Sum affinity violations for this cell
993            let scope_affn_viols: u64 =
994                cell_stats_delta[cell][bpf_intf::cell_stat_idx_CSTAT_AFFN_VIOL as usize];
995
996            // Steals for this cell
997            let scope_steals: u64 =
998                cell_stats_delta[cell][bpf_intf::cell_stat_idx_CSTAT_STEAL as usize];
999
1000            // Pin skips for this cell
1001            let scope_pin_skips: u64 =
1002                cell_stats_delta[cell][bpf_intf::cell_stat_idx_CSTAT_PIN_SKIP as usize];
1003
1004            let stats = self.calculate_distribution_stats(
1005                &queue_counts,
1006                global_queue_decisions,
1007                cell_queue_decisions,
1008                scope_affn_viols,
1009                scope_steals,
1010                scope_pin_skips,
1011            )?;
1012
1013            let cell_metrics = self.metrics.cells.entry(cell as u32).or_default();
1014            cell_metrics.update(&stats);
1015
1016            // Slice shrink stats bypass DistributionStats
1017            cell_metrics.slice_shrink_max =
1018                cell_stats_delta[cell][bpf_intf::cell_stat_idx_CSTAT_SLICE_SHRINK_MAX as usize];
1019            cell_metrics.slice_shrink_proportional = cell_stats_delta[cell]
1020                [bpf_intf::cell_stat_idx_CSTAT_SLICE_SHRINK_PROPORTIONAL as usize];
1021            cell_metrics.slice_shrink_min =
1022                cell_stats_delta[cell][bpf_intf::cell_stat_idx_CSTAT_SLICE_SHRINK_MIN as usize];
1023            cell_metrics.slice_shrink = cell_metrics.slice_shrink_max
1024                + cell_metrics.slice_shrink_proportional
1025                + cell_metrics.slice_shrink_min;
1026
1027            trace!("{} {}", prefix, stats);
1028        }
1029        Ok(())
1030    }
1031
1032    fn log_all_queue_stats(
1033        &mut self,
1034        cell_stats_delta: &[[u64; NR_CSTATS]; MAX_CELLS],
1035    ) -> Result<()> {
1036        // Get total decisions
1037        let global_queue_decisions: u64 = cell_stats_delta
1038            .iter()
1039            .flat_map(|cell| QUEUE_STATS_IDX.iter().map(|&idx| cell[idx as usize]))
1040            .sum();
1041
1042        if global_queue_decisions == 0 {
1043            warn!("No queueing decisions made globally");
1044            return Ok(());
1045        }
1046
1047        self.update_and_log_global_queue_stats(global_queue_decisions, &cell_stats_delta)?;
1048
1049        self.update_and_log_cell_queue_stats(global_queue_decisions, &cell_stats_delta)?;
1050
1051        Ok(())
1052    }
1053
1054    fn calculate_cell_stat_delta(
1055        &mut self,
1056        cpu_ctxs: &[bpf_intf::cpu_ctx],
1057    ) -> Result<[[u64; NR_CSTATS]; MAX_CELLS]> {
1058        let mut cell_stats_delta = [[0 as u64; NR_CSTATS]; MAX_CELLS];
1059
1060        // Loop over cells and stats first, then CPU contexts
1061        // TODO: We should loop over the in_use cells only.
1062        for cell in 0..MAX_CELLS {
1063            for stat in 0..NR_CSTATS {
1064                let mut cur_cell_stat = 0;
1065
1066                // Accumulate stats from all CPUs
1067                for cpu_ctx in cpu_ctxs.iter() {
1068                    cur_cell_stat += cpu_ctx.cstats[cell][stat];
1069                }
1070
1071                // Calculate delta and update previous stat
1072                cell_stats_delta[cell][stat] = cur_cell_stat - self.prev_cell_stats[cell][stat];
1073                self.prev_cell_stats[cell][stat] = cur_cell_stat;
1074            }
1075        }
1076        Ok(cell_stats_delta)
1077    }
1078
1079    /// Collect metrics and out various debugging data like per cell stats, per-cpu stats, etc.
1080    fn collect_metrics(&mut self) -> Result<()> {
1081        let cpu_ctxs = read_cpu_ctxs(&self.skel)?;
1082
1083        let cell_stats_delta = self.calculate_cell_stat_delta(&cpu_ctxs)?;
1084
1085        self.log_all_queue_stats(&cell_stats_delta)?;
1086
1087        if self.cell_manager.is_some() {
1088            self.collect_demand_metrics(&cpu_ctxs)?;
1089        }
1090
1091        for (cell_id, cell) in &self.cells {
1092            trace!("CELL[{}]: {}", cell_id, cell.cpus);
1093        }
1094
1095        for (cell_id, cell) in self.cells.iter() {
1096            // Assume we have a CellMetrics entry if we have a known cell
1097            self.metrics
1098                .cells
1099                .entry(*cell_id)
1100                .and_modify(|cell_metrics| cell_metrics.num_cpus = cell.cpus.weight() as u32);
1101        }
1102        self.metrics.num_cells = self.cells.len() as u32;
1103
1104        Ok(())
1105    }
1106
1107    /// Compute per-cell demand metrics (utilization, borrowed, lent) from BPF running_ns counters.
1108    fn collect_demand_metrics(&mut self, cpu_ctxs: &[bpf_intf::cpu_ctx]) -> Result<()> {
1109        // Per-cell cumulative counters derived from BPF per-CPU running_ns:
1110        //   total_running_ns[c] = total time tasks in cell c ran (on any CPU)
1111        //   on_own_ns[c]        = time tasks in cell c ran on CPUs owned by cell c
1112        //   lent_ns[c]          = time foreign tasks ran on CPUs owned by cell c
1113        let mut total_running_ns = [0u64; MAX_CELLS];
1114        let mut on_own_ns = [0u64; MAX_CELLS];
1115        let mut lent_ns = [0u64; MAX_CELLS];
1116
1117        for cpu_ctx in cpu_ctxs.iter() {
1118            let owner = cpu_ctx.cell as usize;
1119            for cell in 0..MAX_CELLS {
1120                let ns = cpu_ctx.running_ns[cell];
1121                total_running_ns[cell] += ns;
1122                if owner == cell {
1123                    on_own_ns[cell] += ns;
1124                }
1125            }
1126            if owner >= MAX_CELLS {
1127                bail!(
1128                    "CPU has invalid cell assignment {} (MAX_CELLS={})",
1129                    owner,
1130                    MAX_CELLS
1131                );
1132            }
1133            // Lent time: non-owner cell tasks running on this CPU
1134            let total_on_cpu: u64 = cpu_ctx.running_ns.iter().sum();
1135            let owner_on_cpu = cpu_ctx.running_ns[owner];
1136            lent_ns[owner] += total_on_cpu.saturating_sub(owner_on_cpu);
1137        }
1138
1139        // Compute deltas since last collection interval
1140        let interval_ns = self.monitor_interval.as_nanos() as u64;
1141
1142        let mut global_running_delta = 0u64;
1143        let mut global_borrowed_delta = 0u64;
1144        let mut global_lent_delta = 0u64;
1145        let mut global_capacity = 0u64;
1146
1147        for cell in 0..MAX_CELLS {
1148            let delta_running =
1149                total_running_ns[cell].saturating_sub(self.prev_cell_running_ns[cell]);
1150            let delta_on_own = on_own_ns[cell].saturating_sub(self.prev_cell_own_ns[cell]);
1151            let delta_lent = lent_ns[cell].saturating_sub(self.prev_cell_lent_ns[cell]);
1152
1153            self.prev_cell_running_ns[cell] = total_running_ns[cell];
1154            self.prev_cell_own_ns[cell] = on_own_ns[cell];
1155            self.prev_cell_lent_ns[cell] = lent_ns[cell];
1156
1157            if delta_running == 0 && delta_lent == 0 {
1158                continue;
1159            }
1160
1161            // Borrowed = ran somewhere other than own CPUs
1162            let delta_borrowed = delta_running.saturating_sub(delta_on_own);
1163
1164            // After a cell is destroyed, BPF may still report residual
1165            // running_ns from the previous interval. Skip stale cells.
1166            let Some(cell_info) = self.cells.get(&(cell as u32)) else {
1167                continue;
1168            };
1169
1170            let nr_cpus = cell_info.cpus.weight() as u64;
1171            if nr_cpus == 0 {
1172                bail!("Cell {} has 0 CPUs assigned", cell);
1173            }
1174
1175            // capacity = total available CPU-time this interval
1176            let capacity = nr_cpus * interval_ns;
1177            // util: fraction of own capacity consumed by own tasks
1178            let util_pct = 100.0 * (delta_running as f64) / (capacity as f64);
1179            // demand_borrow: fraction of running time that was borrowed from other cells
1180            let demand_borrow_pct = if delta_running > 0 {
1181                100.0 * (delta_borrowed as f64) / (delta_running as f64)
1182            } else {
1183                0.0
1184            };
1185            // lent: fraction of own capacity used by foreign tasks
1186            let lent_pct = 100.0 * (delta_lent as f64) / (capacity as f64);
1187
1188            // Update EWMA-smoothed utilization
1189            if self.enable_rebalancing {
1190                self.smoothed_util[cell] = self.demand_smoothing * util_pct
1191                    + (1.0 - self.demand_smoothing) * self.smoothed_util[cell];
1192            }
1193
1194            self.metrics
1195                .cells
1196                .entry(cell as u32)
1197                .or_default()
1198                .update_demand(util_pct, demand_borrow_pct, lent_pct);
1199
1200            // Update smoothed_util_pct in metrics
1201            if self.enable_rebalancing {
1202                self.metrics
1203                    .cells
1204                    .entry(cell as u32)
1205                    .or_default()
1206                    .smoothed_util_pct = self.smoothed_util[cell];
1207            }
1208
1209            global_running_delta = global_running_delta.saturating_add(delta_running);
1210            global_borrowed_delta = global_borrowed_delta.saturating_add(delta_borrowed);
1211            global_lent_delta = global_lent_delta.saturating_add(delta_lent);
1212            global_capacity = global_capacity.saturating_add(capacity);
1213        }
1214
1215        let global_util_pct = if global_capacity > 0 {
1216            100.0 * (global_running_delta as f64) / (global_capacity as f64)
1217        } else {
1218            0.0
1219        };
1220        let global_borrow_pct = if global_running_delta > 0 {
1221            100.0 * (global_borrowed_delta as f64) / (global_running_delta as f64)
1222        } else {
1223            0.0
1224        };
1225        let global_lent_pct = if global_capacity > 0 {
1226            100.0 * (global_lent_delta as f64) / (global_capacity as f64)
1227        } else {
1228            0.0
1229        };
1230
1231        self.metrics
1232            .update_demand(global_util_pct, global_borrow_pct, global_lent_pct);
1233
1234        Ok(())
1235    }
1236
1237    /// Write applied_cpuset_seq to BSS, closing the rejection-skip window.
1238    fn update_applied_cpuset_seq(&mut self) {
1239        unsafe {
1240            let ptr = &mut self.skel.maps.bss_data.as_mut().unwrap().applied_cpuset_seq as *mut u32;
1241            std::ptr::write_volatile(ptr, self.last_cpuset_seq);
1242        }
1243    }
1244
1245    /// Check if any cell's cpuset was modified and recompute if so.
1246    fn check_cpuset_changes(&mut self) -> Result<()> {
1247        let Some(ref mut cm) = self.cell_manager else {
1248            return Ok(());
1249        };
1250
1251        let current_seq = unsafe {
1252            let ptr = &self.skel.maps.bss_data.as_ref().unwrap().cpuset_seq as *const u32;
1253            (ptr as *const AtomicU32)
1254                .as_ref()
1255                .unwrap()
1256                .load(Ordering::Acquire)
1257        };
1258
1259        if current_seq == self.last_cpuset_seq {
1260            return Ok(());
1261        }
1262        self.last_cpuset_seq = current_seq;
1263
1264        if !cm.refresh_cpusets()? {
1265            // seq changed but no cpusets on our cells changed
1266            self.update_applied_cpuset_seq();
1267            return Ok(());
1268        }
1269
1270        let cpu_assignments = self.compute_and_apply_cell_config(&[])?;
1271        self.update_applied_cpuset_seq();
1272        let cell_manager = self.cell_manager.as_ref().unwrap();
1273        info!(
1274            "Cpuset change detected, recomputed config: {}",
1275            cell_manager.format_cell_config(&cpu_assignments)
1276        );
1277        Ok(())
1278    }
1279
1280    fn refresh_bpf_cells(&mut self) -> Result<()> {
1281        let applied_configuration = unsafe {
1282            let ptr = &self
1283                .skel
1284                .maps
1285                .bss_data
1286                .as_ref()
1287                .unwrap()
1288                .applied_configuration_seq as *const u32;
1289            (ptr as *const std::sync::atomic::AtomicU32)
1290                .as_ref()
1291                .unwrap()
1292                .load(std::sync::atomic::Ordering::Acquire)
1293        };
1294        if self
1295            .last_configuration_seq
1296            .is_some_and(|seq| applied_configuration == seq)
1297        {
1298            return Ok(());
1299        }
1300        // collect all cpus per cell.
1301        let mut cell_to_cpus: HashMap<u32, Cpumask> = HashMap::new();
1302        let cpu_ctxs = read_cpu_ctxs(&self.skel)?;
1303        for (i, cpu_ctx) in cpu_ctxs.iter().enumerate() {
1304            cell_to_cpus
1305                .entry(cpu_ctx.cell)
1306                .or_insert_with(|| Cpumask::new())
1307                .set_cpu(i)
1308                .expect("set cpu in existing mask");
1309        }
1310
1311        // Create cells we don't have yet, drop cells that are no longer in use.
1312        // If we continue to drop cell metrics once a cell is removed, we'll need to make sure we
1313        // flush metrics for a cell before we remove it completely.
1314        //
1315        // IMPORTANT: We determine which cells exist based on CPU assignments (which are
1316        // synchronized by applied_configuration_seq), NOT by reading the in_use field
1317        // separately. This avoids a TOCTOU race where a cell's in_use is set before
1318        // CPUs are assigned.
1319
1320        // Cell 0 (root cell) always exists even if it has no CPUs temporarily
1321        let cells_with_cpus: HashSet<u32> = cell_to_cpus.keys().copied().collect();
1322        let mut active_cells = cells_with_cpus.clone();
1323        active_cells.insert(0);
1324
1325        for cell_idx in &active_cells {
1326            let cpus = cell_to_cpus
1327                .get(cell_idx)
1328                .cloned()
1329                .unwrap_or_else(|| Cpumask::new());
1330            self.cells
1331                .entry(*cell_idx)
1332                .or_insert_with(|| Cell {
1333                    cpus: Cpumask::new(),
1334                })
1335                .cpus = cpus;
1336            self.metrics.cells.insert(*cell_idx, CellMetrics::default());
1337        }
1338
1339        // Remove cells that no longer have CPUs assigned
1340        self.cells.retain(|&k, _| active_cells.contains(&k));
1341        self.metrics.cells.retain(|&k, _| active_cells.contains(&k));
1342
1343        self.last_configuration_seq = Some(applied_configuration);
1344
1345        Ok(())
1346    }
1347}
1348
1349fn write_cpumask_to_config(cpumask: &Cpumask, dest: &mut [u8]) {
1350    let raw_slice = cpumask.as_raw_slice();
1351    for (word_idx, word) in raw_slice.iter().enumerate() {
1352        let byte_start = word_idx * 8;
1353        let bytes = word.to_le_bytes();
1354        for (j, byte) in bytes.iter().enumerate() {
1355            let idx = byte_start + j;
1356            if idx < dest.len() {
1357                dest[idx] = *byte;
1358            }
1359        }
1360    }
1361}
1362
1363fn read_cpu_ctxs(skel: &BpfSkel) -> Result<Vec<bpf_intf::cpu_ctx>> {
1364    let mut cpu_ctxs = vec![];
1365    let cpu_ctxs_vec = skel
1366        .maps
1367        .cpu_ctxs
1368        .lookup_percpu(&0u32.to_ne_bytes(), libbpf_rs::MapFlags::ANY)
1369        .context("Failed to lookup cpu_ctx")?
1370        .unwrap();
1371    if cpu_ctxs_vec.len() < *NR_CPUS_POSSIBLE {
1372        bail!(
1373            "Percpu map returned {} entries but expected {}",
1374            cpu_ctxs_vec.len(),
1375            *NR_CPUS_POSSIBLE
1376        );
1377    }
1378    for cpu in 0..*NR_CPUS_POSSIBLE {
1379        cpu_ctxs.push(*unsafe {
1380            &*(cpu_ctxs_vec[cpu].as_slice().as_ptr() as *const bpf_intf::cpu_ctx)
1381        });
1382    }
1383    Ok(cpu_ctxs)
1384}
1385
1386#[clap_main::clap_main]
1387fn main(opts: Opts) -> Result<()> {
1388    if opts.version {
1389        println!(
1390            "scx_mitosis {}",
1391            build_id::full_version(env!("CARGO_PKG_VERSION"))
1392        );
1393        return Ok(());
1394    }
1395
1396    let env_filter = EnvFilter::try_from_default_env()
1397        .or_else(|_| match EnvFilter::try_new(&opts.log_level) {
1398            Ok(filter) => Ok(filter),
1399            Err(e) => {
1400                eprintln!(
1401                    "invalid log envvar: {}, using info, err is: {}",
1402                    opts.log_level, e
1403                );
1404                EnvFilter::try_new("info")
1405            }
1406        })
1407        .unwrap_or_else(|_| EnvFilter::new("info"));
1408
1409    match tracing_subscriber::fmt()
1410        .with_env_filter(env_filter)
1411        .with_target(true)
1412        .with_thread_ids(true)
1413        .with_file(true)
1414        .with_line_number(true)
1415        .try_init()
1416    {
1417        Ok(()) => {}
1418        Err(e) => eprintln!("failed to init logger: {}", e),
1419    }
1420
1421    if opts.verbose > 0 {
1422        warn!("Setting verbose via -v is deprecated and will be an error in future releases.");
1423    }
1424
1425    debug!("opts={:?}", &opts);
1426
1427    if let Some(run_id) = opts.run_id {
1428        info!("scx_mitosis run_id: {}", run_id);
1429    }
1430
1431    let shutdown = Arc::new(AtomicBool::new(false));
1432    let shutdown_clone = shutdown.clone();
1433    ctrlc::set_handler(move || {
1434        shutdown_clone.store(true, Ordering::Relaxed);
1435    })
1436    .context("Error setting Ctrl-C handler")?;
1437
1438    if let Some(intv) = opts.monitor {
1439        let shutdown_clone = shutdown.clone();
1440        let jh = std::thread::spawn(move || {
1441            match stats::monitor(Duration::from_secs_f64(intv), shutdown_clone) {
1442                Ok(_) => {
1443                    debug!("stats monitor thread finished successfully")
1444                }
1445                Err(error_object) => {
1446                    warn!(
1447                        "stats monitor thread finished because of an error {}",
1448                        error_object
1449                    )
1450                }
1451            }
1452        });
1453        if opts.monitor.is_some() {
1454            let _ = jh.join();
1455            return Ok(());
1456        }
1457    }
1458
1459    let mut open_object = MaybeUninit::uninit();
1460    loop {
1461        let mut sched = Scheduler::init(&opts, &mut open_object)?;
1462        if !sched.run(shutdown.clone())?.should_restart() {
1463            break;
1464        }
1465    }
1466
1467    Ok(())
1468}