1mod bpf_skel;
6pub use bpf_skel::*;
7pub mod bpf_intf;
8
9mod domain;
10use domain::DomainGroup;
11
12pub mod tuner;
13use tuner::Tuner;
14
15pub mod load_balance;
16use load_balance::LoadBalancer;
17
18mod stats;
19use std::collections::BTreeMap;
20use std::mem::MaybeUninit;
21use std::sync::atomic::AtomicBool;
22use std::sync::atomic::Ordering;
23use std::sync::Arc;
24use std::time::Duration;
25use std::time::Instant;
26use std::time::SystemTime;
27use std::time::UNIX_EPOCH;
28
29use stats::ClusterStats;
30use stats::NodeStats;
31
32#[macro_use]
33extern crate static_assertions;
34
35use ::fb_procfs as procfs;
36use anyhow::anyhow;
37use anyhow::bail;
38use anyhow::Context;
39use anyhow::Result;
40use clap::Parser;
41use crossbeam::channel::RecvTimeoutError;
42use libbpf_rs::skel::Skel;
43use libbpf_rs::MapCore as _;
44use libbpf_rs::OpenObject;
45use libbpf_rs::ProgramInput;
46use log::info;
47use scx_arena::ArenaLib;
48use scx_stats::prelude::*;
49use scx_utils::build_id;
50use scx_utils::compat;
51use scx_utils::init_libbpf_logging;
52use scx_utils::libbpf_clap_opts::LibbpfOpts;
53use scx_utils::scx_enums;
54use scx_utils::scx_ops_attach;
55use scx_utils::scx_ops_load;
56use scx_utils::scx_ops_open;
57use scx_utils::uei_exited;
58use scx_utils::uei_report;
59use scx_utils::Cpumask;
60use scx_utils::Topology;
61use scx_utils::UserExitInfo;
62use scx_utils::NR_CPU_IDS;
63
64use crate::types::dom_ctx;
65
66const SCHEDULER_NAME: &str = "scx_wd40";
67const MAX_DOMS: usize = bpf_intf::consts_MAX_DOMS as usize;
68const MAX_CPUS: usize = bpf_intf::consts_MAX_CPUS as usize;
69
70static mut MASK_LEN: usize = 0;
72
73#[derive(Debug, Parser)]
106struct Opts {
107 #[clap(short = 'u', long, default_value = "20000")]
109 slice_us_underutil: u64,
110
111 #[clap(short = 'o', long, default_value = "1000")]
113 slice_us_overutil: u64,
114
115 #[clap(short = 'i', long, default_value = "2.0")]
117 interval: f64,
118
119 #[clap(short = 'I', long, default_value = "0.1")]
122 tune_interval: f64,
123
124 #[clap(short = 'l', long, default_value = "1.0")]
126 load_half_life: f64,
127
128 #[clap(short = 'c', long, default_value = "3")]
131 cache_level: u32,
132
133 #[clap(short = 'g', long, default_value = "1")]
145 greedy_threshold: u32,
146
147 #[clap(long, default_value = "0")]
152 greedy_threshold_x_numa: u32,
153
154 #[clap(long, action = clap::ArgAction::SetTrue)]
157 no_load_balance: bool,
158
159 #[clap(short = 'k', long, action = clap::ArgAction::SetTrue)]
161 kthreads_local: bool,
162
163 #[clap(short = 'b', long, action = clap::ArgAction::SetTrue)]
167 balanced_kworkers: bool,
168
169 #[clap(short = 'f', long, action = clap::ArgAction::SetTrue)]
171 fifo_sched: bool,
172
173 #[clap(short = 'D', long, default_value = "90.0")]
176 direct_greedy_under: f64,
177
178 #[clap(short = 'K', long, default_value = "100.0")]
182 kick_greedy_under: f64,
183
184 #[clap(short = 'r', long, action = clap::ArgAction::SetTrue)]
190 direct_greedy_numa: bool,
191
192 #[clap(short = 'p', long, action = clap::ArgAction::SetTrue)]
196 partial: bool,
197
198 #[clap(long, action = clap::ArgAction::SetTrue)]
201 mempolicy_affinity: bool,
202
203 #[clap(long)]
205 stats: Option<f64>,
206
207 #[clap(long)]
210 monitor: Option<f64>,
211
212 #[clap(long, default_value = "0")]
214 exit_dump_len: u32,
215
216 #[clap(short = 'v', long, action = clap::ArgAction::Count)]
219 verbose: u8,
220
221 #[clap(long)]
223 version: bool,
224
225 #[clap(long)]
227 help_stats: bool,
228
229 #[clap(long, default_value = "0")]
233 perf: u32,
234
235 #[clap(flatten, next_help_heading = "Libbpf Options")]
236 pub libbpf: LibbpfOpts,
237}
238
239fn read_cpu_busy_and_total(reader: &procfs::ProcReader) -> Result<(u64, u64)> {
240 let cs = reader
241 .read_stat()
242 .context("Failed to read procfs")?
243 .total_cpu
244 .ok_or_else(|| anyhow!("Could not read total cpu stat in proc"))?;
245
246 Ok(match cs {
247 procfs::CpuStat {
248 user_usec: Some(user),
249 nice_usec: Some(nice),
250 system_usec: Some(system),
251 idle_usec: Some(idle),
252 iowait_usec: Some(iowait),
253 irq_usec: Some(irq),
254 softirq_usec: Some(softirq),
255 stolen_usec: Some(stolen),
256 guest_usec: _,
257 guest_nice_usec: _,
258 } => {
259 let busy = user + system + nice + irq + softirq + stolen;
260 let total = busy + idle + iowait;
261 (busy, total)
262 }
263 _ => bail!("Some procfs stats are not populated!"),
264 })
265}
266
267pub fn sub_or_zero(curr: &u64, prev: &u64) -> u64 {
268 curr.checked_sub(*prev).unwrap_or(0u64)
269}
270
271pub fn update_bpf_mask(bpfptr: *mut types::scx_bitmap, cpumask: &Cpumask) -> Result<()> {
272 let bpfmask = unsafe { &mut *bpfptr };
273
274 unsafe { cpumask.write_to_ptr(&raw mut bpfmask.bits as *mut u64, MASK_LEN)? };
275
276 Ok(())
277}
278
279#[derive(Clone, Debug)]
280struct StatsCtx {
281 cpu_busy: u64,
282 cpu_total: u64,
283 bpf_stats: Vec<u64>,
284 time_used: Duration,
285}
286
287impl StatsCtx {
288 fn read_bpf_stats(skel: &BpfSkel) -> Result<Vec<u64>> {
289 let stats_map = &skel.maps.stats;
290 let mut stats: Vec<u64> = Vec::new();
291
292 for stat in 0..bpf_intf::stat_idx_RUSTY_NR_STATS {
293 let cpu_stat_vec = stats_map
294 .lookup_percpu(&stat.to_ne_bytes(), libbpf_rs::MapFlags::ANY)
295 .with_context(|| format!("Failed to lookup stat {}", stat))?
296 .expect("per-cpu stat should exist");
297 let sum = cpu_stat_vec
298 .iter()
299 .map(|val| {
300 u64::from_ne_bytes(
301 val.as_slice()
302 .try_into()
303 .expect("Invalid value length in stat map"),
304 )
305 })
306 .sum();
307 stats.push(sum);
308 }
309 Ok(stats)
310 }
311
312 fn blank() -> Self {
313 Self {
314 cpu_busy: 0,
315 cpu_total: 0,
316 bpf_stats: vec![0u64; bpf_intf::stat_idx_RUSTY_NR_STATS as usize],
317 time_used: Duration::default(),
318 }
319 }
320
321 fn new(skel: &BpfSkel, proc_reader: &procfs::ProcReader, time_used: Duration) -> Result<Self> {
322 let (cpu_busy, cpu_total) = read_cpu_busy_and_total(proc_reader)?;
323
324 Ok(Self {
325 cpu_busy,
326 cpu_total,
327 bpf_stats: Self::read_bpf_stats(skel)?,
328 time_used,
329 })
330 }
331
332 fn delta(&self, rhs: &Self) -> Self {
333 Self {
334 cpu_busy: sub_or_zero(&self.cpu_busy, &rhs.cpu_busy),
335 cpu_total: sub_or_zero(&self.cpu_total, &rhs.cpu_total),
336 bpf_stats: self
337 .bpf_stats
338 .iter()
339 .zip(rhs.bpf_stats.iter())
340 .map(|(lhs, rhs)| sub_or_zero(&lhs, &rhs))
341 .collect(),
342 time_used: self.time_used - rhs.time_used,
343 }
344 }
345}
346
347struct Scheduler<'a> {
348 skel: BpfSkel<'a>,
349 struct_ops: Option<libbpf_rs::Link>,
350
351 sched_interval: Duration,
352 tune_interval: Duration,
353 balance_load: bool,
354 balanced_kworkers: bool,
355
356 dom_group: Arc<DomainGroup>,
357
358 proc_reader: procfs::ProcReader,
359
360 lb_at: SystemTime,
361 lb_stats: BTreeMap<usize, NodeStats>,
362 time_used: Duration,
363
364 tuner: Tuner,
365 stats_server: StatsServer<StatsCtx, (StatsCtx, ClusterStats)>,
366}
367
368impl<'a> Scheduler<'a> {
369 fn setup_wd40(skel: &mut BpfSkel<'a>) -> Result<()> {
370 let input = ProgramInput {
374 ..Default::default()
375 };
376 let output = skel.progs.wd40_setup.test_run(input)?;
377 if output.return_value != 0 {
378 bail!(
379 "Could not initialize WD40 arenas, wd40_arena_setup returned {}",
380 output.return_value as i32
381 );
382 }
383
384 Ok(())
385 }
386
387 fn setup_arenas(skel: &mut BpfSkel<'a>) -> Result<()> {
388 let task_size = std::mem::size_of::<types::task_ctx>();
389 let arenalib = ArenaLib::init(skel.object_mut(), task_size, *NR_CPU_IDS)?;
390 arenalib.setup()?;
391
392 Self::setup_wd40(skel)?;
393
394 Ok(())
395 }
396
397 fn init(opts: &Opts, open_object: &'a mut MaybeUninit<OpenObject>) -> Result<Self> {
398 let mut skel_builder = BpfSkelBuilder::default();
400 skel_builder.obj_builder.debug(opts.verbose > 0);
401 init_libbpf_logging(None);
402 info!(
403 "Running scx_wd40 (build ID: {})",
404 build_id::full_version(env!("CARGO_PKG_VERSION"))
405 );
406 let open_opts = opts.libbpf.clone().into_bpf_open_opts();
407 let mut skel = scx_ops_open!(skel_builder, open_object, wd40, open_opts).unwrap();
408
409 let domains = Arc::new(DomainGroup::new(&Topology::new()?)?);
411
412 if *NR_CPU_IDS > MAX_CPUS {
413 bail!(
414 "Num possible CPU IDs ({}) exceeds maximum of ({})",
415 *NR_CPU_IDS,
416 MAX_CPUS
417 );
418 }
419
420 if domains.nr_doms() > MAX_DOMS {
421 bail!(
422 "nr_doms ({}) is greater than MAX_DOMS ({})",
423 domains.nr_doms(),
424 MAX_DOMS
425 );
426 }
427
428 skel.maps.bss_data.as_mut().unwrap().slice_ns = scx_enums.SCX_SLICE_DFL;
429
430 let rodata = skel.maps.rodata_data.as_mut().unwrap();
431 rodata.nr_nodes = domains.nr_nodes() as u32;
432 rodata.nr_doms = domains.nr_doms() as u32;
433 rodata.nr_cpu_ids = *NR_CPU_IDS as u32;
434
435 if opts.partial {
436 skel.struct_ops.wd40_mut().flags |= *compat::SCX_OPS_SWITCH_PARTIAL;
437 }
438 skel.struct_ops.wd40_mut().exit_dump_len = opts.exit_dump_len;
439
440 rodata.load_half_life = (opts.load_half_life * 1000000000.0) as u32;
441 rodata.kthreads_local = opts.kthreads_local;
442 rodata.fifo_sched = opts.fifo_sched;
443 rodata.greedy_threshold = opts.greedy_threshold;
444 rodata.greedy_threshold_x_numa = opts.greedy_threshold_x_numa;
445 rodata.direct_greedy_numa = opts.direct_greedy_numa;
446 rodata.mempolicy_affinity = opts.mempolicy_affinity;
447 rodata.debug = opts.verbose as u32;
448 rodata.wd40_perf_mode = opts.perf;
449
450 let mut skel = scx_ops_load!(skel, wd40, uei)?;
451
452 Self::setup_arenas(&mut skel)?;
453
454 let bss_data = skel.maps.bss_data.as_mut().unwrap();
455 info!(
456 "Mask length {}, number of possible CPUs {}",
457 bss_data.mask_size,
458 skel.maps.rodata_data.as_mut().unwrap().nr_cpu_ids
459 );
460 unsafe { MASK_LEN = bss_data.mask_size as usize };
465
466 let types::topo_level(index) = types::topo_level::TOPO_LLC;
467
468 for numa in 0..domains.nr_nodes() {
469 let mut numa_mask = Cpumask::new();
470 let node_domains = domains.numa_doms(&numa);
471 for dom in node_domains.iter() {
472 let dom_mask = dom.mask();
473 numa_mask = numa_mask.or(&dom_mask);
474 }
475
476 update_bpf_mask(bss_data.node_data[numa], &numa_mask)?;
477 info!("NODE[{:02}] mask= {}", numa, numa_mask);
478
479 for dom in node_domains.iter() {
480 let ptr = bss_data.topo_nodes[index as usize][dom.id()];
482 let domc = unsafe {
483 &mut *std::ptr::with_exposed_provenance_mut::<dom_ctx>(ptr.try_into().unwrap())
484 };
485 update_bpf_mask(domc.cpumask, &dom.mask())?;
486
487 bss_data.dom_numa_id_map[dom.id()] =
488 numa.try_into().expect("NUMA ID could not fit into 32 bits");
489
490 info!(" DOM[{:02}] mask= {}", dom.id(), dom.mask());
491 }
492 }
493
494 let struct_ops = Some(scx_ops_attach!(skel, wd40)?);
496 let stats_server = StatsServer::new(stats::server_data()).launch()?;
497
498 for (id, dom) in domains.doms().iter() {
499 let mut ctx = dom.ctx.lock().unwrap();
500
501 let ptr = skel.maps.bss_data.as_mut().unwrap().topo_nodes[index as usize][*id];
502 let domc = unsafe {
503 &mut *std::ptr::with_exposed_provenance_mut::<dom_ctx>(ptr.try_into().unwrap())
504 };
505 *ctx = Some(domc);
506 }
507
508 info!("WD40 scheduler started! Run `scx_wd40 --monitor` for metrics.");
509
510 let proc_reader = procfs::ProcReader::new();
512
513 Ok(Self {
514 skel,
515 struct_ops, sched_interval: Duration::from_secs_f64(opts.interval),
518 tune_interval: Duration::from_secs_f64(opts.tune_interval),
519 balance_load: !opts.no_load_balance,
520 balanced_kworkers: opts.balanced_kworkers,
521
522 dom_group: domains.clone(),
523 proc_reader,
524
525 lb_at: SystemTime::now(),
526 lb_stats: BTreeMap::new(),
527 time_used: Duration::default(),
528
529 tuner: Tuner::new(
530 domains,
531 opts.direct_greedy_under,
532 opts.kick_greedy_under,
533 opts.slice_us_underutil * 1000,
534 opts.slice_us_overutil * 1000,
535 )?,
536 stats_server,
537 })
538 }
539
540 fn cluster_stats(&self, sc: &StatsCtx, node_stats: BTreeMap<usize, NodeStats>) -> ClusterStats {
541 let stat = |idx| sc.bpf_stats[idx as usize];
542 let total = stat(bpf_intf::stat_idx_RUSTY_STAT_WAKE_SYNC)
543 + stat(bpf_intf::stat_idx_RUSTY_STAT_SYNC_PREV_IDLE)
544 + stat(bpf_intf::stat_idx_RUSTY_STAT_PREV_IDLE)
545 + stat(bpf_intf::stat_idx_RUSTY_STAT_GREEDY_IDLE)
546 + stat(bpf_intf::stat_idx_RUSTY_STAT_PINNED)
547 + stat(bpf_intf::stat_idx_RUSTY_STAT_DIRECT_DISPATCH)
548 + stat(bpf_intf::stat_idx_RUSTY_STAT_DIRECT_GREEDY)
549 + stat(bpf_intf::stat_idx_RUSTY_STAT_DIRECT_GREEDY_FAR)
550 + stat(bpf_intf::stat_idx_RUSTY_STAT_DSQ_DISPATCH)
551 + stat(bpf_intf::stat_idx_RUSTY_STAT_GREEDY_LOCAL)
552 + stat(bpf_intf::stat_idx_RUSTY_STAT_GREEDY_XNUMA);
553 let stat_pct = |idx| stat(idx) as f64 / total as f64 * 100.0;
554
555 let cpu_busy = if sc.cpu_total != 0 {
556 (sc.cpu_busy as f64 / sc.cpu_total as f64) * 100.0
557 } else {
558 0.0
559 };
560
561 ClusterStats {
562 at_us: SystemTime::now()
563 .duration_since(UNIX_EPOCH)
564 .unwrap()
565 .as_micros()
566 .try_into()
567 .unwrap(),
568 lb_at_us: self
569 .lb_at
570 .duration_since(UNIX_EPOCH)
571 .unwrap()
572 .as_micros()
573 .try_into()
574 .unwrap(),
575 total,
576 slice_us: self.tuner.slice_ns / 1000,
577
578 cpu_busy,
579 load: node_stats.iter().map(|(_k, v)| v.load).sum::<f64>(),
580 nr_migrations: sc.bpf_stats[bpf_intf::stat_idx_RUSTY_STAT_LOAD_BALANCE as usize],
581
582 task_get_err: sc.bpf_stats[bpf_intf::stat_idx_RUSTY_STAT_TASK_GET_ERR as usize],
583 time_used: sc.time_used.as_secs_f64(),
584
585 sync_prev_idle: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_SYNC_PREV_IDLE),
586 wake_sync: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_WAKE_SYNC),
587 prev_idle: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_PREV_IDLE),
588 greedy_idle: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_GREEDY_IDLE),
589 pinned: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_PINNED),
590 direct: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_DIRECT_DISPATCH),
591 greedy: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_DIRECT_GREEDY),
592 greedy_far: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_DIRECT_GREEDY_FAR),
593 dsq_dispatch: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_DSQ_DISPATCH),
594 greedy_local: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_GREEDY_LOCAL),
595 greedy_xnuma: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_GREEDY_XNUMA),
596 kick_greedy: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_KICK_GREEDY),
597 repatriate: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_REPATRIATE),
598 dl_clamp: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_DL_CLAMP),
599 dl_preset: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_DL_PRESET),
600
601 direct_greedy_cpus: self.tuner.direct_greedy_mask.as_raw_slice().to_owned(),
602 kick_greedy_cpus: self.tuner.kick_greedy_mask.as_raw_slice().to_owned(),
603
604 nodes: node_stats,
605 }
606 }
607
608 fn lb_step(&mut self) -> Result<()> {
609 let mut lb = LoadBalancer::new(
610 &mut self.skel,
611 self.dom_group.clone(),
612 self.balanced_kworkers,
613 self.tuner.fully_utilized,
614 self.balance_load,
615 );
616
617 lb.load_balance()?;
618
619 self.lb_at = SystemTime::now();
620 self.lb_stats = lb.get_stats();
621 Ok(())
622 }
623
624 fn run(&mut self, shutdown: Arc<AtomicBool>) -> Result<UserExitInfo> {
625 let (res_ch, req_ch) = self.stats_server.channels();
626 let now = Instant::now();
627 let mut next_tune_at = now + self.tune_interval;
628 let mut next_sched_at = now + self.sched_interval;
629
630 self.skel.maps.stats.value_size() as usize;
631
632 while !shutdown.load(Ordering::Relaxed) && !uei_exited!(&self.skel, uei) {
633 let now = Instant::now();
634
635 if now >= next_tune_at {
636 self.tuner.step(&mut self.skel)?;
637 next_tune_at += self.tune_interval;
638 if next_tune_at < now {
639 next_tune_at = now + self.tune_interval;
640 }
641 }
642
643 if now >= next_sched_at {
644 self.lb_step()?;
645 next_sched_at += self.sched_interval;
646 if next_sched_at < now {
647 next_sched_at = now + self.sched_interval;
648 }
649 }
650
651 self.time_used += Instant::now().duration_since(now);
652
653 match req_ch.recv_deadline(next_sched_at.min(next_tune_at)) {
654 Ok(prev_sc) => {
655 let cur_sc = StatsCtx::new(&self.skel, &self.proc_reader, self.time_used)?;
656 let delta_sc = cur_sc.delta(&prev_sc);
657 let cstats = self.cluster_stats(&delta_sc, self.lb_stats.clone());
658 res_ch.send((cur_sc, cstats))?;
659 }
660 Err(RecvTimeoutError::Timeout) => {}
661 Err(e) => Err(e)?,
662 }
663 }
664
665 let _ = self.struct_ops.take();
666 uei_report!(&self.skel, uei)
667 }
668}
669
670impl Drop for Scheduler<'_> {
671 fn drop(&mut self) {
672 info!("Unregister {SCHEDULER_NAME} scheduler");
673
674 if let Some(struct_ops) = self.struct_ops.take() {
675 drop(struct_ops);
676 }
677 }
678}
679
680fn main() -> Result<()> {
681 let opts = Opts::parse();
682
683 if opts.version {
684 println!(
685 "scx_wd40: {}",
686 build_id::full_version(env!("CARGO_PKG_VERSION"))
687 );
688 return Ok(());
689 }
690
691 if opts.help_stats {
692 stats::server_data().describe_meta(&mut std::io::stdout(), None)?;
693 return Ok(());
694 }
695
696 let llv = match opts.verbose {
697 0 => simplelog::LevelFilter::Info,
698 1 => simplelog::LevelFilter::Debug,
699 _ => simplelog::LevelFilter::Trace,
700 };
701 let mut lcfg = simplelog::ConfigBuilder::new();
702 lcfg.set_time_offset_to_local()
703 .expect("Failed to set local time offset")
704 .set_time_level(simplelog::LevelFilter::Error)
705 .set_location_level(simplelog::LevelFilter::Off)
706 .set_target_level(simplelog::LevelFilter::Off)
707 .set_thread_level(simplelog::LevelFilter::Off);
708 simplelog::TermLogger::init(
709 llv,
710 lcfg.build(),
711 simplelog::TerminalMode::Stderr,
712 simplelog::ColorChoice::Auto,
713 )?;
714
715 let shutdown = Arc::new(AtomicBool::new(false));
716 let shutdown_clone = shutdown.clone();
717 ctrlc::set_handler(move || {
718 shutdown_clone.store(true, Ordering::Relaxed);
719 })
720 .context("Error setting Ctrl-C handler")?;
721
722 if let Some(intv) = opts.monitor.or(opts.stats) {
723 let shutdown_copy = shutdown.clone();
724 let jh = std::thread::spawn(move || {
725 stats::monitor(Duration::from_secs_f64(intv), shutdown_copy).unwrap()
726 });
727 if opts.monitor.is_some() {
728 let _ = jh.join();
729 return Ok(());
730 }
731 }
732
733 let mut open_object = MaybeUninit::uninit();
734 loop {
735 let mut sched = Scheduler::init(&opts, &mut open_object)?;
736 if !sched.run(shutdown.clone())?.should_restart() {
737 break;
738 }
739 }
740 Ok(())
741}