scx_rustland/main.rs
1// Copyright (c) Andrea Righi <andrea.righi@linux.dev>
2
3// This software may be used and distributed according to the terms of the
4// GNU General Public License version 2.
5mod bpf_skel;
6pub use bpf_skel::*;
7pub mod bpf_intf;
8
9#[rustfmt::skip]
10mod bpf;
11use bpf::*;
12
13mod stats;
14use std::collections::BTreeSet;
15use std::collections::HashMap;
16use std::fs::File;
17use std::io::Read;
18use std::io::{self};
19use std::mem::MaybeUninit;
20use std::time::Duration;
21use std::time::SystemTime;
22
23use anyhow::Result;
24use clap::Parser;
25use libbpf_rs::OpenObject;
26use log::info;
27use log::warn;
28use scx_stats::prelude::*;
29use scx_utils::UserExitInfo;
30use stats::Metrics;
31
32const SCHEDULER_NAME: &'static str = "RustLand";
33
34const VERSION: &'static str = env!("CARGO_PKG_VERSION");
35
36/// scx_rustland: user-space scheduler written in Rust
37///
38/// scx_rustland is designed to prioritize interactive workloads over background CPU-intensive
39/// workloads. For this reason the typical use case of this scheduler involves low-latency
40/// interactive applications, such as gaming, video conferencing and live streaming.
41///
42/// scx_rustland is also designed to be an "easy to read" template that can be used by any
43/// developer to quickly experiment more complex scheduling policies fully implemented in Rust.
44///
45/// The scheduler is based on scx_rustland_core, which implements the low level sched-ext
46/// functionalities.
47///
48/// The scheduling policy implemented in user-space is a based on a deadline, evaluated as
49/// following:
50///
51/// deadline = vruntime + exec_runtime
52///
53/// Where, vruntime reflects the task's total runtime scaled by weight (ensuring fairness), while
54/// exec_runtime accounts the CPU time used since the last sleep (capturing responsiveness). Tasks
55/// are then dispatched from the lowest to the highest deadline.
56///
57/// This approach favors latency-sensitive tasks: those that frequently sleep will accumulate less
58/// exec_runtime, resulting in earlier deadlines. In contrast, CPU-intensive tasks that don’t sleep
59/// accumulate a larger exec_runtime and thus get scheduled later.
60///
61/// All the tasks are stored in a BTreeSet (TaskTree), using the deadline as the ordering key.
62/// Once the order of execution is determined all tasks are sent back to the BPF counterpart
63/// (scx_rustland_core) to be dispatched. To keep track of the accumulated execution time and
64/// vruntime, the scheduler maintains a HashMap (TaskInfoMap), indexed by pid.
65///
66/// The BPF dispatcher is completely agnostic of the particular scheduling policy implemented in
67/// user-space. For this reason developers that are willing to use this scheduler to experiment
68/// scheduling policies should be able to simply modify the Rust component, without having to deal
69/// with any internal kernel / BPF details.
70///
71/// === Troubleshooting ===
72///
73/// - Reduce the time slice (option `-s`) if you experience lag or cracking audio.
74///
75#[derive(Debug, Parser)]
76struct Opts {
77 /// Scheduling slice duration in microseconds.
78 #[clap(short = 's', long, default_value = "20000")]
79 slice_us: u64,
80
81 /// Scheduling minimum slice duration in microseconds.
82 #[clap(short = 'S', long, default_value = "1000")]
83 slice_us_min: u64,
84
85 /// If specified, only tasks which have their scheduling policy set to SCHED_EXT using
86 /// sched_setscheduler(2) are switched. Otherwise, all tasks are switched.
87 #[clap(short = 'p', long, action = clap::ArgAction::SetTrue)]
88 partial: bool,
89
90 /// Exit debug dump buffer length. 0 indicates default.
91 #[clap(long, default_value = "0")]
92 exit_dump_len: u32,
93
94 /// Enable verbose output, including libbpf details. Moreover, BPF scheduling events will be
95 /// reported in tracefs (e.g., /sys/kernel/tracing/trace_pipe).
96 #[clap(short = 'v', long, action = clap::ArgAction::SetTrue)]
97 verbose: bool,
98
99 /// Enable stats monitoring with the specified interval.
100 #[clap(long)]
101 stats: Option<f64>,
102
103 /// Run in stats monitoring mode with the specified interval. Scheduler
104 /// is not launched.
105 #[clap(long)]
106 monitor: Option<f64>,
107
108 /// Show descriptions for statistics.
109 #[clap(long)]
110 help_stats: bool,
111
112 /// Print scheduler version and exit.
113 #[clap(short = 'V', long, action = clap::ArgAction::SetTrue)]
114 version: bool,
115}
116
117// Time constants.
118const NSEC_PER_USEC: u64 = 1_000;
119
120// Basic item stored in the task information map.
121#[derive(Debug)]
122struct TaskInfo {
123 sum_exec_runtime: u64, // total cpu time used by the task
124 vruntime: u64, // total vruntime of the task
125}
126
127// Task information map: store total execution time and vruntime of each task in the system.
128//
129// TaskInfo objects are stored in the HashMap and they are indexed by pid.
130//
131// Entries are removed when the corresponding task exits.
132//
133// This information is fetched from the BPF section (through the .exit_task() callback) and
134// received by the user-space scheduler via self.bpf.dequeue_task(): a task with a negative .cpu
135// value represents an exiting task, so in this case we can free the corresponding entry in
136// TaskInfoMap (see also Scheduler::drain_queued_tasks()).
137struct TaskInfoMap {
138 tasks: HashMap<i32, TaskInfo>,
139}
140
141// TaskInfoMap implementation: provide methods to get items and update items by pid.
142impl TaskInfoMap {
143 fn new() -> Self {
144 TaskInfoMap {
145 tasks: HashMap::new(),
146 }
147 }
148}
149
150#[derive(Debug, PartialEq, Eq, PartialOrd, Clone)]
151struct Task {
152 qtask: QueuedTask, // queued task
153 deadline: u64, // task deadline (that determines the order how tasks are dispatched)
154 timestamp: u64, // task enqueue timestamp
155}
156
157// Sort tasks by their interactive status first (interactive tasks are always scheduled before
158// regular tasks), then sort them by their vruntime, then by their timestamp and lastly by their
159// pid.
160impl Ord for Task {
161 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
162 self.deadline
163 .cmp(&other.deadline)
164 .then_with(|| self.timestamp.cmp(&other.timestamp))
165 .then_with(|| self.qtask.pid.cmp(&other.qtask.pid))
166 }
167}
168
169// Task pool where all the tasks that needs to run are stored before dispatching (ordered by their
170// shortest deadline using a BTreeSet).
171struct TaskTree {
172 tasks: BTreeSet<Task>,
173 task_map: HashMap<i32, Task>, // Map from pid to task
174}
175
176// Task pool methods (push / pop).
177impl TaskTree {
178 fn new() -> Self {
179 TaskTree {
180 tasks: BTreeSet::new(),
181 task_map: HashMap::new(),
182 }
183 }
184
185 // Add an item to the pool (item will be placed in the tree depending on its deadline, items
186 // with the same deadline will be sorted by pid).
187 fn push(&mut self, task: Task) {
188 // Check if task already exists.
189 if let Some(prev_task) = self.task_map.get(&task.qtask.pid) {
190 self.tasks.remove(prev_task);
191 }
192
193 // Insert/update task.
194 self.tasks.insert(task.clone());
195 self.task_map.insert(task.qtask.pid, task);
196 }
197
198 // Pop the first item from the BTreeSet (item with the shortest deadline).
199 fn pop(&mut self) -> Option<Task> {
200 if let Some(task) = self.tasks.pop_first() {
201 self.task_map.remove(&task.qtask.pid);
202 Some(task)
203 } else {
204 None
205 }
206 }
207}
208
209// Main scheduler object
210struct Scheduler<'a> {
211 bpf: BpfScheduler<'a>, // BPF connector
212 stats_server: StatsServer<(), Metrics>, // statistics
213 task_pool: TaskTree, // tasks ordered by deadline
214 task_map: TaskInfoMap, // map pids to the corresponding task information
215 min_vruntime: u64, // Keep track of the minimum vruntime across all tasks
216 init_page_faults: u64, // Initial page faults counter
217 slice_ns: u64, // Default time slice (in ns)
218 slice_ns_min: u64, // Minimum time slice (in ns)
219}
220
221impl<'a> Scheduler<'a> {
222 fn init(opts: &Opts, open_object: &'a mut MaybeUninit<OpenObject>) -> Result<Self> {
223 // Low-level BPF connector.
224 let bpf = BpfScheduler::init(
225 open_object,
226 opts.exit_dump_len,
227 opts.partial,
228 opts.verbose,
229 true, // Enable built-in idle CPU selection policy
230 )?;
231 let stats_server = StatsServer::new(stats::server_data()).launch()?;
232
233 info!("{} scheduler attached", SCHEDULER_NAME);
234
235 // Return scheduler object.
236 Ok(Self {
237 bpf,
238 stats_server,
239 task_pool: TaskTree::new(),
240 task_map: TaskInfoMap::new(),
241 min_vruntime: 0,
242 init_page_faults: 0,
243 slice_ns: opts.slice_us * NSEC_PER_USEC,
244 slice_ns_min: opts.slice_us_min * NSEC_PER_USEC,
245 })
246 }
247
248 fn get_metrics(&mut self) -> Metrics {
249 let page_faults = match Self::get_page_faults() {
250 Ok(page_faults) => page_faults,
251 Err(_) => 0,
252 };
253 if self.init_page_faults == 0 {
254 self.init_page_faults = page_faults;
255 }
256 let nr_page_faults = page_faults - self.init_page_faults;
257
258 Metrics {
259 nr_running: *self.bpf.nr_running_mut(),
260 nr_cpus: *self.bpf.nr_online_cpus_mut(),
261 nr_queued: *self.bpf.nr_queued_mut(),
262 nr_scheduled: *self.bpf.nr_scheduled_mut(),
263 nr_page_faults,
264 nr_user_dispatches: *self.bpf.nr_user_dispatches_mut(),
265 nr_kernel_dispatches: *self.bpf.nr_kernel_dispatches_mut(),
266 nr_cancel_dispatches: *self.bpf.nr_cancel_dispatches_mut(),
267 nr_bounce_dispatches: *self.bpf.nr_bounce_dispatches_mut(),
268 nr_failed_dispatches: *self.bpf.nr_failed_dispatches_mut(),
269 nr_sched_congested: *self.bpf.nr_sched_congested_mut(),
270 }
271 }
272
273 // Return current timestamp in ns.
274 fn now() -> u64 {
275 let ts = SystemTime::now()
276 .duration_since(SystemTime::UNIX_EPOCH)
277 .unwrap();
278 ts.as_nanos() as u64
279 }
280
281 // Update task's vruntime based on the information collected from the kernel and return to the
282 // caller the evaluated task's deadline.
283 //
284 // This method implements the main task ordering logic of the scheduler.
285 fn update_enqueued(&mut self, task: &QueuedTask) -> u64 {
286 // Get task information if the task is already stored in the task map,
287 // otherwise create a new entry for it.
288 let task_info = self
289 .task_map
290 .tasks
291 .entry(task.pid)
292 .or_insert_with_key(|&_pid| TaskInfo {
293 sum_exec_runtime: task.sum_exec_runtime,
294 vruntime: self.min_vruntime,
295 });
296
297 // Update global minimum vruntime based on the previous task's vruntime.
298 if self.min_vruntime < task.vtime {
299 self.min_vruntime = task.vtime;
300 }
301
302 // Evaluate used task time slice.
303 let slice = task
304 .sum_exec_runtime
305 .saturating_sub(task_info.sum_exec_runtime)
306 .min(self.slice_ns);
307
308 // Update total task cputime.
309 task_info.sum_exec_runtime = task.sum_exec_runtime;
310
311 // Update task's vruntime re-aligning it to min_vruntime (never allow a task to accumulate
312 // a budget of more than a time slice to prevent starvation).
313 let min_vruntime = self.min_vruntime.saturating_sub(self.slice_ns);
314 if task_info.vruntime < min_vruntime {
315 task_info.vruntime = min_vruntime;
316 }
317 let vslice = slice * 100 / task.weight;
318 task_info.vruntime += vslice;
319
320 // Return the task's deadline.
321 task_info.vruntime + task.exec_runtime.min(self.slice_ns * 100)
322 }
323
324 // Drain all the tasks from the queued list, update their vruntime (Self::update_enqueued()),
325 // then push them all to the task pool (doing so will sort them by their vruntime).
326 fn drain_queued_tasks(&mut self) {
327 loop {
328 match self.bpf.dequeue_task() {
329 Ok(Some(task)) => {
330 // Update task information and determine vruntime.
331 let deadline = self.update_enqueued(&task);
332 let timestamp = Self::now();
333
334 // Insert task in the task pool (ordered by vruntime).
335 self.task_pool.push(Task {
336 qtask: task,
337 deadline,
338 timestamp,
339 });
340 }
341 Ok(None) => {
342 break;
343 }
344 Err(err) => {
345 warn!("Error: {}", err);
346 break;
347 }
348 }
349 }
350 }
351
352 // Return the total amount of tasks that are waiting to be scheduled.
353 fn nr_tasks_waiting(&mut self) -> u64 {
354 let nr_queued = *self.bpf.nr_queued_mut();
355 let nr_scheduled = *self.bpf.nr_scheduled_mut();
356
357 nr_queued + nr_scheduled
358 }
359
360 // Dispatch the first task from the task pool (sending them to the BPF dispatcher).
361 fn dispatch_tasks(&mut self) {
362 match self.task_pool.pop() {
363 Some(task) => {
364 // Scale time slice based on the amount of tasks that are waiting in the
365 // scheduler's queue and the previously unused time slice budget, but make sure
366 // to assign at least slice_us_min.
367 let nr_waiting = self.nr_tasks_waiting() + 1;
368 let slice_ns = (self.slice_ns / nr_waiting).max(self.slice_ns_min);
369
370 // Create a new task to dispatch.
371 let mut dispatched_task = DispatchedTask::new(&task.qtask);
372
373 // Assign the time slice to the task and propagate the vruntime.
374 dispatched_task.slice_ns = slice_ns;
375
376 // Propagate the evaluated task's deadline to the scx_rustland_core backend.
377 dispatched_task.vtime = task.deadline;
378
379 // Try to pick an idle CPU for the task.
380 let cpu = self
381 .bpf
382 .select_cpu(task.qtask.pid, task.qtask.cpu, task.qtask.flags);
383 dispatched_task.cpu = if cpu >= 0 { cpu } else { RL_CPU_ANY };
384
385 // Send task to the BPF dispatcher.
386 match self.bpf.dispatch_task(&dispatched_task) {
387 Ok(_) => {}
388 Err(_) => {
389 /*
390 * Re-add the task to the dispatched list in case of failure and stop
391 * dispatching.
392 */
393 self.task_pool.push(task);
394 }
395 }
396 }
397 None => {}
398 }
399 }
400
401 // Main scheduling function (called in a loop to periodically drain tasks from the queued list
402 // and dispatch them to the BPF part via the dispatched list).
403 fn schedule(&mut self) {
404 self.drain_queued_tasks();
405 self.dispatch_tasks();
406
407 // Notify the dispatcher if there are still peding tasks to be processed,
408 self.bpf.notify_complete(self.task_pool.tasks.len() as u64);
409 }
410
411 // Get total page faults from /proc/self/stat.
412 fn get_page_faults() -> Result<u64, io::Error> {
413 let path = format!("/proc/self/stat");
414 let mut file = File::open(path)?;
415
416 // Read the contents of the file into a string.
417 let mut content = String::new();
418 file.read_to_string(&mut content)?;
419
420 // Parse the relevant fields and calculate the total page faults.
421 let fields: Vec<&str> = content.split_whitespace().collect();
422 if fields.len() >= 12 {
423 let minflt: u64 = fields[9].parse().unwrap_or(0);
424 let majflt: u64 = fields[11].parse().unwrap_or(0);
425 Ok(minflt + majflt)
426 } else {
427 Err(io::Error::new(
428 io::ErrorKind::InvalidData,
429 "Invalid format in /proc/[PID]/stat",
430 ))
431 }
432 }
433
434 fn run(&mut self) -> Result<UserExitInfo> {
435 let (res_ch, req_ch) = self.stats_server.channels();
436
437 while !self.bpf.exited() {
438 // Call the main scheduler body.
439 self.schedule();
440
441 // Handle monitor requests asynchronously.
442 match req_ch.try_recv() {
443 Ok(()) => res_ch.send(self.get_metrics())?,
444 Err(_) => {}
445 }
446 }
447
448 self.bpf.shutdown_and_report()
449 }
450}
451
452// Unregister the scheduler.
453impl Drop for Scheduler<'_> {
454 fn drop(&mut self) {
455 info!("Unregister {} scheduler", SCHEDULER_NAME);
456 }
457}
458
459fn main() -> Result<()> {
460 let opts = Opts::parse();
461
462 if opts.version {
463 println!(
464 "{} version {} - scx_rustland_core {}",
465 SCHEDULER_NAME,
466 VERSION,
467 scx_rustland_core::VERSION
468 );
469 return Ok(());
470 }
471
472 if opts.help_stats {
473 stats::server_data().describe_meta(&mut std::io::stdout(), None)?;
474 return Ok(());
475 }
476
477 let loglevel = simplelog::LevelFilter::Info;
478
479 let mut lcfg = simplelog::ConfigBuilder::new();
480 lcfg.set_time_level(simplelog::LevelFilter::Error)
481 .set_location_level(simplelog::LevelFilter::Off)
482 .set_target_level(simplelog::LevelFilter::Off)
483 .set_thread_level(simplelog::LevelFilter::Off);
484 simplelog::TermLogger::init(
485 loglevel,
486 lcfg.build(),
487 simplelog::TerminalMode::Stderr,
488 simplelog::ColorChoice::Auto,
489 )?;
490
491 if let Some(intv) = opts.monitor.or(opts.stats) {
492 let jh = std::thread::spawn(move || stats::monitor(Duration::from_secs_f64(intv)).unwrap());
493 if opts.monitor.is_some() {
494 let _ = jh.join();
495 return Ok(());
496 }
497 }
498
499 let mut open_object = MaybeUninit::uninit();
500 loop {
501 let mut sched = Scheduler::init(&opts, &mut open_object)?;
502 if !sched.run()?.should_restart() {
503 break;
504 }
505 }
506
507 Ok(())
508}