Skip to main content

scx_beerland/
main.rs

1// SPDX-License-Identifier: GPL-2.0
2//
3// Copyright (c) 2025 Andrea Righi <arighi@nvidia.com>
4
5// This software may be used and distributed according to the terms of the
6// GNU General Public License version 2.
7
8mod bpf_skel;
9pub use bpf_skel::*;
10pub mod bpf_intf;
11pub use bpf_intf::*;
12
13mod stats;
14use std::collections::HashSet;
15use std::ffi::{c_int, c_ulong};
16use std::fs::File;
17use std::io::{BufRead, BufReader};
18use std::mem::MaybeUninit;
19use std::sync::atomic::AtomicBool;
20use std::sync::atomic::Ordering;
21use std::sync::Arc;
22use std::time::{Duration, Instant};
23
24use anyhow::bail;
25use anyhow::Context;
26use anyhow::Result;
27use clap::Parser;
28use crossbeam::channel::RecvTimeoutError;
29use libbpf_rs::OpenObject;
30use libbpf_rs::ProgramInput;
31use log::{debug, info, warn};
32use scx_stats::prelude::*;
33use scx_utils::build_id;
34use scx_utils::compat;
35use scx_utils::get_primary_cpus;
36use scx_utils::libbpf_clap_opts::LibbpfOpts;
37use scx_utils::scx_ops_attach;
38use scx_utils::scx_ops_load;
39use scx_utils::scx_ops_open;
40use scx_utils::try_set_rlimit_infinity;
41use scx_utils::uei_exited;
42use scx_utils::uei_report;
43use scx_utils::Powermode;
44use scx_utils::Topology;
45use scx_utils::UserExitInfo;
46use scx_utils::NR_CPU_IDS;
47use stats::Metrics;
48
49const SCHEDULER_NAME: &str = "scx_beerland";
50
51#[derive(Debug, clap::Parser)]
52#[command(
53    name = "scx_beerland",
54    version,
55    disable_version_flag = true,
56    about = "Scheduler designed to prioritize locality and scalability."
57)]
58struct Opts {
59    /// Exit debug dump buffer length. 0 indicates default.
60    #[clap(long, default_value = "0")]
61    exit_dump_len: u32,
62
63    /// Maximum scheduling slice duration in microseconds.
64    #[clap(short = 's', long, default_value = "1000")]
65    slice_us: u64,
66
67    /// Maximum time slice lag in microseconds.
68    ///
69    /// A positive value can help to enhance the responsiveness of interactive tasks, but it can
70    /// also make performance more "spikey".
71    #[clap(short = 'l', long, default_value = "40000")]
72    slice_us_lag: u64,
73
74    /// CPU busy threshold.
75    ///
76    /// Specifies the CPU utilization percentage (0-100%) at which the scheduler considers the
77    /// system to be busy.
78    #[clap(short = 'c', long, default_value = "75")]
79    cpu_busy_thresh: u64,
80
81    /// Polling time (ms) to refresh the CPU utilization.
82    ///
83    /// This interval determines how often the scheduler refreshes the CPU utilization that is
84    /// compared with the CPU busy threshold (option -c) to decide if the system is busy or not.
85    ///
86    /// Value is clamped to the range [10 .. 1000].
87    ///
88    /// 0 = disabled.
89    #[clap(short = 'p', long, default_value = "250")]
90    polling_ms: u64,
91
92    /// Specifies a list of CPUs to prioritize.
93    ///
94    /// Accepts a comma-separated list of CPUs or ranges (i.e., 0-3,12-15) or the following special
95    /// keywords:
96    ///
97    /// "turbo" = automatically detect and prioritize the CPUs with the highest max frequency,
98    /// "performance" = automatically detect and prioritize the fastest CPUs,
99    /// "powersave" = automatically detect and prioritize the slowest CPUs,
100    /// "all" = all CPUs assigned to the primary domain.
101    ///
102    /// By default "all" CPUs are used.
103    #[clap(short = 'm', long)]
104    primary_domain: Option<String>,
105
106    /// Enable preferred idle CPU scanning.
107    ///
108    /// With this option enabled, the scheduler will prioritize assigning tasks to higher-ranked
109    /// cores before considering lower-ranked ones.
110    #[clap(short = 'P', long, action = clap::ArgAction::SetTrue)]
111    preferred_idle_scan: bool,
112
113    /// Enable stats monitoring with the specified interval.
114    #[clap(long)]
115    stats: Option<f64>,
116
117    /// Run in stats monitoring mode with the specified interval. Scheduler
118    /// is not launched.
119    #[clap(long)]
120    monitor: Option<f64>,
121
122    /// Enable verbose output, including libbpf details.
123    #[clap(short = 'v', long, action = clap::ArgAction::SetTrue)]
124    verbose: bool,
125
126    /// Print scheduler version and exit.
127    #[clap(short = 'V', long, action = clap::ArgAction::SetTrue)]
128    version: bool,
129
130    /// Show descriptions for statistics.
131    #[clap(long)]
132    help_stats: bool,
133
134    #[clap(flatten, next_help_heading = "Libbpf Options")]
135    pub libbpf: LibbpfOpts,
136}
137
138pub fn parse_cpu_list(optarg: &str) -> Result<Vec<usize>, String> {
139    let mut cpus = Vec::new();
140    let mut seen = HashSet::new();
141
142    // Handle special keywords
143    if let Some(mode) = match optarg {
144        "powersave" => Some(Powermode::Powersave),
145        "performance" => Some(Powermode::Performance),
146        "turbo" => Some(Powermode::Turbo),
147        "all" => Some(Powermode::Any),
148        _ => None,
149    } {
150        return get_primary_cpus(mode).map_err(|e| e.to_string());
151    }
152
153    // Validate input characters
154    if optarg
155        .chars()
156        .any(|c| !c.is_ascii_digit() && c != '-' && c != ',' && !c.is_whitespace())
157    {
158        return Err("Invalid character in CPU list".to_string());
159    }
160
161    // Replace all whitespace with tab (or just trim later)
162    let cleaned = optarg.replace(' ', "\t");
163
164    for token in cleaned.split(',') {
165        let token = token.trim_matches(|c: char| c.is_whitespace());
166
167        if token.is_empty() {
168            continue;
169        }
170
171        if let Some((start_str, end_str)) = token.split_once('-') {
172            let start = start_str
173                .trim()
174                .parse::<usize>()
175                .map_err(|_| "Invalid range start")?;
176            let end = end_str
177                .trim()
178                .parse::<usize>()
179                .map_err(|_| "Invalid range end")?;
180
181            if start > end {
182                return Err(format!("Invalid CPU range: {}-{}", start, end));
183            }
184
185            for i in start..=end {
186                if cpus.len() >= *NR_CPU_IDS {
187                    return Err(format!("Too many CPUs specified (max {})", *NR_CPU_IDS));
188                }
189                if seen.insert(i) {
190                    cpus.push(i);
191                }
192            }
193        } else {
194            let cpu = token
195                .parse::<usize>()
196                .map_err(|_| format!("Invalid CPU: {}", token))?;
197            if cpus.len() >= *NR_CPU_IDS {
198                return Err(format!("Too many CPUs specified (max {})", *NR_CPU_IDS));
199            }
200            if seen.insert(cpu) {
201                cpus.push(cpu);
202            }
203        }
204    }
205
206    Ok(cpus)
207}
208
209#[derive(Debug, Clone, Copy)]
210struct CpuTimes {
211    user: u64,
212    nice: u64,
213    total: u64,
214}
215
216struct Scheduler<'a> {
217    skel: BpfSkel<'a>,
218    opts: &'a Opts,
219    struct_ops: Option<libbpf_rs::Link>,
220    stats_server: StatsServer<(), Metrics>,
221}
222
223impl<'a> Scheduler<'a> {
224    fn init(opts: &'a Opts, open_object: &'a mut MaybeUninit<OpenObject>) -> Result<Self> {
225        try_set_rlimit_infinity();
226
227        // Initialize CPU topology.
228        let topo = Topology::new().unwrap();
229
230        // Check host topology to determine if we need to enable SMT capabilities.
231        let smt_enabled = topo.smt_enabled;
232
233        info!(
234            "{} {} {}",
235            SCHEDULER_NAME,
236            build_id::full_version(env!("CARGO_PKG_VERSION")),
237            if smt_enabled { "SMT on" } else { "SMT off" }
238        );
239
240        // Print command line.
241        info!(
242            "scheduler options: {}",
243            std::env::args().collect::<Vec<_>>().join(" ")
244        );
245
246        // Initialize BPF connector.
247        let mut skel_builder = BpfSkelBuilder::default();
248        skel_builder.obj_builder.debug(opts.verbose);
249        let open_opts = opts.libbpf.clone().into_bpf_open_opts();
250        let mut skel = scx_ops_open!(skel_builder, open_object, beerland_ops, open_opts)?;
251
252        skel.struct_ops.beerland_ops_mut().exit_dump_len = opts.exit_dump_len;
253
254        // Override default BPF scheduling parameters.
255        let rodata = skel.maps.rodata_data.as_mut().unwrap();
256        rodata.slice_ns = opts.slice_us * 1000;
257        rodata.slice_lag = opts.slice_us_lag * 1000;
258        rodata.smt_enabled = smt_enabled;
259
260        // Normalize CPU busy threshold in the range [0 .. 1024].
261        rodata.busy_threshold = opts.cpu_busy_thresh * 1024 / 100;
262
263        // Define the primary scheduling domain.
264        let primary_cpus = if let Some(ref domain) = opts.primary_domain {
265            match parse_cpu_list(domain) {
266                Ok(cpus) => cpus,
267                Err(e) => bail!("Error parsing primary domain: {}", e),
268            }
269        } else {
270            (0..*NR_CPU_IDS).collect()
271        };
272        if primary_cpus.len() < *NR_CPU_IDS {
273            info!("Primary CPUs: {:?}", primary_cpus);
274            rodata.primary_all = false;
275        } else {
276            rodata.primary_all = true;
277        }
278
279        // Generate the list of available CPUs sorted by capacity in descending order.
280        let mut cpus: Vec<_> = topo.all_cpus.values().collect();
281        cpus.sort_by_key(|cpu| std::cmp::Reverse(cpu.cpu_capacity));
282        for (i, cpu) in cpus.iter().enumerate() {
283            rodata.cpu_capacity[cpu.id] = cpu.cpu_capacity as c_ulong;
284            rodata.preferred_cpus[i] = cpu.id as u64;
285        }
286        if opts.preferred_idle_scan {
287            info!(
288                "Preferred CPUs: {:?}",
289                &rodata.preferred_cpus[0..cpus.len()]
290            );
291        }
292        rodata.preferred_idle_scan = opts.preferred_idle_scan;
293
294        // Set scheduler flags.
295        skel.struct_ops.beerland_ops_mut().flags = *compat::SCX_OPS_ENQ_EXITING
296            | *compat::SCX_OPS_ENQ_LAST
297            | *compat::SCX_OPS_ENQ_MIGRATION_DISABLED
298            | *compat::SCX_OPS_ALLOW_QUEUED_WAKEUP;
299        info!(
300            "scheduler flags: {:#x}",
301            skel.struct_ops.beerland_ops_mut().flags
302        );
303
304        // Load the BPF program for validation.
305        let mut skel = scx_ops_load!(skel, beerland_ops, uei)?;
306
307        // Initialize SMT domains.
308        if smt_enabled {
309            Self::init_smt_domains(&mut skel, &topo)?;
310        }
311
312        // Enable primary scheduling domain, if defined.
313        if primary_cpus.len() < *NR_CPU_IDS {
314            for cpu in primary_cpus {
315                if let Err(err) = Self::enable_primary_cpu(&mut skel, cpu as i32) {
316                    bail!("failed to add CPU {} to primary domain: error {}", cpu, err);
317                }
318            }
319        }
320
321        // Attach the scheduler.
322        let struct_ops = Some(scx_ops_attach!(skel, beerland_ops)?);
323        let stats_server = StatsServer::new(stats::server_data()).launch()?;
324
325        Ok(Self {
326            skel,
327            opts,
328            struct_ops,
329            stats_server,
330        })
331    }
332
333    fn enable_sibling_cpu(
334        skel: &mut BpfSkel<'_>,
335        cpu: usize,
336        sibling_cpu: usize,
337    ) -> Result<(), u32> {
338        let prog = &mut skel.progs.enable_sibling_cpu;
339        let mut args = domain_arg {
340            cpu_id: cpu as c_int,
341            sibling_cpu_id: sibling_cpu as c_int,
342        };
343        let input = ProgramInput {
344            context_in: Some(unsafe {
345                std::slice::from_raw_parts_mut(
346                    &mut args as *mut _ as *mut u8,
347                    std::mem::size_of_val(&args),
348                )
349            }),
350            ..Default::default()
351        };
352        let out = prog.test_run(input).unwrap();
353        if out.return_value != 0 {
354            return Err(out.return_value);
355        }
356
357        Ok(())
358    }
359
360    fn enable_primary_cpu(skel: &mut BpfSkel<'_>, cpu: i32) -> Result<(), u32> {
361        let prog = &mut skel.progs.enable_primary_cpu;
362        let mut args = cpu_arg {
363            cpu_id: cpu as c_int,
364        };
365        let input = ProgramInput {
366            context_in: Some(unsafe {
367                std::slice::from_raw_parts_mut(
368                    &mut args as *mut _ as *mut u8,
369                    std::mem::size_of_val(&args),
370                )
371            }),
372            ..Default::default()
373        };
374        let out = prog.test_run(input).unwrap();
375        if out.return_value != 0 {
376            return Err(out.return_value);
377        }
378
379        Ok(())
380    }
381
382    fn init_smt_domains(skel: &mut BpfSkel<'_>, topo: &Topology) -> Result<(), std::io::Error> {
383        let smt_siblings = topo.sibling_cpus();
384
385        info!("SMT sibling CPUs: {:?}", smt_siblings);
386        for (cpu, sibling_cpu) in smt_siblings.iter().enumerate() {
387            Self::enable_sibling_cpu(skel, cpu, *sibling_cpu as usize).unwrap();
388        }
389
390        Ok(())
391    }
392
393    fn get_metrics(&mut self) -> Metrics {
394        let bss_data = self.skel.maps.bss_data.as_ref().unwrap();
395        Metrics {
396            nr_local_dispatch: bss_data.nr_local_dispatch,
397            nr_remote_dispatch: bss_data.nr_remote_dispatch,
398            nr_keep_running: bss_data.nr_keep_running,
399        }
400    }
401
402    pub fn exited(&mut self) -> bool {
403        uei_exited!(&self.skel, uei)
404    }
405
406    fn compute_user_cpu_pct(prev: &CpuTimes, curr: &CpuTimes) -> Option<u64> {
407        // Evaluate total user CPU time as user + nice.
408        let user_diff = (curr.user + curr.nice).saturating_sub(prev.user + prev.nice);
409        let total_diff = curr.total.saturating_sub(prev.total);
410
411        if total_diff > 0 {
412            let user_ratio = user_diff as f64 / total_diff as f64;
413            Some((user_ratio * 1024.0).round() as u64)
414        } else {
415            None
416        }
417    }
418
419    fn read_cpu_times() -> Option<CpuTimes> {
420        let file = File::open("/proc/stat").ok()?;
421        let reader = BufReader::new(file);
422
423        for line in reader.lines() {
424            let line = line.ok()?;
425            if line.starts_with("cpu ") {
426                let fields: Vec<&str> = line.split_whitespace().collect();
427                if fields.len() < 5 {
428                    return None;
429                }
430
431                let user: u64 = fields[1].parse().ok()?;
432                let nice: u64 = fields[2].parse().ok()?;
433
434                // Sum the first 8 fields as total time, including idle, system, etc.
435                let total: u64 = fields
436                    .iter()
437                    .skip(1)
438                    .take(8)
439                    .filter_map(|v| v.parse::<u64>().ok())
440                    .sum();
441
442                return Some(CpuTimes { user, nice, total });
443            }
444        }
445
446        None
447    }
448
449    fn run(&mut self, shutdown: Arc<AtomicBool>) -> Result<UserExitInfo> {
450        let (res_ch, req_ch) = self.stats_server.channels();
451
452        // Periodically evaluate user CPU utilization from user-space and update a global variable
453        // in BPF.
454        //
455        // The BPF scheduler can use this value to determine when the system is busy or idle.
456        let polling_time = Duration::from_millis(self.opts.polling_ms).min(Duration::from_secs(1));
457        let mut prev_cputime = Self::read_cpu_times().expect("Failed to read initial CPU stats");
458        let mut last_update = Instant::now();
459
460        while !shutdown.load(Ordering::Relaxed) && !self.exited() {
461            // Update CPU utilization.
462            if !polling_time.is_zero() && last_update.elapsed() >= polling_time {
463                if let Some(curr_cputime) = Self::read_cpu_times() {
464                    Self::compute_user_cpu_pct(&prev_cputime, &curr_cputime)
465                        .map(|util| self.skel.maps.bss_data.as_mut().unwrap().cpu_util = util);
466                    prev_cputime = curr_cputime;
467                }
468                last_update = Instant::now();
469            }
470
471            // Update statistics and check for exit condition.
472            let timeout = if polling_time.is_zero() {
473                Duration::from_secs(1)
474            } else {
475                polling_time
476            };
477            match req_ch.recv_timeout(timeout) {
478                Ok(()) => res_ch.send(self.get_metrics())?,
479                Err(RecvTimeoutError::Timeout) => {}
480                Err(e) => Err(e)?,
481            }
482        }
483
484        let _ = self.struct_ops.take();
485        uei_report!(&self.skel, uei)
486    }
487}
488
489impl Drop for Scheduler<'_> {
490    fn drop(&mut self) {
491        info!("Unregister {SCHEDULER_NAME} scheduler");
492    }
493}
494
495fn main() -> Result<()> {
496    let opts = Opts::parse();
497
498    if opts.version {
499        println!(
500            "{} {}",
501            SCHEDULER_NAME,
502            build_id::full_version(env!("CARGO_PKG_VERSION"))
503        );
504        return Ok(());
505    }
506
507    if opts.help_stats {
508        stats::server_data().describe_meta(&mut std::io::stdout(), None)?;
509        return Ok(());
510    }
511
512    let loglevel = simplelog::LevelFilter::Info;
513
514    let mut lcfg = simplelog::ConfigBuilder::new();
515    lcfg.set_time_offset_to_local()
516        .expect("Failed to set local time offset")
517        .set_time_level(simplelog::LevelFilter::Error)
518        .set_location_level(simplelog::LevelFilter::Off)
519        .set_target_level(simplelog::LevelFilter::Off)
520        .set_thread_level(simplelog::LevelFilter::Off);
521    simplelog::TermLogger::init(
522        loglevel,
523        lcfg.build(),
524        simplelog::TerminalMode::Stderr,
525        simplelog::ColorChoice::Auto,
526    )?;
527
528    let shutdown = Arc::new(AtomicBool::new(false));
529    let shutdown_clone = shutdown.clone();
530    ctrlc::set_handler(move || {
531        shutdown_clone.store(true, Ordering::Relaxed);
532    })
533    .context("Error setting Ctrl-C handler")?;
534
535    if let Some(intv) = opts.monitor.or(opts.stats) {
536        let shutdown_copy = shutdown.clone();
537        let jh = std::thread::spawn(move || {
538            match stats::monitor(Duration::from_secs_f64(intv), shutdown_copy) {
539                Ok(_) => {
540                    debug!("stats monitor thread finished successfully")
541                }
542                Err(error_object) => {
543                    warn!(
544                        "stats monitor thread finished because of an error {}",
545                        error_object
546                    )
547                }
548            }
549        });
550        if opts.monitor.is_some() {
551            let _ = jh.join();
552            return Ok(());
553        }
554    }
555
556    let mut open_object = MaybeUninit::uninit();
557    loop {
558        let mut sched = Scheduler::init(&opts, &mut open_object)?;
559        if !sched.run(shutdown.clone())?.should_restart() {
560            break;
561        }
562    }
563
564    Ok(())
565}