scx_rustland/
bpf.rs

1// Copyright (c) Andrea Righi <andrea.righi@linux.dev>
2
3// This software may be used and distributed according to the terms of the
4// GNU General Public License version 2.
5
6use std::mem::MaybeUninit;
7
8use crate::bpf_intf;
9use crate::bpf_intf::*;
10use crate::bpf_skel::*;
11
12use std::ffi::c_int;
13use std::ffi::c_ulong;
14
15use std::sync::atomic::AtomicBool;
16use std::sync::atomic::Ordering;
17use std::sync::Arc;
18use std::sync::Once;
19
20use anyhow::bail;
21use anyhow::Context;
22use anyhow::Result;
23
24use plain::Plain;
25use procfs::process::all_processes;
26
27use libbpf_rs::libbpf_sys::bpf_object_open_opts;
28use libbpf_rs::OpenObject;
29use libbpf_rs::ProgramInput;
30
31use libc::{c_char, pthread_self, pthread_setschedparam, sched_param};
32
33#[cfg(target_env = "musl")]
34use libc::timespec;
35
36use scx_utils::compat;
37use scx_utils::scx_ops_attach;
38use scx_utils::scx_ops_load;
39use scx_utils::scx_ops_open;
40use scx_utils::uei_exited;
41use scx_utils::uei_report;
42use scx_utils::Topology;
43use scx_utils::UserExitInfo;
44
45use scx_rustland_core::ALLOCATOR;
46
47// Defined in UAPI
48const SCHED_EXT: i32 = 7;
49const TASK_COMM_LEN: usize = 16;
50
51// Allow to dispatch the task on any CPU.
52//
53// The task will be dispatched to the global shared DSQ and it will run on the first CPU available.
54#[allow(dead_code)]
55pub const RL_CPU_ANY: i32 = bpf_intf::RL_CPU_ANY as i32;
56
57/// High-level Rust abstraction to interact with a generic sched-ext BPF component.
58///
59/// Overview
60/// ========
61///
62/// The main BPF interface is provided by the BpfScheduler() struct. When this object is
63/// initialized it will take care of registering and initializing the BPF component.
64///
65/// The scheduler then can use BpfScheduler() instance to receive tasks (in the form of QueuedTask
66/// objects) and dispatch tasks (in the form of DispatchedTask objects), using respectively the
67/// methods dequeue_task() and dispatch_task().
68///
69/// BPF counters and statistics can be accessed using the methods nr_*_mut(), in particular
70/// nr_queued_mut() and nr_scheduled_mut() can be updated to notify the BPF component if the
71/// user-space scheduler has some pending work to do or not.
72///
73/// Finally the methods exited() and shutdown_and_report() can be used respectively to test
74/// whether the BPF component exited, and to shutdown and report the exit message.
75
76// Task queued for scheduling from the BPF component (see bpf_intf::queued_task_ctx).
77#[derive(Debug, PartialEq, Eq, PartialOrd, Clone)]
78pub struct QueuedTask {
79    pub pid: i32,             // pid that uniquely identifies a task
80    pub cpu: i32,             // CPU previously used by the task
81    pub nr_cpus_allowed: u64, // Number of CPUs that the task can use
82    pub flags: u64,           // task's enqueue flags
83    pub start_ts: u64,        // Timestamp since last time the task ran on a CPU (in ns)
84    pub stop_ts: u64,         // Timestamp since last time the task released a CPU (in ns)
85    pub exec_runtime: u64,    // Total cpu time since last sleep (in ns)
86    pub weight: u64,          // Task priority in the range [1..10000] (default is 100)
87    pub vtime: u64,           // Current task vruntime / deadline (set by the scheduler)
88    pub enq_cnt: u64,
89    pub comm: [c_char; TASK_COMM_LEN], // Task's executable name
90}
91
92impl QueuedTask {
93    /// Convert the task's comm field (C char array) into a Rust String.
94    #[allow(dead_code)]
95    pub fn comm_str(&self) -> String {
96        let bytes: &[u8] =
97            unsafe { std::slice::from_raw_parts(self.comm.as_ptr() as *const u8, self.comm.len()) };
98
99        // Find the first NUL byte, or take the whole array.
100        let nul_pos = bytes.iter().position(|&c| c == 0).unwrap_or(bytes.len());
101
102        // Convert to String (handle invalid UTF-8 gracefully).
103        String::from_utf8_lossy(&bytes[..nul_pos]).into_owned()
104    }
105}
106
107// Task queued for dispatching to the BPF component (see bpf_intf::dispatched_task_ctx).
108#[derive(Debug, PartialEq, Eq, PartialOrd, Clone)]
109pub struct DispatchedTask {
110    pub pid: i32,      // pid that uniquely identifies a task
111    pub cpu: i32, // target CPU selected by the scheduler (RL_CPU_ANY = dispatch on the first CPU available)
112    pub flags: u64, // task's enqueue flags
113    pub slice_ns: u64, // time slice in nanoseconds assigned to the task (0 = use default time slice)
114    pub vtime: u64, // this value can be used to send the task's vruntime or deadline directly to the underlying BPF dispatcher
115    pub enq_cnt: u64,
116}
117
118impl DispatchedTask {
119    // Create a DispatchedTask from a QueuedTask.
120    //
121    // A dispatched task should be always originated from a QueuedTask (there is no reason to
122    // dispatch a task if it wasn't queued to the scheduler earlier).
123    pub fn new(task: &QueuedTask) -> Self {
124        DispatchedTask {
125            pid: task.pid,
126            cpu: task.cpu,
127            flags: task.flags,
128            slice_ns: 0, // use default time slice
129            vtime: 0,
130            enq_cnt: task.enq_cnt,
131        }
132    }
133}
134
135// Helpers used to submit tasks to the BPF user ring buffer.
136unsafe impl Plain for bpf_intf::dispatched_task_ctx {}
137
138impl AsMut<bpf_intf::dispatched_task_ctx> for bpf_intf::dispatched_task_ctx {
139    fn as_mut(&mut self) -> &mut bpf_intf::dispatched_task_ctx {
140        self
141    }
142}
143
144// Message received from the dispatcher (see bpf_intf::queued_task_ctx for details).
145//
146// NOTE: eventually libbpf-rs will provide a better abstraction for this.
147struct EnqueuedMessage {
148    inner: bpf_intf::queued_task_ctx,
149}
150
151impl EnqueuedMessage {
152    fn from_bytes(bytes: &[u8]) -> Self {
153        let queued_task_struct = unsafe { *(bytes.as_ptr() as *const bpf_intf::queued_task_ctx) };
154        EnqueuedMessage {
155            inner: queued_task_struct,
156        }
157    }
158
159    fn to_queued_task(&self) -> QueuedTask {
160        QueuedTask {
161            pid: self.inner.pid,
162            cpu: self.inner.cpu,
163            nr_cpus_allowed: self.inner.nr_cpus_allowed,
164            flags: self.inner.flags,
165            start_ts: self.inner.start_ts,
166            stop_ts: self.inner.stop_ts,
167            exec_runtime: self.inner.exec_runtime,
168            weight: self.inner.weight,
169            vtime: self.inner.vtime,
170            enq_cnt: self.inner.enq_cnt,
171            comm: self.inner.comm,
172        }
173    }
174}
175
176pub struct BpfScheduler<'cb> {
177    pub skel: BpfSkel<'cb>,                // Low-level BPF connector
178    shutdown: Arc<AtomicBool>,             // Determine scheduler shutdown
179    queued: libbpf_rs::RingBuffer<'cb>,    // Ring buffer of queued tasks
180    dispatched: libbpf_rs::UserRingBuffer, // User Ring buffer of dispatched tasks
181    struct_ops: Option<libbpf_rs::Link>,   // Low-level BPF methods
182}
183
184// Buffer to store a task read from the ring buffer.
185//
186// NOTE: make the buffer aligned to 64-bits to prevent misaligned dereferences when accessing the
187// buffer using a pointer.
188const BUFSIZE: usize = std::mem::size_of::<QueuedTask>();
189
190#[repr(align(8))]
191struct AlignedBuffer([u8; BUFSIZE]);
192
193static mut BUF: AlignedBuffer = AlignedBuffer([0; BUFSIZE]);
194
195static SET_HANDLER: Once = Once::new();
196
197fn set_ctrlc_handler(shutdown: Arc<AtomicBool>) -> Result<(), anyhow::Error> {
198    SET_HANDLER.call_once(|| {
199        let shutdown_clone = shutdown.clone();
200        ctrlc::set_handler(move || {
201            shutdown_clone.store(true, Ordering::Relaxed);
202        })
203        .expect("Error setting Ctrl-C handler");
204    });
205    Ok(())
206}
207
208impl<'cb> BpfScheduler<'cb> {
209    pub fn init(
210        open_object: &'cb mut MaybeUninit<OpenObject>,
211        open_opts: Option<bpf_object_open_opts>,
212        exit_dump_len: u32,
213        partial: bool,
214        debug: bool,
215        builtin_idle: bool,
216        slice_ns: u64,
217        name: &str,
218    ) -> Result<Self> {
219        let shutdown = Arc::new(AtomicBool::new(false));
220        set_ctrlc_handler(shutdown.clone()).context("Error setting Ctrl-C handler")?;
221
222        // Open the BPF prog first for verification.
223        let mut skel_builder = BpfSkelBuilder::default();
224        skel_builder.obj_builder.debug(debug);
225        let mut skel = scx_ops_open!(skel_builder, open_object, rustland, open_opts)?;
226
227        // Copy one item from the ring buffer.
228        //
229        // # Safety
230        //
231        // Each invocation of the callback will trigger the copy of exactly one QueuedTask item to
232        // BUF. The caller must be synchronize to ensure that multiple invocations of the callback
233        // are not happening at the same time, but this is implicitly guaranteed by the fact that
234        // the caller is a single-thread process (for now).
235        //
236        // Use of a `str` whose contents are not valid UTF-8 is undefined behavior.
237        fn callback(data: &[u8]) -> i32 {
238            #[allow(static_mut_refs)]
239            unsafe {
240                // SAFETY: copying from the BPF ring buffer to BUF is safe, since the size of BUF
241                // is exactly the size of QueuedTask and the callback operates in chunks of
242                // QueuedTask items. It also copies exactly one QueuedTask at a time, this is
243                // guaranteed by the error code returned by this callback (see below). From a
244                // thread-safety perspective this is also correct, assuming the caller is a
245                // single-thread process (as it is for now).
246                BUF.0.copy_from_slice(data);
247            }
248
249            // Return 0 to indicate successful completion of the copy.
250            0
251        }
252
253        // Check host topology to determine if we need to enable SMT capabilities.
254        let topo = Topology::new().unwrap();
255        skel.maps.rodata_data.as_mut().unwrap().smt_enabled = topo.smt_enabled;
256
257        // Enable scheduler flags.
258        skel.struct_ops.rustland_mut().flags =
259            *compat::SCX_OPS_ENQ_LAST | *compat::SCX_OPS_ALLOW_QUEUED_WAKEUP;
260        if partial {
261            skel.struct_ops.rustland_mut().flags |= *compat::SCX_OPS_SWITCH_PARTIAL;
262        }
263        skel.struct_ops.rustland_mut().exit_dump_len = exit_dump_len;
264        skel.maps.rodata_data.as_mut().unwrap().usersched_pid = std::process::id();
265        skel.maps.rodata_data.as_mut().unwrap().khugepaged_pid = Self::khugepaged_pid();
266        skel.maps.rodata_data.as_mut().unwrap().builtin_idle = builtin_idle;
267        skel.maps.rodata_data.as_mut().unwrap().slice_ns = slice_ns;
268        skel.maps.rodata_data.as_mut().unwrap().debug = debug;
269        let _ = Self::set_scx_ops_name(&mut skel.struct_ops.rustland_mut().name, name);
270
271        // Attach BPF scheduler.
272        let mut skel = scx_ops_load!(skel, rustland, uei)?;
273
274        let struct_ops = Some(scx_ops_attach!(skel, rustland)?);
275
276        // Build the ring buffer of queued tasks.
277        let maps = &skel.maps;
278        let queued_ring_buffer = &maps.queued;
279        let mut rbb = libbpf_rs::RingBufferBuilder::new();
280        rbb.add(queued_ring_buffer, callback)
281            .expect("failed to add ringbuf callback");
282        let queued = rbb.build().expect("failed to build ringbuf");
283
284        // Build the user ring buffer of dispatched tasks.
285        let dispatched = libbpf_rs::UserRingBuffer::new(&maps.dispatched)
286            .expect("failed to create user ringbuf");
287
288        // Lock all the memory to prevent page faults that could trigger potential deadlocks during
289        // scheduling.
290        ALLOCATOR.lock_memory();
291        ALLOCATOR.disable_mmap().expect("Failed to disable mmap");
292
293        // Make sure to use the SCHED_EXT class at least for the scheduler itself.
294        if partial {
295            let err = Self::use_sched_ext();
296            if err < 0 {
297                return Err(anyhow::Error::msg(format!(
298                    "sched_setscheduler error: {err}"
299                )));
300            }
301        }
302
303        Ok(Self {
304            skel,
305            shutdown,
306            queued,
307            dispatched,
308            struct_ops,
309        })
310    }
311
312    // Set the name of the scx ops.
313    fn set_scx_ops_name(name_field: &mut [i8], src: &str) -> Result<()> {
314        if !src.is_ascii() {
315            bail!("name must be an ASCII string");
316        }
317
318        let bytes = src.as_bytes();
319        let n = bytes.len().min(name_field.len().saturating_sub(1));
320
321        name_field.fill(0);
322        for i in 0..n {
323            name_field[i] = bytes[i] as i8;
324        }
325
326        let version_suffix = ::scx_utils::build_id::ops_version_suffix(env!("CARGO_PKG_VERSION"));
327        let bytes = version_suffix.as_bytes();
328        let mut i = 0;
329        let mut bytes_idx = 0;
330        let mut found_null = false;
331
332        while i < name_field.len() - 1 {
333            found_null |= name_field[i] == 0;
334            if !found_null {
335                i += 1;
336                continue;
337            }
338
339            if bytes_idx < bytes.len() {
340                name_field[i] = bytes[bytes_idx] as i8;
341                bytes_idx += 1;
342            } else {
343                break;
344            }
345            i += 1;
346        }
347        name_field[i] = 0;
348
349        Ok(())
350    }
351
352    // Return the PID of khugepaged, if present, otherwise return 0.
353    fn khugepaged_pid() -> u32 {
354        let procs = match all_processes() {
355            Ok(p) => p,
356            Err(_) => return 0,
357        };
358
359        for proc in procs {
360            let proc = match proc {
361                Ok(p) => p,
362                Err(_) => continue,
363            };
364
365            if let Ok(stat) = proc.stat() {
366                if proc.exe().is_err() && stat.comm == "khugepaged" {
367                    return proc.pid() as u32;
368                }
369            }
370        }
371
372        0
373    }
374
375    // Notify the BPF component that the user-space scheduler has completed its scheduling cycle,
376    // updating the amount tasks that are still pending.
377    //
378    // NOTE: do not set allow(dead_code) for this method, any scheduler must use this method at
379    // some point, otherwise the BPF component will keep waking-up the user-space scheduler in a
380    // busy loop, causing unnecessary high CPU consumption.
381    pub fn notify_complete(&mut self, nr_pending: u64) {
382        self.skel.maps.bss_data.as_mut().unwrap().nr_scheduled = nr_pending;
383        std::thread::yield_now();
384    }
385
386    // Counter of the online CPUs.
387    #[allow(dead_code)]
388    pub fn nr_online_cpus_mut(&mut self) -> &mut u64 {
389        &mut self.skel.maps.bss_data.as_mut().unwrap().nr_online_cpus
390    }
391
392    // Counter of currently running tasks.
393    #[allow(dead_code)]
394    pub fn nr_running_mut(&mut self) -> &mut u64 {
395        &mut self.skel.maps.bss_data.as_mut().unwrap().nr_running
396    }
397
398    // Counter of queued tasks.
399    #[allow(dead_code)]
400    pub fn nr_queued_mut(&mut self) -> &mut u64 {
401        &mut self.skel.maps.bss_data.as_mut().unwrap().nr_queued
402    }
403
404    // Counter of scheduled tasks.
405    #[allow(dead_code)]
406    pub fn nr_scheduled_mut(&mut self) -> &mut u64 {
407        &mut self.skel.maps.bss_data.as_mut().unwrap().nr_scheduled
408    }
409
410    // Counter of user dispatch events.
411    #[allow(dead_code)]
412    pub fn nr_user_dispatches_mut(&mut self) -> &mut u64 {
413        &mut self.skel.maps.bss_data.as_mut().unwrap().nr_user_dispatches
414    }
415
416    // Counter of user kernel events.
417    #[allow(dead_code)]
418    pub fn nr_kernel_dispatches_mut(&mut self) -> &mut u64 {
419        &mut self
420            .skel
421            .maps
422            .bss_data
423            .as_mut()
424            .unwrap()
425            .nr_kernel_dispatches
426    }
427
428    // Counter of cancel dispatch events.
429    #[allow(dead_code)]
430    pub fn nr_cancel_dispatches_mut(&mut self) -> &mut u64 {
431        &mut self
432            .skel
433            .maps
434            .bss_data
435            .as_mut()
436            .unwrap()
437            .nr_cancel_dispatches
438    }
439
440    // Counter of dispatches bounced to the shared DSQ.
441    #[allow(dead_code)]
442    pub fn nr_bounce_dispatches_mut(&mut self) -> &mut u64 {
443        &mut self
444            .skel
445            .maps
446            .bss_data
447            .as_mut()
448            .unwrap()
449            .nr_bounce_dispatches
450    }
451
452    // Counter of failed dispatch events.
453    #[allow(dead_code)]
454    pub fn nr_failed_dispatches_mut(&mut self) -> &mut u64 {
455        &mut self
456            .skel
457            .maps
458            .bss_data
459            .as_mut()
460            .unwrap()
461            .nr_failed_dispatches
462    }
463
464    // Counter of scheduler congestion events.
465    #[allow(dead_code)]
466    pub fn nr_sched_congested_mut(&mut self) -> &mut u64 {
467        &mut self.skel.maps.bss_data.as_mut().unwrap().nr_sched_congested
468    }
469
470    // Set scheduling class for the scheduler itself to SCHED_EXT
471    fn use_sched_ext() -> i32 {
472        #[cfg(target_env = "gnu")]
473        let param: sched_param = sched_param { sched_priority: 0 };
474        #[cfg(target_env = "musl")]
475        let param: sched_param = sched_param {
476            sched_priority: 0,
477            sched_ss_low_priority: 0,
478            sched_ss_repl_period: timespec {
479                tv_sec: 0,
480                tv_nsec: 0,
481            },
482            sched_ss_init_budget: timespec {
483                tv_sec: 0,
484                tv_nsec: 0,
485            },
486            sched_ss_max_repl: 0,
487        };
488
489        unsafe { pthread_setschedparam(pthread_self(), SCHED_EXT, &param as *const sched_param) }
490    }
491
492    // Pick an idle CPU for the target PID.
493    #[allow(dead_code)]
494    pub fn select_cpu(&mut self, pid: i32, cpu: i32, flags: u64) -> i32 {
495        let prog = &mut self.skel.progs.rs_select_cpu;
496        let mut args = task_cpu_arg {
497            pid: pid as c_int,
498            cpu: cpu as c_int,
499            flags: flags as c_ulong,
500        };
501        let input = ProgramInput {
502            context_in: Some(unsafe {
503                std::slice::from_raw_parts_mut(
504                    &mut args as *mut _ as *mut u8,
505                    std::mem::size_of_val(&args),
506                )
507            }),
508            ..Default::default()
509        };
510        let out = prog.test_run(input).unwrap();
511
512        out.return_value as i32
513    }
514
515    // Receive a task to be scheduled from the BPF dispatcher.
516    #[allow(static_mut_refs)]
517    pub fn dequeue_task(&mut self) -> Result<Option<QueuedTask>, i32> {
518        // Try to consume the first task from the ring buffer.
519        match self.queued.consume_raw_n(1) {
520            0 => {
521                // Ring buffer is empty.
522                self.skel.maps.bss_data.as_mut().unwrap().nr_queued = 0;
523                Ok(None)
524            }
525            1 => {
526                // A valid task is received, convert data to a proper task struct.
527                let task = unsafe { EnqueuedMessage::from_bytes(&BUF.0).to_queued_task() };
528                self.skel.maps.bss_data.as_mut().unwrap().nr_queued = self
529                    .skel
530                    .maps
531                    .bss_data
532                    .as_ref()
533                    .unwrap()
534                    .nr_queued
535                    .saturating_sub(1);
536
537                Ok(Some(task))
538            }
539            res if res < 0 => Err(res),
540            res => panic!("Unexpected return value from libbpf-rs::consume_raw(): {res}"),
541        }
542    }
543
544    // Send a task to the dispatcher.
545    pub fn dispatch_task(&mut self, task: &DispatchedTask) -> Result<(), libbpf_rs::Error> {
546        // Reserve a slot in the user ring buffer.
547        let mut urb_sample = self
548            .dispatched
549            .reserve(std::mem::size_of::<bpf_intf::dispatched_task_ctx>())?;
550        let bytes = urb_sample.as_mut();
551        let dispatched_task = plain::from_mut_bytes::<bpf_intf::dispatched_task_ctx>(bytes)
552            .expect("failed to convert bytes");
553
554        // Convert the dispatched task into the low-level dispatched task context.
555        let bpf_intf::dispatched_task_ctx {
556            pid,
557            cpu,
558            flags,
559            slice_ns,
560            vtime,
561            enq_cnt,
562            ..
563        } = &mut dispatched_task.as_mut();
564
565        *pid = task.pid;
566        *cpu = task.cpu;
567        *flags = task.flags;
568        *slice_ns = task.slice_ns;
569        *vtime = task.vtime;
570        *enq_cnt = task.enq_cnt;
571
572        // Store the task in the user ring buffer.
573        //
574        // NOTE: submit() only updates the reserved slot in the user ring buffer, so it is not
575        // expected to fail.
576        self.dispatched
577            .submit(urb_sample)
578            .expect("failed to submit task");
579
580        Ok(())
581    }
582
583    // Read exit code from the BPF part.
584    pub fn exited(&mut self) -> bool {
585        self.shutdown.load(Ordering::Relaxed) || uei_exited!(&self.skel, uei)
586    }
587
588    // Called on exit to shutdown and report exit message from the BPF part.
589    pub fn shutdown_and_report(&mut self) -> Result<UserExitInfo> {
590        let _ = self.struct_ops.take();
591        uei_report!(&self.skel, uei)
592    }
593}
594
595// Disconnect the low-level BPF scheduler.
596impl Drop for BpfScheduler<'_> {
597    fn drop(&mut self) {
598        if let Some(struct_ops) = self.struct_ops.take() {
599            drop(struct_ops);
600        }
601        ALLOCATOR.unlock_memory();
602    }
603}