Skip to main content

scx_rustland/
main.rs

1// Copyright (c) Andrea Righi <andrea.righi@linux.dev>
2
3// This software may be used and distributed according to the terms of the
4// GNU General Public License version 2.
5mod bpf_skel;
6pub use bpf_skel::*;
7pub mod bpf_intf;
8
9#[rustfmt::skip]
10mod bpf;
11use bpf::*;
12
13mod stats;
14use std::collections::BTreeSet;
15use std::io::{self};
16use std::mem::MaybeUninit;
17use std::time::Duration;
18use std::time::SystemTime;
19
20use anyhow::Result;
21use clap::Parser;
22use libbpf_rs::OpenObject;
23use log::info;
24use log::warn;
25use procfs::process::Process;
26use scx_stats::prelude::*;
27use scx_utils::build_id;
28use scx_utils::libbpf_clap_opts::LibbpfOpts;
29use scx_utils::UserExitInfo;
30use stats::Metrics;
31
32const SCHEDULER_NAME: &str = "RustLand";
33
34/// scx_rustland: user-space scheduler written in Rust
35///
36/// scx_rustland is designed to prioritize interactive workloads over background CPU-intensive
37/// workloads. For this reason the typical use case of this scheduler involves low-latency
38/// interactive applications, such as gaming, video conferencing and live streaming.
39///
40/// scx_rustland is also designed to be an "easy to read" template that can be used by any
41/// developer to quickly experiment more complex scheduling policies fully implemented in Rust.
42///
43/// The scheduler is based on scx_rustland_core, which implements the low level sched-ext
44/// functionalities.
45///
46/// The scheduling policy implemented in user-space is a based on a deadline, evaluated as
47/// following:
48///
49///       deadline = vruntime + exec_runtime
50///
51/// Where, vruntime reflects the task's total runtime scaled by weight (ensuring fairness), while
52/// exec_runtime accounts the CPU time used since the last sleep (capturing responsiveness). Tasks
53/// are then dispatched from the lowest to the highest deadline.
54///
55/// This approach favors latency-sensitive tasks: those that frequently sleep will accumulate less
56/// exec_runtime, resulting in earlier deadlines. In contrast, CPU-intensive tasks that don’t sleep
57/// accumulate a larger exec_runtime and thus get scheduled later.
58///
59/// All the tasks are stored in a BTreeSet (TaskTree), using the deadline as the ordering key.
60/// Once the order of execution is determined all tasks are sent back to the BPF counterpart
61/// (scx_rustland_core) to be dispatched.
62///
63/// The BPF dispatcher is completely agnostic of the particular scheduling policy implemented in
64/// user-space. For this reason developers that are willing to use this scheduler to experiment
65/// scheduling policies should be able to simply modify the Rust component, without having to deal
66/// with any internal kernel / BPF details.
67///
68/// === Troubleshooting ===
69///
70/// - Reduce the time slice (option `-s`) if you experience lag or cracking audio.
71///
72#[derive(Debug, Parser)]
73struct Opts {
74    /// Scheduling slice duration in microseconds.
75    #[clap(short = 's', long, default_value = "20000")]
76    slice_us: u64,
77
78    /// Scheduling minimum slice duration in microseconds.
79    #[clap(short = 'S', long, default_value = "1000")]
80    slice_us_min: u64,
81
82    /// If set, per-CPU tasks are dispatched directly to their only eligible CPU.
83    /// This can help enforce affinity-based isolation for better performance.
84    #[clap(short = 'l', long, action = clap::ArgAction::SetTrue)]
85    percpu_local: bool,
86
87    /// Enable NUMA-local idle CPU selection. When enabled, tasks with a preferred NUMA node will
88    /// preferentially be assigned an idle CPU from that node. This is opt-in as NUMA balancing
89    /// overhead may be undesirable on certain workloads.
90    #[clap(short = 'n', long, action = clap::ArgAction::SetTrue)]
91    numa_local: bool,
92
93    /// If specified, only tasks which have their scheduling policy set to SCHED_EXT using
94    /// sched_setscheduler(2) are switched. Otherwise, all tasks are switched.
95    #[clap(short = 'p', long, action = clap::ArgAction::SetTrue)]
96    partial: bool,
97
98    /// Exit debug dump buffer length. 0 indicates default.
99    #[clap(long, default_value = "0")]
100    exit_dump_len: u32,
101
102    /// Enable verbose output, including libbpf details. Moreover, BPF scheduling events will be
103    /// reported in tracefs (e.g., /sys/kernel/tracing/trace_pipe).
104    #[clap(short = 'v', long, action = clap::ArgAction::SetTrue)]
105    verbose: bool,
106
107    /// Enable stats monitoring with the specified interval.
108    #[clap(long)]
109    stats: Option<f64>,
110
111    /// Run in stats monitoring mode with the specified interval. Scheduler
112    /// is not launched.
113    #[clap(long)]
114    monitor: Option<f64>,
115
116    /// Show descriptions for statistics.
117    #[clap(long)]
118    help_stats: bool,
119
120    /// Print scheduler version and exit.
121    #[clap(short = 'V', long, action = clap::ArgAction::SetTrue)]
122    version: bool,
123
124    #[clap(flatten, next_help_heading = "Libbpf Options")]
125    pub libbpf: LibbpfOpts,
126}
127
128// Time constants.
129const NSEC_PER_USEC: u64 = 1_000;
130
131#[derive(Debug, PartialEq, Eq, Clone)]
132struct Task {
133    qtask: QueuedTask, // queued task
134    deadline: u64,     // task deadline (that determines the order how tasks are dispatched)
135    timestamp: u64,    // task enqueue timestamp
136}
137
138// Sort tasks by their interactive status first (interactive tasks are always scheduled before
139// regular tasks), then sort them by their vruntime, then by their timestamp and lastly by their
140// pid.
141impl Ord for Task {
142    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
143        self.deadline
144            .cmp(&other.deadline)
145            .then_with(|| self.timestamp.cmp(&other.timestamp))
146            .then_with(|| self.qtask.pid.cmp(&other.qtask.pid))
147    }
148}
149
150impl PartialOrd for Task {
151    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
152        Some(self.cmp(other))
153    }
154}
155
156// Main scheduler object
157struct Scheduler<'a> {
158    bpf: BpfScheduler<'a>,                  // BPF connector
159    opts: &'a Opts,                         // scheduler options
160    stats_server: StatsServer<(), Metrics>, // statistics
161    tasks: BTreeSet<Task>,                  // tasks ordered by deadline
162    vruntime_now: u64,     // Tracks the latest observed (max) vruntime across tasks
163    init_page_faults: u64, // Initial page faults counter
164    slice_ns: u64,         // Default time slice (in ns)
165    slice_ns_min: u64,     // Minimum time slice (in ns)
166}
167
168impl<'a> Scheduler<'a> {
169    fn init(opts: &'a Opts, open_object: &'a mut MaybeUninit<OpenObject>) -> Result<Self> {
170        let stats_server = StatsServer::new(stats::server_data()).launch()?;
171
172        let slice_ns = opts.slice_us * NSEC_PER_USEC;
173        let slice_ns_min = opts.slice_us_min * NSEC_PER_USEC;
174
175        // Low-level BPF connector.
176        let bpf = BpfScheduler::init(
177            open_object,
178            opts.libbpf.clone().into_bpf_open_opts(),
179            opts.exit_dump_len,
180            opts.partial,
181            opts.verbose,
182            true, // Enable built-in idle CPU selection policy
183            opts.numa_local,
184            slice_ns_min,
185            "rustland",
186        )?;
187
188        info!(
189            "{} version {} - scx_rustland_core {}",
190            SCHEDULER_NAME,
191            build_id::full_version(env!("CARGO_PKG_VERSION")),
192            scx_rustland_core::VERSION
193        );
194
195        // Return scheduler object.
196        Ok(Self {
197            bpf,
198            opts,
199            stats_server,
200            tasks: BTreeSet::new(),
201            vruntime_now: 0,
202            init_page_faults: 0,
203            slice_ns,
204            slice_ns_min,
205        })
206    }
207
208    fn get_metrics(&mut self) -> Metrics {
209        let page_faults = Self::get_page_faults().unwrap_or_default();
210        if self.init_page_faults == 0 {
211            self.init_page_faults = page_faults;
212        }
213        let nr_page_faults = page_faults - self.init_page_faults;
214
215        Metrics {
216            nr_running: *self.bpf.nr_running_mut(),
217            nr_cpus: *self.bpf.nr_online_cpus_mut(),
218            nr_queued: *self.bpf.nr_queued_mut(),
219            nr_scheduled: *self.bpf.nr_scheduled_mut(),
220            nr_page_faults,
221            nr_user_dispatches: *self.bpf.nr_user_dispatches_mut(),
222            nr_kernel_dispatches: *self.bpf.nr_kernel_dispatches_mut(),
223            nr_cancel_dispatches: *self.bpf.nr_cancel_dispatches_mut(),
224            nr_bounce_dispatches: *self.bpf.nr_bounce_dispatches_mut(),
225            nr_failed_dispatches: *self.bpf.nr_failed_dispatches_mut(),
226            nr_sched_congested: *self.bpf.nr_sched_congested_mut(),
227        }
228    }
229
230    // Return current timestamp in ns.
231    fn now() -> u64 {
232        let ts = SystemTime::now()
233            .duration_since(SystemTime::UNIX_EPOCH)
234            .unwrap();
235        ts.as_nanos() as u64
236    }
237
238    // Return a value proportional to the task's weight.
239    fn scale_by_task_weight(task: &QueuedTask, value: u64) -> u64 {
240        value * task.weight / 100
241    }
242
243    // Return a value inversely proportional to the task's weight.
244    fn scale_by_task_weight_inverse(task: &QueuedTask, value: u64) -> u64 {
245        value * 100 / task.weight
246    }
247
248    /// Updates a task's virtual runtime based on kernel information and
249    /// returns the evaluated deadline.
250    ///
251    /// This method implements the main task ordering logic of the scheduler.
252    fn update_enqueued(&mut self, task: &mut QueuedTask) -> u64 {
253        // Update task's vruntime.
254        task.vtime = if task.vtime == 0 {
255            // Re-align new tasks to the current vruntime.
256            self.vruntime_now
257        } else {
258            // Prevent sleeping tasks from gaining more than one full slice of vruntime credit.
259            let vruntime_min = self.vruntime_now.saturating_sub(self.slice_ns);
260            task.vtime.max(vruntime_min)
261        };
262
263        // Compute the time slice the task just consumed.
264        let slice_ns = task.stop_ts.saturating_sub(task.start_ts);
265
266        // Update task and global vruntimes.
267        let vslice = Self::scale_by_task_weight_inverse(task, slice_ns);
268        task.vtime += vslice;
269        self.vruntime_now += vslice;
270
271        // Compute the deadline, adding the accumulated runtime since the last sleep. Cap
272        // exec_runtime to 100 time slices to prevent starvation of CPU-intensive tasks.
273        task.vtime + task.exec_runtime.min(self.slice_ns.saturating_mul(100))
274    }
275
276    /// Dispatch the next task in the queue.
277    ///
278    /// Return true if dispatching succeeded or there was no task to dispatch, or false if
279    /// dispatching failed (the task is automatically re-enqueued in that case).
280    fn dispatch_task(&mut self) -> bool {
281        // Retrieve the next task to dispatch, if any.
282        let Some(task) = self.tasks.pop_first() else {
283            return true;
284        };
285
286        // Initialize a dispatched task from the queued one.
287        let mut dispatched_task = DispatchedTask::new(&task.qtask);
288
289        // Assign the minimum time slice scaled by the task's priority.
290        dispatched_task.slice_ns = Self::scale_by_task_weight(&task.qtask, self.slice_ns_min);
291
292        // Propagate the evaluated deadline to the BPF backend.
293        dispatched_task.vtime = task.deadline;
294
295        // Attempt to select an idle CPU for the task (if percpu_local is enabled, send per-CPU
296        // tasks directly to their only usable CPU).
297        dispatched_task.cpu = if self.opts.percpu_local {
298            task.qtask.cpu
299        } else {
300            match self
301                .bpf
302                .select_cpu(task.qtask.pid, task.qtask.cpu, task.qtask.flags)
303            {
304                cpu if cpu >= 0 => cpu,
305                _ => RL_CPU_ANY,
306            }
307        };
308
309        // Send the task to the BPF dispatcher.
310        if self.bpf.dispatch_task(&dispatched_task).is_err() {
311            // Dispatching failed: reinsert the task and stop dispatching.
312            self.tasks.insert(task);
313            return false;
314        }
315
316        true
317    }
318
319    // Drain all the tasks from the queued list, update their vruntime (Self::update_enqueued()),
320    // then push them all to the task pool (doing so will sort them by their vruntime).
321    fn drain_queued_tasks(&mut self) {
322        loop {
323            match self.bpf.dequeue_task() {
324                Ok(Some(mut task)) => {
325                    // Update task information and determine vruntime.
326                    let deadline = self.update_enqueued(&mut task);
327                    let timestamp = Self::now();
328
329                    // Insert task in the task pool (ordered by vruntime).
330                    self.tasks.insert(Task {
331                        qtask: task,
332                        deadline,
333                        timestamp,
334                    });
335                }
336                Ok(None) => {
337                    break;
338                }
339                Err(err) => {
340                    warn!("Error: {err}");
341                    break;
342                }
343            }
344        }
345    }
346
347    // Main scheduling function (called in a loop to periodically drain tasks from the queued list
348    // and dispatch them to the BPF part via the dispatched list).
349    fn schedule(&mut self) {
350        self.drain_queued_tasks();
351        self.dispatch_task();
352
353        // Notify the dispatcher if there are still pending tasks to be processed.
354        self.bpf.notify_complete(self.tasks.len() as u64);
355    }
356
357    // Get total page faults from the process.
358    fn get_page_faults() -> Result<u64, io::Error> {
359        let myself = Process::myself().map_err(io::Error::other)?;
360        let stat = myself.stat().map_err(io::Error::other)?;
361
362        Ok(stat.minflt + stat.majflt)
363    }
364
365    fn run(&mut self) -> Result<UserExitInfo> {
366        let (res_ch, req_ch) = self.stats_server.channels();
367
368        while !self.bpf.exited() {
369            // Call the main scheduler body.
370            self.schedule();
371
372            // Handle monitor requests asynchronously.
373            if req_ch.try_recv().is_ok() {
374                res_ch.send(self.get_metrics())?;
375            }
376        }
377
378        self.bpf.shutdown_and_report()
379    }
380}
381
382// Unregister the scheduler.
383impl Drop for Scheduler<'_> {
384    fn drop(&mut self) {
385        info!("Unregister {SCHEDULER_NAME} scheduler");
386    }
387}
388
389fn main() -> Result<()> {
390    let opts = Opts::parse();
391
392    if opts.version {
393        println!(
394            "{} version {} - scx_rustland_core {}",
395            SCHEDULER_NAME,
396            build_id::full_version(env!("CARGO_PKG_VERSION")),
397            scx_rustland_core::VERSION
398        );
399        return Ok(());
400    }
401
402    if opts.help_stats {
403        stats::server_data().describe_meta(&mut std::io::stdout(), None)?;
404        return Ok(());
405    }
406
407    let loglevel = simplelog::LevelFilter::Info;
408
409    let mut lcfg = simplelog::ConfigBuilder::new();
410    lcfg.set_time_offset_to_local()
411        .expect("Failed to set local time offset")
412        .set_time_level(simplelog::LevelFilter::Error)
413        .set_location_level(simplelog::LevelFilter::Off)
414        .set_target_level(simplelog::LevelFilter::Off)
415        .set_thread_level(simplelog::LevelFilter::Off);
416    simplelog::TermLogger::init(
417        loglevel,
418        lcfg.build(),
419        simplelog::TerminalMode::Stderr,
420        simplelog::ColorChoice::Auto,
421    )?;
422
423    if let Some(intv) = opts.monitor.or(opts.stats) {
424        let jh = std::thread::spawn(move || stats::monitor(Duration::from_secs_f64(intv)).unwrap());
425        if opts.monitor.is_some() {
426            let _ = jh.join();
427            return Ok(());
428        }
429    }
430
431    let mut open_object = MaybeUninit::uninit();
432    loop {
433        let mut sched = Scheduler::init(&opts, &mut open_object)?;
434        if !sched.run()?.should_restart() {
435            break;
436        }
437    }
438
439    Ok(())
440}