scx_rustland/
main.rs

1// Copyright (c) Andrea Righi <andrea.righi@linux.dev>
2
3// This software may be used and distributed according to the terms of the
4// GNU General Public License version 2.
5mod bpf_skel;
6pub use bpf_skel::*;
7pub mod bpf_intf;
8
9#[rustfmt::skip]
10mod bpf;
11use bpf::*;
12
13mod stats;
14use std::collections::BTreeSet;
15use std::io::{self};
16use std::mem::MaybeUninit;
17use std::time::Duration;
18use std::time::SystemTime;
19
20use anyhow::Result;
21use clap::Parser;
22use libbpf_rs::OpenObject;
23use log::info;
24use log::warn;
25use procfs::process::Process;
26use scx_stats::prelude::*;
27use scx_utils::build_id;
28use scx_utils::UserExitInfo;
29use stats::Metrics;
30
31const SCHEDULER_NAME: &'static str = "RustLand";
32
33/// scx_rustland: user-space scheduler written in Rust
34///
35/// scx_rustland is designed to prioritize interactive workloads over background CPU-intensive
36/// workloads. For this reason the typical use case of this scheduler involves low-latency
37/// interactive applications, such as gaming, video conferencing and live streaming.
38///
39/// scx_rustland is also designed to be an "easy to read" template that can be used by any
40/// developer to quickly experiment more complex scheduling policies fully implemented in Rust.
41///
42/// The scheduler is based on scx_rustland_core, which implements the low level sched-ext
43/// functionalities.
44///
45/// The scheduling policy implemented in user-space is a based on a deadline, evaluated as
46/// following:
47///
48///       deadline = vruntime + exec_runtime
49///
50/// Where, vruntime reflects the task's total runtime scaled by weight (ensuring fairness), while
51/// exec_runtime accounts the CPU time used since the last sleep (capturing responsiveness). Tasks
52/// are then dispatched from the lowest to the highest deadline.
53///
54/// This approach favors latency-sensitive tasks: those that frequently sleep will accumulate less
55/// exec_runtime, resulting in earlier deadlines. In contrast, CPU-intensive tasks that don’t sleep
56/// accumulate a larger exec_runtime and thus get scheduled later.
57///
58/// All the tasks are stored in a BTreeSet (TaskTree), using the deadline as the ordering key.
59/// Once the order of execution is determined all tasks are sent back to the BPF counterpart
60/// (scx_rustland_core) to be dispatched.
61///
62/// The BPF dispatcher is completely agnostic of the particular scheduling policy implemented in
63/// user-space. For this reason developers that are willing to use this scheduler to experiment
64/// scheduling policies should be able to simply modify the Rust component, without having to deal
65/// with any internal kernel / BPF details.
66///
67/// === Troubleshooting ===
68///
69/// - Reduce the time slice (option `-s`) if you experience lag or cracking audio.
70///
71#[derive(Debug, Parser)]
72struct Opts {
73    /// Scheduling slice duration in microseconds.
74    #[clap(short = 's', long, default_value = "20000")]
75    slice_us: u64,
76
77    /// Scheduling minimum slice duration in microseconds.
78    #[clap(short = 'S', long, default_value = "1000")]
79    slice_us_min: u64,
80
81    /// If set, per-CPU tasks are dispatched directly to their only eligible CPU.
82    /// This can help enforce affinity-based isolation for better performance.
83    #[clap(short = 'l', long, action = clap::ArgAction::SetTrue)]
84    percpu_local: bool,
85
86    /// If specified, only tasks which have their scheduling policy set to SCHED_EXT using
87    /// sched_setscheduler(2) are switched. Otherwise, all tasks are switched.
88    #[clap(short = 'p', long, action = clap::ArgAction::SetTrue)]
89    partial: bool,
90
91    /// Exit debug dump buffer length. 0 indicates default.
92    #[clap(long, default_value = "0")]
93    exit_dump_len: u32,
94
95    /// Enable verbose output, including libbpf details. Moreover, BPF scheduling events will be
96    /// reported in tracefs (e.g., /sys/kernel/tracing/trace_pipe).
97    #[clap(short = 'v', long, action = clap::ArgAction::SetTrue)]
98    verbose: bool,
99
100    /// Enable stats monitoring with the specified interval.
101    #[clap(long)]
102    stats: Option<f64>,
103
104    /// Run in stats monitoring mode with the specified interval. Scheduler
105    /// is not launched.
106    #[clap(long)]
107    monitor: Option<f64>,
108
109    /// Show descriptions for statistics.
110    #[clap(long)]
111    help_stats: bool,
112
113    /// Print scheduler version and exit.
114    #[clap(short = 'V', long, action = clap::ArgAction::SetTrue)]
115    version: bool,
116}
117
118// Time constants.
119const NSEC_PER_USEC: u64 = 1_000;
120
121#[derive(Debug, PartialEq, Eq, PartialOrd, Clone)]
122struct Task {
123    qtask: QueuedTask, // queued task
124    deadline: u64,     // task deadline (that determines the order how tasks are dispatched)
125    timestamp: u64,    // task enqueue timestamp
126}
127
128// Sort tasks by their interactive status first (interactive tasks are always scheduled before
129// regular tasks), then sort them by their vruntime, then by their timestamp and lastly by their
130// pid.
131impl Ord for Task {
132    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
133        self.deadline
134            .cmp(&other.deadline)
135            .then_with(|| self.timestamp.cmp(&other.timestamp))
136            .then_with(|| self.qtask.pid.cmp(&other.qtask.pid))
137    }
138}
139
140// Main scheduler object
141struct Scheduler<'a> {
142    bpf: BpfScheduler<'a>,                  // BPF connector
143    opts: &'a Opts,                         // scheduler options
144    stats_server: StatsServer<(), Metrics>, // statistics
145    tasks: BTreeSet<Task>,                  // tasks ordered by deadline
146    min_vruntime: u64,                      // Keep track of the minimum vruntime across all tasks
147    init_page_faults: u64,                  // Initial page faults counter
148    slice_ns: u64,                          // Default time slice (in ns)
149    slice_ns_min: u64,                      // Minimum time slice (in ns)
150}
151
152impl<'a> Scheduler<'a> {
153    fn init(opts: &'a Opts, open_object: &'a mut MaybeUninit<OpenObject>) -> Result<Self> {
154        let stats_server = StatsServer::new(stats::server_data()).launch()?;
155
156        // Low-level BPF connector.
157        let bpf = BpfScheduler::init(
158            open_object,
159            opts.exit_dump_len,
160            opts.partial,
161            opts.verbose,
162            true, // Enable built-in idle CPU selection policy
163        )?;
164
165        info!(
166            "{} version {} - scx_rustland_core {}",
167            SCHEDULER_NAME,
168            build_id::full_version(env!("CARGO_PKG_VERSION")),
169            scx_rustland_core::VERSION
170        );
171
172        // Return scheduler object.
173        Ok(Self {
174            bpf,
175            opts,
176            stats_server,
177            tasks: BTreeSet::new(),
178            min_vruntime: 0,
179            init_page_faults: 0,
180            slice_ns: opts.slice_us * NSEC_PER_USEC,
181            slice_ns_min: opts.slice_us_min * NSEC_PER_USEC,
182        })
183    }
184
185    fn get_metrics(&mut self) -> Metrics {
186        let page_faults = match Self::get_page_faults() {
187            Ok(page_faults) => page_faults,
188            Err(_) => 0,
189        };
190        if self.init_page_faults == 0 {
191            self.init_page_faults = page_faults;
192        }
193        let nr_page_faults = page_faults - self.init_page_faults;
194
195        Metrics {
196            nr_running: *self.bpf.nr_running_mut(),
197            nr_cpus: *self.bpf.nr_online_cpus_mut(),
198            nr_queued: *self.bpf.nr_queued_mut(),
199            nr_scheduled: *self.bpf.nr_scheduled_mut(),
200            nr_page_faults,
201            nr_user_dispatches: *self.bpf.nr_user_dispatches_mut(),
202            nr_kernel_dispatches: *self.bpf.nr_kernel_dispatches_mut(),
203            nr_cancel_dispatches: *self.bpf.nr_cancel_dispatches_mut(),
204            nr_bounce_dispatches: *self.bpf.nr_bounce_dispatches_mut(),
205            nr_failed_dispatches: *self.bpf.nr_failed_dispatches_mut(),
206            nr_sched_congested: *self.bpf.nr_sched_congested_mut(),
207        }
208    }
209
210    // Return current timestamp in ns.
211    fn now() -> u64 {
212        let ts = SystemTime::now()
213            .duration_since(SystemTime::UNIX_EPOCH)
214            .unwrap();
215        ts.as_nanos() as u64
216    }
217
218    // Return the total amount of tasks waiting in the user-space scheduler.
219    fn nr_tasks_scheduled(&mut self) -> u64 {
220        self.tasks.len() as u64
221    }
222
223    // Return the total amount of tasks waiting to be consumed by the user-space scheduler.
224    fn nr_tasks_queued(&mut self) -> u64 {
225        *self.bpf.nr_queued_mut()
226    }
227
228    // Return the total amount of tasks that are waiting to be scheduled.
229    fn nr_tasks_waiting(&mut self) -> u64 {
230        self.nr_tasks_queued() + self.nr_tasks_scheduled()
231    }
232
233    // Return a value inversely proportional to the task's weight.
234    fn scale_by_task_weight_inverse(task: &QueuedTask, value: u64) -> u64 {
235        value * 100 / task.weight
236    }
237
238    // Update task's vruntime based on the information collected from the kernel and return to the
239    // caller the evaluated task's deadline.
240    //
241    // This method implements the main task ordering logic of the scheduler.
242    fn update_enqueued(&mut self, task: &mut QueuedTask) -> u64 {
243        // Update global minimum vruntime based on the previous task's vruntime.
244        if self.min_vruntime < task.vtime {
245            self.min_vruntime = task.vtime;
246        }
247
248        // Update task's vruntime re-aligning it to min_vruntime (never allow a task to accumulate
249        // a budget of more than a time slice to prevent starvation).
250        let min_vruntime = self.min_vruntime.saturating_sub(self.slice_ns);
251        if task.vtime == 0 {
252            // Slightly penalize new tasks by charging an extra time slice to prevent bursts of such
253            // tasks from disrupting the responsiveness of already running ones.
254            task.vtime = min_vruntime + Self::scale_by_task_weight_inverse(task, self.slice_ns);
255        } else if task.vtime < min_vruntime {
256            task.vtime = min_vruntime;
257        }
258        task.vtime += Self::scale_by_task_weight_inverse(task, task.stop_ts - task.start_ts);
259
260        // Return the task's deadline.
261        task.vtime + task.exec_runtime.min(self.slice_ns * 100)
262    }
263
264    // Dispatch the first task from the task pool (sending them to the BPF dispatcher).
265    //
266    // Return true on success, false if the BPF backend can't accept any more dispatch.
267    fn dispatch_task(&mut self) -> bool {
268        let nr_waiting = self.nr_tasks_waiting() + 1;
269
270        if let Some(task) = self.tasks.pop_first() {
271            // Scale time slice based on the amount of tasks that are waiting in the
272            // scheduler's queue and the previously unused time slice budget, but make sure
273            // to assign at least slice_us_min.
274            let slice_ns = (self.slice_ns / nr_waiting).max(self.slice_ns_min);
275
276            // Create a new task to dispatch.
277            let mut dispatched_task = DispatchedTask::new(&task.qtask);
278
279            // Assign the time slice to the task and propagate the vruntime.
280            dispatched_task.slice_ns = slice_ns;
281
282            // Propagate the evaluated task's deadline to the scx_rustland_core backend.
283            dispatched_task.vtime = task.deadline;
284
285            // Try to pick an idle CPU for the task.
286            let cpu = self
287                .bpf
288                .select_cpu(task.qtask.pid, task.qtask.cpu, task.qtask.flags);
289            dispatched_task.cpu = if cpu >= 0 {
290                // An idle CPU was found, dispatch the task there.
291                cpu
292            } else if self.opts.percpu_local && task.qtask.nr_cpus_allowed == 1 {
293                // Task is restricted to run on a single CPU, dispatch it to that one.
294                task.qtask.cpu
295            } else {
296                // No idle CPU found, dispatch to the first CPU available.
297                RL_CPU_ANY
298            };
299
300            // Send task to the BPF dispatcher.
301            if self.bpf.dispatch_task(&dispatched_task).is_err() {
302                // If dispatching fails, re-add the task to the pool and skip further dispatching.
303                self.tasks.insert(task);
304
305                return false;
306            }
307        }
308
309        return true;
310    }
311
312    // Drain all the tasks from the queued list, update their vruntime (Self::update_enqueued()),
313    // then push them all to the task pool (doing so will sort them by their vruntime).
314    fn drain_queued_tasks(&mut self) {
315        loop {
316            match self.bpf.dequeue_task() {
317                Ok(Some(mut task)) => {
318                    // Update task information and determine vruntime.
319                    let deadline = self.update_enqueued(&mut task);
320                    let timestamp = Self::now();
321
322                    // Insert task in the task pool (ordered by vruntime).
323                    self.tasks.insert(Task {
324                        qtask: task,
325                        deadline,
326                        timestamp,
327                    });
328                }
329                Ok(None) => {
330                    break;
331                }
332                Err(err) => {
333                    warn!("Error: {}", err);
334                    break;
335                }
336            }
337        }
338
339        // Dispatch the first task from the task pool.
340        self.dispatch_task();
341    }
342
343    // Main scheduling function (called in a loop to periodically drain tasks from the queued list
344    // and dispatch them to the BPF part via the dispatched list).
345    fn schedule(&mut self) {
346        self.drain_queued_tasks();
347
348        // Notify the dispatcher if there are still pending tasks to be processed,
349        self.bpf.notify_complete(self.tasks.len() as u64);
350    }
351
352    // Get total page faults from the process.
353    fn get_page_faults() -> Result<u64, io::Error> {
354        let myself = Process::myself().map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
355        let stat = myself
356            .stat()
357            .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
358
359        Ok(stat.minflt + stat.majflt)
360    }
361
362    fn run(&mut self) -> Result<UserExitInfo> {
363        let (res_ch, req_ch) = self.stats_server.channels();
364
365        while !self.bpf.exited() {
366            // Call the main scheduler body.
367            self.schedule();
368
369            // Handle monitor requests asynchronously.
370            if req_ch.try_recv().is_ok() {
371                res_ch.send(self.get_metrics())?;
372            }
373        }
374
375        self.bpf.shutdown_and_report()
376    }
377}
378
379// Unregister the scheduler.
380impl Drop for Scheduler<'_> {
381    fn drop(&mut self) {
382        info!("Unregister {} scheduler", SCHEDULER_NAME);
383    }
384}
385
386fn main() -> Result<()> {
387    let opts = Opts::parse();
388
389    if opts.version {
390        println!(
391            "{} version {} - scx_rustland_core {}",
392            SCHEDULER_NAME,
393            build_id::full_version(env!("CARGO_PKG_VERSION")),
394            scx_rustland_core::VERSION
395        );
396        return Ok(());
397    }
398
399    if opts.help_stats {
400        stats::server_data().describe_meta(&mut std::io::stdout(), None)?;
401        return Ok(());
402    }
403
404    let loglevel = simplelog::LevelFilter::Info;
405
406    let mut lcfg = simplelog::ConfigBuilder::new();
407    lcfg.set_time_offset_to_local()
408        .expect("Failed to set local time offset")
409        .set_time_level(simplelog::LevelFilter::Error)
410        .set_location_level(simplelog::LevelFilter::Off)
411        .set_target_level(simplelog::LevelFilter::Off)
412        .set_thread_level(simplelog::LevelFilter::Off);
413    simplelog::TermLogger::init(
414        loglevel,
415        lcfg.build(),
416        simplelog::TerminalMode::Stderr,
417        simplelog::ColorChoice::Auto,
418    )?;
419
420    if let Some(intv) = opts.monitor.or(opts.stats) {
421        let jh = std::thread::spawn(move || stats::monitor(Duration::from_secs_f64(intv)).unwrap());
422        if opts.monitor.is_some() {
423            let _ = jh.join();
424            return Ok(());
425        }
426    }
427
428    let mut open_object = MaybeUninit::uninit();
429    loop {
430        let mut sched = Scheduler::init(&opts, &mut open_object)?;
431        if !sched.run()?.should_restart() {
432            break;
433        }
434    }
435
436    Ok(())
437}