scx_rustland/
main.rs

1// Copyright (c) Andrea Righi <andrea.righi@linux.dev>
2
3// This software may be used and distributed according to the terms of the
4// GNU General Public License version 2.
5mod bpf_skel;
6pub use bpf_skel::*;
7pub mod bpf_intf;
8
9#[rustfmt::skip]
10mod bpf;
11use bpf::*;
12
13mod stats;
14use std::collections::BTreeSet;
15use std::io::{self};
16use std::mem::MaybeUninit;
17use std::time::Duration;
18use std::time::SystemTime;
19
20use anyhow::Result;
21use clap::Parser;
22use libbpf_rs::OpenObject;
23use log::info;
24use log::warn;
25use procfs::process::Process;
26use scx_stats::prelude::*;
27use scx_utils::build_id;
28use scx_utils::libbpf_clap_opts::LibbpfOpts;
29use scx_utils::UserExitInfo;
30use stats::Metrics;
31
32const SCHEDULER_NAME: &str = "RustLand";
33
34/// scx_rustland: user-space scheduler written in Rust
35///
36/// scx_rustland is designed to prioritize interactive workloads over background CPU-intensive
37/// workloads. For this reason the typical use case of this scheduler involves low-latency
38/// interactive applications, such as gaming, video conferencing and live streaming.
39///
40/// scx_rustland is also designed to be an "easy to read" template that can be used by any
41/// developer to quickly experiment more complex scheduling policies fully implemented in Rust.
42///
43/// The scheduler is based on scx_rustland_core, which implements the low level sched-ext
44/// functionalities.
45///
46/// The scheduling policy implemented in user-space is a based on a deadline, evaluated as
47/// following:
48///
49///       deadline = vruntime + exec_runtime
50///
51/// Where, vruntime reflects the task's total runtime scaled by weight (ensuring fairness), while
52/// exec_runtime accounts the CPU time used since the last sleep (capturing responsiveness). Tasks
53/// are then dispatched from the lowest to the highest deadline.
54///
55/// This approach favors latency-sensitive tasks: those that frequently sleep will accumulate less
56/// exec_runtime, resulting in earlier deadlines. In contrast, CPU-intensive tasks that don’t sleep
57/// accumulate a larger exec_runtime and thus get scheduled later.
58///
59/// All the tasks are stored in a BTreeSet (TaskTree), using the deadline as the ordering key.
60/// Once the order of execution is determined all tasks are sent back to the BPF counterpart
61/// (scx_rustland_core) to be dispatched.
62///
63/// The BPF dispatcher is completely agnostic of the particular scheduling policy implemented in
64/// user-space. For this reason developers that are willing to use this scheduler to experiment
65/// scheduling policies should be able to simply modify the Rust component, without having to deal
66/// with any internal kernel / BPF details.
67///
68/// === Troubleshooting ===
69///
70/// - Reduce the time slice (option `-s`) if you experience lag or cracking audio.
71///
72#[derive(Debug, Parser)]
73struct Opts {
74    /// Scheduling slice duration in microseconds.
75    #[clap(short = 's', long, default_value = "20000")]
76    slice_us: u64,
77
78    /// Scheduling minimum slice duration in microseconds.
79    #[clap(short = 'S', long, default_value = "1000")]
80    slice_us_min: u64,
81
82    /// If set, per-CPU tasks are dispatched directly to their only eligible CPU.
83    /// This can help enforce affinity-based isolation for better performance.
84    #[clap(short = 'l', long, action = clap::ArgAction::SetTrue)]
85    percpu_local: bool,
86
87    /// If specified, only tasks which have their scheduling policy set to SCHED_EXT using
88    /// sched_setscheduler(2) are switched. Otherwise, all tasks are switched.
89    #[clap(short = 'p', long, action = clap::ArgAction::SetTrue)]
90    partial: bool,
91
92    /// Exit debug dump buffer length. 0 indicates default.
93    #[clap(long, default_value = "0")]
94    exit_dump_len: u32,
95
96    /// Enable verbose output, including libbpf details. Moreover, BPF scheduling events will be
97    /// reported in tracefs (e.g., /sys/kernel/tracing/trace_pipe).
98    #[clap(short = 'v', long, action = clap::ArgAction::SetTrue)]
99    verbose: bool,
100
101    /// Enable stats monitoring with the specified interval.
102    #[clap(long)]
103    stats: Option<f64>,
104
105    /// Run in stats monitoring mode with the specified interval. Scheduler
106    /// is not launched.
107    #[clap(long)]
108    monitor: Option<f64>,
109
110    /// Show descriptions for statistics.
111    #[clap(long)]
112    help_stats: bool,
113
114    /// Print scheduler version and exit.
115    #[clap(short = 'V', long, action = clap::ArgAction::SetTrue)]
116    version: bool,
117
118    #[clap(flatten, next_help_heading = "Libbpf Options")]
119    pub libbpf: LibbpfOpts,
120}
121
122// Time constants.
123const NSEC_PER_USEC: u64 = 1_000;
124
125#[derive(Debug, PartialEq, Eq, PartialOrd, Clone)]
126struct Task {
127    qtask: QueuedTask, // queued task
128    deadline: u64,     // task deadline (that determines the order how tasks are dispatched)
129    timestamp: u64,    // task enqueue timestamp
130}
131
132// Sort tasks by their interactive status first (interactive tasks are always scheduled before
133// regular tasks), then sort them by their vruntime, then by their timestamp and lastly by their
134// pid.
135impl Ord for Task {
136    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
137        self.deadline
138            .cmp(&other.deadline)
139            .then_with(|| self.timestamp.cmp(&other.timestamp))
140            .then_with(|| self.qtask.pid.cmp(&other.qtask.pid))
141    }
142}
143
144// Main scheduler object
145struct Scheduler<'a> {
146    bpf: BpfScheduler<'a>,                  // BPF connector
147    opts: &'a Opts,                         // scheduler options
148    stats_server: StatsServer<(), Metrics>, // statistics
149    tasks: BTreeSet<Task>,                  // tasks ordered by deadline
150    min_vruntime: u64,                      // Keep track of the minimum vruntime across all tasks
151    init_page_faults: u64,                  // Initial page faults counter
152    slice_ns: u64,                          // Default time slice (in ns)
153    slice_ns_min: u64,                      // Minimum time slice (in ns)
154}
155
156impl<'a> Scheduler<'a> {
157    fn init(opts: &'a Opts, open_object: &'a mut MaybeUninit<OpenObject>) -> Result<Self> {
158        let stats_server = StatsServer::new(stats::server_data()).launch()?;
159
160        // Low-level BPF connector.
161        let bpf = BpfScheduler::init(
162            open_object,
163            opts.libbpf.clone().into_bpf_open_opts(),
164            opts.exit_dump_len,
165            opts.partial,
166            opts.verbose,
167            true, // Enable built-in idle CPU selection policy
168            "rustland",
169        )?;
170
171        info!(
172            "{} version {} - scx_rustland_core {}",
173            SCHEDULER_NAME,
174            build_id::full_version(env!("CARGO_PKG_VERSION")),
175            scx_rustland_core::VERSION
176        );
177
178        // Return scheduler object.
179        Ok(Self {
180            bpf,
181            opts,
182            stats_server,
183            tasks: BTreeSet::new(),
184            min_vruntime: 0,
185            init_page_faults: 0,
186            slice_ns: opts.slice_us * NSEC_PER_USEC,
187            slice_ns_min: opts.slice_us_min * NSEC_PER_USEC,
188        })
189    }
190
191    fn get_metrics(&mut self) -> Metrics {
192        let page_faults = match Self::get_page_faults() {
193            Ok(page_faults) => page_faults,
194            Err(_) => 0,
195        };
196        if self.init_page_faults == 0 {
197            self.init_page_faults = page_faults;
198        }
199        let nr_page_faults = page_faults - self.init_page_faults;
200
201        Metrics {
202            nr_running: *self.bpf.nr_running_mut(),
203            nr_cpus: *self.bpf.nr_online_cpus_mut(),
204            nr_queued: *self.bpf.nr_queued_mut(),
205            nr_scheduled: *self.bpf.nr_scheduled_mut(),
206            nr_page_faults,
207            nr_user_dispatches: *self.bpf.nr_user_dispatches_mut(),
208            nr_kernel_dispatches: *self.bpf.nr_kernel_dispatches_mut(),
209            nr_cancel_dispatches: *self.bpf.nr_cancel_dispatches_mut(),
210            nr_bounce_dispatches: *self.bpf.nr_bounce_dispatches_mut(),
211            nr_failed_dispatches: *self.bpf.nr_failed_dispatches_mut(),
212            nr_sched_congested: *self.bpf.nr_sched_congested_mut(),
213        }
214    }
215
216    // Return current timestamp in ns.
217    fn now() -> u64 {
218        let ts = SystemTime::now()
219            .duration_since(SystemTime::UNIX_EPOCH)
220            .unwrap();
221        ts.as_nanos() as u64
222    }
223
224    // Return the total amount of tasks waiting in the user-space scheduler.
225    fn nr_tasks_scheduled(&mut self) -> u64 {
226        self.tasks.len() as u64
227    }
228
229    // Return the total amount of tasks waiting to be consumed by the user-space scheduler.
230    fn nr_tasks_queued(&mut self) -> u64 {
231        *self.bpf.nr_queued_mut()
232    }
233
234    // Return the total amount of tasks that are waiting to be scheduled.
235    fn nr_tasks_waiting(&mut self) -> u64 {
236        self.nr_tasks_queued() + self.nr_tasks_scheduled()
237    }
238
239    // Return a value inversely proportional to the task's weight.
240    fn scale_by_task_weight_inverse(task: &QueuedTask, value: u64) -> u64 {
241        value * 100 / task.weight
242    }
243
244    // Update task's vruntime based on the information collected from the kernel and return to the
245    // caller the evaluated task's deadline.
246    //
247    // This method implements the main task ordering logic of the scheduler.
248    fn update_enqueued(&mut self, task: &mut QueuedTask) -> u64 {
249        // Update global minimum vruntime based on the previous task's vruntime.
250        if self.min_vruntime < task.vtime {
251            self.min_vruntime = task.vtime;
252        }
253
254        // Update task's vruntime re-aligning it to min_vruntime (never allow a task to accumulate
255        // a budget of more than a time slice to prevent starvation).
256        let min_vruntime = self.min_vruntime.saturating_sub(self.slice_ns);
257        if task.vtime == 0 {
258            // Slightly penalize new tasks by charging an extra time slice to prevent bursts of such
259            // tasks from disrupting the responsiveness of already running ones.
260            task.vtime = min_vruntime + Self::scale_by_task_weight_inverse(task, self.slice_ns);
261        } else if task.vtime < min_vruntime {
262            task.vtime = min_vruntime;
263        }
264        task.vtime += Self::scale_by_task_weight_inverse(task, task.stop_ts - task.start_ts);
265
266        // Return the task's deadline.
267        task.vtime + task.exec_runtime.min(self.slice_ns * 100)
268    }
269
270    // Dispatch the first task from the task pool (sending them to the BPF dispatcher).
271    //
272    // Return true on success, false if the BPF backend can't accept any more dispatch.
273    fn dispatch_task(&mut self) -> bool {
274        let nr_waiting = self.nr_tasks_waiting() + 1;
275
276        if let Some(task) = self.tasks.pop_first() {
277            // Scale time slice based on the amount of tasks that are waiting in the
278            // scheduler's queue and the previously unused time slice budget, but make sure
279            // to assign at least slice_us_min.
280            let slice_ns = (self.slice_ns / nr_waiting).max(self.slice_ns_min);
281
282            // Create a new task to dispatch.
283            let mut dispatched_task = DispatchedTask::new(&task.qtask);
284
285            // Assign the time slice to the task and propagate the vruntime.
286            dispatched_task.slice_ns = slice_ns;
287
288            // Propagate the evaluated task's deadline to the scx_rustland_core backend.
289            dispatched_task.vtime = task.deadline;
290
291            // Try to pick an idle CPU for the task.
292            let cpu = self
293                .bpf
294                .select_cpu(task.qtask.pid, task.qtask.cpu, task.qtask.flags);
295            dispatched_task.cpu = if cpu >= 0 {
296                // An idle CPU was found, dispatch the task there.
297                cpu
298            } else if self.opts.percpu_local && task.qtask.nr_cpus_allowed == 1 {
299                // Task is restricted to run on a single CPU, dispatch it to that one.
300                task.qtask.cpu
301            } else {
302                // No idle CPU found, dispatch to the first CPU available.
303                RL_CPU_ANY
304            };
305
306            // Send task to the BPF dispatcher.
307            if self.bpf.dispatch_task(&dispatched_task).is_err() {
308                // If dispatching fails, re-add the task to the pool and skip further dispatching.
309                self.tasks.insert(task);
310
311                return false;
312            }
313        }
314
315        return true;
316    }
317
318    // Drain all the tasks from the queued list, update their vruntime (Self::update_enqueued()),
319    // then push them all to the task pool (doing so will sort them by their vruntime).
320    fn drain_queued_tasks(&mut self) {
321        loop {
322            match self.bpf.dequeue_task() {
323                Ok(Some(mut task)) => {
324                    // Update task information and determine vruntime.
325                    let deadline = self.update_enqueued(&mut task);
326                    let timestamp = Self::now();
327
328                    // Insert task in the task pool (ordered by vruntime).
329                    self.tasks.insert(Task {
330                        qtask: task,
331                        deadline,
332                        timestamp,
333                    });
334                }
335                Ok(None) => {
336                    break;
337                }
338                Err(err) => {
339                    warn!("Error: {}", err);
340                    break;
341                }
342            }
343        }
344
345        // Dispatch the first task from the task pool.
346        self.dispatch_task();
347    }
348
349    // Main scheduling function (called in a loop to periodically drain tasks from the queued list
350    // and dispatch them to the BPF part via the dispatched list).
351    fn schedule(&mut self) {
352        self.drain_queued_tasks();
353
354        // Notify the dispatcher if there are still pending tasks to be processed,
355        self.bpf.notify_complete(self.tasks.len() as u64);
356    }
357
358    // Get total page faults from the process.
359    fn get_page_faults() -> Result<u64, io::Error> {
360        let myself = Process::myself().map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
361        let stat = myself
362            .stat()
363            .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
364
365        Ok(stat.minflt + stat.majflt)
366    }
367
368    fn run(&mut self) -> Result<UserExitInfo> {
369        let (res_ch, req_ch) = self.stats_server.channels();
370
371        while !self.bpf.exited() {
372            // Call the main scheduler body.
373            self.schedule();
374
375            // Handle monitor requests asynchronously.
376            if req_ch.try_recv().is_ok() {
377                res_ch.send(self.get_metrics())?;
378            }
379        }
380
381        self.bpf.shutdown_and_report()
382    }
383}
384
385// Unregister the scheduler.
386impl Drop for Scheduler<'_> {
387    fn drop(&mut self) {
388        info!("Unregister {SCHEDULER_NAME} scheduler");
389    }
390}
391
392fn main() -> Result<()> {
393    let opts = Opts::parse();
394
395    if opts.version {
396        println!(
397            "{} version {} - scx_rustland_core {}",
398            SCHEDULER_NAME,
399            build_id::full_version(env!("CARGO_PKG_VERSION")),
400            scx_rustland_core::VERSION
401        );
402        return Ok(());
403    }
404
405    if opts.help_stats {
406        stats::server_data().describe_meta(&mut std::io::stdout(), None)?;
407        return Ok(());
408    }
409
410    let loglevel = simplelog::LevelFilter::Info;
411
412    let mut lcfg = simplelog::ConfigBuilder::new();
413    lcfg.set_time_offset_to_local()
414        .expect("Failed to set local time offset")
415        .set_time_level(simplelog::LevelFilter::Error)
416        .set_location_level(simplelog::LevelFilter::Off)
417        .set_target_level(simplelog::LevelFilter::Off)
418        .set_thread_level(simplelog::LevelFilter::Off);
419    simplelog::TermLogger::init(
420        loglevel,
421        lcfg.build(),
422        simplelog::TerminalMode::Stderr,
423        simplelog::ColorChoice::Auto,
424    )?;
425
426    if let Some(intv) = opts.monitor.or(opts.stats) {
427        let jh = std::thread::spawn(move || stats::monitor(Duration::from_secs_f64(intv)).unwrap());
428        if opts.monitor.is_some() {
429            let _ = jh.join();
430            return Ok(());
431        }
432    }
433
434    let mut open_object = MaybeUninit::uninit();
435    loop {
436        let mut sched = Scheduler::init(&opts, &mut open_object)?;
437        if !sched.run()?.should_restart() {
438            break;
439        }
440    }
441
442    Ok(())
443}