scx_rustland/
main.rs

1// Copyright (c) Andrea Righi <andrea.righi@linux.dev>
2
3// This software may be used and distributed according to the terms of the
4// GNU General Public License version 2.
5mod bpf_skel;
6pub use bpf_skel::*;
7pub mod bpf_intf;
8
9#[rustfmt::skip]
10mod bpf;
11use bpf::*;
12
13mod stats;
14use std::collections::BTreeSet;
15use std::collections::HashMap;
16use std::fs::File;
17use std::io::Read;
18use std::io::{self};
19use std::mem::MaybeUninit;
20use std::time::Duration;
21use std::time::SystemTime;
22
23use anyhow::Result;
24use clap::Parser;
25use libbpf_rs::OpenObject;
26use log::info;
27use log::warn;
28use scx_stats::prelude::*;
29use scx_utils::UserExitInfo;
30use stats::Metrics;
31
32const SCHEDULER_NAME: &'static str = "RustLand";
33
34const VERSION: &'static str = env!("CARGO_PKG_VERSION");
35
36/// scx_rustland: user-space scheduler written in Rust
37///
38/// scx_rustland is designed to prioritize interactive workloads over background CPU-intensive
39/// workloads. For this reason the typical use case of this scheduler involves low-latency
40/// interactive applications, such as gaming, video conferencing and live streaming.
41///
42/// scx_rustland is also designed to be an "easy to read" template that can be used by any
43/// developer to quickly experiment more complex scheduling policies fully implemented in Rust.
44///
45/// The scheduler is based on scx_rustland_core, which implements the low level sched-ext
46/// functionalities.
47///
48/// The scheduling policy implemented in user-space is a based on a deadline, evaluated as
49/// following:
50///
51///       deadline = vruntime + exec_runtime
52///
53/// Where, vruntime reflects the task's total runtime scaled by weight (ensuring fairness), while
54/// exec_runtime accounts the CPU time used since the last sleep (capturing responsiveness). Tasks
55/// are then dispatched from the lowest to the highest deadline.
56///
57/// This approach favors latency-sensitive tasks: those that frequently sleep will accumulate less
58/// exec_runtime, resulting in earlier deadlines. In contrast, CPU-intensive tasks that don’t sleep
59/// accumulate a larger exec_runtime and thus get scheduled later.
60///
61/// All the tasks are stored in a BTreeSet (TaskTree), using the deadline as the ordering key.
62/// Once the order of execution is determined all tasks are sent back to the BPF counterpart
63/// (scx_rustland_core) to be dispatched. To keep track of the accumulated execution time and
64/// vruntime, the scheduler maintains a HashMap (TaskInfoMap), indexed by pid.
65///
66/// The BPF dispatcher is completely agnostic of the particular scheduling policy implemented in
67/// user-space. For this reason developers that are willing to use this scheduler to experiment
68/// scheduling policies should be able to simply modify the Rust component, without having to deal
69/// with any internal kernel / BPF details.
70///
71/// === Troubleshooting ===
72///
73/// - Reduce the time slice (option `-s`) if you experience lag or cracking audio.
74///
75#[derive(Debug, Parser)]
76struct Opts {
77    /// Scheduling slice duration in microseconds.
78    #[clap(short = 's', long, default_value = "20000")]
79    slice_us: u64,
80
81    /// Scheduling minimum slice duration in microseconds.
82    #[clap(short = 'S', long, default_value = "1000")]
83    slice_us_min: u64,
84
85    /// If specified, only tasks which have their scheduling policy set to SCHED_EXT using
86    /// sched_setscheduler(2) are switched. Otherwise, all tasks are switched.
87    #[clap(short = 'p', long, action = clap::ArgAction::SetTrue)]
88    partial: bool,
89
90    /// Exit debug dump buffer length. 0 indicates default.
91    #[clap(long, default_value = "0")]
92    exit_dump_len: u32,
93
94    /// Enable verbose output, including libbpf details. Moreover, BPF scheduling events will be
95    /// reported in tracefs (e.g., /sys/kernel/tracing/trace_pipe).
96    #[clap(short = 'v', long, action = clap::ArgAction::SetTrue)]
97    verbose: bool,
98
99    /// Enable stats monitoring with the specified interval.
100    #[clap(long)]
101    stats: Option<f64>,
102
103    /// Run in stats monitoring mode with the specified interval. Scheduler
104    /// is not launched.
105    #[clap(long)]
106    monitor: Option<f64>,
107
108    /// Show descriptions for statistics.
109    #[clap(long)]
110    help_stats: bool,
111
112    /// Print scheduler version and exit.
113    #[clap(short = 'V', long, action = clap::ArgAction::SetTrue)]
114    version: bool,
115}
116
117// Time constants.
118const NSEC_PER_USEC: u64 = 1_000;
119
120// Basic item stored in the task information map.
121#[derive(Debug)]
122struct TaskInfo {
123    sum_exec_runtime: u64, // total cpu time used by the task
124    vruntime: u64,         // total vruntime of the task
125}
126
127// Task information map: store total execution time and vruntime of each task in the system.
128//
129// TaskInfo objects are stored in the HashMap and they are indexed by pid.
130//
131// Entries are removed when the corresponding task exits.
132//
133// This information is fetched from the BPF section (through the .exit_task() callback) and
134// received by the user-space scheduler via self.bpf.dequeue_task(): a task with a negative .cpu
135// value represents an exiting task, so in this case we can free the corresponding entry in
136// TaskInfoMap (see also Scheduler::drain_queued_tasks()).
137struct TaskInfoMap {
138    tasks: HashMap<i32, TaskInfo>,
139}
140
141// TaskInfoMap implementation: provide methods to get items and update items by pid.
142impl TaskInfoMap {
143    fn new() -> Self {
144        TaskInfoMap {
145            tasks: HashMap::new(),
146        }
147    }
148}
149
150#[derive(Debug, PartialEq, Eq, PartialOrd, Clone)]
151struct Task {
152    qtask: QueuedTask, // queued task
153    deadline: u64,     // task deadline (that determines the order how tasks are dispatched)
154    timestamp: u64,    // task enqueue timestamp
155}
156
157// Sort tasks by their interactive status first (interactive tasks are always scheduled before
158// regular tasks), then sort them by their vruntime, then by their timestamp and lastly by their
159// pid.
160impl Ord for Task {
161    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
162        self.deadline
163            .cmp(&other.deadline)
164            .then_with(|| self.timestamp.cmp(&other.timestamp))
165            .then_with(|| self.qtask.pid.cmp(&other.qtask.pid))
166    }
167}
168
169// Task pool where all the tasks that needs to run are stored before dispatching (ordered by their
170// shortest deadline using a BTreeSet).
171struct TaskTree {
172    tasks: BTreeSet<Task>,
173    task_map: HashMap<i32, Task>, // Map from pid to task
174}
175
176// Task pool methods (push / pop).
177impl TaskTree {
178    fn new() -> Self {
179        TaskTree {
180            tasks: BTreeSet::new(),
181            task_map: HashMap::new(),
182        }
183    }
184
185    // Add an item to the pool (item will be placed in the tree depending on its deadline, items
186    // with the same deadline will be sorted by pid).
187    fn push(&mut self, task: Task) {
188        // Check if task already exists.
189        if let Some(prev_task) = self.task_map.get(&task.qtask.pid) {
190            self.tasks.remove(prev_task);
191        }
192
193        // Insert/update task.
194        self.tasks.insert(task.clone());
195        self.task_map.insert(task.qtask.pid, task);
196    }
197
198    // Pop the first item from the BTreeSet (item with the shortest deadline).
199    fn pop(&mut self) -> Option<Task> {
200        if let Some(task) = self.tasks.pop_first() {
201            self.task_map.remove(&task.qtask.pid);
202            Some(task)
203        } else {
204            None
205        }
206    }
207}
208
209// Main scheduler object
210struct Scheduler<'a> {
211    bpf: BpfScheduler<'a>,                  // BPF connector
212    stats_server: StatsServer<(), Metrics>, // statistics
213    task_pool: TaskTree,                    // tasks ordered by deadline
214    task_map: TaskInfoMap,                  // map pids to the corresponding task information
215    min_vruntime: u64,                      // Keep track of the minimum vruntime across all tasks
216    init_page_faults: u64,                  // Initial page faults counter
217    slice_ns: u64,                          // Default time slice (in ns)
218    slice_ns_min: u64,                      // Minimum time slice (in ns)
219}
220
221impl<'a> Scheduler<'a> {
222    fn init(opts: &Opts, open_object: &'a mut MaybeUninit<OpenObject>) -> Result<Self> {
223        // Low-level BPF connector.
224        let bpf = BpfScheduler::init(
225            open_object,
226            opts.exit_dump_len,
227            opts.partial,
228            opts.verbose,
229            true, // Enable built-in idle CPU selection policy
230        )?;
231        let stats_server = StatsServer::new(stats::server_data()).launch()?;
232
233        info!("{} scheduler attached", SCHEDULER_NAME);
234
235        // Return scheduler object.
236        Ok(Self {
237            bpf,
238            stats_server,
239            task_pool: TaskTree::new(),
240            task_map: TaskInfoMap::new(),
241            min_vruntime: 0,
242            init_page_faults: 0,
243            slice_ns: opts.slice_us * NSEC_PER_USEC,
244            slice_ns_min: opts.slice_us_min * NSEC_PER_USEC,
245        })
246    }
247
248    fn get_metrics(&mut self) -> Metrics {
249        let page_faults = match Self::get_page_faults() {
250            Ok(page_faults) => page_faults,
251            Err(_) => 0,
252        };
253        if self.init_page_faults == 0 {
254            self.init_page_faults = page_faults;
255        }
256        let nr_page_faults = page_faults - self.init_page_faults;
257
258        Metrics {
259            nr_running: *self.bpf.nr_running_mut(),
260            nr_cpus: *self.bpf.nr_online_cpus_mut(),
261            nr_queued: *self.bpf.nr_queued_mut(),
262            nr_scheduled: *self.bpf.nr_scheduled_mut(),
263            nr_page_faults,
264            nr_user_dispatches: *self.bpf.nr_user_dispatches_mut(),
265            nr_kernel_dispatches: *self.bpf.nr_kernel_dispatches_mut(),
266            nr_cancel_dispatches: *self.bpf.nr_cancel_dispatches_mut(),
267            nr_bounce_dispatches: *self.bpf.nr_bounce_dispatches_mut(),
268            nr_failed_dispatches: *self.bpf.nr_failed_dispatches_mut(),
269            nr_sched_congested: *self.bpf.nr_sched_congested_mut(),
270        }
271    }
272
273    // Return current timestamp in ns.
274    fn now() -> u64 {
275        let ts = SystemTime::now()
276            .duration_since(SystemTime::UNIX_EPOCH)
277            .unwrap();
278        ts.as_nanos() as u64
279    }
280
281    // Update task's vruntime based on the information collected from the kernel and return to the
282    // caller the evaluated task's deadline.
283    //
284    // This method implements the main task ordering logic of the scheduler.
285    fn update_enqueued(&mut self, task: &QueuedTask) -> u64 {
286        // Get task information if the task is already stored in the task map,
287        // otherwise create a new entry for it.
288        let task_info = self
289            .task_map
290            .tasks
291            .entry(task.pid)
292            .or_insert_with_key(|&_pid| TaskInfo {
293                sum_exec_runtime: task.sum_exec_runtime,
294                vruntime: self.min_vruntime,
295            });
296
297        // Update global minimum vruntime based on the previous task's vruntime.
298        if self.min_vruntime < task.vtime {
299            self.min_vruntime = task.vtime;
300        }
301
302        // Evaluate used task time slice.
303        let slice = task
304            .sum_exec_runtime
305            .saturating_sub(task_info.sum_exec_runtime)
306            .min(self.slice_ns);
307
308        // Update total task cputime.
309        task_info.sum_exec_runtime = task.sum_exec_runtime;
310
311        // Update task's vruntime re-aligning it to min_vruntime (never allow a task to accumulate
312        // a budget of more than a time slice to prevent starvation).
313        let min_vruntime = self.min_vruntime.saturating_sub(self.slice_ns);
314        if task_info.vruntime < min_vruntime {
315            task_info.vruntime = min_vruntime;
316        }
317        let vslice = slice * 100 / task.weight;
318        task_info.vruntime += vslice;
319
320        // Return the task's deadline.
321        task_info.vruntime + task.exec_runtime.min(self.slice_ns * 100)
322    }
323
324    // Drain all the tasks from the queued list, update their vruntime (Self::update_enqueued()),
325    // then push them all to the task pool (doing so will sort them by their vruntime).
326    fn drain_queued_tasks(&mut self) {
327        loop {
328            match self.bpf.dequeue_task() {
329                Ok(Some(task)) => {
330                    // Update task information and determine vruntime.
331                    let deadline = self.update_enqueued(&task);
332                    let timestamp = Self::now();
333
334                    // Insert task in the task pool (ordered by vruntime).
335                    self.task_pool.push(Task {
336                        qtask: task,
337                        deadline,
338                        timestamp,
339                    });
340                }
341                Ok(None) => {
342                    break;
343                }
344                Err(err) => {
345                    warn!("Error: {}", err);
346                    break;
347                }
348            }
349        }
350    }
351
352    // Return the total amount of tasks that are waiting to be scheduled.
353    fn nr_tasks_waiting(&mut self) -> u64 {
354        let nr_queued = *self.bpf.nr_queued_mut();
355        let nr_scheduled = *self.bpf.nr_scheduled_mut();
356
357        nr_queued + nr_scheduled
358    }
359
360    // Dispatch the first task from the task pool (sending them to the BPF dispatcher).
361    fn dispatch_tasks(&mut self) {
362        match self.task_pool.pop() {
363            Some(task) => {
364                // Scale time slice based on the amount of tasks that are waiting in the
365                // scheduler's queue and the previously unused time slice budget, but make sure
366                // to assign at least slice_us_min.
367                let nr_waiting = self.nr_tasks_waiting() + 1;
368                let slice_ns = (self.slice_ns / nr_waiting).max(self.slice_ns_min);
369
370                // Create a new task to dispatch.
371                let mut dispatched_task = DispatchedTask::new(&task.qtask);
372
373                // Assign the time slice to the task and propagate the vruntime.
374                dispatched_task.slice_ns = slice_ns;
375
376                // Propagate the evaluated task's deadline to the scx_rustland_core backend.
377                dispatched_task.vtime = task.deadline;
378
379                // Try to pick an idle CPU for the task.
380                let cpu = self
381                    .bpf
382                    .select_cpu(task.qtask.pid, task.qtask.cpu, task.qtask.flags);
383                dispatched_task.cpu = if cpu >= 0 { cpu } else { RL_CPU_ANY };
384
385                // Send task to the BPF dispatcher.
386                match self.bpf.dispatch_task(&dispatched_task) {
387                    Ok(_) => {}
388                    Err(_) => {
389                        /*
390                         * Re-add the task to the dispatched list in case of failure and stop
391                         * dispatching.
392                         */
393                        self.task_pool.push(task);
394                    }
395                }
396            }
397            None => {}
398        }
399    }
400
401    // Main scheduling function (called in a loop to periodically drain tasks from the queued list
402    // and dispatch them to the BPF part via the dispatched list).
403    fn schedule(&mut self) {
404        self.drain_queued_tasks();
405        self.dispatch_tasks();
406
407        // Notify the dispatcher if there are still peding tasks to be processed,
408        self.bpf.notify_complete(self.task_pool.tasks.len() as u64);
409    }
410
411    // Get total page faults from /proc/self/stat.
412    fn get_page_faults() -> Result<u64, io::Error> {
413        let path = format!("/proc/self/stat");
414        let mut file = File::open(path)?;
415
416        // Read the contents of the file into a string.
417        let mut content = String::new();
418        file.read_to_string(&mut content)?;
419
420        // Parse the relevant fields and calculate the total page faults.
421        let fields: Vec<&str> = content.split_whitespace().collect();
422        if fields.len() >= 12 {
423            let minflt: u64 = fields[9].parse().unwrap_or(0);
424            let majflt: u64 = fields[11].parse().unwrap_or(0);
425            Ok(minflt + majflt)
426        } else {
427            Err(io::Error::new(
428                io::ErrorKind::InvalidData,
429                "Invalid format in /proc/[PID]/stat",
430            ))
431        }
432    }
433
434    fn run(&mut self) -> Result<UserExitInfo> {
435        let (res_ch, req_ch) = self.stats_server.channels();
436
437        while !self.bpf.exited() {
438            // Call the main scheduler body.
439            self.schedule();
440
441            // Handle monitor requests asynchronously.
442            match req_ch.try_recv() {
443                Ok(()) => res_ch.send(self.get_metrics())?,
444                Err(_) => {}
445            }
446        }
447
448        self.bpf.shutdown_and_report()
449    }
450}
451
452// Unregister the scheduler.
453impl Drop for Scheduler<'_> {
454    fn drop(&mut self) {
455        info!("Unregister {} scheduler", SCHEDULER_NAME);
456    }
457}
458
459fn main() -> Result<()> {
460    let opts = Opts::parse();
461
462    if opts.version {
463        println!(
464            "{} version {} - scx_rustland_core {}",
465            SCHEDULER_NAME,
466            VERSION,
467            scx_rustland_core::VERSION
468        );
469        return Ok(());
470    }
471
472    if opts.help_stats {
473        stats::server_data().describe_meta(&mut std::io::stdout(), None)?;
474        return Ok(());
475    }
476
477    let loglevel = simplelog::LevelFilter::Info;
478
479    let mut lcfg = simplelog::ConfigBuilder::new();
480    lcfg.set_time_level(simplelog::LevelFilter::Error)
481        .set_location_level(simplelog::LevelFilter::Off)
482        .set_target_level(simplelog::LevelFilter::Off)
483        .set_thread_level(simplelog::LevelFilter::Off);
484    simplelog::TermLogger::init(
485        loglevel,
486        lcfg.build(),
487        simplelog::TerminalMode::Stderr,
488        simplelog::ColorChoice::Auto,
489    )?;
490
491    if let Some(intv) = opts.monitor.or(opts.stats) {
492        let jh = std::thread::spawn(move || stats::monitor(Duration::from_secs_f64(intv)).unwrap());
493        if opts.monitor.is_some() {
494            let _ = jh.join();
495            return Ok(());
496        }
497    }
498
499    let mut open_object = MaybeUninit::uninit();
500    loop {
501        let mut sched = Scheduler::init(&opts, &mut open_object)?;
502        if !sched.run()?.should_restart() {
503            break;
504        }
505    }
506
507    Ok(())
508}