1mod bpf_skel;
6pub use bpf_skel::*;
7pub mod bpf_intf;
8mod stats;
9
10use std::cmp::max;
11use std::collections::HashMap;
12use std::fmt;
13use std::fmt::Display;
14use std::mem::MaybeUninit;
15use std::sync::atomic::AtomicBool;
16use std::sync::atomic::Ordering;
17use std::sync::Arc;
18use std::time::Duration;
19
20use anyhow::bail;
21use anyhow::Context;
22use anyhow::Result;
23use clap::Parser;
24use crossbeam::channel::RecvTimeoutError;
25use libbpf_rs::MapCore as _;
26use libbpf_rs::OpenObject;
27use log::debug;
28use log::info;
29use log::trace;
30use log::warn;
31use scx_stats::prelude::*;
32use scx_utils::init_libbpf_logging;
33use scx_utils::scx_enums;
34use scx_utils::scx_ops_attach;
35use scx_utils::scx_ops_load;
36use scx_utils::scx_ops_open;
37use scx_utils::uei_exited;
38use scx_utils::uei_report;
39use scx_utils::Cpumask;
40use scx_utils::Topology;
41use scx_utils::UserExitInfo;
42use scx_utils::NR_CPUS_POSSIBLE;
43
44use stats::CellMetrics;
45use stats::Metrics;
46
47const MAX_CELLS: usize = bpf_intf::consts_MAX_CELLS as usize;
48const NR_CSTATS: usize = bpf_intf::cell_stat_idx_NR_CSTATS as usize;
49
50#[derive(Debug, Parser)]
58struct Opts {
59 #[clap(short = 'v', long, action = clap::ArgAction::Count)]
62 verbose: u8,
63
64 #[clap(long, default_value = "0")]
66 exit_dump_len: u32,
67
68 #[clap(long, default_value = "10")]
70 reconfiguration_interval_s: u64,
71
72 #[clap(long, default_value = "5")]
74 rebalance_cpus_interval_s: u64,
75
76 #[clap(long, default_value = "1")]
78 monitor_interval_s: u64,
79
80 #[clap(long)]
83 monitor: Option<f64>,
84}
85
86const QUEUE_STATS_IDX: [bpf_intf::cell_stat_idx; 3] = [
91 bpf_intf::cell_stat_idx_CSTAT_LOCAL,
92 bpf_intf::cell_stat_idx_CSTAT_CPU_DSQ,
93 bpf_intf::cell_stat_idx_CSTAT_CELL_DSQ,
94];
95
96#[derive(Debug)]
98struct Cell {
99 cpus: Cpumask,
100}
101
102struct Scheduler<'a> {
103 skel: BpfSkel<'a>,
104 monitor_interval: Duration,
105 cells: HashMap<u32, Cell>,
106 prev_cell_stats: [[u64; NR_CSTATS]; MAX_CELLS],
109 metrics: Metrics,
110 stats_server: StatsServer<(), Metrics>,
111}
112
113struct DistributionStats {
114 total_decisions: u64,
115 share_of_decisions_pct: f64,
116 local_q_pct: f64,
117 cpu_q_pct: f64,
118 cell_q_pct: f64,
119 affn_viol_pct: f64,
120
121 global_queue_decisions: u64,
123}
124
125impl Display for DistributionStats {
126 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
127 const MIN_DECISIONS_WIDTH: usize = 5;
131 let descisions_width = max(
132 MIN_DECISIONS_WIDTH,
133 (self.global_queue_decisions as f64).log10().ceil() as usize,
134 );
135 write!(
136 f,
137 "{:width$} {:5.1}% | Local:{:4.1}% From: CPU:{:4.1}% Cell:{:4.1}% | V:{:4.1}%",
138 self.total_decisions,
139 self.share_of_decisions_pct,
140 self.local_q_pct,
141 self.cpu_q_pct,
142 self.cell_q_pct,
143 self.affn_viol_pct,
144 width = descisions_width,
145 )
146 }
147}
148
149impl<'a> Scheduler<'a> {
150 fn init(opts: &Opts, open_object: &'a mut MaybeUninit<OpenObject>) -> Result<Self> {
151 let topology = Topology::new()?;
152
153 let mut skel_builder = BpfSkelBuilder::default();
154 skel_builder.obj_builder.debug(opts.verbose > 1);
155 init_libbpf_logging(None);
156 let mut skel = scx_ops_open!(skel_builder, open_object, mitosis)?;
157
158 skel.struct_ops.mitosis_mut().exit_dump_len = opts.exit_dump_len;
159
160 skel.maps.rodata_data.as_mut().unwrap().slice_ns = scx_enums.SCX_SLICE_DFL;
161
162 skel.maps.rodata_data.as_mut().unwrap().nr_possible_cpus = *NR_CPUS_POSSIBLE as u32;
163 for cpu in topology.all_cpus.keys() {
164 skel.maps.rodata_data.as_mut().unwrap().all_cpus[cpu / 8] |= 1 << (cpu % 8);
165 }
166
167 let skel = scx_ops_load!(skel, mitosis, uei)?;
168
169 let stats_server = StatsServer::new(stats::server_data()).launch()?;
170
171 Ok(Self {
172 skel,
173 monitor_interval: Duration::from_secs(opts.monitor_interval_s),
174 cells: HashMap::new(),
175 prev_cell_stats: [[0; NR_CSTATS]; MAX_CELLS],
176 metrics: Metrics::default(),
177 stats_server,
178 })
179 }
180
181 fn run(&mut self, shutdown: Arc<AtomicBool>) -> Result<UserExitInfo> {
182 let struct_ops = scx_ops_attach!(self.skel, mitosis)?;
183
184 info!("Layered Scheduler Attached. Run `scx_mitosis --monitor` for metrics.");
185
186 let (res_ch, req_ch) = self.stats_server.channels();
187
188 while !shutdown.load(Ordering::Relaxed) && !uei_exited!(&self.skel, uei) {
189 self.refresh_bpf_cells()?;
190 self.collect_metrics()?;
191
192 match req_ch.recv_timeout(self.monitor_interval) {
193 Ok(()) => res_ch.send(self.get_metrics())?,
194 Err(RecvTimeoutError::Timeout) => {}
195 Err(e) => Err(e)?,
196 }
197 }
198 drop(struct_ops);
199 uei_report!(&self.skel, uei)
200 }
201
202 fn get_metrics(&self) -> Metrics {
203 self.metrics.clone()
204 }
205
206 fn calculate_distribution_stats(
207 &self,
208 queue_counts: &[u64; QUEUE_STATS_IDX.len()],
209 global_queue_decisions: u64,
210 scope_queue_decisions: u64,
211 scope_affn_viols: u64,
212 ) -> Result<DistributionStats> {
213 let share_of_global =
216 100.0 * (scope_queue_decisions as f64) / (global_queue_decisions as f64);
217
218 let queue_pct = if scope_queue_decisions == 0 {
220 debug!("No queue decisions in scope, zeroing out queue distribution");
221 [0.0; QUEUE_STATS_IDX.len()]
222 } else {
223 core::array::from_fn(|i| {
224 100.0 * (queue_counts[i] as f64) / (scope_queue_decisions as f64)
225 })
226 };
227
228 let affinity_violations_percent = if scope_queue_decisions == 0 {
230 debug!("No queue decisions in scope, zeroing out affinity violations");
231 0.0
232 } else {
233 100.0 * (scope_affn_viols as f64) / (scope_queue_decisions as f64)
234 };
235
236 const EXPECTED_QUEUES: usize = 3;
237 if queue_pct.len() != EXPECTED_QUEUES {
238 bail!(
239 "Expected {} queues, got {}",
240 EXPECTED_QUEUES,
241 queue_pct.len()
242 );
243 }
244
245 return Ok(DistributionStats {
246 total_decisions: scope_queue_decisions,
247 share_of_decisions_pct: share_of_global,
248 local_q_pct: queue_pct[0],
249 cpu_q_pct: queue_pct[1],
250 cell_q_pct: queue_pct[2],
251 affn_viol_pct: affinity_violations_percent,
252 global_queue_decisions,
253 });
254 }
255
256 fn update_and_log_global_queue_stats(
258 &mut self,
259 global_queue_decisions: u64,
260 cell_stats_delta: &[[u64; NR_CSTATS]; MAX_CELLS],
261 ) -> Result<()> {
262 let mut queue_counts = [0; QUEUE_STATS_IDX.len()];
264 for cells in 0..MAX_CELLS {
265 for (i, stat) in QUEUE_STATS_IDX.iter().enumerate() {
266 queue_counts[i] += cell_stats_delta[cells][*stat as usize];
267 }
268 }
269
270 let prefix = "Total Decisions:";
271
272 let scope_affn_viols: u64 = cell_stats_delta
274 .iter()
275 .map(|&cell| cell[bpf_intf::cell_stat_idx_CSTAT_AFFN_VIOL as usize])
276 .sum::<u64>();
277
278 let stats = self.calculate_distribution_stats(
280 &queue_counts,
281 global_queue_decisions,
282 global_queue_decisions,
283 scope_affn_viols,
284 )?;
285
286 self.metrics.update(&stats);
287
288 trace!("{} {}", prefix, stats);
289
290 Ok(())
291 }
292
293 fn update_and_log_cell_queue_stats(
295 &mut self,
296 global_queue_decisions: u64,
297 cell_stats_delta: &[[u64; NR_CSTATS]; MAX_CELLS],
298 ) -> Result<()> {
299 for cell in 0..MAX_CELLS {
300 let cell_queue_decisions = QUEUE_STATS_IDX
301 .iter()
302 .map(|&stat| cell_stats_delta[cell][stat as usize])
303 .sum::<u64>();
304
305 if cell_queue_decisions == 0 {
307 continue;
308 }
309
310 let mut queue_counts = [0; QUEUE_STATS_IDX.len()];
311 for (i, &stat) in QUEUE_STATS_IDX.iter().enumerate() {
312 queue_counts[i] = cell_stats_delta[cell][stat as usize];
313 }
314
315 const MIN_CELL_WIDTH: usize = 2;
316 let cell_width: usize = max(MIN_CELL_WIDTH, (MAX_CELLS as f64).log10().ceil() as usize);
317
318 let prefix = format!(" Cell {:width$}:", cell, width = cell_width);
319
320 let scope_affn_viols: u64 =
322 cell_stats_delta[cell][bpf_intf::cell_stat_idx_CSTAT_AFFN_VIOL as usize];
323
324 let stats = self.calculate_distribution_stats(
325 &queue_counts,
326 global_queue_decisions,
327 cell_queue_decisions,
328 scope_affn_viols,
329 )?;
330
331 self.metrics
332 .cells
333 .entry(cell as u32)
334 .or_default()
335 .update(&stats);
336
337 trace!("{} {}", prefix, stats);
338 }
339 Ok(())
340 }
341
342 fn log_all_queue_stats(
343 &mut self,
344 cell_stats_delta: &[[u64; NR_CSTATS]; MAX_CELLS],
345 ) -> Result<()> {
346 let global_queue_decisions: u64 = cell_stats_delta
348 .iter()
349 .flat_map(|cell| QUEUE_STATS_IDX.iter().map(|&idx| cell[idx as usize]))
350 .sum();
351
352 if global_queue_decisions == 0 {
354 bail!("Error: No queueing decisions made globally");
355 }
356
357 self.update_and_log_global_queue_stats(global_queue_decisions, &cell_stats_delta)?;
358
359 self.update_and_log_cell_queue_stats(global_queue_decisions, &cell_stats_delta)?;
360
361 Ok(())
362 }
363
364 fn calculate_cell_stat_delta(&mut self) -> Result<[[u64; NR_CSTATS]; MAX_CELLS]> {
365 let mut cell_stats_delta = [[0 as u64; NR_CSTATS]; MAX_CELLS];
366
367 let cpu_ctxs = read_cpu_ctxs(&self.skel)?;
369
370 for cell in 0..MAX_CELLS {
373 for stat in 0..NR_CSTATS {
374 let mut cur_cell_stat = 0;
375
376 for cpu_ctx in cpu_ctxs.iter() {
378 cur_cell_stat += cpu_ctx.cstats[cell][stat];
379 }
380
381 cell_stats_delta[cell][stat] = cur_cell_stat - self.prev_cell_stats[cell][stat];
383 self.prev_cell_stats[cell][stat] = cur_cell_stat;
384 }
385 }
386 Ok(cell_stats_delta)
387 }
388
389 fn collect_metrics(&mut self) -> Result<()> {
391 let cell_stats_delta = self.calculate_cell_stat_delta()?;
392
393 self.log_all_queue_stats(&cell_stats_delta)?;
394
395 for (cell_id, cell) in &self.cells {
396 trace!("CELL[{}]: {}", cell_id, cell.cpus);
397 }
398
399 for (cell_id, cell) in self.cells.iter() {
400 self.metrics
402 .cells
403 .entry(*cell_id)
404 .and_modify(|cell_metrics| cell_metrics.num_cpus = cell.cpus.weight() as u32);
405 }
406 self.metrics.num_cells = self.cells.len() as u32;
407
408 Ok(())
409 }
410
411 fn refresh_bpf_cells(&mut self) -> Result<()> {
412 let mut cell_to_cpus: HashMap<u32, Cpumask> = HashMap::new();
414 let cpu_ctxs = read_cpu_ctxs(&self.skel)?;
415 for (i, cpu_ctx) in cpu_ctxs.iter().enumerate() {
416 cell_to_cpus
417 .entry(cpu_ctx.cell)
418 .and_modify(|mask| mask.set_cpu(i).expect("set cpu in existing mask"))
419 .or_insert_with(|| {
420 let mut mask = Cpumask::new();
421 mask.set_cpu(i).expect("set cpu in new mask");
422 mask
423 });
424 }
425
426 let cells = &self.skel.maps.bss_data.as_ref().unwrap().cells;
430 for i in 0..MAX_CELLS {
431 let cell_idx = i as u32;
432 let bpf_cell = cells[i];
433 if bpf_cell.in_use > 0 {
434 self.cells.entry(cell_idx).or_insert(Cell {
435 cpus: cell_to_cpus
436 .get(&cell_idx)
437 .expect("missing cell in cpu map")
438 .clone(),
439 });
440 self.metrics.cells.insert(cell_idx, CellMetrics::default());
441 } else {
442 self.cells.remove(&cell_idx);
443 self.metrics.cells.remove(&cell_idx);
444 }
445 }
446
447 Ok(())
448 }
449}
450
451fn read_cpu_ctxs(skel: &BpfSkel) -> Result<Vec<bpf_intf::cpu_ctx>> {
452 let mut cpu_ctxs = vec![];
453 let cpu_ctxs_vec = skel
454 .maps
455 .cpu_ctxs
456 .lookup_percpu(&0u32.to_ne_bytes(), libbpf_rs::MapFlags::ANY)
457 .context("Failed to lookup cpu_ctx")?
458 .unwrap();
459 for cpu in 0..*NR_CPUS_POSSIBLE {
460 cpu_ctxs.push(*unsafe {
461 &*(cpu_ctxs_vec[cpu].as_slice().as_ptr() as *const bpf_intf::cpu_ctx)
462 });
463 }
464 Ok(cpu_ctxs)
465}
466
467fn main() -> Result<()> {
468 let opts = Opts::parse();
469
470 let llv = match opts.verbose {
471 0 => simplelog::LevelFilter::Info,
472 1 => simplelog::LevelFilter::Debug,
473 _ => simplelog::LevelFilter::Trace,
474 };
475 let mut lcfg = simplelog::ConfigBuilder::new();
476 lcfg.set_time_offset_to_local()
477 .expect("Failed to set local time offset")
478 .set_time_level(simplelog::LevelFilter::Error)
479 .set_location_level(simplelog::LevelFilter::Off)
480 .set_target_level(simplelog::LevelFilter::Off)
481 .set_thread_level(simplelog::LevelFilter::Off);
482 simplelog::TermLogger::init(
483 llv,
484 lcfg.build(),
485 simplelog::TerminalMode::Stderr,
486 simplelog::ColorChoice::Auto,
487 )?;
488
489 debug!("opts={:?}", &opts);
490
491 let shutdown = Arc::new(AtomicBool::new(false));
492 let shutdown_clone = shutdown.clone();
493 ctrlc::set_handler(move || {
494 shutdown_clone.store(true, Ordering::Relaxed);
495 })
496 .context("Error setting Ctrl-C handler")?;
497
498 if let Some(intv) = opts.monitor {
499 let shutdown_clone = shutdown.clone();
500 let jh = std::thread::spawn(move || {
501 match stats::monitor(Duration::from_secs_f64(intv), shutdown_clone) {
502 Ok(_) => {
503 debug!("stats monitor thread finished successfully")
504 }
505 Err(error_object) => {
506 warn!(
507 "stats monitor thread finished because of an error {}",
508 error_object
509 )
510 }
511 }
512 });
513 if opts.monitor.is_some() {
514 let _ = jh.join();
515 return Ok(());
516 }
517 }
518
519 let mut open_object = MaybeUninit::uninit();
520 loop {
521 let mut sched = Scheduler::init(&opts, &mut open_object)?;
522 if !sched.run(shutdown.clone())?.should_restart() {
523 break;
524 }
525 }
526
527 Ok(())
528}