1mod bpf_skel;
9pub use bpf_skel::*;
10pub mod bpf_intf;
11pub use bpf_intf::*;
12
13mod stats;
14use std::collections::HashSet;
15use std::ffi::{c_int, c_ulong};
16use std::fs::File;
17use std::io::{BufRead, BufReader};
18use std::mem::MaybeUninit;
19use std::sync::atomic::AtomicBool;
20use std::sync::atomic::Ordering;
21use std::sync::Arc;
22use std::time::{Duration, Instant};
23
24use anyhow::bail;
25use anyhow::Context;
26use anyhow::Result;
27use clap::Parser;
28use crossbeam::channel::RecvTimeoutError;
29use libbpf_rs::MapCore;
30use libbpf_rs::MapFlags;
31use libbpf_rs::OpenObject;
32use libbpf_rs::ProgramInput;
33use log::{debug, info, warn};
34use scx_stats::prelude::*;
35use scx_utils::build_id;
36use scx_utils::compat;
37use scx_utils::libbpf_clap_opts::LibbpfOpts;
38use scx_utils::scx_ops_attach;
39use scx_utils::scx_ops_load;
40use scx_utils::scx_ops_open;
41use scx_utils::try_set_rlimit_infinity;
42use scx_utils::uei_exited;
43use scx_utils::uei_report;
44use scx_utils::CoreType;
45use scx_utils::Topology;
46use scx_utils::UserExitInfo;
47use scx_utils::NR_CPU_IDS;
48use stats::Metrics;
49
50const SCHEDULER_NAME: &str = "scx_cosmos";
51
52fn parse_hex(s: &str) -> Result<u64, String> {
54 if let Some(hex_str) = s.strip_prefix("0x").or_else(|| s.strip_prefix("0X")) {
55 u64::from_str_radix(hex_str, 16).map_err(|e| format!("Invalid hexadecimal value: {}", e))
56 } else {
57 Err("Hexadecimal value must start with '0x' prefix (e.g., 0x2)".to_string())
58 }
59}
60
61const PERF_MAP_STRIDE: u32 = 4096;
63
64fn setup_perf_events(
67 skel: &mut BpfSkel,
68 cpu: i32,
69 perf_config: u64,
70 counter_idx: u32,
71) -> Result<()> {
72 use perf_event_open_sys as sys;
73
74 let map = &skel.maps.scx_pmu_map;
75
76 let mut attrs = sys::bindings::perf_event_attr::default();
77 attrs.type_ = sys::bindings::PERF_TYPE_RAW;
78 attrs.config = perf_config;
79 attrs.size = std::mem::size_of::<sys::bindings::perf_event_attr>() as u32;
80 attrs.set_disabled(0);
81 attrs.set_inherit(0);
82
83 let fd = unsafe { sys::perf_event_open(&mut attrs, -1, cpu, -1, 0) };
84
85 if fd < 0 {
86 let err = std::io::Error::last_os_error();
87 return Err(anyhow::anyhow!(
88 "Failed to open perf event 0x{:x} on CPU {}: {}",
89 perf_config,
90 cpu,
91 err
92 ));
93 }
94
95 let key = cpu as u32 + counter_idx * PERF_MAP_STRIDE;
96
97 map.update(
98 &key.to_ne_bytes(),
99 &fd.to_ne_bytes(),
100 libbpf_rs::MapFlags::ANY,
101 )
102 .with_context(|| "Failed to update perf_events map")?;
103
104 Ok(())
105}
106
107#[derive(Debug, clap::Parser)]
108#[command(
109 name = "scx_cosmos",
110 version,
111 disable_version_flag = true,
112 about = "Lightweight scheduler optimized for preserving task-to-CPU locality."
113)]
114struct Opts {
115 #[clap(long, default_value = "0")]
117 exit_dump_len: u32,
118
119 #[clap(short = 's', long, default_value = "10")]
121 slice_us: u64,
122
123 #[clap(short = 'l', long, default_value = "20000")]
125 slice_lag_us: u64,
126
127 #[clap(short = 'c', long, default_value = "75")]
145 cpu_busy_thresh: u64,
146
147 #[clap(short = 'p', long, default_value = "250")]
158 polling_ms: u64,
159
160 #[clap(short = 'm', long)]
172 primary_domain: Option<String>,
173
174 #[clap(short = 'e', long, default_value = "0x0", value_parser = parse_hex)]
176 perf_config: u64,
177
178 #[clap(short = 'E', default_value = "0", long)]
180 perf_threshold: u64,
181
182 #[clap(short = 'y', long, default_value = "0x0", value_parser = parse_hex)]
184 perf_sticky: u64,
185
186 #[clap(short = 'Y', default_value = "0", long)]
188 perf_sticky_threshold: u64,
189
190 #[clap(short = 'n', long, action = clap::ArgAction::SetTrue)]
192 disable_numa: bool,
193
194 #[clap(short = 'f', long, action = clap::ArgAction::SetTrue)]
196 disable_cpufreq: bool,
197
198 #[arg(short = 'i', long, action = clap::ArgAction::SetTrue)]
203 flat_idle_scan: bool,
204
205 #[clap(short = 'P', long, action = clap::ArgAction::SetTrue)]
210 preferred_idle_scan: bool,
211
212 #[clap(long, action = clap::ArgAction::SetTrue)]
217 disable_smt: bool,
218
219 #[clap(short = 'S', long, action = clap::ArgAction::SetTrue)]
225 avoid_smt: bool,
226
227 #[clap(short = 'w', long, action = clap::ArgAction::SetTrue)]
234 no_wake_sync: bool,
235
236 #[clap(short = 'd', long, action = clap::ArgAction::SetTrue)]
241 no_deferred_wakeup: bool,
242
243 #[clap(long, action = clap::ArgAction::SetTrue)]
248 no_tick_preempt: bool,
249
250 #[clap(short = 'a', long, action = clap::ArgAction::SetTrue)]
257 mm_affinity: bool,
258
259 #[clap(long)]
261 stats: Option<f64>,
262
263 #[clap(long)]
266 monitor: Option<f64>,
267
268 #[clap(short = 'v', long, action = clap::ArgAction::SetTrue)]
270 verbose: bool,
271
272 #[clap(short = 'V', long, action = clap::ArgAction::SetTrue)]
274 version: bool,
275
276 #[clap(long)]
278 help_stats: bool,
279
280 #[clap(flatten, next_help_heading = "Libbpf Options")]
281 pub libbpf: LibbpfOpts,
282}
283
284#[derive(PartialEq)]
285enum Powermode {
286 Turbo,
287 Performance,
288 Powersave,
289 Any,
290}
291
292fn get_primary_cpus(mode: Powermode) -> std::io::Result<Vec<usize>> {
297 let cpus: Vec<usize> = Topology::new()
298 .unwrap()
299 .all_cores
300 .values()
301 .flat_map(|core| &core.cpus)
302 .filter_map(|(cpu_id, cpu)| match (&mode, &cpu.core_type) {
303 (Powermode::Turbo, CoreType::Big { turbo: true }) |
305 (Powermode::Performance, CoreType::Big { .. }) |
307 (Powermode::Powersave, CoreType::Little) => Some(*cpu_id),
309 (Powermode::Any, ..) => Some(*cpu_id),
310 _ => None,
311 })
312 .collect();
313
314 Ok(cpus)
315}
316
317pub fn parse_cpu_list(optarg: &str) -> Result<Vec<usize>, String> {
318 let mut cpus = Vec::new();
319 let mut seen = HashSet::new();
320
321 if let Some(mode) = match optarg {
323 "powersave" => Some(Powermode::Powersave),
324 "performance" => Some(Powermode::Performance),
325 "turbo" => Some(Powermode::Turbo),
326 "all" => Some(Powermode::Any),
327 _ => None,
328 } {
329 return get_primary_cpus(mode).map_err(|e| e.to_string());
330 }
331
332 if optarg
334 .chars()
335 .any(|c| !c.is_ascii_digit() && c != '-' && c != ',' && !c.is_whitespace())
336 {
337 return Err("Invalid character in CPU list".to_string());
338 }
339
340 let cleaned = optarg.replace(' ', "\t");
342
343 for token in cleaned.split(',') {
344 let token = token.trim_matches(|c: char| c.is_whitespace());
345
346 if token.is_empty() {
347 continue;
348 }
349
350 if let Some((start_str, end_str)) = token.split_once('-') {
351 let start = start_str
352 .trim()
353 .parse::<usize>()
354 .map_err(|_| "Invalid range start")?;
355 let end = end_str
356 .trim()
357 .parse::<usize>()
358 .map_err(|_| "Invalid range end")?;
359
360 if start > end {
361 return Err(format!("Invalid CPU range: {}-{}", start, end));
362 }
363
364 for i in start..=end {
365 if cpus.len() >= *NR_CPU_IDS {
366 return Err(format!("Too many CPUs specified (max {})", *NR_CPU_IDS));
367 }
368 if seen.insert(i) {
369 cpus.push(i);
370 }
371 }
372 } else {
373 let cpu = token
374 .parse::<usize>()
375 .map_err(|_| format!("Invalid CPU: {}", token))?;
376 if cpus.len() >= *NR_CPU_IDS {
377 return Err(format!("Too many CPUs specified (max {})", *NR_CPU_IDS));
378 }
379 if seen.insert(cpu) {
380 cpus.push(cpu);
381 }
382 }
383 }
384
385 Ok(cpus)
386}
387
388const DYNAMIC_THRESHOLD_INIT_VALUE: u64 = 1000;
390
391const DYNAMIC_THRESHOLD_RATE_HIGH: f64 = 4000.0;
393
394const DYNAMIC_THRESHOLD_RATE_LOW: f64 = 2000.0;
396
397const DYNAMIC_THRESHOLD_SCALE_MIN: f64 = 0.0001;
399
400const DYNAMIC_THRESHOLD_SCALE_MAX: f64 = 1000.0;
402
403const DYNAMIC_THRESHOLD_SLOPE_HIGH: f64 = 0.35;
406
407const DYNAMIC_THRESHOLD_SLOPE_LOW: f64 = 0.58;
409
410fn dynamic_threshold_scale(rate_per_sec: f64, too_high: bool) -> f64 {
411 if too_high {
412 let excess = ((rate_per_sec / DYNAMIC_THRESHOLD_RATE_HIGH) - 1.0).max(0.0);
413 let scale = DYNAMIC_THRESHOLD_SCALE_MIN + DYNAMIC_THRESHOLD_SLOPE_HIGH * excess.min(4.0);
414 scale.min(DYNAMIC_THRESHOLD_SCALE_MAX)
415 } else {
416 if rate_per_sec <= 0.0 {
417 return DYNAMIC_THRESHOLD_SCALE_MAX;
418 }
419 let deficit = (DYNAMIC_THRESHOLD_RATE_LOW - rate_per_sec) / DYNAMIC_THRESHOLD_RATE_LOW;
420 let t = deficit.min(1.0).max(0.0);
421 DYNAMIC_THRESHOLD_SCALE_MIN + DYNAMIC_THRESHOLD_SLOPE_LOW * t
422 }
423}
424
425fn adjust_dynamic_threshold(current: u64, rate_per_sec: f64, base_threshold: u64) -> u64 {
426 let (scale_pct, raise_threshold) = if rate_per_sec > DYNAMIC_THRESHOLD_RATE_HIGH {
427 (dynamic_threshold_scale(rate_per_sec, true), true)
428 } else if rate_per_sec < DYNAMIC_THRESHOLD_RATE_LOW && rate_per_sec >= 0.0 {
429 (dynamic_threshold_scale(rate_per_sec, false), false)
430 } else {
431 return current;
432 };
433
434 let factor = if raise_threshold {
435 1.0 + scale_pct
436 } else {
437 1.0 - scale_pct
438 };
439 let new = ((current as f64) * factor).round() as u64;
440
441 let min_val = if base_threshold == 0 {
442 1
443 } else {
444 base_threshold / 100
445 };
446 let max_val = if base_threshold == 0 {
447 u64::MAX
448 } else {
449 base_threshold.saturating_mul(10000)
450 };
451
452 new.clamp(min_val.max(1), max_val)
453}
454
455#[derive(Debug, Clone, Copy)]
456struct CpuTimes {
457 user: u64,
458 nice: u64,
459 total: u64,
460}
461
462struct Scheduler<'a> {
463 skel: BpfSkel<'a>,
464 opts: &'a Opts,
465 struct_ops: Option<libbpf_rs::Link>,
466 stats_server: StatsServer<(), Metrics>,
467}
468
469impl<'a> Scheduler<'a> {
470 fn init(opts: &'a Opts, open_object: &'a mut MaybeUninit<OpenObject>) -> Result<Self> {
471 try_set_rlimit_infinity();
472
473 let topo = Topology::new().unwrap();
475
476 let smt_enabled = !opts.disable_smt && topo.smt_enabled;
478
479 let nr_nodes = topo
481 .nodes
482 .values()
483 .filter(|node| !node.all_cpus.is_empty())
484 .count();
485 info!("NUMA nodes: {}", nr_nodes);
486
487 let numa_enabled = !opts.disable_numa && nr_nodes > 1;
489 if !numa_enabled {
490 info!("Disabling NUMA optimizations");
491 }
492
493 info!(
494 "{} {} {}",
495 SCHEDULER_NAME,
496 build_id::full_version(env!("CARGO_PKG_VERSION")),
497 if smt_enabled { "SMT on" } else { "SMT off" }
498 );
499
500 info!(
502 "scheduler options: {}",
503 std::env::args().collect::<Vec<_>>().join(" ")
504 );
505
506 let mut skel_builder = BpfSkelBuilder::default();
508 skel_builder.obj_builder.debug(opts.verbose);
509 let open_opts = opts.libbpf.clone().into_bpf_open_opts();
510 let mut skel = scx_ops_open!(skel_builder, open_object, cosmos_ops, open_opts)?;
511
512 skel.struct_ops.cosmos_ops_mut().exit_dump_len = opts.exit_dump_len;
513
514 let rodata = skel.maps.rodata_data.as_mut().unwrap();
516 rodata.slice_ns = opts.slice_us * 1000;
517 rodata.slice_lag = opts.slice_lag_us * 1000;
518 rodata.cpufreq_enabled = !opts.disable_cpufreq;
519 rodata.deferred_wakeups = !opts.no_deferred_wakeup;
520 rodata.flat_idle_scan = opts.flat_idle_scan;
521 rodata.smt_enabled = smt_enabled;
522 rodata.numa_enabled = numa_enabled;
523 rodata.nr_node_ids = topo.nodes.len() as u32;
524 rodata.no_wake_sync = opts.no_wake_sync;
525 rodata.avoid_smt = opts.avoid_smt;
526 rodata.tick_preempt = !opts.no_tick_preempt;
527 rodata.mm_affinity = opts.mm_affinity;
528
529 rodata.perf_config = opts.perf_config;
531 rodata.perf_sticky = opts.perf_sticky;
532
533 rodata.busy_threshold = opts.cpu_busy_thresh * 1024 / 100;
535
536 let mut cpus: Vec<_> = topo.all_cpus.values().collect();
538 cpus.sort_by_key(|cpu| std::cmp::Reverse(cpu.cpu_capacity));
539 for (i, cpu) in cpus.iter().enumerate() {
540 rodata.cpu_capacity[cpu.id] = cpu.cpu_capacity as c_ulong;
541 rodata.preferred_cpus[i] = cpu.id as u64;
542 }
543 if opts.preferred_idle_scan {
544 info!(
545 "Preferred CPUs: {:?}",
546 &rodata.preferred_cpus[0..cpus.len()]
547 );
548 }
549 rodata.preferred_idle_scan = opts.preferred_idle_scan;
550
551 let primary_cpus = if let Some(ref domain) = opts.primary_domain {
553 match parse_cpu_list(domain) {
554 Ok(cpus) => cpus,
555 Err(e) => bail!("Error parsing primary domain: {}", e),
556 }
557 } else {
558 (0..*NR_CPU_IDS).collect()
559 };
560 if primary_cpus.len() < *NR_CPU_IDS {
561 info!("Primary CPUs: {:?}", primary_cpus);
562 rodata.primary_all = false;
563 } else {
564 rodata.primary_all = true;
565 }
566
567 skel.struct_ops.cosmos_ops_mut().flags = *compat::SCX_OPS_ENQ_EXITING
569 | *compat::SCX_OPS_ENQ_LAST
570 | *compat::SCX_OPS_ENQ_MIGRATION_DISABLED
571 | *compat::SCX_OPS_ALLOW_QUEUED_WAKEUP;
572
573 info!(
574 "scheduler flags: {:#x}",
575 skel.struct_ops.cosmos_ops_mut().flags
576 );
577
578 let mut skel = scx_ops_load!(skel, cosmos_ops, uei)?;
580
581 let bss = skel.maps.bss_data.as_mut().unwrap();
584 if opts.perf_config > 0 {
585 bss.perf_threshold = if opts.perf_threshold == 0 {
586 DYNAMIC_THRESHOLD_INIT_VALUE
587 } else {
588 opts.perf_threshold
589 };
590 }
591 if opts.perf_sticky > 0 {
592 bss.perf_sticky_threshold = if opts.perf_sticky_threshold == 0 {
593 DYNAMIC_THRESHOLD_INIT_VALUE
594 } else {
595 opts.perf_sticky_threshold
596 };
597 }
598
599 for node in topo.nodes.values() {
601 for cpu in node.all_cpus.values() {
602 if opts.verbose {
603 info!("CPU{} -> node{}", cpu.id, node.id);
604 }
605 skel.maps.cpu_node_map.update(
606 &(cpu.id as u32).to_ne_bytes(),
607 &(node.id as u32).to_ne_bytes(),
608 MapFlags::ANY,
609 )?;
610 }
611 }
612
613 let nr_cpus = *NR_CPU_IDS;
617 info!("Setting up performance counters for {} CPUs...", nr_cpus);
618 let mut perf_available = true;
619 let sticky_counter_idx = if opts.perf_config > 0 { 1 } else { 0 };
620 for cpu in 0..nr_cpus {
621 if opts.perf_config > 0 {
622 if let Err(e) = setup_perf_events(&mut skel, cpu as i32, opts.perf_config, 0) {
623 if cpu == 0 {
624 let err_str = e.to_string();
625 if err_str.contains("errno 2") || err_str.contains("os error 2") {
626 warn!("Performance counters not available on this CPU architecture");
627 warn!("PMU event 0x{:x} not supported - scheduler will run without perf monitoring", opts.perf_config);
628 } else {
629 warn!("Failed to setup perf events: {}", e);
630 }
631 perf_available = false;
632 break;
633 }
634 }
635 }
636 if opts.perf_sticky > 0 {
637 if let Err(e) =
638 setup_perf_events(&mut skel, cpu as i32, opts.perf_sticky, sticky_counter_idx)
639 {
640 if cpu == 0 {
641 let err_str = e.to_string();
642 if err_str.contains("errno 2") || err_str.contains("os error 2") {
643 warn!("Performance counters not available on this CPU architecture");
644 warn!("PMU event 0x{:x} not supported - scheduler will run without perf monitoring", opts.perf_sticky);
645 } else {
646 warn!("Failed to setup perf events: {}", e);
647 }
648 perf_available = false;
649 break;
650 }
651 }
652 }
653 }
654 if perf_available {
655 info!("Performance counters configured successfully for all CPUs");
656 }
657
658 if primary_cpus.len() < *NR_CPU_IDS {
660 for cpu in primary_cpus {
661 if let Err(err) = Self::enable_primary_cpu(&mut skel, cpu as i32) {
662 bail!("failed to add CPU {} to primary domain: error {}", cpu, err);
663 }
664 }
665 }
666
667 if smt_enabled {
669 Self::init_smt_domains(&mut skel, &topo)?;
670 }
671
672 let struct_ops = Some(scx_ops_attach!(skel, cosmos_ops)?);
674 let stats_server = StatsServer::new(stats::server_data()).launch()?;
675
676 Ok(Self {
677 skel,
678 opts,
679 struct_ops,
680 stats_server,
681 })
682 }
683
684 fn enable_primary_cpu(skel: &mut BpfSkel<'_>, cpu: i32) -> Result<(), u32> {
685 let prog = &mut skel.progs.enable_primary_cpu;
686 let mut args = cpu_arg {
687 cpu_id: cpu as c_int,
688 };
689 let input = ProgramInput {
690 context_in: Some(unsafe {
691 std::slice::from_raw_parts_mut(
692 &mut args as *mut _ as *mut u8,
693 std::mem::size_of_val(&args),
694 )
695 }),
696 ..Default::default()
697 };
698 let out = prog.test_run(input).unwrap();
699 if out.return_value != 0 {
700 return Err(out.return_value);
701 }
702
703 Ok(())
704 }
705
706 fn enable_sibling_cpu(
707 skel: &mut BpfSkel<'_>,
708 cpu: usize,
709 sibling_cpu: usize,
710 ) -> Result<(), u32> {
711 let prog = &mut skel.progs.enable_sibling_cpu;
712 let mut args = domain_arg {
713 cpu_id: cpu as c_int,
714 sibling_cpu_id: sibling_cpu as c_int,
715 };
716 let input = ProgramInput {
717 context_in: Some(unsafe {
718 std::slice::from_raw_parts_mut(
719 &mut args as *mut _ as *mut u8,
720 std::mem::size_of_val(&args),
721 )
722 }),
723 ..Default::default()
724 };
725 let out = prog.test_run(input).unwrap();
726 if out.return_value != 0 {
727 return Err(out.return_value);
728 }
729
730 Ok(())
731 }
732
733 fn init_smt_domains(skel: &mut BpfSkel<'_>, topo: &Topology) -> Result<(), std::io::Error> {
734 let smt_siblings = topo.sibling_cpus();
735
736 info!("SMT sibling CPUs: {:?}", smt_siblings);
737 for (cpu, sibling_cpu) in smt_siblings.iter().enumerate() {
738 Self::enable_sibling_cpu(skel, cpu, *sibling_cpu as usize).unwrap();
739 }
740
741 Ok(())
742 }
743
744 fn get_metrics(&self) -> Metrics {
745 let bss_data = self.skel.maps.bss_data.as_ref().unwrap();
746 Metrics {
747 cpu_thresh: self.skel.maps.rodata_data.as_ref().unwrap().busy_threshold,
748 cpu_util: self.skel.maps.bss_data.as_ref().unwrap().cpu_util,
749 nr_event_dispatches: bss_data.nr_event_dispatches,
750 nr_ev_sticky_dispatches: bss_data.nr_ev_sticky_dispatches,
751 }
752 }
753
754 pub fn exited(&mut self) -> bool {
755 uei_exited!(&self.skel, uei)
756 }
757
758 fn compute_user_cpu_pct(prev: &CpuTimes, curr: &CpuTimes) -> Option<u64> {
759 let user_diff = (curr.user + curr.nice).saturating_sub(prev.user + prev.nice);
761 let total_diff = curr.total.saturating_sub(prev.total);
762
763 if total_diff > 0 {
764 let user_ratio = user_diff as f64 / total_diff as f64;
765 Some((user_ratio * 1024.0).round() as u64)
766 } else {
767 None
768 }
769 }
770
771 fn read_cpu_times() -> Option<CpuTimes> {
772 let file = File::open("/proc/stat").ok()?;
773 let reader = BufReader::new(file);
774
775 for line in reader.lines() {
776 let line = line.ok()?;
777 if line.starts_with("cpu ") {
778 let fields: Vec<&str> = line.split_whitespace().collect();
779 if fields.len() < 5 {
780 return None;
781 }
782
783 let user: u64 = fields[1].parse().ok()?;
784 let nice: u64 = fields[2].parse().ok()?;
785
786 let total: u64 = fields
788 .iter()
789 .skip(1)
790 .take(8)
791 .filter_map(|v| v.parse::<u64>().ok())
792 .sum();
793
794 return Some(CpuTimes { user, nice, total });
795 }
796 }
797
798 None
799 }
800
801 fn run(&mut self, shutdown: Arc<AtomicBool>) -> Result<UserExitInfo> {
802 let (res_ch, req_ch) = self.stats_server.channels();
803
804 let polling_time = Duration::from_millis(self.opts.polling_ms).min(Duration::from_secs(1));
810 let mut prev_cputime = Self::read_cpu_times().expect("Failed to read initial CPU stats");
811 let mut last_update = Instant::now();
812
813 let mut prev_nr_event_dispatches: u64 = 0;
815 let mut prev_nr_ev_sticky_dispatches: u64 = 0;
816
817 while !shutdown.load(Ordering::Relaxed) && !self.exited() {
818 if !polling_time.is_zero() && last_update.elapsed() >= polling_time {
820 if let Some(curr_cputime) = Self::read_cpu_times() {
821 Self::compute_user_cpu_pct(&prev_cputime, &curr_cputime)
822 .map(|util| self.skel.maps.bss_data.as_mut().unwrap().cpu_util = util);
823 prev_cputime = curr_cputime;
824 }
825
826 let nr_event = self
828 .skel
829 .maps
830 .bss_data
831 .as_ref()
832 .unwrap()
833 .nr_event_dispatches;
834 let nr_sticky = self
835 .skel
836 .maps
837 .bss_data
838 .as_ref()
839 .unwrap()
840 .nr_ev_sticky_dispatches;
841 let elapsed_secs = last_update.elapsed().as_secs_f64();
842 if elapsed_secs > 0.0 {
843 let migration_rate =
844 (nr_event.saturating_sub(prev_nr_event_dispatches) as f64) / elapsed_secs;
845 let sticky_rate = (nr_sticky.saturating_sub(prev_nr_ev_sticky_dispatches)
846 as f64)
847 / elapsed_secs;
848
849 let bss = self.skel.maps.bss_data.as_mut().unwrap();
850 if self.opts.perf_config > 0 && self.opts.perf_threshold == 0 {
852 let base = 0u64; let current = bss.perf_threshold;
854 let new_thresh = adjust_dynamic_threshold(current, migration_rate, base);
855 if new_thresh != current {
856 bss.perf_threshold = new_thresh;
857 if self.opts.verbose {
858 info!(
859 "perf_threshold: {} (migration rate {:.1}/s)",
860 new_thresh, migration_rate
861 );
862 }
863 }
864 }
865 if self.opts.perf_sticky > 0 && self.opts.perf_sticky_threshold == 0 {
866 let base = 0u64;
867 let current = bss.perf_sticky_threshold;
868 let new_thresh = adjust_dynamic_threshold(current, sticky_rate, base);
869 if new_thresh != current {
870 bss.perf_sticky_threshold = new_thresh;
871 if self.opts.verbose {
872 info!(
873 "perf_sticky_threshold: {} (sticky rate {:.1}/s)",
874 new_thresh, sticky_rate
875 );
876 }
877 }
878 }
879
880 prev_nr_event_dispatches = nr_event;
881 prev_nr_ev_sticky_dispatches = nr_sticky;
882 }
883
884 last_update = Instant::now();
885 }
886
887 let timeout = if polling_time.is_zero() {
889 Duration::from_secs(1)
890 } else {
891 polling_time
892 };
893 match req_ch.recv_timeout(timeout) {
894 Ok(()) => res_ch.send(self.get_metrics())?,
895 Err(RecvTimeoutError::Timeout) => {}
896 Err(e) => Err(e)?,
897 }
898 }
899
900 let _ = self.struct_ops.take();
901 uei_report!(&self.skel, uei)
902 }
903}
904
905impl Drop for Scheduler<'_> {
906 fn drop(&mut self) {
907 info!("Unregister {SCHEDULER_NAME} scheduler");
908 }
909}
910
911fn main() -> Result<()> {
912 let opts = Opts::parse();
913
914 if opts.version {
915 println!(
916 "{} {}",
917 SCHEDULER_NAME,
918 build_id::full_version(env!("CARGO_PKG_VERSION"))
919 );
920 return Ok(());
921 }
922
923 if opts.help_stats {
924 stats::server_data().describe_meta(&mut std::io::stdout(), None)?;
925 return Ok(());
926 }
927
928 let loglevel = simplelog::LevelFilter::Info;
929
930 let mut lcfg = simplelog::ConfigBuilder::new();
931 lcfg.set_time_offset_to_local()
932 .expect("Failed to set local time offset")
933 .set_time_level(simplelog::LevelFilter::Error)
934 .set_location_level(simplelog::LevelFilter::Off)
935 .set_target_level(simplelog::LevelFilter::Off)
936 .set_thread_level(simplelog::LevelFilter::Off);
937 simplelog::TermLogger::init(
938 loglevel,
939 lcfg.build(),
940 simplelog::TerminalMode::Stderr,
941 simplelog::ColorChoice::Auto,
942 )?;
943
944 let shutdown = Arc::new(AtomicBool::new(false));
945 let shutdown_clone = shutdown.clone();
946 ctrlc::set_handler(move || {
947 shutdown_clone.store(true, Ordering::Relaxed);
948 })
949 .context("Error setting Ctrl-C handler")?;
950
951 if let Some(intv) = opts.monitor.or(opts.stats) {
952 let shutdown_copy = shutdown.clone();
953 let jh = std::thread::spawn(move || {
954 match stats::monitor(Duration::from_secs_f64(intv), shutdown_copy) {
955 Ok(_) => {
956 debug!("stats monitor thread finished successfully")
957 }
958 Err(error_object) => {
959 warn!(
960 "stats monitor thread finished because of an error {}",
961 error_object
962 )
963 }
964 }
965 });
966 if opts.monitor.is_some() {
967 let _ = jh.join();
968 return Ok(());
969 }
970 }
971
972 let mut open_object = MaybeUninit::uninit();
973 loop {
974 let mut sched = Scheduler::init(&opts, &mut open_object)?;
975 if !sched.run(shutdown.clone())?.should_restart() {
976 break;
977 }
978 }
979
980 Ok(())
981}