Skip to main content

scx_beerland/
main.rs

1// SPDX-License-Identifier: GPL-2.0
2//
3// Copyright (c) 2025 Andrea Righi <arighi@nvidia.com>
4
5// This software may be used and distributed according to the terms of the
6// GNU General Public License version 2.
7
8mod bpf_skel;
9pub use bpf_skel::*;
10pub mod bpf_intf;
11pub use bpf_intf::*;
12
13mod stats;
14use std::collections::HashSet;
15use std::ffi::{c_int, c_ulong};
16use std::fs::File;
17use std::io::{BufRead, BufReader};
18use std::mem::MaybeUninit;
19use std::sync::atomic::AtomicBool;
20use std::sync::atomic::Ordering;
21use std::sync::Arc;
22use std::time::{Duration, Instant};
23
24use anyhow::bail;
25use anyhow::Context;
26use anyhow::Result;
27use clap::Parser;
28use crossbeam::channel::RecvTimeoutError;
29use libbpf_rs::OpenObject;
30use libbpf_rs::ProgramInput;
31use log::{debug, info, warn};
32use scx_stats::prelude::*;
33use scx_utils::build_id;
34use scx_utils::compat;
35use scx_utils::libbpf_clap_opts::LibbpfOpts;
36use scx_utils::scx_ops_attach;
37use scx_utils::scx_ops_load;
38use scx_utils::scx_ops_open;
39use scx_utils::try_set_rlimit_infinity;
40use scx_utils::uei_exited;
41use scx_utils::uei_report;
42use scx_utils::CoreType;
43use scx_utils::Topology;
44use scx_utils::UserExitInfo;
45use scx_utils::NR_CPU_IDS;
46use stats::Metrics;
47
48const SCHEDULER_NAME: &str = "scx_beerland";
49
50#[derive(Debug, clap::Parser)]
51#[command(
52    name = "scx_beerland",
53    version,
54    disable_version_flag = true,
55    about = "Scheduler designed to prioritize locality and scalability."
56)]
57struct Opts {
58    /// Exit debug dump buffer length. 0 indicates default.
59    #[clap(long, default_value = "0")]
60    exit_dump_len: u32,
61
62    /// Maximum scheduling slice duration in microseconds.
63    #[clap(short = 's', long, default_value = "1000")]
64    slice_us: u64,
65
66    /// Maximum time slice lag in microseconds.
67    ///
68    /// A positive value can help to enhance the responsiveness of interactive tasks, but it can
69    /// also make performance more "spikey".
70    #[clap(short = 'l', long, default_value = "40000")]
71    slice_us_lag: u64,
72
73    /// CPU busy threshold.
74    ///
75    /// Specifies the CPU utilization percentage (0-100%) at which the scheduler considers the
76    /// system to be busy.
77    #[clap(short = 'c', long, default_value = "75")]
78    cpu_busy_thresh: u64,
79
80    /// Polling time (ms) to refresh the CPU utilization.
81    ///
82    /// This interval determines how often the scheduler refreshes the CPU utilization that is
83    /// compared with the CPU busy threshold (option -c) to decide if the system is busy or not.
84    ///
85    /// Value is clamped to the range [10 .. 1000].
86    ///
87    /// 0 = disabled.
88    #[clap(short = 'p', long, default_value = "250")]
89    polling_ms: u64,
90
91    /// Specifies a list of CPUs to prioritize.
92    ///
93    /// Accepts a comma-separated list of CPUs or ranges (i.e., 0-3,12-15) or the following special
94    /// keywords:
95    ///
96    /// "turbo" = automatically detect and prioritize the CPUs with the highest max frequency,
97    /// "performance" = automatically detect and prioritize the fastest CPUs,
98    /// "powersave" = automatically detect and prioritize the slowest CPUs,
99    /// "all" = all CPUs assigned to the primary domain.
100    ///
101    /// By default "all" CPUs are used.
102    #[clap(short = 'm', long)]
103    primary_domain: Option<String>,
104
105    /// Enable preferred idle CPU scanning.
106    ///
107    /// With this option enabled, the scheduler will prioritize assigning tasks to higher-ranked
108    /// cores before considering lower-ranked ones.
109    #[clap(short = 'P', long, action = clap::ArgAction::SetTrue)]
110    preferred_idle_scan: bool,
111
112    /// Enable stats monitoring with the specified interval.
113    #[clap(long)]
114    stats: Option<f64>,
115
116    /// Run in stats monitoring mode with the specified interval. Scheduler
117    /// is not launched.
118    #[clap(long)]
119    monitor: Option<f64>,
120
121    /// Enable verbose output, including libbpf details.
122    #[clap(short = 'v', long, action = clap::ArgAction::SetTrue)]
123    verbose: bool,
124
125    /// Print scheduler version and exit.
126    #[clap(short = 'V', long, action = clap::ArgAction::SetTrue)]
127    version: bool,
128
129    /// Show descriptions for statistics.
130    #[clap(long)]
131    help_stats: bool,
132
133    #[clap(flatten, next_help_heading = "Libbpf Options")]
134    pub libbpf: LibbpfOpts,
135}
136
137#[derive(PartialEq)]
138enum Powermode {
139    Turbo,
140    Performance,
141    Powersave,
142    Any,
143}
144
145/*
146 * TODO: this code is shared between scx_bpfland, scx_flash and scx_cosmos; consder to move it to
147 * scx_utils.
148 */
149fn get_primary_cpus(mode: Powermode) -> std::io::Result<Vec<usize>> {
150    let cpus: Vec<usize> = Topology::new()
151        .unwrap()
152        .all_cores
153        .values()
154        .flat_map(|core| &core.cpus)
155        .filter_map(|(cpu_id, cpu)| match (&mode, &cpu.core_type) {
156            // Turbo mode: prioritize CPUs with the highest max frequency
157            (Powermode::Turbo, CoreType::Big { turbo: true }) |
158            // Performance mode: add all the Big CPUs (either Turbo or non-Turbo)
159            (Powermode::Performance, CoreType::Big { .. }) |
160            // Powersave mode: add all the Little CPUs
161            (Powermode::Powersave, CoreType::Little) => Some(*cpu_id),
162            (Powermode::Any, ..) => Some(*cpu_id),
163            _ => None,
164        })
165        .collect();
166
167    Ok(cpus)
168}
169
170pub fn parse_cpu_list(optarg: &str) -> Result<Vec<usize>, String> {
171    let mut cpus = Vec::new();
172    let mut seen = HashSet::new();
173
174    // Handle special keywords
175    if let Some(mode) = match optarg {
176        "powersave" => Some(Powermode::Powersave),
177        "performance" => Some(Powermode::Performance),
178        "turbo" => Some(Powermode::Turbo),
179        "all" => Some(Powermode::Any),
180        _ => None,
181    } {
182        return get_primary_cpus(mode).map_err(|e| e.to_string());
183    }
184
185    // Validate input characters
186    if optarg
187        .chars()
188        .any(|c| !c.is_ascii_digit() && c != '-' && c != ',' && !c.is_whitespace())
189    {
190        return Err("Invalid character in CPU list".to_string());
191    }
192
193    // Replace all whitespace with tab (or just trim later)
194    let cleaned = optarg.replace(' ', "\t");
195
196    for token in cleaned.split(',') {
197        let token = token.trim_matches(|c: char| c.is_whitespace());
198
199        if token.is_empty() {
200            continue;
201        }
202
203        if let Some((start_str, end_str)) = token.split_once('-') {
204            let start = start_str
205                .trim()
206                .parse::<usize>()
207                .map_err(|_| "Invalid range start")?;
208            let end = end_str
209                .trim()
210                .parse::<usize>()
211                .map_err(|_| "Invalid range end")?;
212
213            if start > end {
214                return Err(format!("Invalid CPU range: {}-{}", start, end));
215            }
216
217            for i in start..=end {
218                if cpus.len() >= *NR_CPU_IDS {
219                    return Err(format!("Too many CPUs specified (max {})", *NR_CPU_IDS));
220                }
221                if seen.insert(i) {
222                    cpus.push(i);
223                }
224            }
225        } else {
226            let cpu = token
227                .parse::<usize>()
228                .map_err(|_| format!("Invalid CPU: {}", token))?;
229            if cpus.len() >= *NR_CPU_IDS {
230                return Err(format!("Too many CPUs specified (max {})", *NR_CPU_IDS));
231            }
232            if seen.insert(cpu) {
233                cpus.push(cpu);
234            }
235        }
236    }
237
238    Ok(cpus)
239}
240
241#[derive(Debug, Clone, Copy)]
242struct CpuTimes {
243    user: u64,
244    nice: u64,
245    total: u64,
246}
247
248struct Scheduler<'a> {
249    skel: BpfSkel<'a>,
250    opts: &'a Opts,
251    struct_ops: Option<libbpf_rs::Link>,
252    stats_server: StatsServer<(), Metrics>,
253}
254
255impl<'a> Scheduler<'a> {
256    fn init(opts: &'a Opts, open_object: &'a mut MaybeUninit<OpenObject>) -> Result<Self> {
257        try_set_rlimit_infinity();
258
259        // Initialize CPU topology.
260        let topo = Topology::new().unwrap();
261
262        // Check host topology to determine if we need to enable SMT capabilities.
263        let smt_enabled = topo.smt_enabled;
264
265        info!(
266            "{} {} {}",
267            SCHEDULER_NAME,
268            build_id::full_version(env!("CARGO_PKG_VERSION")),
269            if smt_enabled { "SMT on" } else { "SMT off" }
270        );
271
272        // Print command line.
273        info!(
274            "scheduler options: {}",
275            std::env::args().collect::<Vec<_>>().join(" ")
276        );
277
278        // Initialize BPF connector.
279        let mut skel_builder = BpfSkelBuilder::default();
280        skel_builder.obj_builder.debug(opts.verbose);
281        let open_opts = opts.libbpf.clone().into_bpf_open_opts();
282        let mut skel = scx_ops_open!(skel_builder, open_object, beerland_ops, open_opts)?;
283
284        skel.struct_ops.beerland_ops_mut().exit_dump_len = opts.exit_dump_len;
285
286        // Override default BPF scheduling parameters.
287        let rodata = skel.maps.rodata_data.as_mut().unwrap();
288        rodata.slice_ns = opts.slice_us * 1000;
289        rodata.slice_lag = opts.slice_us_lag * 1000;
290        rodata.smt_enabled = smt_enabled;
291
292        // Normalize CPU busy threshold in the range [0 .. 1024].
293        rodata.busy_threshold = opts.cpu_busy_thresh * 1024 / 100;
294
295        // Define the primary scheduling domain.
296        let primary_cpus = if let Some(ref domain) = opts.primary_domain {
297            match parse_cpu_list(domain) {
298                Ok(cpus) => cpus,
299                Err(e) => bail!("Error parsing primary domain: {}", e),
300            }
301        } else {
302            (0..*NR_CPU_IDS).collect()
303        };
304        if primary_cpus.len() < *NR_CPU_IDS {
305            info!("Primary CPUs: {:?}", primary_cpus);
306            rodata.primary_all = false;
307        } else {
308            rodata.primary_all = true;
309        }
310
311        // Generate the list of available CPUs sorted by capacity in descending order.
312        let mut cpus: Vec<_> = topo.all_cpus.values().collect();
313        cpus.sort_by_key(|cpu| std::cmp::Reverse(cpu.cpu_capacity));
314        for (i, cpu) in cpus.iter().enumerate() {
315            rodata.cpu_capacity[cpu.id] = cpu.cpu_capacity as c_ulong;
316            rodata.preferred_cpus[i] = cpu.id as u64;
317        }
318        if opts.preferred_idle_scan {
319            info!(
320                "Preferred CPUs: {:?}",
321                &rodata.preferred_cpus[0..cpus.len()]
322            );
323        }
324        rodata.preferred_idle_scan = opts.preferred_idle_scan;
325
326        // Set scheduler flags.
327        skel.struct_ops.beerland_ops_mut().flags = *compat::SCX_OPS_ENQ_EXITING
328            | *compat::SCX_OPS_ENQ_LAST
329            | *compat::SCX_OPS_ENQ_MIGRATION_DISABLED
330            | *compat::SCX_OPS_ALLOW_QUEUED_WAKEUP;
331        info!(
332            "scheduler flags: {:#x}",
333            skel.struct_ops.beerland_ops_mut().flags
334        );
335
336        // Load the BPF program for validation.
337        let mut skel = scx_ops_load!(skel, beerland_ops, uei)?;
338
339        // Initialize SMT domains.
340        if smt_enabled {
341            Self::init_smt_domains(&mut skel, &topo)?;
342        }
343
344        // Enable primary scheduling domain, if defined.
345        if primary_cpus.len() < *NR_CPU_IDS {
346            for cpu in primary_cpus {
347                if let Err(err) = Self::enable_primary_cpu(&mut skel, cpu as i32) {
348                    bail!("failed to add CPU {} to primary domain: error {}", cpu, err);
349                }
350            }
351        }
352
353        // Attach the scheduler.
354        let struct_ops = Some(scx_ops_attach!(skel, beerland_ops)?);
355        let stats_server = StatsServer::new(stats::server_data()).launch()?;
356
357        Ok(Self {
358            skel,
359            opts,
360            struct_ops,
361            stats_server,
362        })
363    }
364
365    fn enable_sibling_cpu(
366        skel: &mut BpfSkel<'_>,
367        cpu: usize,
368        sibling_cpu: usize,
369    ) -> Result<(), u32> {
370        let prog = &mut skel.progs.enable_sibling_cpu;
371        let mut args = domain_arg {
372            cpu_id: cpu as c_int,
373            sibling_cpu_id: sibling_cpu as c_int,
374        };
375        let input = ProgramInput {
376            context_in: Some(unsafe {
377                std::slice::from_raw_parts_mut(
378                    &mut args as *mut _ as *mut u8,
379                    std::mem::size_of_val(&args),
380                )
381            }),
382            ..Default::default()
383        };
384        let out = prog.test_run(input).unwrap();
385        if out.return_value != 0 {
386            return Err(out.return_value);
387        }
388
389        Ok(())
390    }
391
392    fn enable_primary_cpu(skel: &mut BpfSkel<'_>, cpu: i32) -> Result<(), u32> {
393        let prog = &mut skel.progs.enable_primary_cpu;
394        let mut args = cpu_arg {
395            cpu_id: cpu as c_int,
396        };
397        let input = ProgramInput {
398            context_in: Some(unsafe {
399                std::slice::from_raw_parts_mut(
400                    &mut args as *mut _ as *mut u8,
401                    std::mem::size_of_val(&args),
402                )
403            }),
404            ..Default::default()
405        };
406        let out = prog.test_run(input).unwrap();
407        if out.return_value != 0 {
408            return Err(out.return_value);
409        }
410
411        Ok(())
412    }
413
414    fn init_smt_domains(skel: &mut BpfSkel<'_>, topo: &Topology) -> Result<(), std::io::Error> {
415        let smt_siblings = topo.sibling_cpus();
416
417        info!("SMT sibling CPUs: {:?}", smt_siblings);
418        for (cpu, sibling_cpu) in smt_siblings.iter().enumerate() {
419            Self::enable_sibling_cpu(skel, cpu, *sibling_cpu as usize).unwrap();
420        }
421
422        Ok(())
423    }
424
425    fn get_metrics(&mut self) -> Metrics {
426        let bss_data = self.skel.maps.bss_data.as_ref().unwrap();
427        Metrics {
428            nr_local_dispatch: bss_data.nr_local_dispatch,
429            nr_remote_dispatch: bss_data.nr_remote_dispatch,
430            nr_keep_running: bss_data.nr_keep_running,
431        }
432    }
433
434    pub fn exited(&mut self) -> bool {
435        uei_exited!(&self.skel, uei)
436    }
437
438    fn compute_user_cpu_pct(prev: &CpuTimes, curr: &CpuTimes) -> Option<u64> {
439        // Evaluate total user CPU time as user + nice.
440        let user_diff = (curr.user + curr.nice).saturating_sub(prev.user + prev.nice);
441        let total_diff = curr.total.saturating_sub(prev.total);
442
443        if total_diff > 0 {
444            let user_ratio = user_diff as f64 / total_diff as f64;
445            Some((user_ratio * 1024.0).round() as u64)
446        } else {
447            None
448        }
449    }
450
451    fn read_cpu_times() -> Option<CpuTimes> {
452        let file = File::open("/proc/stat").ok()?;
453        let reader = BufReader::new(file);
454
455        for line in reader.lines() {
456            let line = line.ok()?;
457            if line.starts_with("cpu ") {
458                let fields: Vec<&str> = line.split_whitespace().collect();
459                if fields.len() < 5 {
460                    return None;
461                }
462
463                let user: u64 = fields[1].parse().ok()?;
464                let nice: u64 = fields[2].parse().ok()?;
465
466                // Sum the first 8 fields as total time, including idle, system, etc.
467                let total: u64 = fields
468                    .iter()
469                    .skip(1)
470                    .take(8)
471                    .filter_map(|v| v.parse::<u64>().ok())
472                    .sum();
473
474                return Some(CpuTimes { user, nice, total });
475            }
476        }
477
478        None
479    }
480
481    fn run(&mut self, shutdown: Arc<AtomicBool>) -> Result<UserExitInfo> {
482        let (res_ch, req_ch) = self.stats_server.channels();
483
484        // Periodically evaluate user CPU utilization from user-space and update a global variable
485        // in BPF.
486        //
487        // The BPF scheduler can use this value to determine when the system is busy or idle.
488        let polling_time = Duration::from_millis(self.opts.polling_ms).min(Duration::from_secs(1));
489        let mut prev_cputime = Self::read_cpu_times().expect("Failed to read initial CPU stats");
490        let mut last_update = Instant::now();
491
492        while !shutdown.load(Ordering::Relaxed) && !self.exited() {
493            // Update CPU utilization.
494            if !polling_time.is_zero() && last_update.elapsed() >= polling_time {
495                if let Some(curr_cputime) = Self::read_cpu_times() {
496                    Self::compute_user_cpu_pct(&prev_cputime, &curr_cputime)
497                        .map(|util| self.skel.maps.bss_data.as_mut().unwrap().cpu_util = util);
498                    prev_cputime = curr_cputime;
499                }
500                last_update = Instant::now();
501            }
502
503            // Update statistics and check for exit condition.
504            let timeout = if polling_time.is_zero() {
505                Duration::from_secs(1)
506            } else {
507                polling_time
508            };
509            match req_ch.recv_timeout(timeout) {
510                Ok(()) => res_ch.send(self.get_metrics())?,
511                Err(RecvTimeoutError::Timeout) => {}
512                Err(e) => Err(e)?,
513            }
514        }
515
516        let _ = self.struct_ops.take();
517        uei_report!(&self.skel, uei)
518    }
519}
520
521impl Drop for Scheduler<'_> {
522    fn drop(&mut self) {
523        info!("Unregister {SCHEDULER_NAME} scheduler");
524    }
525}
526
527fn main() -> Result<()> {
528    let opts = Opts::parse();
529
530    if opts.version {
531        println!(
532            "{} {}",
533            SCHEDULER_NAME,
534            build_id::full_version(env!("CARGO_PKG_VERSION"))
535        );
536        return Ok(());
537    }
538
539    if opts.help_stats {
540        stats::server_data().describe_meta(&mut std::io::stdout(), None)?;
541        return Ok(());
542    }
543
544    let loglevel = simplelog::LevelFilter::Info;
545
546    let mut lcfg = simplelog::ConfigBuilder::new();
547    lcfg.set_time_offset_to_local()
548        .expect("Failed to set local time offset")
549        .set_time_level(simplelog::LevelFilter::Error)
550        .set_location_level(simplelog::LevelFilter::Off)
551        .set_target_level(simplelog::LevelFilter::Off)
552        .set_thread_level(simplelog::LevelFilter::Off);
553    simplelog::TermLogger::init(
554        loglevel,
555        lcfg.build(),
556        simplelog::TerminalMode::Stderr,
557        simplelog::ColorChoice::Auto,
558    )?;
559
560    let shutdown = Arc::new(AtomicBool::new(false));
561    let shutdown_clone = shutdown.clone();
562    ctrlc::set_handler(move || {
563        shutdown_clone.store(true, Ordering::Relaxed);
564    })
565    .context("Error setting Ctrl-C handler")?;
566
567    if let Some(intv) = opts.monitor.or(opts.stats) {
568        let shutdown_copy = shutdown.clone();
569        let jh = std::thread::spawn(move || {
570            match stats::monitor(Duration::from_secs_f64(intv), shutdown_copy) {
571                Ok(_) => {
572                    debug!("stats monitor thread finished successfully")
573                }
574                Err(error_object) => {
575                    warn!(
576                        "stats monitor thread finished because of an error {}",
577                        error_object
578                    )
579                }
580            }
581        });
582        if opts.monitor.is_some() {
583            let _ = jh.join();
584            return Ok(());
585        }
586    }
587
588    let mut open_object = MaybeUninit::uninit();
589    loop {
590        let mut sched = Scheduler::init(&opts, &mut open_object)?;
591        if !sched.run(shutdown.clone())?.should_restart() {
592            break;
593        }
594    }
595
596    Ok(())
597}