1mod bpf_skel;
9pub use bpf_skel::*;
10pub mod bpf_intf;
11pub use bpf_intf::*;
12
13mod stats;
14use std::collections::HashSet;
15use std::ffi::{c_int, c_ulong};
16use std::fs::File;
17use std::io::{BufRead, BufReader};
18use std::mem::MaybeUninit;
19use std::sync::atomic::AtomicBool;
20use std::sync::atomic::Ordering;
21use std::sync::Arc;
22use std::time::{Duration, Instant};
23
24use anyhow::bail;
25use anyhow::Context;
26use anyhow::Result;
27use clap::Parser;
28use crossbeam::channel::RecvTimeoutError;
29use libbpf_rs::OpenObject;
30use libbpf_rs::ProgramInput;
31use log::{debug, info, warn};
32use scx_stats::prelude::*;
33use scx_utils::build_id;
34use scx_utils::compat;
35use scx_utils::libbpf_clap_opts::LibbpfOpts;
36use scx_utils::scx_ops_attach;
37use scx_utils::scx_ops_load;
38use scx_utils::scx_ops_open;
39use scx_utils::try_set_rlimit_infinity;
40use scx_utils::uei_exited;
41use scx_utils::uei_report;
42use scx_utils::CoreType;
43use scx_utils::Topology;
44use scx_utils::UserExitInfo;
45use scx_utils::NR_CPU_IDS;
46use stats::Metrics;
47
48const SCHEDULER_NAME: &str = "scx_beerland";
49
50#[derive(Debug, clap::Parser)]
51#[command(
52 name = "scx_beerland",
53 version,
54 disable_version_flag = true,
55 about = "Scheduler designed to prioritize locality and scalability."
56)]
57struct Opts {
58 #[clap(long, default_value = "0")]
60 exit_dump_len: u32,
61
62 #[clap(short = 's', long, default_value = "1000")]
64 slice_us: u64,
65
66 #[clap(short = 'l', long, default_value = "40000")]
71 slice_us_lag: u64,
72
73 #[clap(short = 'c', long, default_value = "75")]
78 cpu_busy_thresh: u64,
79
80 #[clap(short = 'p', long, default_value = "250")]
89 polling_ms: u64,
90
91 #[clap(short = 'm', long)]
103 primary_domain: Option<String>,
104
105 #[clap(short = 'P', long, action = clap::ArgAction::SetTrue)]
110 preferred_idle_scan: bool,
111
112 #[clap(long)]
114 stats: Option<f64>,
115
116 #[clap(long)]
119 monitor: Option<f64>,
120
121 #[clap(short = 'v', long, action = clap::ArgAction::SetTrue)]
123 verbose: bool,
124
125 #[clap(short = 'V', long, action = clap::ArgAction::SetTrue)]
127 version: bool,
128
129 #[clap(long)]
131 help_stats: bool,
132
133 #[clap(flatten, next_help_heading = "Libbpf Options")]
134 pub libbpf: LibbpfOpts,
135}
136
137#[derive(PartialEq)]
138enum Powermode {
139 Turbo,
140 Performance,
141 Powersave,
142 Any,
143}
144
145fn get_primary_cpus(mode: Powermode) -> std::io::Result<Vec<usize>> {
150 let cpus: Vec<usize> = Topology::new()
151 .unwrap()
152 .all_cores
153 .values()
154 .flat_map(|core| &core.cpus)
155 .filter_map(|(cpu_id, cpu)| match (&mode, &cpu.core_type) {
156 (Powermode::Turbo, CoreType::Big { turbo: true }) |
158 (Powermode::Performance, CoreType::Big { .. }) |
160 (Powermode::Powersave, CoreType::Little) => Some(*cpu_id),
162 (Powermode::Any, ..) => Some(*cpu_id),
163 _ => None,
164 })
165 .collect();
166
167 Ok(cpus)
168}
169
170pub fn parse_cpu_list(optarg: &str) -> Result<Vec<usize>, String> {
171 let mut cpus = Vec::new();
172 let mut seen = HashSet::new();
173
174 if let Some(mode) = match optarg {
176 "powersave" => Some(Powermode::Powersave),
177 "performance" => Some(Powermode::Performance),
178 "turbo" => Some(Powermode::Turbo),
179 "all" => Some(Powermode::Any),
180 _ => None,
181 } {
182 return get_primary_cpus(mode).map_err(|e| e.to_string());
183 }
184
185 if optarg
187 .chars()
188 .any(|c| !c.is_ascii_digit() && c != '-' && c != ',' && !c.is_whitespace())
189 {
190 return Err("Invalid character in CPU list".to_string());
191 }
192
193 let cleaned = optarg.replace(' ', "\t");
195
196 for token in cleaned.split(',') {
197 let token = token.trim_matches(|c: char| c.is_whitespace());
198
199 if token.is_empty() {
200 continue;
201 }
202
203 if let Some((start_str, end_str)) = token.split_once('-') {
204 let start = start_str
205 .trim()
206 .parse::<usize>()
207 .map_err(|_| "Invalid range start")?;
208 let end = end_str
209 .trim()
210 .parse::<usize>()
211 .map_err(|_| "Invalid range end")?;
212
213 if start > end {
214 return Err(format!("Invalid CPU range: {}-{}", start, end));
215 }
216
217 for i in start..=end {
218 if cpus.len() >= *NR_CPU_IDS {
219 return Err(format!("Too many CPUs specified (max {})", *NR_CPU_IDS));
220 }
221 if seen.insert(i) {
222 cpus.push(i);
223 }
224 }
225 } else {
226 let cpu = token
227 .parse::<usize>()
228 .map_err(|_| format!("Invalid CPU: {}", token))?;
229 if cpus.len() >= *NR_CPU_IDS {
230 return Err(format!("Too many CPUs specified (max {})", *NR_CPU_IDS));
231 }
232 if seen.insert(cpu) {
233 cpus.push(cpu);
234 }
235 }
236 }
237
238 Ok(cpus)
239}
240
241#[derive(Debug, Clone, Copy)]
242struct CpuTimes {
243 user: u64,
244 nice: u64,
245 total: u64,
246}
247
248struct Scheduler<'a> {
249 skel: BpfSkel<'a>,
250 opts: &'a Opts,
251 struct_ops: Option<libbpf_rs::Link>,
252 stats_server: StatsServer<(), Metrics>,
253}
254
255impl<'a> Scheduler<'a> {
256 fn init(opts: &'a Opts, open_object: &'a mut MaybeUninit<OpenObject>) -> Result<Self> {
257 try_set_rlimit_infinity();
258
259 let topo = Topology::new().unwrap();
261
262 let smt_enabled = topo.smt_enabled;
264
265 info!(
266 "{} {} {}",
267 SCHEDULER_NAME,
268 build_id::full_version(env!("CARGO_PKG_VERSION")),
269 if smt_enabled { "SMT on" } else { "SMT off" }
270 );
271
272 info!(
274 "scheduler options: {}",
275 std::env::args().collect::<Vec<_>>().join(" ")
276 );
277
278 let mut skel_builder = BpfSkelBuilder::default();
280 skel_builder.obj_builder.debug(opts.verbose);
281 let open_opts = opts.libbpf.clone().into_bpf_open_opts();
282 let mut skel = scx_ops_open!(skel_builder, open_object, beerland_ops, open_opts)?;
283
284 skel.struct_ops.beerland_ops_mut().exit_dump_len = opts.exit_dump_len;
285
286 let rodata = skel.maps.rodata_data.as_mut().unwrap();
288 rodata.slice_ns = opts.slice_us * 1000;
289 rodata.slice_lag = opts.slice_us_lag * 1000;
290 rodata.smt_enabled = smt_enabled;
291
292 rodata.busy_threshold = opts.cpu_busy_thresh * 1024 / 100;
294
295 let primary_cpus = if let Some(ref domain) = opts.primary_domain {
297 match parse_cpu_list(domain) {
298 Ok(cpus) => cpus,
299 Err(e) => bail!("Error parsing primary domain: {}", e),
300 }
301 } else {
302 (0..*NR_CPU_IDS).collect()
303 };
304 if primary_cpus.len() < *NR_CPU_IDS {
305 info!("Primary CPUs: {:?}", primary_cpus);
306 rodata.primary_all = false;
307 } else {
308 rodata.primary_all = true;
309 }
310
311 let mut cpus: Vec<_> = topo.all_cpus.values().collect();
313 cpus.sort_by_key(|cpu| std::cmp::Reverse(cpu.cpu_capacity));
314 for (i, cpu) in cpus.iter().enumerate() {
315 rodata.cpu_capacity[cpu.id] = cpu.cpu_capacity as c_ulong;
316 rodata.preferred_cpus[i] = cpu.id as u64;
317 }
318 if opts.preferred_idle_scan {
319 info!(
320 "Preferred CPUs: {:?}",
321 &rodata.preferred_cpus[0..cpus.len()]
322 );
323 }
324 rodata.preferred_idle_scan = opts.preferred_idle_scan;
325
326 skel.struct_ops.beerland_ops_mut().flags = *compat::SCX_OPS_ENQ_EXITING
328 | *compat::SCX_OPS_ENQ_LAST
329 | *compat::SCX_OPS_ENQ_MIGRATION_DISABLED
330 | *compat::SCX_OPS_ALLOW_QUEUED_WAKEUP;
331 info!(
332 "scheduler flags: {:#x}",
333 skel.struct_ops.beerland_ops_mut().flags
334 );
335
336 let mut skel = scx_ops_load!(skel, beerland_ops, uei)?;
338
339 if smt_enabled {
341 Self::init_smt_domains(&mut skel, &topo)?;
342 }
343
344 if primary_cpus.len() < *NR_CPU_IDS {
346 for cpu in primary_cpus {
347 if let Err(err) = Self::enable_primary_cpu(&mut skel, cpu as i32) {
348 bail!("failed to add CPU {} to primary domain: error {}", cpu, err);
349 }
350 }
351 }
352
353 let struct_ops = Some(scx_ops_attach!(skel, beerland_ops)?);
355 let stats_server = StatsServer::new(stats::server_data()).launch()?;
356
357 Ok(Self {
358 skel,
359 opts,
360 struct_ops,
361 stats_server,
362 })
363 }
364
365 fn enable_sibling_cpu(
366 skel: &mut BpfSkel<'_>,
367 cpu: usize,
368 sibling_cpu: usize,
369 ) -> Result<(), u32> {
370 let prog = &mut skel.progs.enable_sibling_cpu;
371 let mut args = domain_arg {
372 cpu_id: cpu as c_int,
373 sibling_cpu_id: sibling_cpu as c_int,
374 };
375 let input = ProgramInput {
376 context_in: Some(unsafe {
377 std::slice::from_raw_parts_mut(
378 &mut args as *mut _ as *mut u8,
379 std::mem::size_of_val(&args),
380 )
381 }),
382 ..Default::default()
383 };
384 let out = prog.test_run(input).unwrap();
385 if out.return_value != 0 {
386 return Err(out.return_value);
387 }
388
389 Ok(())
390 }
391
392 fn enable_primary_cpu(skel: &mut BpfSkel<'_>, cpu: i32) -> Result<(), u32> {
393 let prog = &mut skel.progs.enable_primary_cpu;
394 let mut args = cpu_arg {
395 cpu_id: cpu as c_int,
396 };
397 let input = ProgramInput {
398 context_in: Some(unsafe {
399 std::slice::from_raw_parts_mut(
400 &mut args as *mut _ as *mut u8,
401 std::mem::size_of_val(&args),
402 )
403 }),
404 ..Default::default()
405 };
406 let out = prog.test_run(input).unwrap();
407 if out.return_value != 0 {
408 return Err(out.return_value);
409 }
410
411 Ok(())
412 }
413
414 fn init_smt_domains(skel: &mut BpfSkel<'_>, topo: &Topology) -> Result<(), std::io::Error> {
415 let smt_siblings = topo.sibling_cpus();
416
417 info!("SMT sibling CPUs: {:?}", smt_siblings);
418 for (cpu, sibling_cpu) in smt_siblings.iter().enumerate() {
419 Self::enable_sibling_cpu(skel, cpu, *sibling_cpu as usize).unwrap();
420 }
421
422 Ok(())
423 }
424
425 fn get_metrics(&mut self) -> Metrics {
426 let bss_data = self.skel.maps.bss_data.as_ref().unwrap();
427 Metrics {
428 nr_local_dispatch: bss_data.nr_local_dispatch,
429 nr_remote_dispatch: bss_data.nr_remote_dispatch,
430 nr_keep_running: bss_data.nr_keep_running,
431 }
432 }
433
434 pub fn exited(&mut self) -> bool {
435 uei_exited!(&self.skel, uei)
436 }
437
438 fn compute_user_cpu_pct(prev: &CpuTimes, curr: &CpuTimes) -> Option<u64> {
439 let user_diff = (curr.user + curr.nice).saturating_sub(prev.user + prev.nice);
441 let total_diff = curr.total.saturating_sub(prev.total);
442
443 if total_diff > 0 {
444 let user_ratio = user_diff as f64 / total_diff as f64;
445 Some((user_ratio * 1024.0).round() as u64)
446 } else {
447 None
448 }
449 }
450
451 fn read_cpu_times() -> Option<CpuTimes> {
452 let file = File::open("/proc/stat").ok()?;
453 let reader = BufReader::new(file);
454
455 for line in reader.lines() {
456 let line = line.ok()?;
457 if line.starts_with("cpu ") {
458 let fields: Vec<&str> = line.split_whitespace().collect();
459 if fields.len() < 5 {
460 return None;
461 }
462
463 let user: u64 = fields[1].parse().ok()?;
464 let nice: u64 = fields[2].parse().ok()?;
465
466 let total: u64 = fields
468 .iter()
469 .skip(1)
470 .take(8)
471 .filter_map(|v| v.parse::<u64>().ok())
472 .sum();
473
474 return Some(CpuTimes { user, nice, total });
475 }
476 }
477
478 None
479 }
480
481 fn run(&mut self, shutdown: Arc<AtomicBool>) -> Result<UserExitInfo> {
482 let (res_ch, req_ch) = self.stats_server.channels();
483
484 let polling_time = Duration::from_millis(self.opts.polling_ms).min(Duration::from_secs(1));
489 let mut prev_cputime = Self::read_cpu_times().expect("Failed to read initial CPU stats");
490 let mut last_update = Instant::now();
491
492 while !shutdown.load(Ordering::Relaxed) && !self.exited() {
493 if !polling_time.is_zero() && last_update.elapsed() >= polling_time {
495 if let Some(curr_cputime) = Self::read_cpu_times() {
496 Self::compute_user_cpu_pct(&prev_cputime, &curr_cputime)
497 .map(|util| self.skel.maps.bss_data.as_mut().unwrap().cpu_util = util);
498 prev_cputime = curr_cputime;
499 }
500 last_update = Instant::now();
501 }
502
503 let timeout = if polling_time.is_zero() {
505 Duration::from_secs(1)
506 } else {
507 polling_time
508 };
509 match req_ch.recv_timeout(timeout) {
510 Ok(()) => res_ch.send(self.get_metrics())?,
511 Err(RecvTimeoutError::Timeout) => {}
512 Err(e) => Err(e)?,
513 }
514 }
515
516 let _ = self.struct_ops.take();
517 uei_report!(&self.skel, uei)
518 }
519}
520
521impl Drop for Scheduler<'_> {
522 fn drop(&mut self) {
523 info!("Unregister {SCHEDULER_NAME} scheduler");
524 }
525}
526
527fn main() -> Result<()> {
528 let opts = Opts::parse();
529
530 if opts.version {
531 println!(
532 "{} {}",
533 SCHEDULER_NAME,
534 build_id::full_version(env!("CARGO_PKG_VERSION"))
535 );
536 return Ok(());
537 }
538
539 if opts.help_stats {
540 stats::server_data().describe_meta(&mut std::io::stdout(), None)?;
541 return Ok(());
542 }
543
544 let loglevel = simplelog::LevelFilter::Info;
545
546 let mut lcfg = simplelog::ConfigBuilder::new();
547 lcfg.set_time_offset_to_local()
548 .expect("Failed to set local time offset")
549 .set_time_level(simplelog::LevelFilter::Error)
550 .set_location_level(simplelog::LevelFilter::Off)
551 .set_target_level(simplelog::LevelFilter::Off)
552 .set_thread_level(simplelog::LevelFilter::Off);
553 simplelog::TermLogger::init(
554 loglevel,
555 lcfg.build(),
556 simplelog::TerminalMode::Stderr,
557 simplelog::ColorChoice::Auto,
558 )?;
559
560 let shutdown = Arc::new(AtomicBool::new(false));
561 let shutdown_clone = shutdown.clone();
562 ctrlc::set_handler(move || {
563 shutdown_clone.store(true, Ordering::Relaxed);
564 })
565 .context("Error setting Ctrl-C handler")?;
566
567 if let Some(intv) = opts.monitor.or(opts.stats) {
568 let shutdown_copy = shutdown.clone();
569 let jh = std::thread::spawn(move || {
570 match stats::monitor(Duration::from_secs_f64(intv), shutdown_copy) {
571 Ok(_) => {
572 debug!("stats monitor thread finished successfully")
573 }
574 Err(error_object) => {
575 warn!(
576 "stats monitor thread finished because of an error {}",
577 error_object
578 )
579 }
580 }
581 });
582 if opts.monitor.is_some() {
583 let _ = jh.join();
584 return Ok(());
585 }
586 }
587
588 let mut open_object = MaybeUninit::uninit();
589 loop {
590 let mut sched = Scheduler::init(&opts, &mut open_object)?;
591 if !sched.run(shutdown.clone())?.should_restart() {
592 break;
593 }
594 }
595
596 Ok(())
597}