1mod bpf_skel;
9pub use bpf_skel::*;
10pub mod bpf_intf;
11pub use bpf_intf::*;
12
13mod stats;
14use std::collections::HashSet;
15use std::ffi::{c_int, c_ulong};
16use std::fs::File;
17use std::io::{BufRead, BufReader};
18use std::mem::MaybeUninit;
19use std::sync::atomic::AtomicBool;
20use std::sync::atomic::Ordering;
21use std::sync::Arc;
22use std::time::{Duration, Instant};
23
24use anyhow::bail;
25use anyhow::Context;
26use anyhow::Result;
27use clap::Parser;
28use crossbeam::channel::RecvTimeoutError;
29use libbpf_rs::OpenObject;
30use libbpf_rs::ProgramInput;
31use log::{debug, info, warn};
32use scx_stats::prelude::*;
33use scx_utils::build_id;
34use scx_utils::compat;
35use scx_utils::get_primary_cpus;
36use scx_utils::libbpf_clap_opts::LibbpfOpts;
37use scx_utils::scx_ops_attach;
38use scx_utils::scx_ops_load;
39use scx_utils::scx_ops_open;
40use scx_utils::try_set_rlimit_infinity;
41use scx_utils::uei_exited;
42use scx_utils::uei_report;
43use scx_utils::Powermode;
44use scx_utils::Topology;
45use scx_utils::UserExitInfo;
46use scx_utils::NR_CPU_IDS;
47use stats::Metrics;
48
49const SCHEDULER_NAME: &str = "scx_beerland";
50
51#[derive(Debug, clap::Parser)]
52#[command(
53 name = "scx_beerland",
54 version,
55 disable_version_flag = true,
56 about = "Scheduler designed to prioritize locality and scalability."
57)]
58struct Opts {
59 #[clap(long, default_value = "0")]
61 exit_dump_len: u32,
62
63 #[clap(short = 's', long, default_value = "1000")]
65 slice_us: u64,
66
67 #[clap(short = 'l', long, default_value = "40000")]
72 slice_us_lag: u64,
73
74 #[clap(short = 'c', long, default_value = "75")]
79 cpu_busy_thresh: u64,
80
81 #[clap(short = 'p', long, default_value = "250")]
90 polling_ms: u64,
91
92 #[clap(short = 'm', long)]
104 primary_domain: Option<String>,
105
106 #[clap(short = 'P', long, action = clap::ArgAction::SetTrue)]
111 preferred_idle_scan: bool,
112
113 #[clap(long)]
115 stats: Option<f64>,
116
117 #[clap(long)]
120 monitor: Option<f64>,
121
122 #[clap(short = 'v', long, action = clap::ArgAction::SetTrue)]
124 verbose: bool,
125
126 #[clap(short = 'V', long, action = clap::ArgAction::SetTrue)]
128 version: bool,
129
130 #[clap(long)]
132 help_stats: bool,
133
134 #[clap(flatten, next_help_heading = "Libbpf Options")]
135 pub libbpf: LibbpfOpts,
136}
137
138pub fn parse_cpu_list(optarg: &str) -> Result<Vec<usize>, String> {
139 let mut cpus = Vec::new();
140 let mut seen = HashSet::new();
141
142 if let Some(mode) = match optarg {
144 "powersave" => Some(Powermode::Powersave),
145 "performance" => Some(Powermode::Performance),
146 "turbo" => Some(Powermode::Turbo),
147 "all" => Some(Powermode::Any),
148 _ => None,
149 } {
150 return get_primary_cpus(mode).map_err(|e| e.to_string());
151 }
152
153 if optarg
155 .chars()
156 .any(|c| !c.is_ascii_digit() && c != '-' && c != ',' && !c.is_whitespace())
157 {
158 return Err("Invalid character in CPU list".to_string());
159 }
160
161 let cleaned = optarg.replace(' ', "\t");
163
164 for token in cleaned.split(',') {
165 let token = token.trim_matches(|c: char| c.is_whitespace());
166
167 if token.is_empty() {
168 continue;
169 }
170
171 if let Some((start_str, end_str)) = token.split_once('-') {
172 let start = start_str
173 .trim()
174 .parse::<usize>()
175 .map_err(|_| "Invalid range start")?;
176 let end = end_str
177 .trim()
178 .parse::<usize>()
179 .map_err(|_| "Invalid range end")?;
180
181 if start > end {
182 return Err(format!("Invalid CPU range: {}-{}", start, end));
183 }
184
185 for i in start..=end {
186 if cpus.len() >= *NR_CPU_IDS {
187 return Err(format!("Too many CPUs specified (max {})", *NR_CPU_IDS));
188 }
189 if seen.insert(i) {
190 cpus.push(i);
191 }
192 }
193 } else {
194 let cpu = token
195 .parse::<usize>()
196 .map_err(|_| format!("Invalid CPU: {}", token))?;
197 if cpus.len() >= *NR_CPU_IDS {
198 return Err(format!("Too many CPUs specified (max {})", *NR_CPU_IDS));
199 }
200 if seen.insert(cpu) {
201 cpus.push(cpu);
202 }
203 }
204 }
205
206 Ok(cpus)
207}
208
209#[derive(Debug, Clone, Copy)]
210struct CpuTimes {
211 user: u64,
212 nice: u64,
213 total: u64,
214}
215
216struct Scheduler<'a> {
217 skel: BpfSkel<'a>,
218 opts: &'a Opts,
219 struct_ops: Option<libbpf_rs::Link>,
220 stats_server: StatsServer<(), Metrics>,
221}
222
223impl<'a> Scheduler<'a> {
224 fn init(opts: &'a Opts, open_object: &'a mut MaybeUninit<OpenObject>) -> Result<Self> {
225 try_set_rlimit_infinity();
226
227 let topo = Topology::new().unwrap();
229
230 let smt_enabled = topo.smt_enabled;
232
233 info!(
234 "{} {} {}",
235 SCHEDULER_NAME,
236 build_id::full_version(env!("CARGO_PKG_VERSION")),
237 if smt_enabled { "SMT on" } else { "SMT off" }
238 );
239
240 info!(
242 "scheduler options: {}",
243 std::env::args().collect::<Vec<_>>().join(" ")
244 );
245
246 let mut skel_builder = BpfSkelBuilder::default();
248 skel_builder.obj_builder.debug(opts.verbose);
249 let open_opts = opts.libbpf.clone().into_bpf_open_opts();
250 let mut skel = scx_ops_open!(skel_builder, open_object, beerland_ops, open_opts)?;
251
252 skel.struct_ops.beerland_ops_mut().exit_dump_len = opts.exit_dump_len;
253
254 let rodata = skel.maps.rodata_data.as_mut().unwrap();
256 rodata.slice_ns = opts.slice_us * 1000;
257 rodata.slice_lag = opts.slice_us_lag * 1000;
258 rodata.smt_enabled = smt_enabled;
259
260 rodata.busy_threshold = opts.cpu_busy_thresh * 1024 / 100;
262
263 let primary_cpus = if let Some(ref domain) = opts.primary_domain {
265 match parse_cpu_list(domain) {
266 Ok(cpus) => cpus,
267 Err(e) => bail!("Error parsing primary domain: {}", e),
268 }
269 } else {
270 (0..*NR_CPU_IDS).collect()
271 };
272 if primary_cpus.len() < *NR_CPU_IDS {
273 info!("Primary CPUs: {:?}", primary_cpus);
274 rodata.primary_all = false;
275 } else {
276 rodata.primary_all = true;
277 }
278
279 let mut cpus: Vec<_> = topo.all_cpus.values().collect();
281 cpus.sort_by_key(|cpu| std::cmp::Reverse(cpu.cpu_capacity));
282 for (i, cpu) in cpus.iter().enumerate() {
283 rodata.cpu_capacity[cpu.id] = cpu.cpu_capacity as c_ulong;
284 rodata.preferred_cpus[i] = cpu.id as u64;
285 }
286 if opts.preferred_idle_scan {
287 info!(
288 "Preferred CPUs: {:?}",
289 &rodata.preferred_cpus[0..cpus.len()]
290 );
291 }
292 rodata.preferred_idle_scan = opts.preferred_idle_scan;
293
294 skel.struct_ops.beerland_ops_mut().flags = *compat::SCX_OPS_ENQ_EXITING
296 | *compat::SCX_OPS_ENQ_LAST
297 | *compat::SCX_OPS_ENQ_MIGRATION_DISABLED
298 | *compat::SCX_OPS_ALLOW_QUEUED_WAKEUP;
299 info!(
300 "scheduler flags: {:#x}",
301 skel.struct_ops.beerland_ops_mut().flags
302 );
303
304 let mut skel = scx_ops_load!(skel, beerland_ops, uei)?;
306
307 if smt_enabled {
309 Self::init_smt_domains(&mut skel, &topo)?;
310 }
311
312 if primary_cpus.len() < *NR_CPU_IDS {
314 for cpu in primary_cpus {
315 if let Err(err) = Self::enable_primary_cpu(&mut skel, cpu as i32) {
316 bail!("failed to add CPU {} to primary domain: error {}", cpu, err);
317 }
318 }
319 }
320
321 let struct_ops = Some(scx_ops_attach!(skel, beerland_ops)?);
323 let stats_server = StatsServer::new(stats::server_data()).launch()?;
324
325 Ok(Self {
326 skel,
327 opts,
328 struct_ops,
329 stats_server,
330 })
331 }
332
333 fn enable_sibling_cpu(
334 skel: &mut BpfSkel<'_>,
335 cpu: usize,
336 sibling_cpu: usize,
337 ) -> Result<(), u32> {
338 let prog = &mut skel.progs.enable_sibling_cpu;
339 let mut args = domain_arg {
340 cpu_id: cpu as c_int,
341 sibling_cpu_id: sibling_cpu as c_int,
342 };
343 let input = ProgramInput {
344 context_in: Some(unsafe {
345 std::slice::from_raw_parts_mut(
346 &mut args as *mut _ as *mut u8,
347 std::mem::size_of_val(&args),
348 )
349 }),
350 ..Default::default()
351 };
352 let out = prog.test_run(input).unwrap();
353 if out.return_value != 0 {
354 return Err(out.return_value);
355 }
356
357 Ok(())
358 }
359
360 fn enable_primary_cpu(skel: &mut BpfSkel<'_>, cpu: i32) -> Result<(), u32> {
361 let prog = &mut skel.progs.enable_primary_cpu;
362 let mut args = cpu_arg {
363 cpu_id: cpu as c_int,
364 };
365 let input = ProgramInput {
366 context_in: Some(unsafe {
367 std::slice::from_raw_parts_mut(
368 &mut args as *mut _ as *mut u8,
369 std::mem::size_of_val(&args),
370 )
371 }),
372 ..Default::default()
373 };
374 let out = prog.test_run(input).unwrap();
375 if out.return_value != 0 {
376 return Err(out.return_value);
377 }
378
379 Ok(())
380 }
381
382 fn init_smt_domains(skel: &mut BpfSkel<'_>, topo: &Topology) -> Result<(), std::io::Error> {
383 let smt_siblings = topo.sibling_cpus();
384
385 info!("SMT sibling CPUs: {:?}", smt_siblings);
386 for (cpu, sibling_cpu) in smt_siblings.iter().enumerate() {
387 Self::enable_sibling_cpu(skel, cpu, *sibling_cpu as usize).unwrap();
388 }
389
390 Ok(())
391 }
392
393 fn get_metrics(&mut self) -> Metrics {
394 let bss_data = self.skel.maps.bss_data.as_ref().unwrap();
395 Metrics {
396 nr_local_dispatch: bss_data.nr_local_dispatch,
397 nr_remote_dispatch: bss_data.nr_remote_dispatch,
398 nr_keep_running: bss_data.nr_keep_running,
399 }
400 }
401
402 pub fn exited(&mut self) -> bool {
403 uei_exited!(&self.skel, uei)
404 }
405
406 fn compute_user_cpu_pct(prev: &CpuTimes, curr: &CpuTimes) -> Option<u64> {
407 let user_diff = (curr.user + curr.nice).saturating_sub(prev.user + prev.nice);
409 let total_diff = curr.total.saturating_sub(prev.total);
410
411 if total_diff > 0 {
412 let user_ratio = user_diff as f64 / total_diff as f64;
413 Some((user_ratio * 1024.0).round() as u64)
414 } else {
415 None
416 }
417 }
418
419 fn read_cpu_times() -> Option<CpuTimes> {
420 let file = File::open("/proc/stat").ok()?;
421 let reader = BufReader::new(file);
422
423 for line in reader.lines() {
424 let line = line.ok()?;
425 if line.starts_with("cpu ") {
426 let fields: Vec<&str> = line.split_whitespace().collect();
427 if fields.len() < 5 {
428 return None;
429 }
430
431 let user: u64 = fields[1].parse().ok()?;
432 let nice: u64 = fields[2].parse().ok()?;
433
434 let total: u64 = fields
436 .iter()
437 .skip(1)
438 .take(8)
439 .filter_map(|v| v.parse::<u64>().ok())
440 .sum();
441
442 return Some(CpuTimes { user, nice, total });
443 }
444 }
445
446 None
447 }
448
449 fn run(&mut self, shutdown: Arc<AtomicBool>) -> Result<UserExitInfo> {
450 let (res_ch, req_ch) = self.stats_server.channels();
451
452 let polling_time = Duration::from_millis(self.opts.polling_ms).min(Duration::from_secs(1));
457 let mut prev_cputime = Self::read_cpu_times().expect("Failed to read initial CPU stats");
458 let mut last_update = Instant::now();
459
460 while !shutdown.load(Ordering::Relaxed) && !self.exited() {
461 if !polling_time.is_zero() && last_update.elapsed() >= polling_time {
463 if let Some(curr_cputime) = Self::read_cpu_times() {
464 Self::compute_user_cpu_pct(&prev_cputime, &curr_cputime)
465 .map(|util| self.skel.maps.bss_data.as_mut().unwrap().cpu_util = util);
466 prev_cputime = curr_cputime;
467 }
468 last_update = Instant::now();
469 }
470
471 let timeout = if polling_time.is_zero() {
473 Duration::from_secs(1)
474 } else {
475 polling_time
476 };
477 match req_ch.recv_timeout(timeout) {
478 Ok(()) => res_ch.send(self.get_metrics())?,
479 Err(RecvTimeoutError::Timeout) => {}
480 Err(e) => Err(e)?,
481 }
482 }
483
484 let _ = self.struct_ops.take();
485 uei_report!(&self.skel, uei)
486 }
487}
488
489impl Drop for Scheduler<'_> {
490 fn drop(&mut self) {
491 info!("Unregister {SCHEDULER_NAME} scheduler");
492 }
493}
494
495fn main() -> Result<()> {
496 let opts = Opts::parse();
497
498 if opts.version {
499 println!(
500 "{} {}",
501 SCHEDULER_NAME,
502 build_id::full_version(env!("CARGO_PKG_VERSION"))
503 );
504 return Ok(());
505 }
506
507 if opts.help_stats {
508 stats::server_data().describe_meta(&mut std::io::stdout(), None)?;
509 return Ok(());
510 }
511
512 let loglevel = simplelog::LevelFilter::Info;
513
514 let mut lcfg = simplelog::ConfigBuilder::new();
515 lcfg.set_time_offset_to_local()
516 .expect("Failed to set local time offset")
517 .set_time_level(simplelog::LevelFilter::Error)
518 .set_location_level(simplelog::LevelFilter::Off)
519 .set_target_level(simplelog::LevelFilter::Off)
520 .set_thread_level(simplelog::LevelFilter::Off);
521 simplelog::TermLogger::init(
522 loglevel,
523 lcfg.build(),
524 simplelog::TerminalMode::Stderr,
525 simplelog::ColorChoice::Auto,
526 )?;
527
528 let shutdown = Arc::new(AtomicBool::new(false));
529 let shutdown_clone = shutdown.clone();
530 ctrlc::set_handler(move || {
531 shutdown_clone.store(true, Ordering::Relaxed);
532 })
533 .context("Error setting Ctrl-C handler")?;
534
535 if let Some(intv) = opts.monitor.or(opts.stats) {
536 let shutdown_copy = shutdown.clone();
537 let jh = std::thread::spawn(move || {
538 match stats::monitor(Duration::from_secs_f64(intv), shutdown_copy) {
539 Ok(_) => {
540 debug!("stats monitor thread finished successfully")
541 }
542 Err(error_object) => {
543 warn!(
544 "stats monitor thread finished because of an error {}",
545 error_object
546 )
547 }
548 }
549 });
550 if opts.monitor.is_some() {
551 let _ = jh.join();
552 return Ok(());
553 }
554 }
555
556 let mut open_object = MaybeUninit::uninit();
557 loop {
558 let mut sched = Scheduler::init(&opts, &mut open_object)?;
559 if !sched.run(shutdown.clone())?.should_restart() {
560 break;
561 }
562 }
563
564 Ok(())
565}