1mod bpf_skel;
6pub use bpf_skel::*;
7pub mod bpf_intf;
8
9mod domain;
10use domain::DomainGroup;
11
12pub mod tuner;
13use tuner::Tuner;
14
15pub mod load_balance;
16use load_balance::LoadBalancer;
17
18mod stats;
19use std::collections::BTreeMap;
20use std::mem::MaybeUninit;
21use std::sync::Arc;
22use std::sync::atomic::AtomicBool;
23use std::sync::atomic::Ordering;
24use std::time::Duration;
25use std::time::Instant;
26use std::time::SystemTime;
27use std::time::UNIX_EPOCH;
28
29use stats::ClusterStats;
30use stats::NodeStats;
31
32#[macro_use]
33extern crate static_assertions;
34
35use ::fb_procfs as procfs;
36use anyhow::Context;
37use anyhow::Result;
38use anyhow::anyhow;
39use anyhow::bail;
40use clap::Parser;
41use crossbeam::channel::RecvTimeoutError;
42use libbpf_rs::MapCore as _;
43use libbpf_rs::OpenObject;
44use log::info;
45use scx_stats::prelude::*;
46use scx_utils::Cpumask;
47use scx_utils::NR_CPU_IDS;
48use scx_utils::Topology;
49use scx_utils::UserExitInfo;
50use scx_utils::build_id;
51use scx_utils::compat;
52use scx_utils::init_libbpf_logging;
53use scx_utils::scx_enums;
54use scx_utils::scx_ops_attach;
55use scx_utils::scx_ops_load;
56use scx_utils::scx_ops_open;
57use scx_utils::uei_exited;
58use scx_utils::uei_report;
59
60const MAX_DOMS: usize = bpf_intf::consts_MAX_DOMS as usize;
61const MAX_CPUS: usize = bpf_intf::consts_MAX_CPUS as usize;
62
63#[derive(Debug, Parser)]
94struct Opts {
95 #[clap(short = 'u', long, default_value = "20000")]
97 slice_us_underutil: u64,
98
99 #[clap(short = 'o', long, default_value = "1000")]
101 slice_us_overutil: u64,
102
103 #[clap(short = 'i', long, default_value = "2.0")]
105 interval: f64,
106
107 #[clap(short = 'I', long, default_value = "0.1")]
110 tune_interval: f64,
111
112 #[clap(short = 'l', long, default_value = "1.0")]
114 load_half_life: f64,
115
116 #[clap(short = 'c', long, default_value = "3")]
119 cache_level: u32,
120
121 #[clap(short = 'C', long, num_args = 1.., conflicts_with = "cache_level")]
127 cpumasks: Vec<String>,
128
129 #[clap(short = 'g', long, default_value = "1")]
141 greedy_threshold: u32,
142
143 #[clap(long, default_value = "0")]
148 greedy_threshold_x_numa: u32,
149
150 #[clap(long, action = clap::ArgAction::SetTrue)]
153 no_load_balance: bool,
154
155 #[clap(short = 'k', long, action = clap::ArgAction::SetTrue)]
157 kthreads_local: bool,
158
159 #[clap(short = 'b', long, action = clap::ArgAction::SetTrue)]
163 balanced_kworkers: bool,
164
165 #[clap(short = 'f', long, action = clap::ArgAction::SetTrue)]
167 fifo_sched: bool,
168
169 #[clap(short = 'D', long, default_value = "90.0")]
172 direct_greedy_under: f64,
173
174 #[clap(short = 'K', long, default_value = "100.0")]
178 kick_greedy_under: f64,
179
180 #[clap(short = 'r', long, action = clap::ArgAction::SetTrue)]
186 direct_greedy_numa: bool,
187
188 #[clap(short = 'p', long, action = clap::ArgAction::SetTrue)]
192 partial: bool,
193
194 #[clap(long, action = clap::ArgAction::SetTrue)]
197 mempolicy_affinity: bool,
198
199 #[clap(long)]
201 stats: Option<f64>,
202
203 #[clap(long)]
206 monitor: Option<f64>,
207
208 #[clap(long, default_value = "0")]
210 exit_dump_len: u32,
211
212 #[clap(short = 'v', long, action = clap::ArgAction::Count)]
215 verbose: u8,
216
217 #[clap(long)]
219 version: bool,
220
221 #[clap(long)]
223 help_stats: bool,
224
225 #[clap(long, default_value = "0")]
229 perf: u32,
230}
231
232fn read_cpu_busy_and_total(reader: &procfs::ProcReader) -> Result<(u64, u64)> {
233 let cs = reader
234 .read_stat()
235 .context("Failed to read procfs")?
236 .total_cpu
237 .ok_or_else(|| anyhow!("Could not read total cpu stat in proc"))?;
238
239 Ok(match cs {
240 procfs::CpuStat {
241 user_usec: Some(user),
242 nice_usec: Some(nice),
243 system_usec: Some(system),
244 idle_usec: Some(idle),
245 iowait_usec: Some(iowait),
246 irq_usec: Some(irq),
247 softirq_usec: Some(softirq),
248 stolen_usec: Some(stolen),
249 guest_usec: _,
250 guest_nice_usec: _,
251 } => {
252 let busy = user + system + nice + irq + softirq + stolen;
253 let total = busy + idle + iowait;
254 (busy, total)
255 }
256 _ => bail!("Some procfs stats are not populated!"),
257 })
258}
259
260pub fn sub_or_zero(curr: &u64, prev: &u64) -> u64 {
261 curr.checked_sub(*prev).unwrap_or(0u64)
262}
263
264#[derive(Clone, Debug)]
265struct StatsCtx {
266 cpu_busy: u64,
267 cpu_total: u64,
268 bpf_stats: Vec<u64>,
269 time_used: Duration,
270}
271
272impl StatsCtx {
273 fn read_bpf_stats(skel: &BpfSkel) -> Result<Vec<u64>> {
274 let stats_map = &skel.maps.stats;
275 let mut stats: Vec<u64> = Vec::new();
276
277 for stat in 0..bpf_intf::stat_idx_RUSTY_NR_STATS {
278 let cpu_stat_vec = stats_map
279 .lookup_percpu(&stat.to_ne_bytes(), libbpf_rs::MapFlags::ANY)
280 .with_context(|| format!("Failed to lookup stat {}", stat))?
281 .expect("per-cpu stat should exist");
282 let sum = cpu_stat_vec
283 .iter()
284 .map(|val| {
285 u64::from_ne_bytes(
286 val.as_slice()
287 .try_into()
288 .expect("Invalid value length in stat map"),
289 )
290 })
291 .sum();
292 stats.push(sum);
293 }
294 Ok(stats)
295 }
296
297 fn blank() -> Self {
298 Self {
299 cpu_busy: 0,
300 cpu_total: 0,
301 bpf_stats: vec![0u64; bpf_intf::stat_idx_RUSTY_NR_STATS as usize],
302 time_used: Duration::default(),
303 }
304 }
305
306 fn new(skel: &BpfSkel, proc_reader: &procfs::ProcReader, time_used: Duration) -> Result<Self> {
307 let (cpu_busy, cpu_total) = read_cpu_busy_and_total(proc_reader)?;
308
309 Ok(Self {
310 cpu_busy,
311 cpu_total,
312 bpf_stats: Self::read_bpf_stats(skel)?,
313 time_used,
314 })
315 }
316
317 fn delta(&self, rhs: &Self) -> Self {
318 Self {
319 cpu_busy: sub_or_zero(&self.cpu_busy, &rhs.cpu_busy),
320 cpu_total: sub_or_zero(&self.cpu_total, &rhs.cpu_total),
321 bpf_stats: self
322 .bpf_stats
323 .iter()
324 .zip(rhs.bpf_stats.iter())
325 .map(|(lhs, rhs)| sub_or_zero(&lhs, &rhs))
326 .collect(),
327 time_used: self.time_used - rhs.time_used,
328 }
329 }
330}
331
332struct Scheduler<'a> {
333 skel: BpfSkel<'a>,
334 struct_ops: Option<libbpf_rs::Link>,
335
336 sched_interval: Duration,
337 tune_interval: Duration,
338 balance_load: bool,
339 balanced_kworkers: bool,
340
341 dom_group: Arc<DomainGroup>,
342
343 proc_reader: procfs::ProcReader,
344
345 lb_at: SystemTime,
346 lb_stats: BTreeMap<usize, NodeStats>,
347 time_used: Duration,
348
349 tuner: Tuner,
350 stats_server: StatsServer<StatsCtx, (StatsCtx, ClusterStats)>,
351}
352
353impl<'a> Scheduler<'a> {
354 fn init(opts: &Opts, open_object: &'a mut MaybeUninit<OpenObject>) -> Result<Self> {
355 let mut skel_builder = BpfSkelBuilder::default();
357 skel_builder.obj_builder.debug(opts.verbose > 0);
358 init_libbpf_logging(None);
359 info!(
360 "Running scx_rusty (build ID: {})",
361 build_id::full_version(env!("CARGO_PKG_VERSION"))
362 );
363 let mut skel = scx_ops_open!(skel_builder, open_object, rusty).unwrap();
364
365 let domains = Arc::new(DomainGroup::new(&Topology::new()?, &opts.cpumasks)?);
367
368 if *NR_CPU_IDS > MAX_CPUS {
369 bail!(
370 "Num possible CPU IDs ({}) exceeds maximum of ({})",
371 *NR_CPU_IDS,
372 MAX_CPUS
373 );
374 }
375
376 if domains.nr_doms() > MAX_DOMS {
377 bail!(
378 "nr_doms ({}) is greater than MAX_DOMS ({})",
379 domains.nr_doms(),
380 MAX_DOMS
381 );
382 }
383
384 skel.maps.bss_data.slice_ns = scx_enums.SCX_SLICE_DFL;
385
386 skel.maps.rodata_data.nr_nodes = domains.nr_nodes() as u32;
387 skel.maps.rodata_data.nr_doms = domains.nr_doms() as u32;
388 skel.maps.rodata_data.nr_cpu_ids = *NR_CPU_IDS as u32;
389
390 for cpu in 0..*NR_CPU_IDS {
395 skel.maps.rodata_data.cpu_dom_id_map[cpu] = u32::MAX;
396 }
397
398 for (id, dom) in domains.doms().iter() {
399 for cpu in dom.mask().iter() {
400 skel.maps.rodata_data.cpu_dom_id_map[cpu] = *id as u32;
401 }
402 }
403
404 for numa in 0..domains.nr_nodes() {
405 let mut numa_mask = Cpumask::new();
406 let node_domains = domains.numa_doms(&numa);
407 for dom in node_domains.iter() {
408 let dom_mask = dom.mask();
409 numa_mask = numa_mask.or(&dom_mask);
410 }
411
412 let raw_numa_slice = numa_mask.as_raw_slice();
413 let node_cpumask_slice = &mut skel.maps.rodata_data.numa_cpumasks[numa];
414 let (left, _) = node_cpumask_slice.split_at_mut(raw_numa_slice.len());
415 left.clone_from_slice(raw_numa_slice);
416 info!("NODE[{:02}] mask= {}", numa, numa_mask);
417
418 for dom in node_domains.iter() {
419 let raw_dom_slice = dom.mask_slice();
420 let dom_cpumask_slice = &mut skel.maps.rodata_data.dom_cpumasks[dom.id()];
421 let (left, _) = dom_cpumask_slice.split_at_mut(raw_dom_slice.len());
422 left.clone_from_slice(raw_dom_slice);
423 skel.maps.rodata_data.dom_numa_id_map[dom.id()] =
424 numa.try_into().expect("NUMA ID could not fit into 32 bits");
425
426 info!(" DOM[{:02}] mask= {}", dom.id(), dom.mask());
427 }
428 }
429
430 if opts.partial {
431 skel.struct_ops.rusty_mut().flags |= *compat::SCX_OPS_SWITCH_PARTIAL;
432 }
433 skel.struct_ops.rusty_mut().exit_dump_len = opts.exit_dump_len;
434
435 skel.maps.rodata_data.load_half_life = (opts.load_half_life * 1000000000.0) as u32;
436 skel.maps.rodata_data.kthreads_local = opts.kthreads_local;
437 skel.maps.rodata_data.fifo_sched = opts.fifo_sched;
438 skel.maps.rodata_data.greedy_threshold = opts.greedy_threshold;
439 skel.maps.rodata_data.greedy_threshold_x_numa = opts.greedy_threshold_x_numa;
440 skel.maps.rodata_data.direct_greedy_numa = opts.direct_greedy_numa;
441 skel.maps.rodata_data.mempolicy_affinity = opts.mempolicy_affinity;
442 skel.maps.rodata_data.debug = opts.verbose as u32;
443 skel.maps.rodata_data.rusty_perf_mode = opts.perf;
444
445 let mut skel = scx_ops_load!(skel, rusty, uei)?;
447 let struct_ops = Some(scx_ops_attach!(skel, rusty)?);
448 let stats_server = StatsServer::new(stats::server_data()).launch()?;
449
450 for (id, dom) in domains.doms().iter() {
451 let mut ctx = dom.ctx.lock().unwrap();
452
453 *ctx = Some(skel.maps.bss_data.dom_ctxs[*id]);
454 }
455
456 info!("Rusty scheduler started! Run `scx_rusty --monitor` for metrics.");
457
458 let proc_reader = procfs::ProcReader::new();
460
461 Ok(Self {
462 skel,
463 struct_ops, sched_interval: Duration::from_secs_f64(opts.interval),
466 tune_interval: Duration::from_secs_f64(opts.tune_interval),
467 balance_load: !opts.no_load_balance,
468 balanced_kworkers: opts.balanced_kworkers,
469
470 dom_group: domains.clone(),
471 proc_reader,
472
473 lb_at: SystemTime::now(),
474 lb_stats: BTreeMap::new(),
475 time_used: Duration::default(),
476
477 tuner: Tuner::new(
478 domains,
479 opts.direct_greedy_under,
480 opts.kick_greedy_under,
481 opts.slice_us_underutil * 1000,
482 opts.slice_us_overutil * 1000,
483 )?,
484 stats_server,
485 })
486 }
487
488 fn cluster_stats(&self, sc: &StatsCtx, node_stats: BTreeMap<usize, NodeStats>) -> ClusterStats {
489 let stat = |idx| sc.bpf_stats[idx as usize];
490 let total = stat(bpf_intf::stat_idx_RUSTY_STAT_WAKE_SYNC)
491 + stat(bpf_intf::stat_idx_RUSTY_STAT_SYNC_PREV_IDLE)
492 + stat(bpf_intf::stat_idx_RUSTY_STAT_PREV_IDLE)
493 + stat(bpf_intf::stat_idx_RUSTY_STAT_GREEDY_IDLE)
494 + stat(bpf_intf::stat_idx_RUSTY_STAT_PINNED)
495 + stat(bpf_intf::stat_idx_RUSTY_STAT_DIRECT_DISPATCH)
496 + stat(bpf_intf::stat_idx_RUSTY_STAT_DIRECT_GREEDY)
497 + stat(bpf_intf::stat_idx_RUSTY_STAT_DIRECT_GREEDY_FAR)
498 + stat(bpf_intf::stat_idx_RUSTY_STAT_DSQ_DISPATCH)
499 + stat(bpf_intf::stat_idx_RUSTY_STAT_GREEDY_LOCAL)
500 + stat(bpf_intf::stat_idx_RUSTY_STAT_GREEDY_XNUMA);
501 let stat_pct = |idx| stat(idx) as f64 / total as f64 * 100.0;
502
503 let cpu_busy = if sc.cpu_total != 0 {
504 (sc.cpu_busy as f64 / sc.cpu_total as f64) * 100.0
505 } else {
506 0.0
507 };
508
509 ClusterStats {
510 at_us: SystemTime::now()
511 .duration_since(UNIX_EPOCH)
512 .unwrap()
513 .as_micros()
514 .try_into()
515 .unwrap(),
516 lb_at_us: self
517 .lb_at
518 .duration_since(UNIX_EPOCH)
519 .unwrap()
520 .as_micros()
521 .try_into()
522 .unwrap(),
523 total,
524 slice_us: self.tuner.slice_ns / 1000,
525
526 cpu_busy,
527 load: node_stats.iter().map(|(_k, v)| v.load).sum::<f64>(),
528 nr_migrations: sc.bpf_stats[bpf_intf::stat_idx_RUSTY_STAT_LOAD_BALANCE as usize],
529
530 task_get_err: sc.bpf_stats[bpf_intf::stat_idx_RUSTY_STAT_TASK_GET_ERR as usize],
531 time_used: sc.time_used.as_secs_f64(),
532
533 sync_prev_idle: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_SYNC_PREV_IDLE),
534 wake_sync: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_WAKE_SYNC),
535 prev_idle: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_PREV_IDLE),
536 greedy_idle: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_GREEDY_IDLE),
537 pinned: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_PINNED),
538 direct: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_DIRECT_DISPATCH),
539 greedy: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_DIRECT_GREEDY),
540 greedy_far: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_DIRECT_GREEDY_FAR),
541 dsq_dispatch: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_DSQ_DISPATCH),
542 greedy_local: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_GREEDY_LOCAL),
543 greedy_xnuma: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_GREEDY_XNUMA),
544 kick_greedy: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_KICK_GREEDY),
545 repatriate: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_REPATRIATE),
546 dl_clamp: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_DL_CLAMP),
547 dl_preset: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_DL_PRESET),
548
549 direct_greedy_cpus: self.tuner.direct_greedy_mask.as_raw_slice().to_owned(),
550 kick_greedy_cpus: self.tuner.kick_greedy_mask.as_raw_slice().to_owned(),
551
552 nodes: node_stats,
553 }
554 }
555
556 fn lb_step(&mut self) -> Result<()> {
557 let mut lb = LoadBalancer::new(
558 &mut self.skel,
559 self.dom_group.clone(),
560 self.balanced_kworkers,
561 self.tuner.fully_utilized,
562 self.balance_load,
563 );
564
565 lb.load_balance()?;
566
567 self.lb_at = SystemTime::now();
568 self.lb_stats = lb.get_stats();
569 Ok(())
570 }
571
572 fn run(&mut self, shutdown: Arc<AtomicBool>) -> Result<UserExitInfo> {
573 let (res_ch, req_ch) = self.stats_server.channels();
574 let now = Instant::now();
575 let mut next_tune_at = now + self.tune_interval;
576 let mut next_sched_at = now + self.sched_interval;
577
578 self.skel.maps.stats.value_size() as usize;
579
580 while !shutdown.load(Ordering::Relaxed) && !uei_exited!(&self.skel, uei) {
581 let now = Instant::now();
582
583 if now >= next_tune_at {
584 self.tuner.step(&mut self.skel)?;
585 next_tune_at += self.tune_interval;
586 if next_tune_at < now {
587 next_tune_at = now + self.tune_interval;
588 }
589 }
590
591 if now >= next_sched_at {
592 self.lb_step()?;
593 next_sched_at += self.sched_interval;
594 if next_sched_at < now {
595 next_sched_at = now + self.sched_interval;
596 }
597 }
598
599 self.time_used += Instant::now().duration_since(now);
600
601 match req_ch.recv_deadline(next_sched_at.min(next_tune_at)) {
602 Ok(prev_sc) => {
603 let cur_sc = StatsCtx::new(&self.skel, &self.proc_reader, self.time_used)?;
604 let delta_sc = cur_sc.delta(&prev_sc);
605 let cstats = self.cluster_stats(&delta_sc, self.lb_stats.clone());
606 res_ch.send((cur_sc, cstats))?;
607 }
608 Err(RecvTimeoutError::Timeout) => {}
609 Err(e) => Err(e)?,
610 }
611 }
612
613 self.struct_ops.take();
614 uei_report!(&self.skel, uei)
615 }
616}
617
618impl Drop for Scheduler<'_> {
619 fn drop(&mut self) {
620 if let Some(struct_ops) = self.struct_ops.take() {
621 drop(struct_ops);
622 }
623 }
624}
625
626fn main() -> Result<()> {
627 let opts = Opts::parse();
628
629 if opts.version {
630 println!(
631 "scx_rusty: {}",
632 build_id::full_version(env!("CARGO_PKG_VERSION"))
633 );
634 return Ok(());
635 }
636
637 if opts.help_stats {
638 stats::server_data().describe_meta(&mut std::io::stdout(), None)?;
639 return Ok(());
640 }
641
642 let llv = match opts.verbose {
643 0 => simplelog::LevelFilter::Info,
644 1 => simplelog::LevelFilter::Debug,
645 _ => simplelog::LevelFilter::Trace,
646 };
647 let mut lcfg = simplelog::ConfigBuilder::new();
648 lcfg.set_time_level(simplelog::LevelFilter::Error)
649 .set_location_level(simplelog::LevelFilter::Off)
650 .set_target_level(simplelog::LevelFilter::Off)
651 .set_thread_level(simplelog::LevelFilter::Off);
652 simplelog::TermLogger::init(
653 llv,
654 lcfg.build(),
655 simplelog::TerminalMode::Stderr,
656 simplelog::ColorChoice::Auto,
657 )?;
658
659 let shutdown = Arc::new(AtomicBool::new(false));
660 let shutdown_clone = shutdown.clone();
661 ctrlc::set_handler(move || {
662 shutdown_clone.store(true, Ordering::Relaxed);
663 })
664 .context("Error setting Ctrl-C handler")?;
665
666 if let Some(intv) = opts.monitor.or(opts.stats) {
667 let shutdown_copy = shutdown.clone();
668 let jh = std::thread::spawn(move || {
669 stats::monitor(Duration::from_secs_f64(intv), shutdown_copy).unwrap()
670 });
671 if opts.monitor.is_some() {
672 let _ = jh.join();
673 return Ok(());
674 }
675 }
676
677 let mut open_object = MaybeUninit::uninit();
678 loop {
679 let mut sched = Scheduler::init(&opts, &mut open_object)?;
680 if !sched.run(shutdown.clone())?.should_restart() {
681 break;
682 }
683 }
684 Ok(())
685}