1mod bpf_skel;
6pub use bpf_skel::*;
7pub mod bpf_intf;
8
9mod domain;
10use domain::DomainGroup;
11
12pub mod tuner;
13use tuner::Tuner;
14
15pub mod load_balance;
16use load_balance::LoadBalancer;
17
18mod stats;
19use std::collections::BTreeMap;
20use std::mem::MaybeUninit;
21use std::sync::atomic::AtomicBool;
22use std::sync::atomic::Ordering;
23use std::sync::Arc;
24use std::time::Duration;
25use std::time::Instant;
26use std::time::SystemTime;
27use std::time::UNIX_EPOCH;
28
29use stats::ClusterStats;
30use stats::NodeStats;
31
32#[macro_use]
33extern crate static_assertions;
34
35use ::fb_procfs as procfs;
36use anyhow::anyhow;
37use anyhow::bail;
38use anyhow::Context;
39use anyhow::Result;
40use clap::Parser;
41use crossbeam::channel::RecvTimeoutError;
42use libbpf_rs::MapCore as _;
43use libbpf_rs::OpenObject;
44use log::info;
45use scx_stats::prelude::*;
46use scx_utils::build_id;
47use scx_utils::compat;
48use scx_utils::init_libbpf_logging;
49use scx_utils::libbpf_clap_opts::LibbpfOpts;
50use scx_utils::scx_enums;
51use scx_utils::scx_ops_attach;
52use scx_utils::scx_ops_load;
53use scx_utils::scx_ops_open;
54use scx_utils::uei_exited;
55use scx_utils::uei_report;
56use scx_utils::Cpumask;
57use scx_utils::Topology;
58use scx_utils::UserExitInfo;
59use scx_utils::NR_CPU_IDS;
60
61const SCHEDULER_NAME: &str = "scx_rusty";
62const MAX_DOMS: usize = bpf_intf::consts_MAX_DOMS as usize;
63const MAX_CPUS: usize = bpf_intf::consts_MAX_CPUS as usize;
64
65#[derive(Debug, Parser)]
96struct Opts {
97 #[clap(short = 'u', long, default_value = "20000")]
99 slice_us_underutil: u64,
100
101 #[clap(short = 'o', long, default_value = "1000")]
103 slice_us_overutil: u64,
104
105 #[clap(short = 'i', long, default_value = "2.0")]
107 interval: f64,
108
109 #[clap(short = 'I', long, default_value = "0.1")]
112 tune_interval: f64,
113
114 #[clap(short = 'l', long, default_value = "1.0")]
116 load_half_life: f64,
117
118 #[clap(short = 'c', long, default_value = "3")]
121 cache_level: u32,
122
123 #[clap(short = 'C', long, num_args = 1.., conflicts_with = "cache_level")]
129 cpumasks: Vec<String>,
130
131 #[clap(short = 'g', long, default_value = "1")]
143 greedy_threshold: u32,
144
145 #[clap(long, default_value = "0")]
150 greedy_threshold_x_numa: u32,
151
152 #[clap(long, action = clap::ArgAction::SetTrue)]
155 no_load_balance: bool,
156
157 #[clap(short = 'k', long, action = clap::ArgAction::SetTrue)]
159 kthreads_local: bool,
160
161 #[clap(short = 'b', long, action = clap::ArgAction::SetTrue)]
165 balanced_kworkers: bool,
166
167 #[clap(short = 'f', long, action = clap::ArgAction::SetTrue)]
169 fifo_sched: bool,
170
171 #[clap(short = 'D', long, default_value = "90.0")]
174 direct_greedy_under: f64,
175
176 #[clap(short = 'K', long, default_value = "100.0")]
180 kick_greedy_under: f64,
181
182 #[clap(short = 'r', long, action = clap::ArgAction::SetTrue)]
188 direct_greedy_numa: bool,
189
190 #[clap(short = 'p', long, action = clap::ArgAction::SetTrue)]
194 partial: bool,
195
196 #[clap(long, action = clap::ArgAction::SetTrue)]
199 mempolicy_affinity: bool,
200
201 #[clap(long)]
203 stats: Option<f64>,
204
205 #[clap(long)]
208 monitor: Option<f64>,
209
210 #[clap(long, default_value = "0")]
212 exit_dump_len: u32,
213
214 #[clap(short = 'v', long, action = clap::ArgAction::Count)]
217 verbose: u8,
218
219 #[clap(long)]
221 version: bool,
222
223 #[clap(long)]
225 help_stats: bool,
226
227 #[clap(long, default_value = "0")]
231 perf: u32,
232
233 #[clap(flatten, next_help_heading = "Libbpf Options")]
234 pub libbpf: LibbpfOpts,
235}
236
237fn read_cpu_busy_and_total(reader: &procfs::ProcReader) -> Result<(u64, u64)> {
238 let cs = reader
239 .read_stat()
240 .context("Failed to read procfs")?
241 .total_cpu
242 .ok_or_else(|| anyhow!("Could not read total cpu stat in proc"))?;
243
244 Ok(match cs {
245 procfs::CpuStat {
246 user_usec: Some(user),
247 nice_usec: Some(nice),
248 system_usec: Some(system),
249 idle_usec: Some(idle),
250 iowait_usec: Some(iowait),
251 irq_usec: Some(irq),
252 softirq_usec: Some(softirq),
253 stolen_usec: Some(stolen),
254 guest_usec: _,
255 guest_nice_usec: _,
256 } => {
257 let busy = user + system + nice + irq + softirq + stolen;
258 let total = busy + idle + iowait;
259 (busy, total)
260 }
261 _ => bail!("Some procfs stats are not populated!"),
262 })
263}
264
265pub fn sub_or_zero(curr: &u64, prev: &u64) -> u64 {
266 curr.checked_sub(*prev).unwrap_or(0u64)
267}
268
269#[derive(Clone, Debug)]
270struct StatsCtx {
271 cpu_busy: u64,
272 cpu_total: u64,
273 bpf_stats: Vec<u64>,
274 time_used: Duration,
275}
276
277impl StatsCtx {
278 fn read_bpf_stats(skel: &BpfSkel) -> Result<Vec<u64>> {
279 let stats_map = &skel.maps.stats;
280 let mut stats: Vec<u64> = Vec::new();
281
282 for stat in 0..bpf_intf::stat_idx_RUSTY_NR_STATS {
283 let cpu_stat_vec = stats_map
284 .lookup_percpu(&stat.to_ne_bytes(), libbpf_rs::MapFlags::ANY)
285 .with_context(|| format!("Failed to lookup stat {}", stat))?
286 .expect("per-cpu stat should exist");
287 let sum = cpu_stat_vec
288 .iter()
289 .map(|val| {
290 u64::from_ne_bytes(
291 val.as_slice()
292 .try_into()
293 .expect("Invalid value length in stat map"),
294 )
295 })
296 .sum();
297 stats.push(sum);
298 }
299 Ok(stats)
300 }
301
302 fn blank() -> Self {
303 Self {
304 cpu_busy: 0,
305 cpu_total: 0,
306 bpf_stats: vec![0u64; bpf_intf::stat_idx_RUSTY_NR_STATS as usize],
307 time_used: Duration::default(),
308 }
309 }
310
311 fn new(skel: &BpfSkel, proc_reader: &procfs::ProcReader, time_used: Duration) -> Result<Self> {
312 let (cpu_busy, cpu_total) = read_cpu_busy_and_total(proc_reader)?;
313
314 Ok(Self {
315 cpu_busy,
316 cpu_total,
317 bpf_stats: Self::read_bpf_stats(skel)?,
318 time_used,
319 })
320 }
321
322 fn delta(&self, rhs: &Self) -> Self {
323 Self {
324 cpu_busy: sub_or_zero(&self.cpu_busy, &rhs.cpu_busy),
325 cpu_total: sub_or_zero(&self.cpu_total, &rhs.cpu_total),
326 bpf_stats: self
327 .bpf_stats
328 .iter()
329 .zip(rhs.bpf_stats.iter())
330 .map(|(lhs, rhs)| sub_or_zero(&lhs, &rhs))
331 .collect(),
332 time_used: self.time_used - rhs.time_used,
333 }
334 }
335}
336
337struct Scheduler<'a> {
338 skel: BpfSkel<'a>,
339 struct_ops: Option<libbpf_rs::Link>,
340
341 sched_interval: Duration,
342 tune_interval: Duration,
343 balance_load: bool,
344 balanced_kworkers: bool,
345
346 dom_group: Arc<DomainGroup>,
347
348 proc_reader: procfs::ProcReader,
349
350 lb_at: SystemTime,
351 lb_stats: BTreeMap<usize, NodeStats>,
352 time_used: Duration,
353
354 tuner: Tuner,
355 stats_server: StatsServer<StatsCtx, (StatsCtx, ClusterStats)>,
356}
357
358impl<'a> Scheduler<'a> {
359 fn init(opts: &Opts, open_object: &'a mut MaybeUninit<OpenObject>) -> Result<Self> {
360 let mut skel_builder = BpfSkelBuilder::default();
362 skel_builder.obj_builder.debug(opts.verbose > 0);
363 init_libbpf_logging(None);
364 info!(
365 "Running scx_rusty (build ID: {})",
366 build_id::full_version(env!("CARGO_PKG_VERSION"))
367 );
368 let open_opts = opts.libbpf.clone().into_bpf_open_opts();
369 let mut skel = scx_ops_open!(skel_builder, open_object, rusty, open_opts).unwrap();
370
371 let domains = Arc::new(DomainGroup::new(&Topology::new()?, &opts.cpumasks)?);
373
374 if *NR_CPU_IDS > MAX_CPUS {
375 bail!(
376 "Num possible CPU IDs ({}) exceeds maximum of ({})",
377 *NR_CPU_IDS,
378 MAX_CPUS
379 );
380 }
381
382 if domains.nr_doms() > MAX_DOMS {
383 bail!(
384 "nr_doms ({}) is greater than MAX_DOMS ({})",
385 domains.nr_doms(),
386 MAX_DOMS
387 );
388 }
389
390 skel.maps.bss_data.as_mut().unwrap().slice_ns = scx_enums.SCX_SLICE_DFL;
391
392 let rodata = skel.maps.rodata_data.as_mut().unwrap();
393 rodata.nr_nodes = domains.nr_nodes() as u32;
394 rodata.nr_doms = domains.nr_doms() as u32;
395 rodata.nr_cpu_ids = *NR_CPU_IDS as u32;
396
397 for cpu in 0..*NR_CPU_IDS {
402 rodata.cpu_dom_id_map[cpu] = u32::MAX;
403 }
404
405 for (id, dom) in domains.doms().iter() {
406 for cpu in dom.mask().iter() {
407 rodata.cpu_dom_id_map[cpu] = *id as u32;
408 }
409 }
410
411 for numa in 0..domains.nr_nodes() {
412 let mut numa_mask = Cpumask::new();
413 let node_domains = domains.numa_doms(&numa);
414 for dom in node_domains.iter() {
415 let dom_mask = dom.mask();
416 numa_mask = numa_mask.or(&dom_mask);
417 }
418
419 let raw_numa_slice = numa_mask.as_raw_slice();
420 let node_cpumask_slice = &mut rodata.numa_cpumasks[numa];
421 let (left, _) = node_cpumask_slice.split_at_mut(raw_numa_slice.len());
422 left.clone_from_slice(raw_numa_slice);
423 info!("NODE[{:02}] mask= {}", numa, numa_mask);
424
425 for dom in node_domains.iter() {
426 let raw_dom_slice = dom.mask_slice();
427 let dom_cpumask_slice = &mut rodata.dom_cpumasks[dom.id()];
428 let (left, _) = dom_cpumask_slice.split_at_mut(raw_dom_slice.len());
429 left.clone_from_slice(raw_dom_slice);
430 rodata.dom_numa_id_map[dom.id()] =
431 numa.try_into().expect("NUMA ID could not fit into 32 bits");
432
433 info!(" DOM[{:02}] mask= {}", dom.id(), dom.mask());
434 }
435 }
436
437 if opts.partial {
438 skel.struct_ops.rusty_mut().flags |= *compat::SCX_OPS_SWITCH_PARTIAL;
439 }
440 skel.struct_ops.rusty_mut().exit_dump_len = opts.exit_dump_len;
441
442 rodata.load_half_life = (opts.load_half_life * 1000000000.0) as u32;
443 rodata.kthreads_local = opts.kthreads_local;
444 rodata.fifo_sched = opts.fifo_sched;
445 rodata.greedy_threshold = opts.greedy_threshold;
446 rodata.greedy_threshold_x_numa = opts.greedy_threshold_x_numa;
447 rodata.direct_greedy_numa = opts.direct_greedy_numa;
448 rodata.mempolicy_affinity = opts.mempolicy_affinity;
449 rodata.debug = opts.verbose as u32;
450 rodata.rusty_perf_mode = opts.perf;
451
452 let mut skel = scx_ops_load!(skel, rusty, uei)?;
454 let struct_ops = Some(scx_ops_attach!(skel, rusty)?);
455 let stats_server = StatsServer::new(stats::server_data()).launch()?;
456
457 for (id, dom) in domains.doms().iter() {
458 let mut ctx = dom.ctx.lock().unwrap();
459
460 *ctx = Some(skel.maps.bss_data.as_ref().unwrap().dom_ctxs[*id]);
461 }
462
463 info!("Rusty scheduler started! Run `scx_rusty --monitor` for metrics.");
464
465 let proc_reader = procfs::ProcReader::new();
467
468 Ok(Self {
469 skel,
470 struct_ops, sched_interval: Duration::from_secs_f64(opts.interval),
473 tune_interval: Duration::from_secs_f64(opts.tune_interval),
474 balance_load: !opts.no_load_balance,
475 balanced_kworkers: opts.balanced_kworkers,
476
477 dom_group: domains.clone(),
478 proc_reader,
479
480 lb_at: SystemTime::now(),
481 lb_stats: BTreeMap::new(),
482 time_used: Duration::default(),
483
484 tuner: Tuner::new(
485 domains,
486 opts.direct_greedy_under,
487 opts.kick_greedy_under,
488 opts.slice_us_underutil * 1000,
489 opts.slice_us_overutil * 1000,
490 )?,
491 stats_server,
492 })
493 }
494
495 fn cluster_stats(&self, sc: &StatsCtx, node_stats: BTreeMap<usize, NodeStats>) -> ClusterStats {
496 let stat = |idx| sc.bpf_stats[idx as usize];
497 let total = stat(bpf_intf::stat_idx_RUSTY_STAT_WAKE_SYNC)
498 + stat(bpf_intf::stat_idx_RUSTY_STAT_SYNC_PREV_IDLE)
499 + stat(bpf_intf::stat_idx_RUSTY_STAT_PREV_IDLE)
500 + stat(bpf_intf::stat_idx_RUSTY_STAT_GREEDY_IDLE)
501 + stat(bpf_intf::stat_idx_RUSTY_STAT_PINNED)
502 + stat(bpf_intf::stat_idx_RUSTY_STAT_DIRECT_DISPATCH)
503 + stat(bpf_intf::stat_idx_RUSTY_STAT_DIRECT_GREEDY)
504 + stat(bpf_intf::stat_idx_RUSTY_STAT_DIRECT_GREEDY_FAR)
505 + stat(bpf_intf::stat_idx_RUSTY_STAT_DSQ_DISPATCH)
506 + stat(bpf_intf::stat_idx_RUSTY_STAT_GREEDY_LOCAL)
507 + stat(bpf_intf::stat_idx_RUSTY_STAT_GREEDY_XNUMA);
508 let stat_pct = |idx| stat(idx) as f64 / total as f64 * 100.0;
509
510 let cpu_busy = if sc.cpu_total != 0 {
511 (sc.cpu_busy as f64 / sc.cpu_total as f64) * 100.0
512 } else {
513 0.0
514 };
515
516 ClusterStats {
517 at_us: SystemTime::now()
518 .duration_since(UNIX_EPOCH)
519 .unwrap()
520 .as_micros()
521 .try_into()
522 .unwrap(),
523 lb_at_us: self
524 .lb_at
525 .duration_since(UNIX_EPOCH)
526 .unwrap()
527 .as_micros()
528 .try_into()
529 .unwrap(),
530 total,
531 slice_us: self.tuner.slice_ns / 1000,
532
533 cpu_busy,
534 load: node_stats.iter().map(|(_k, v)| v.load).sum::<f64>(),
535 nr_migrations: sc.bpf_stats[bpf_intf::stat_idx_RUSTY_STAT_LOAD_BALANCE as usize],
536
537 task_get_err: sc.bpf_stats[bpf_intf::stat_idx_RUSTY_STAT_TASK_GET_ERR as usize],
538 time_used: sc.time_used.as_secs_f64(),
539
540 sync_prev_idle: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_SYNC_PREV_IDLE),
541 wake_sync: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_WAKE_SYNC),
542 prev_idle: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_PREV_IDLE),
543 greedy_idle: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_GREEDY_IDLE),
544 pinned: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_PINNED),
545 direct: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_DIRECT_DISPATCH),
546 greedy: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_DIRECT_GREEDY),
547 greedy_far: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_DIRECT_GREEDY_FAR),
548 dsq_dispatch: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_DSQ_DISPATCH),
549 greedy_local: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_GREEDY_LOCAL),
550 greedy_xnuma: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_GREEDY_XNUMA),
551 kick_greedy: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_KICK_GREEDY),
552 repatriate: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_REPATRIATE),
553 dl_clamp: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_DL_CLAMP),
554 dl_preset: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_DL_PRESET),
555
556 direct_greedy_cpus: self.tuner.direct_greedy_mask.as_raw_slice().to_owned(),
557 kick_greedy_cpus: self.tuner.kick_greedy_mask.as_raw_slice().to_owned(),
558
559 nodes: node_stats,
560 }
561 }
562
563 fn lb_step(&mut self) -> Result<()> {
564 let mut lb = LoadBalancer::new(
565 &mut self.skel,
566 self.dom_group.clone(),
567 self.balanced_kworkers,
568 self.tuner.fully_utilized,
569 self.balance_load,
570 );
571
572 lb.load_balance()?;
573
574 self.lb_at = SystemTime::now();
575 self.lb_stats = lb.get_stats();
576 Ok(())
577 }
578
579 fn run(&mut self, shutdown: Arc<AtomicBool>) -> Result<UserExitInfo> {
580 let (res_ch, req_ch) = self.stats_server.channels();
581 let now = Instant::now();
582 let mut next_tune_at = now + self.tune_interval;
583 let mut next_sched_at = now + self.sched_interval;
584
585 self.skel.maps.stats.value_size() as usize;
586
587 while !shutdown.load(Ordering::Relaxed) && !uei_exited!(&self.skel, uei) {
588 let now = Instant::now();
589
590 if now >= next_tune_at {
591 self.tuner.step(&mut self.skel)?;
592 next_tune_at += self.tune_interval;
593 if next_tune_at < now {
594 next_tune_at = now + self.tune_interval;
595 }
596 }
597
598 if now >= next_sched_at {
599 self.lb_step()?;
600 next_sched_at += self.sched_interval;
601 if next_sched_at < now {
602 next_sched_at = now + self.sched_interval;
603 }
604 }
605
606 self.time_used += Instant::now().duration_since(now);
607
608 match req_ch.recv_deadline(next_sched_at.min(next_tune_at)) {
609 Ok(prev_sc) => {
610 let cur_sc = StatsCtx::new(&self.skel, &self.proc_reader, self.time_used)?;
611 let delta_sc = cur_sc.delta(&prev_sc);
612 let cstats = self.cluster_stats(&delta_sc, self.lb_stats.clone());
613 res_ch.send((cur_sc, cstats))?;
614 }
615 Err(RecvTimeoutError::Timeout) => {}
616 Err(e) => Err(e)?,
617 }
618 }
619
620 let _ = self.struct_ops.take();
621 uei_report!(&self.skel, uei)
622 }
623}
624
625impl Drop for Scheduler<'_> {
626 fn drop(&mut self) {
627 info!("Unregister {SCHEDULER_NAME} scheduler");
628
629 if let Some(struct_ops) = self.struct_ops.take() {
630 drop(struct_ops);
631 }
632 }
633}
634
635fn main() -> Result<()> {
636 let opts = Opts::parse();
637
638 if opts.version {
639 println!(
640 "scx_rusty: {}",
641 build_id::full_version(env!("CARGO_PKG_VERSION"))
642 );
643 return Ok(());
644 }
645
646 if opts.help_stats {
647 stats::server_data().describe_meta(&mut std::io::stdout(), None)?;
648 return Ok(());
649 }
650
651 let llv = match opts.verbose {
652 0 => simplelog::LevelFilter::Info,
653 1 => simplelog::LevelFilter::Debug,
654 _ => simplelog::LevelFilter::Trace,
655 };
656 let mut lcfg = simplelog::ConfigBuilder::new();
657 lcfg.set_time_offset_to_local()
658 .expect("Failed to set local time offset")
659 .set_time_level(simplelog::LevelFilter::Error)
660 .set_location_level(simplelog::LevelFilter::Off)
661 .set_target_level(simplelog::LevelFilter::Off)
662 .set_thread_level(simplelog::LevelFilter::Off);
663 simplelog::TermLogger::init(
664 llv,
665 lcfg.build(),
666 simplelog::TerminalMode::Stderr,
667 simplelog::ColorChoice::Auto,
668 )?;
669
670 let shutdown = Arc::new(AtomicBool::new(false));
671 let shutdown_clone = shutdown.clone();
672 ctrlc::set_handler(move || {
673 shutdown_clone.store(true, Ordering::Relaxed);
674 })
675 .context("Error setting Ctrl-C handler")?;
676
677 if let Some(intv) = opts.monitor.or(opts.stats) {
678 let shutdown_copy = shutdown.clone();
679 let jh = std::thread::spawn(move || {
680 stats::monitor(Duration::from_secs_f64(intv), shutdown_copy).unwrap()
681 });
682 if opts.monitor.is_some() {
683 let _ = jh.join();
684 return Ok(());
685 }
686 }
687
688 let mut open_object = MaybeUninit::uninit();
689 loop {
690 let mut sched = Scheduler::init(&opts, &mut open_object)?;
691 if !sched.run(shutdown.clone())?.should_restart() {
692 break;
693 }
694 }
695 Ok(())
696}