1mod bpf_skel;
6pub use bpf_skel::*;
7pub mod bpf_intf;
8
9mod domain;
10use domain::DomainGroup;
11
12pub mod tuner;
13use tuner::Tuner;
14
15pub mod load_balance;
16use load_balance::LoadBalancer;
17
18mod stats;
19use std::collections::BTreeMap;
20use std::mem::MaybeUninit;
21use std::sync::atomic::AtomicBool;
22use std::sync::atomic::Ordering;
23use std::sync::Arc;
24use std::time::Duration;
25use std::time::Instant;
26use std::time::SystemTime;
27use std::time::UNIX_EPOCH;
28
29use stats::ClusterStats;
30use stats::NodeStats;
31
32use std::ffi::c_ulong;
33
34#[macro_use]
35extern crate static_assertions;
36
37use ::fb_procfs as procfs;
38use anyhow::anyhow;
39use anyhow::bail;
40use anyhow::Context;
41use anyhow::Result;
42use clap::Parser;
43use crossbeam::channel::RecvTimeoutError;
44use libbpf_rs::MapCore as _;
45use libbpf_rs::OpenObject;
46use libbpf_rs::ProgramInput;
47use log::info;
48use scx_stats::prelude::*;
49use scx_utils::build_id;
50use scx_utils::compat;
51use scx_utils::init_libbpf_logging;
52use scx_utils::scx_enums;
53use scx_utils::scx_ops_attach;
54use scx_utils::scx_ops_load;
55use scx_utils::scx_ops_open;
56use scx_utils::uei_exited;
57use scx_utils::uei_report;
58use scx_utils::Core;
59use scx_utils::Cpumask;
60use scx_utils::Llc;
61use scx_utils::Topology;
62use scx_utils::UserExitInfo;
63use scx_utils::NR_CPU_IDS;
64
65const MAX_DOMS: usize = bpf_intf::consts_MAX_DOMS as usize;
66const MAX_CPUS: usize = bpf_intf::consts_MAX_CPUS as usize;
67
68static mut MASK_LEN: usize = 0;
70
71#[derive(Debug, Parser)]
104struct Opts {
105 #[clap(short = 'u', long, default_value = "20000")]
107 slice_us_underutil: u64,
108
109 #[clap(short = 'o', long, default_value = "1000")]
111 slice_us_overutil: u64,
112
113 #[clap(short = 'i', long, default_value = "2.0")]
115 interval: f64,
116
117 #[clap(short = 'I', long, default_value = "0.1")]
120 tune_interval: f64,
121
122 #[clap(short = 'l', long, default_value = "1.0")]
124 load_half_life: f64,
125
126 #[clap(short = 'c', long, default_value = "3")]
129 cache_level: u32,
130
131 #[clap(short = 'g', long, default_value = "1")]
143 greedy_threshold: u32,
144
145 #[clap(long, default_value = "0")]
150 greedy_threshold_x_numa: u32,
151
152 #[clap(long, action = clap::ArgAction::SetTrue)]
155 no_load_balance: bool,
156
157 #[clap(short = 'k', long, action = clap::ArgAction::SetTrue)]
159 kthreads_local: bool,
160
161 #[clap(short = 'b', long, action = clap::ArgAction::SetTrue)]
165 balanced_kworkers: bool,
166
167 #[clap(short = 'f', long, action = clap::ArgAction::SetTrue)]
169 fifo_sched: bool,
170
171 #[clap(short = 'D', long, default_value = "90.0")]
174 direct_greedy_under: f64,
175
176 #[clap(short = 'K', long, default_value = "100.0")]
180 kick_greedy_under: f64,
181
182 #[clap(short = 'r', long, action = clap::ArgAction::SetTrue)]
188 direct_greedy_numa: bool,
189
190 #[clap(short = 'p', long, action = clap::ArgAction::SetTrue)]
194 partial: bool,
195
196 #[clap(long, action = clap::ArgAction::SetTrue)]
199 mempolicy_affinity: bool,
200
201 #[clap(long)]
203 stats: Option<f64>,
204
205 #[clap(long)]
208 monitor: Option<f64>,
209
210 #[clap(long, default_value = "0")]
212 exit_dump_len: u32,
213
214 #[clap(short = 'v', long, action = clap::ArgAction::Count)]
217 verbose: u8,
218
219 #[clap(long)]
221 version: bool,
222
223 #[clap(long)]
225 help_stats: bool,
226
227 #[clap(long, default_value = "0")]
231 perf: u32,
232}
233
234fn read_cpu_busy_and_total(reader: &procfs::ProcReader) -> Result<(u64, u64)> {
235 let cs = reader
236 .read_stat()
237 .context("Failed to read procfs")?
238 .total_cpu
239 .ok_or_else(|| anyhow!("Could not read total cpu stat in proc"))?;
240
241 Ok(match cs {
242 procfs::CpuStat {
243 user_usec: Some(user),
244 nice_usec: Some(nice),
245 system_usec: Some(system),
246 idle_usec: Some(idle),
247 iowait_usec: Some(iowait),
248 irq_usec: Some(irq),
249 softirq_usec: Some(softirq),
250 stolen_usec: Some(stolen),
251 guest_usec: _,
252 guest_nice_usec: _,
253 } => {
254 let busy = user + system + nice + irq + softirq + stolen;
255 let total = busy + idle + iowait;
256 (busy, total)
257 }
258 _ => bail!("Some procfs stats are not populated!"),
259 })
260}
261
262pub fn sub_or_zero(curr: &u64, prev: &u64) -> u64 {
263 curr.checked_sub(*prev).unwrap_or(0u64)
264}
265
266pub fn update_bpf_mask(bpfptr: *mut types::scx_bitmap, cpumask: &Cpumask) -> Result<()> {
267 let bpfmask = unsafe { &mut *bpfptr };
268
269 unsafe { cpumask.write_to_ptr(&raw mut bpfmask.bits as *mut u64, MASK_LEN)? };
270
271 Ok(())
272}
273
274#[derive(Clone, Debug)]
275struct StatsCtx {
276 cpu_busy: u64,
277 cpu_total: u64,
278 bpf_stats: Vec<u64>,
279 time_used: Duration,
280}
281
282impl StatsCtx {
283 fn read_bpf_stats(skel: &BpfSkel) -> Result<Vec<u64>> {
284 let stats_map = &skel.maps.stats;
285 let mut stats: Vec<u64> = Vec::new();
286
287 for stat in 0..bpf_intf::stat_idx_RUSTY_NR_STATS {
288 let cpu_stat_vec = stats_map
289 .lookup_percpu(&stat.to_ne_bytes(), libbpf_rs::MapFlags::ANY)
290 .with_context(|| format!("Failed to lookup stat {}", stat))?
291 .expect("per-cpu stat should exist");
292 let sum = cpu_stat_vec
293 .iter()
294 .map(|val| {
295 u64::from_ne_bytes(
296 val.as_slice()
297 .try_into()
298 .expect("Invalid value length in stat map"),
299 )
300 })
301 .sum();
302 stats.push(sum);
303 }
304 Ok(stats)
305 }
306
307 fn blank() -> Self {
308 Self {
309 cpu_busy: 0,
310 cpu_total: 0,
311 bpf_stats: vec![0u64; bpf_intf::stat_idx_RUSTY_NR_STATS as usize],
312 time_used: Duration::default(),
313 }
314 }
315
316 fn new(skel: &BpfSkel, proc_reader: &procfs::ProcReader, time_used: Duration) -> Result<Self> {
317 let (cpu_busy, cpu_total) = read_cpu_busy_and_total(proc_reader)?;
318
319 Ok(Self {
320 cpu_busy,
321 cpu_total,
322 bpf_stats: Self::read_bpf_stats(skel)?,
323 time_used,
324 })
325 }
326
327 fn delta(&self, rhs: &Self) -> Self {
328 Self {
329 cpu_busy: sub_or_zero(&self.cpu_busy, &rhs.cpu_busy),
330 cpu_total: sub_or_zero(&self.cpu_total, &rhs.cpu_total),
331 bpf_stats: self
332 .bpf_stats
333 .iter()
334 .zip(rhs.bpf_stats.iter())
335 .map(|(lhs, rhs)| sub_or_zero(&lhs, &rhs))
336 .collect(),
337 time_used: self.time_used - rhs.time_used,
338 }
339 }
340}
341
342struct Scheduler<'a> {
343 skel: BpfSkel<'a>,
344 struct_ops: Option<libbpf_rs::Link>,
345
346 sched_interval: Duration,
347 tune_interval: Duration,
348 balance_load: bool,
349 balanced_kworkers: bool,
350
351 dom_group: Arc<DomainGroup>,
352
353 proc_reader: procfs::ProcReader,
354
355 lb_at: SystemTime,
356 lb_stats: BTreeMap<usize, NodeStats>,
357 time_used: Duration,
358
359 tuner: Tuner,
360 stats_server: StatsServer<StatsCtx, (StatsCtx, ClusterStats)>,
361}
362
363impl<'a> Scheduler<'a> {
364 fn setup_allocators(skel: &mut BpfSkel<'a>) -> Result<()> {
365 let mut args = types::arena_init_args {
369 static_pages: bpf_intf::consts_STATIC_ALLOC_PAGES_GRANULARITY as c_ulong,
370 task_ctx_size: std::mem::size_of::<types::task_ctx>() as c_ulong,
371 };
372
373 let input = ProgramInput {
374 context_in: Some(unsafe {
375 std::slice::from_raw_parts_mut(
376 &mut args as *mut _ as *mut u8,
377 std::mem::size_of_val(&args),
378 )
379 }),
380 ..Default::default()
381 };
382
383 let output = skel.progs.arena_init.test_run(input)?;
384 if output.return_value != 0 {
385 bail!(
386 "Could not initialize arenas, p2dq_setup returned {}",
387 output.return_value as i32
388 );
389 }
390
391 Ok(())
392 }
393
394 fn setup_topology_node(
395 skel: &mut BpfSkel<'a>,
396 mask: &[u64],
397 data_size: usize,
398 id: usize,
399 ) -> Result<()> {
400 let mut args = types::arena_alloc_mask_args {
401 bitmap: 0 as c_ulong,
402 };
403
404 let input = ProgramInput {
405 context_in: Some(unsafe {
406 std::slice::from_raw_parts_mut(
407 &mut args as *mut _ as *mut u8,
408 std::mem::size_of_val(&args),
409 )
410 }),
411 ..Default::default()
412 };
413
414 let output = skel.progs.arena_alloc_mask.test_run(input)?;
415 if output.return_value != 0 {
416 bail!(
417 "Could not initialize arenas, setup_topology_node returned {}",
418 output.return_value as i32
419 );
420 }
421
422 let ptr = unsafe { std::mem::transmute::<u64, &mut [u64; 10]>(args.bitmap) };
423
424 let (valid_mask, _) = ptr.split_at_mut(mask.len());
425 valid_mask.clone_from_slice(mask);
426
427 let mut args = types::arena_topology_node_init_args {
428 bitmap: args.bitmap as c_ulong,
429 data_size: data_size as c_ulong,
430 id: id as c_ulong,
431 };
432
433 let input = ProgramInput {
434 context_in: Some(unsafe {
435 std::slice::from_raw_parts_mut(
436 &mut args as *mut _ as *mut u8,
437 std::mem::size_of_val(&args),
438 )
439 }),
440 ..Default::default()
441 };
442 let output = skel.progs.arena_topology_node_init.test_run(input)?;
443 if output.return_value != 0 {
444 bail!(
445 "p2dq_topology_node_init returned {}",
446 output.return_value as i32
447 );
448 }
449
450 Ok(())
451 }
452
453 fn setup_topology(skel: &mut BpfSkel<'a>) -> Result<()> {
454 let topo = Topology::new().expect("Failed to build host topology");
455
456 Self::setup_topology_node(skel, topo.span.as_raw_slice(), 0, 0)?;
465
466 for (id, (_, node)) in topo.nodes.into_iter().enumerate() {
467 Self::setup_topology_node(skel, node.span.as_raw_slice(), 0, id)?;
468 }
469
470 for (id, (_, llc)) in topo.all_llcs.into_iter().into_iter().enumerate() {
471 Self::setup_topology_node(
472 skel,
473 Arc::<Llc>::into_inner(llc)
474 .expect("missing llc")
475 .span
476 .as_raw_slice(),
477 0,
478 id,
479 )?;
480 }
481 for (id, (_, core)) in topo.all_cores.into_iter().into_iter().enumerate() {
482 Self::setup_topology_node(
483 skel,
484 Arc::<Core>::into_inner(core)
485 .expect("missing core")
486 .span
487 .as_raw_slice(),
488 0,
489 id,
490 )?;
491 }
492 for (id, (_, cpu)) in topo.all_cpus.into_iter().into_iter().enumerate() {
493 let mut mask = [0; 9];
494 mask[cpu.id.checked_shr(64).unwrap_or(0)] |= 1 << (cpu.id % 64);
495 Self::setup_topology_node(skel, &mask, 0, id)?;
496 }
497
498 Ok(())
499 }
500
501 fn setup_wd40(skel: &mut BpfSkel<'a>) -> Result<()> {
502 let input = ProgramInput {
506 ..Default::default()
507 };
508 let output = skel.progs.wd40_setup.test_run(input)?;
509 if output.return_value != 0 {
510 bail!(
511 "Could not initialize WD40 arenas, wd40_arena_setup returned {}",
512 output.return_value as i32
513 );
514 }
515
516 Ok(())
517 }
518
519 fn setup_arenas(skel: &mut BpfSkel<'a>) -> Result<()> {
520 Self::setup_allocators(skel)?;
521 Self::setup_topology(skel)?;
522 Self::setup_wd40(skel)?;
523
524 Ok(())
525 }
526
527 fn init(opts: &Opts, open_object: &'a mut MaybeUninit<OpenObject>) -> Result<Self> {
528 let mut skel_builder = BpfSkelBuilder::default();
530 skel_builder.obj_builder.debug(opts.verbose > 0);
531 init_libbpf_logging(None);
532 info!(
533 "Running scx_wd40 (build ID: {})",
534 build_id::full_version(env!("CARGO_PKG_VERSION"))
535 );
536 let mut skel = scx_ops_open!(skel_builder, open_object, wd40).unwrap();
537
538 let domains = Arc::new(DomainGroup::new(&Topology::new()?)?);
540
541 if *NR_CPU_IDS > MAX_CPUS {
542 bail!(
543 "Num possible CPU IDs ({}) exceeds maximum of ({})",
544 *NR_CPU_IDS,
545 MAX_CPUS
546 );
547 }
548
549 if domains.nr_doms() > MAX_DOMS {
550 bail!(
551 "nr_doms ({}) is greater than MAX_DOMS ({})",
552 domains.nr_doms(),
553 MAX_DOMS
554 );
555 }
556
557 skel.maps.bss_data.as_mut().unwrap().slice_ns = scx_enums.SCX_SLICE_DFL;
558
559 let rodata = skel.maps.rodata_data.as_mut().unwrap();
560 rodata.nr_nodes = domains.nr_nodes() as u32;
561 rodata.nr_doms = domains.nr_doms() as u32;
562 rodata.nr_cpu_ids = *NR_CPU_IDS as u32;
563
564 if opts.partial {
565 skel.struct_ops.wd40_mut().flags |= *compat::SCX_OPS_SWITCH_PARTIAL;
566 }
567 skel.struct_ops.wd40_mut().exit_dump_len = opts.exit_dump_len;
568
569 rodata.load_half_life = (opts.load_half_life * 1000000000.0) as u32;
570 rodata.kthreads_local = opts.kthreads_local;
571 rodata.fifo_sched = opts.fifo_sched;
572 rodata.greedy_threshold = opts.greedy_threshold;
573 rodata.greedy_threshold_x_numa = opts.greedy_threshold_x_numa;
574 rodata.direct_greedy_numa = opts.direct_greedy_numa;
575 rodata.mempolicy_affinity = opts.mempolicy_affinity;
576 rodata.debug = opts.verbose as u32;
577 rodata.wd40_perf_mode = opts.perf;
578
579 let mut skel = scx_ops_load!(skel, wd40, uei)?;
580
581 Self::setup_arenas(&mut skel)?;
582
583 let bss_data = skel.maps.bss_data.as_mut().unwrap();
584 info!(
585 "Mask length {}, number of possible CPUs {}",
586 bss_data.mask_size,
587 skel.maps.rodata_data.as_mut().unwrap().nr_cpu_ids
588 );
589 unsafe { MASK_LEN = bss_data.mask_size as usize };
594
595 let types::topo_level(index) = types::topo_level::TOPO_LLC;
596
597 for numa in 0..domains.nr_nodes() {
598 let mut numa_mask = Cpumask::new();
599 let node_domains = domains.numa_doms(&numa);
600 for dom in node_domains.iter() {
601 let dom_mask = dom.mask();
602 numa_mask = numa_mask.or(&dom_mask);
603 }
604
605 update_bpf_mask(bss_data.node_data[numa], &numa_mask)?;
606 info!("NODE[{:02}] mask= {}", numa, numa_mask);
607
608 for dom in node_domains.iter() {
609 let ptr = bss_data.topo_nodes[index as usize][dom.id()];
611 let domc = unsafe { std::mem::transmute::<u64, &mut types::dom_ctx>(ptr) };
612 update_bpf_mask(domc.cpumask, &dom.mask())?;
613
614 bss_data.dom_numa_id_map[dom.id()] =
615 numa.try_into().expect("NUMA ID could not fit into 32 bits");
616
617 info!(" DOM[{:02}] mask= {}", dom.id(), dom.mask());
618 }
619 }
620
621 let struct_ops = Some(scx_ops_attach!(skel, wd40)?);
623 let stats_server = StatsServer::new(stats::server_data()).launch()?;
624
625 for (id, dom) in domains.doms().iter() {
626 let mut ctx = dom.ctx.lock().unwrap();
627
628 let ptr = skel.maps.bss_data.as_mut().unwrap().topo_nodes[index as usize][*id];
629 let domc = unsafe { std::mem::transmute::<u64, &mut types::dom_ctx>(ptr) };
630 *ctx = Some(domc);
631 }
632
633 info!("WD40 scheduler started! Run `scx_wd40 --monitor` for metrics.");
634
635 let proc_reader = procfs::ProcReader::new();
637
638 Ok(Self {
639 skel,
640 struct_ops, sched_interval: Duration::from_secs_f64(opts.interval),
643 tune_interval: Duration::from_secs_f64(opts.tune_interval),
644 balance_load: !opts.no_load_balance,
645 balanced_kworkers: opts.balanced_kworkers,
646
647 dom_group: domains.clone(),
648 proc_reader,
649
650 lb_at: SystemTime::now(),
651 lb_stats: BTreeMap::new(),
652 time_used: Duration::default(),
653
654 tuner: Tuner::new(
655 domains,
656 opts.direct_greedy_under,
657 opts.kick_greedy_under,
658 opts.slice_us_underutil * 1000,
659 opts.slice_us_overutil * 1000,
660 )?,
661 stats_server,
662 })
663 }
664
665 fn cluster_stats(&self, sc: &StatsCtx, node_stats: BTreeMap<usize, NodeStats>) -> ClusterStats {
666 let stat = |idx| sc.bpf_stats[idx as usize];
667 let total = stat(bpf_intf::stat_idx_RUSTY_STAT_WAKE_SYNC)
668 + stat(bpf_intf::stat_idx_RUSTY_STAT_SYNC_PREV_IDLE)
669 + stat(bpf_intf::stat_idx_RUSTY_STAT_PREV_IDLE)
670 + stat(bpf_intf::stat_idx_RUSTY_STAT_GREEDY_IDLE)
671 + stat(bpf_intf::stat_idx_RUSTY_STAT_PINNED)
672 + stat(bpf_intf::stat_idx_RUSTY_STAT_DIRECT_DISPATCH)
673 + stat(bpf_intf::stat_idx_RUSTY_STAT_DIRECT_GREEDY)
674 + stat(bpf_intf::stat_idx_RUSTY_STAT_DIRECT_GREEDY_FAR)
675 + stat(bpf_intf::stat_idx_RUSTY_STAT_DSQ_DISPATCH)
676 + stat(bpf_intf::stat_idx_RUSTY_STAT_GREEDY_LOCAL)
677 + stat(bpf_intf::stat_idx_RUSTY_STAT_GREEDY_XNUMA);
678 let stat_pct = |idx| stat(idx) as f64 / total as f64 * 100.0;
679
680 let cpu_busy = if sc.cpu_total != 0 {
681 (sc.cpu_busy as f64 / sc.cpu_total as f64) * 100.0
682 } else {
683 0.0
684 };
685
686 ClusterStats {
687 at_us: SystemTime::now()
688 .duration_since(UNIX_EPOCH)
689 .unwrap()
690 .as_micros()
691 .try_into()
692 .unwrap(),
693 lb_at_us: self
694 .lb_at
695 .duration_since(UNIX_EPOCH)
696 .unwrap()
697 .as_micros()
698 .try_into()
699 .unwrap(),
700 total,
701 slice_us: self.tuner.slice_ns / 1000,
702
703 cpu_busy,
704 load: node_stats.iter().map(|(_k, v)| v.load).sum::<f64>(),
705 nr_migrations: sc.bpf_stats[bpf_intf::stat_idx_RUSTY_STAT_LOAD_BALANCE as usize],
706
707 task_get_err: sc.bpf_stats[bpf_intf::stat_idx_RUSTY_STAT_TASK_GET_ERR as usize],
708 time_used: sc.time_used.as_secs_f64(),
709
710 sync_prev_idle: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_SYNC_PREV_IDLE),
711 wake_sync: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_WAKE_SYNC),
712 prev_idle: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_PREV_IDLE),
713 greedy_idle: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_GREEDY_IDLE),
714 pinned: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_PINNED),
715 direct: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_DIRECT_DISPATCH),
716 greedy: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_DIRECT_GREEDY),
717 greedy_far: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_DIRECT_GREEDY_FAR),
718 dsq_dispatch: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_DSQ_DISPATCH),
719 greedy_local: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_GREEDY_LOCAL),
720 greedy_xnuma: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_GREEDY_XNUMA),
721 kick_greedy: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_KICK_GREEDY),
722 repatriate: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_REPATRIATE),
723 dl_clamp: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_DL_CLAMP),
724 dl_preset: stat_pct(bpf_intf::stat_idx_RUSTY_STAT_DL_PRESET),
725
726 direct_greedy_cpus: self.tuner.direct_greedy_mask.as_raw_slice().to_owned(),
727 kick_greedy_cpus: self.tuner.kick_greedy_mask.as_raw_slice().to_owned(),
728
729 nodes: node_stats,
730 }
731 }
732
733 fn lb_step(&mut self) -> Result<()> {
734 let mut lb = LoadBalancer::new(
735 &mut self.skel,
736 self.dom_group.clone(),
737 self.balanced_kworkers,
738 self.tuner.fully_utilized,
739 self.balance_load,
740 );
741
742 lb.load_balance()?;
743
744 self.lb_at = SystemTime::now();
745 self.lb_stats = lb.get_stats();
746 Ok(())
747 }
748
749 fn run(&mut self, shutdown: Arc<AtomicBool>) -> Result<UserExitInfo> {
750 let (res_ch, req_ch) = self.stats_server.channels();
751 let now = Instant::now();
752 let mut next_tune_at = now + self.tune_interval;
753 let mut next_sched_at = now + self.sched_interval;
754
755 self.skel.maps.stats.value_size() as usize;
756
757 while !shutdown.load(Ordering::Relaxed) && !uei_exited!(&self.skel, uei) {
758 let now = Instant::now();
759
760 if now >= next_tune_at {
761 self.tuner.step(&mut self.skel)?;
762 next_tune_at += self.tune_interval;
763 if next_tune_at < now {
764 next_tune_at = now + self.tune_interval;
765 }
766 }
767
768 if now >= next_sched_at {
769 self.lb_step()?;
770 next_sched_at += self.sched_interval;
771 if next_sched_at < now {
772 next_sched_at = now + self.sched_interval;
773 }
774 }
775
776 self.time_used += Instant::now().duration_since(now);
777
778 match req_ch.recv_deadline(next_sched_at.min(next_tune_at)) {
779 Ok(prev_sc) => {
780 let cur_sc = StatsCtx::new(&self.skel, &self.proc_reader, self.time_used)?;
781 let delta_sc = cur_sc.delta(&prev_sc);
782 let cstats = self.cluster_stats(&delta_sc, self.lb_stats.clone());
783 res_ch.send((cur_sc, cstats))?;
784 }
785 Err(RecvTimeoutError::Timeout) => {}
786 Err(e) => Err(e)?,
787 }
788 }
789
790 let _ = self.struct_ops.take();
791 uei_report!(&self.skel, uei)
792 }
793}
794
795impl Drop for Scheduler<'_> {
796 fn drop(&mut self) {
797 if let Some(struct_ops) = self.struct_ops.take() {
798 drop(struct_ops);
799 }
800 }
801}
802
803fn main() -> Result<()> {
804 let opts = Opts::parse();
805
806 if opts.version {
807 println!(
808 "scx_wd40: {}",
809 build_id::full_version(env!("CARGO_PKG_VERSION"))
810 );
811 return Ok(());
812 }
813
814 if opts.help_stats {
815 stats::server_data().describe_meta(&mut std::io::stdout(), None)?;
816 return Ok(());
817 }
818
819 let llv = match opts.verbose {
820 0 => simplelog::LevelFilter::Info,
821 1 => simplelog::LevelFilter::Debug,
822 _ => simplelog::LevelFilter::Trace,
823 };
824 let mut lcfg = simplelog::ConfigBuilder::new();
825 lcfg.set_time_offset_to_local()
826 .expect("Failed to set local time offset")
827 .set_time_level(simplelog::LevelFilter::Error)
828 .set_location_level(simplelog::LevelFilter::Off)
829 .set_target_level(simplelog::LevelFilter::Off)
830 .set_thread_level(simplelog::LevelFilter::Off);
831 simplelog::TermLogger::init(
832 llv,
833 lcfg.build(),
834 simplelog::TerminalMode::Stderr,
835 simplelog::ColorChoice::Auto,
836 )?;
837
838 let shutdown = Arc::new(AtomicBool::new(false));
839 let shutdown_clone = shutdown.clone();
840 ctrlc::set_handler(move || {
841 shutdown_clone.store(true, Ordering::Relaxed);
842 })
843 .context("Error setting Ctrl-C handler")?;
844
845 if let Some(intv) = opts.monitor.or(opts.stats) {
846 let shutdown_copy = shutdown.clone();
847 let jh = std::thread::spawn(move || {
848 stats::monitor(Duration::from_secs_f64(intv), shutdown_copy).unwrap()
849 });
850 if opts.monitor.is_some() {
851 let _ = jh.join();
852 return Ok(());
853 }
854 }
855
856 let mut open_object = MaybeUninit::uninit();
857 loop {
858 let mut sched = Scheduler::init(&opts, &mut open_object)?;
859 if !sched.run(shutdown.clone())?.should_restart() {
860 break;
861 }
862 }
863 Ok(())
864}