scx_rustland/main.rs
1// Copyright (c) Andrea Righi <andrea.righi@linux.dev>
2
3// This software may be used and distributed according to the terms of the
4// GNU General Public License version 2.
5mod bpf_skel;
6pub use bpf_skel::*;
7pub mod bpf_intf;
8
9#[rustfmt::skip]
10mod bpf;
11use bpf::*;
12
13mod stats;
14use std::collections::BTreeSet;
15use std::io::{self};
16use std::mem::MaybeUninit;
17use std::time::Duration;
18use std::time::SystemTime;
19
20use anyhow::Result;
21use clap::Parser;
22use libbpf_rs::OpenObject;
23use log::info;
24use log::warn;
25use procfs::process::Process;
26use scx_stats::prelude::*;
27use scx_utils::build_id;
28use scx_utils::libbpf_clap_opts::LibbpfOpts;
29use scx_utils::UserExitInfo;
30use stats::Metrics;
31
32const SCHEDULER_NAME: &str = "RustLand";
33
34/// scx_rustland: user-space scheduler written in Rust
35///
36/// scx_rustland is designed to prioritize interactive workloads over background CPU-intensive
37/// workloads. For this reason the typical use case of this scheduler involves low-latency
38/// interactive applications, such as gaming, video conferencing and live streaming.
39///
40/// scx_rustland is also designed to be an "easy to read" template that can be used by any
41/// developer to quickly experiment more complex scheduling policies fully implemented in Rust.
42///
43/// The scheduler is based on scx_rustland_core, which implements the low level sched-ext
44/// functionalities.
45///
46/// The scheduling policy implemented in user-space is a based on a deadline, evaluated as
47/// following:
48///
49/// deadline = vruntime + exec_runtime
50///
51/// Where, vruntime reflects the task's total runtime scaled by weight (ensuring fairness), while
52/// exec_runtime accounts the CPU time used since the last sleep (capturing responsiveness). Tasks
53/// are then dispatched from the lowest to the highest deadline.
54///
55/// This approach favors latency-sensitive tasks: those that frequently sleep will accumulate less
56/// exec_runtime, resulting in earlier deadlines. In contrast, CPU-intensive tasks that don’t sleep
57/// accumulate a larger exec_runtime and thus get scheduled later.
58///
59/// All the tasks are stored in a BTreeSet (TaskTree), using the deadline as the ordering key.
60/// Once the order of execution is determined all tasks are sent back to the BPF counterpart
61/// (scx_rustland_core) to be dispatched.
62///
63/// The BPF dispatcher is completely agnostic of the particular scheduling policy implemented in
64/// user-space. For this reason developers that are willing to use this scheduler to experiment
65/// scheduling policies should be able to simply modify the Rust component, without having to deal
66/// with any internal kernel / BPF details.
67///
68/// === Troubleshooting ===
69///
70/// - Reduce the time slice (option `-s`) if you experience lag or cracking audio.
71///
72#[derive(Debug, Parser)]
73struct Opts {
74 /// Scheduling slice duration in microseconds.
75 #[clap(short = 's', long, default_value = "20000")]
76 slice_us: u64,
77
78 /// Scheduling minimum slice duration in microseconds.
79 #[clap(short = 'S', long, default_value = "1000")]
80 slice_us_min: u64,
81
82 /// If set, per-CPU tasks are dispatched directly to their only eligible CPU.
83 /// This can help enforce affinity-based isolation for better performance.
84 #[clap(short = 'l', long, action = clap::ArgAction::SetTrue)]
85 percpu_local: bool,
86
87 /// If specified, only tasks which have their scheduling policy set to SCHED_EXT using
88 /// sched_setscheduler(2) are switched. Otherwise, all tasks are switched.
89 #[clap(short = 'p', long, action = clap::ArgAction::SetTrue)]
90 partial: bool,
91
92 /// Exit debug dump buffer length. 0 indicates default.
93 #[clap(long, default_value = "0")]
94 exit_dump_len: u32,
95
96 /// Enable verbose output, including libbpf details. Moreover, BPF scheduling events will be
97 /// reported in tracefs (e.g., /sys/kernel/tracing/trace_pipe).
98 #[clap(short = 'v', long, action = clap::ArgAction::SetTrue)]
99 verbose: bool,
100
101 /// Enable stats monitoring with the specified interval.
102 #[clap(long)]
103 stats: Option<f64>,
104
105 /// Run in stats monitoring mode with the specified interval. Scheduler
106 /// is not launched.
107 #[clap(long)]
108 monitor: Option<f64>,
109
110 /// Show descriptions for statistics.
111 #[clap(long)]
112 help_stats: bool,
113
114 /// Print scheduler version and exit.
115 #[clap(short = 'V', long, action = clap::ArgAction::SetTrue)]
116 version: bool,
117
118 #[clap(flatten, next_help_heading = "Libbpf Options")]
119 pub libbpf: LibbpfOpts,
120}
121
122// Time constants.
123const NSEC_PER_USEC: u64 = 1_000;
124
125#[derive(Debug, PartialEq, Eq, PartialOrd, Clone)]
126struct Task {
127 qtask: QueuedTask, // queued task
128 deadline: u64, // task deadline (that determines the order how tasks are dispatched)
129 timestamp: u64, // task enqueue timestamp
130}
131
132// Sort tasks by their interactive status first (interactive tasks are always scheduled before
133// regular tasks), then sort them by their vruntime, then by their timestamp and lastly by their
134// pid.
135impl Ord for Task {
136 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
137 self.deadline
138 .cmp(&other.deadline)
139 .then_with(|| self.timestamp.cmp(&other.timestamp))
140 .then_with(|| self.qtask.pid.cmp(&other.qtask.pid))
141 }
142}
143
144// Main scheduler object
145struct Scheduler<'a> {
146 bpf: BpfScheduler<'a>, // BPF connector
147 opts: &'a Opts, // scheduler options
148 stats_server: StatsServer<(), Metrics>, // statistics
149 tasks: BTreeSet<Task>, // tasks ordered by deadline
150 min_vruntime: u64, // Keep track of the minimum vruntime across all tasks
151 init_page_faults: u64, // Initial page faults counter
152 slice_ns: u64, // Default time slice (in ns)
153 slice_ns_min: u64, // Minimum time slice (in ns)
154}
155
156impl<'a> Scheduler<'a> {
157 fn init(opts: &'a Opts, open_object: &'a mut MaybeUninit<OpenObject>) -> Result<Self> {
158 let stats_server = StatsServer::new(stats::server_data()).launch()?;
159
160 // Low-level BPF connector.
161 let bpf = BpfScheduler::init(
162 open_object,
163 opts.libbpf.clone().into_bpf_open_opts(),
164 opts.exit_dump_len,
165 opts.partial,
166 opts.verbose,
167 true, // Enable built-in idle CPU selection policy
168 "rustland",
169 )?;
170
171 info!(
172 "{} version {} - scx_rustland_core {}",
173 SCHEDULER_NAME,
174 build_id::full_version(env!("CARGO_PKG_VERSION")),
175 scx_rustland_core::VERSION
176 );
177
178 // Return scheduler object.
179 Ok(Self {
180 bpf,
181 opts,
182 stats_server,
183 tasks: BTreeSet::new(),
184 min_vruntime: 0,
185 init_page_faults: 0,
186 slice_ns: opts.slice_us * NSEC_PER_USEC,
187 slice_ns_min: opts.slice_us_min * NSEC_PER_USEC,
188 })
189 }
190
191 fn get_metrics(&mut self) -> Metrics {
192 let page_faults = match Self::get_page_faults() {
193 Ok(page_faults) => page_faults,
194 Err(_) => 0,
195 };
196 if self.init_page_faults == 0 {
197 self.init_page_faults = page_faults;
198 }
199 let nr_page_faults = page_faults - self.init_page_faults;
200
201 Metrics {
202 nr_running: *self.bpf.nr_running_mut(),
203 nr_cpus: *self.bpf.nr_online_cpus_mut(),
204 nr_queued: *self.bpf.nr_queued_mut(),
205 nr_scheduled: *self.bpf.nr_scheduled_mut(),
206 nr_page_faults,
207 nr_user_dispatches: *self.bpf.nr_user_dispatches_mut(),
208 nr_kernel_dispatches: *self.bpf.nr_kernel_dispatches_mut(),
209 nr_cancel_dispatches: *self.bpf.nr_cancel_dispatches_mut(),
210 nr_bounce_dispatches: *self.bpf.nr_bounce_dispatches_mut(),
211 nr_failed_dispatches: *self.bpf.nr_failed_dispatches_mut(),
212 nr_sched_congested: *self.bpf.nr_sched_congested_mut(),
213 }
214 }
215
216 // Return current timestamp in ns.
217 fn now() -> u64 {
218 let ts = SystemTime::now()
219 .duration_since(SystemTime::UNIX_EPOCH)
220 .unwrap();
221 ts.as_nanos() as u64
222 }
223
224 // Return the total amount of tasks waiting in the user-space scheduler.
225 fn nr_tasks_scheduled(&mut self) -> u64 {
226 self.tasks.len() as u64
227 }
228
229 // Return the total amount of tasks waiting to be consumed by the user-space scheduler.
230 fn nr_tasks_queued(&mut self) -> u64 {
231 *self.bpf.nr_queued_mut()
232 }
233
234 // Return the total amount of tasks that are waiting to be scheduled.
235 fn nr_tasks_waiting(&mut self) -> u64 {
236 self.nr_tasks_queued() + self.nr_tasks_scheduled()
237 }
238
239 // Return a value inversely proportional to the task's weight.
240 fn scale_by_task_weight_inverse(task: &QueuedTask, value: u64) -> u64 {
241 value * 100 / task.weight
242 }
243
244 // Update task's vruntime based on the information collected from the kernel and return to the
245 // caller the evaluated task's deadline.
246 //
247 // This method implements the main task ordering logic of the scheduler.
248 fn update_enqueued(&mut self, task: &mut QueuedTask) -> u64 {
249 // Update global minimum vruntime based on the previous task's vruntime.
250 if self.min_vruntime < task.vtime {
251 self.min_vruntime = task.vtime;
252 }
253
254 // Update task's vruntime re-aligning it to min_vruntime (never allow a task to accumulate
255 // a budget of more than a time slice to prevent starvation).
256 let min_vruntime = self.min_vruntime.saturating_sub(self.slice_ns);
257 if task.vtime == 0 {
258 // Slightly penalize new tasks by charging an extra time slice to prevent bursts of such
259 // tasks from disrupting the responsiveness of already running ones.
260 task.vtime = min_vruntime + Self::scale_by_task_weight_inverse(task, self.slice_ns);
261 } else if task.vtime < min_vruntime {
262 task.vtime = min_vruntime;
263 }
264 task.vtime += Self::scale_by_task_weight_inverse(task, task.stop_ts - task.start_ts);
265
266 // Return the task's deadline.
267 task.vtime + task.exec_runtime.min(self.slice_ns * 100)
268 }
269
270 // Dispatch the first task from the task pool (sending them to the BPF dispatcher).
271 //
272 // Return true on success, false if the BPF backend can't accept any more dispatch.
273 fn dispatch_task(&mut self) -> bool {
274 let nr_waiting = self.nr_tasks_waiting() + 1;
275
276 if let Some(task) = self.tasks.pop_first() {
277 // Scale time slice based on the amount of tasks that are waiting in the
278 // scheduler's queue and the previously unused time slice budget, but make sure
279 // to assign at least slice_us_min.
280 let slice_ns = (self.slice_ns / nr_waiting).max(self.slice_ns_min);
281
282 // Create a new task to dispatch.
283 let mut dispatched_task = DispatchedTask::new(&task.qtask);
284
285 // Assign the time slice to the task and propagate the vruntime.
286 dispatched_task.slice_ns = slice_ns;
287
288 // Propagate the evaluated task's deadline to the scx_rustland_core backend.
289 dispatched_task.vtime = task.deadline;
290
291 // Try to pick an idle CPU for the task.
292 let cpu = self
293 .bpf
294 .select_cpu(task.qtask.pid, task.qtask.cpu, task.qtask.flags);
295 dispatched_task.cpu = if cpu >= 0 {
296 // An idle CPU was found, dispatch the task there.
297 cpu
298 } else if self.opts.percpu_local && task.qtask.nr_cpus_allowed == 1 {
299 // Task is restricted to run on a single CPU, dispatch it to that one.
300 task.qtask.cpu
301 } else {
302 // No idle CPU found, dispatch to the first CPU available.
303 RL_CPU_ANY
304 };
305
306 // Send task to the BPF dispatcher.
307 if self.bpf.dispatch_task(&dispatched_task).is_err() {
308 // If dispatching fails, re-add the task to the pool and skip further dispatching.
309 self.tasks.insert(task);
310
311 return false;
312 }
313 }
314
315 return true;
316 }
317
318 // Drain all the tasks from the queued list, update their vruntime (Self::update_enqueued()),
319 // then push them all to the task pool (doing so will sort them by their vruntime).
320 fn drain_queued_tasks(&mut self) {
321 loop {
322 match self.bpf.dequeue_task() {
323 Ok(Some(mut task)) => {
324 // Update task information and determine vruntime.
325 let deadline = self.update_enqueued(&mut task);
326 let timestamp = Self::now();
327
328 // Insert task in the task pool (ordered by vruntime).
329 self.tasks.insert(Task {
330 qtask: task,
331 deadline,
332 timestamp,
333 });
334 }
335 Ok(None) => {
336 break;
337 }
338 Err(err) => {
339 warn!("Error: {}", err);
340 break;
341 }
342 }
343 }
344
345 // Dispatch the first task from the task pool.
346 self.dispatch_task();
347 }
348
349 // Main scheduling function (called in a loop to periodically drain tasks from the queued list
350 // and dispatch them to the BPF part via the dispatched list).
351 fn schedule(&mut self) {
352 self.drain_queued_tasks();
353
354 // Notify the dispatcher if there are still pending tasks to be processed,
355 self.bpf.notify_complete(self.tasks.len() as u64);
356 }
357
358 // Get total page faults from the process.
359 fn get_page_faults() -> Result<u64, io::Error> {
360 let myself = Process::myself().map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
361 let stat = myself
362 .stat()
363 .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
364
365 Ok(stat.minflt + stat.majflt)
366 }
367
368 fn run(&mut self) -> Result<UserExitInfo> {
369 let (res_ch, req_ch) = self.stats_server.channels();
370
371 while !self.bpf.exited() {
372 // Call the main scheduler body.
373 self.schedule();
374
375 // Handle monitor requests asynchronously.
376 if req_ch.try_recv().is_ok() {
377 res_ch.send(self.get_metrics())?;
378 }
379 }
380
381 self.bpf.shutdown_and_report()
382 }
383}
384
385// Unregister the scheduler.
386impl Drop for Scheduler<'_> {
387 fn drop(&mut self) {
388 info!("Unregister {SCHEDULER_NAME} scheduler");
389 }
390}
391
392fn main() -> Result<()> {
393 let opts = Opts::parse();
394
395 if opts.version {
396 println!(
397 "{} version {} - scx_rustland_core {}",
398 SCHEDULER_NAME,
399 build_id::full_version(env!("CARGO_PKG_VERSION")),
400 scx_rustland_core::VERSION
401 );
402 return Ok(());
403 }
404
405 if opts.help_stats {
406 stats::server_data().describe_meta(&mut std::io::stdout(), None)?;
407 return Ok(());
408 }
409
410 let loglevel = simplelog::LevelFilter::Info;
411
412 let mut lcfg = simplelog::ConfigBuilder::new();
413 lcfg.set_time_offset_to_local()
414 .expect("Failed to set local time offset")
415 .set_time_level(simplelog::LevelFilter::Error)
416 .set_location_level(simplelog::LevelFilter::Off)
417 .set_target_level(simplelog::LevelFilter::Off)
418 .set_thread_level(simplelog::LevelFilter::Off);
419 simplelog::TermLogger::init(
420 loglevel,
421 lcfg.build(),
422 simplelog::TerminalMode::Stderr,
423 simplelog::ColorChoice::Auto,
424 )?;
425
426 if let Some(intv) = opts.monitor.or(opts.stats) {
427 let jh = std::thread::spawn(move || stats::monitor(Duration::from_secs_f64(intv)).unwrap());
428 if opts.monitor.is_some() {
429 let _ = jh.join();
430 return Ok(());
431 }
432 }
433
434 let mut open_object = MaybeUninit::uninit();
435 loop {
436 let mut sched = Scheduler::init(&opts, &mut open_object)?;
437 if !sched.run()?.should_restart() {
438 break;
439 }
440 }
441
442 Ok(())
443}