Skip to main content

scx_rustland/
bpf.rs

1// Copyright (c) Andrea Righi <andrea.righi@linux.dev>
2
3// This software may be used and distributed according to the terms of the
4// GNU General Public License version 2.
5
6use std::mem::MaybeUninit;
7
8use crate::bpf_intf;
9use crate::bpf_intf::*;
10use crate::bpf_skel::*;
11
12use std::ffi::c_int;
13use std::ffi::c_ulong;
14use std::ffi::CStr;
15
16use std::sync::atomic::AtomicBool;
17use std::sync::atomic::Ordering;
18use std::sync::Arc;
19use std::sync::Once;
20
21use anyhow::bail;
22use anyhow::Context;
23use anyhow::Result;
24
25use plain::Plain;
26use procfs::process::all_processes;
27
28use libbpf_rs::libbpf_sys::bpf_object_open_opts;
29use libbpf_rs::OpenObject;
30use libbpf_rs::ProgramInput;
31
32use libc::{c_char, pthread_self, pthread_setschedparam, sched_param};
33
34#[cfg(target_env = "musl")]
35use libc::timespec;
36
37use scx_utils::compat;
38use scx_utils::scx_ops_attach;
39use scx_utils::scx_ops_load;
40use scx_utils::scx_ops_open;
41use scx_utils::uei_exited;
42use scx_utils::uei_report;
43use scx_utils::Topology;
44use scx_utils::UserExitInfo;
45
46use scx_rustland_core::ALLOCATOR;
47
48// Defined in UAPI
49const SCHED_EXT: i32 = 7;
50const TASK_COMM_LEN: usize = 16;
51
52// Allow to dispatch the task on any CPU.
53//
54// The task will be dispatched to the global shared DSQ and it will run on the first CPU available.
55#[allow(dead_code)]
56pub const RL_CPU_ANY: i32 = bpf_intf::RL_CPU_ANY as i32;
57
58/// High-level Rust abstraction to interact with a generic sched-ext BPF component.
59///
60/// Overview
61/// ========
62///
63/// The main BPF interface is provided by the BpfScheduler() struct. When this object is
64/// initialized it will take care of registering and initializing the BPF component.
65///
66/// The scheduler then can use BpfScheduler() instance to receive tasks (in the form of QueuedTask
67/// objects) and dispatch tasks (in the form of DispatchedTask objects), using respectively the
68/// methods dequeue_task() and dispatch_task().
69///
70/// BPF counters and statistics can be accessed using the methods nr_*_mut(), in particular
71/// nr_queued_mut() and nr_scheduled_mut() can be updated to notify the BPF component if the
72/// user-space scheduler has some pending work to do or not.
73///
74/// Finally the methods exited() and shutdown_and_report() can be used respectively to test
75/// whether the BPF component exited, and to shutdown and report the exit message.
76
77// Task queued for scheduling from the BPF component (see bpf_intf::queued_task_ctx).
78#[derive(Debug, PartialEq, Eq, PartialOrd, Clone)]
79pub struct QueuedTask {
80    pub pid: i32,             // pid that uniquely identifies a task
81    pub cpu: i32,             // CPU previously used by the task
82    pub nr_cpus_allowed: u64, // Number of CPUs that the task can use
83    pub flags: u64,           // task's enqueue flags
84    pub start_ts: u64,        // Timestamp since last time the task ran on a CPU (in ns)
85    pub stop_ts: u64,         // Timestamp since last time the task released a CPU (in ns)
86    pub exec_runtime: u64,    // Total cpu time since last sleep (in ns)
87    pub weight: u64,          // Task priority in the range [1..10000] (default is 100)
88    pub vtime: u64,           // Current task vruntime / deadline (set by the scheduler)
89    pub enq_cnt: u64,
90    pub comm: [c_char; TASK_COMM_LEN], // Task's executable name
91}
92
93impl QueuedTask {
94    /// Convert the task's comm field (C char array) into a Rust String.
95    #[allow(dead_code)]
96    pub fn comm_str(&self) -> String {
97        // Convert the C char array into a Rust String
98        let c_str = unsafe { CStr::from_ptr(self.comm.as_ptr()) };
99
100        // Handle potential invalid UTF-8
101        c_str.to_string_lossy().into_owned()
102    }
103}
104
105// Task queued for dispatching to the BPF component (see bpf_intf::dispatched_task_ctx).
106#[derive(Debug, PartialEq, Eq, PartialOrd, Clone)]
107pub struct DispatchedTask {
108    pub pid: i32,      // pid that uniquely identifies a task
109    pub cpu: i32, // target CPU selected by the scheduler (RL_CPU_ANY = dispatch on the first CPU available)
110    pub flags: u64, // task's enqueue flags
111    pub slice_ns: u64, // time slice in nanoseconds assigned to the task (0 = use default time slice)
112    pub vtime: u64, // this value can be used to send the task's vruntime or deadline directly to the underlying BPF dispatcher
113    pub enq_cnt: u64,
114}
115
116impl DispatchedTask {
117    // Create a DispatchedTask from a QueuedTask.
118    //
119    // A dispatched task should be always originated from a QueuedTask (there is no reason to
120    // dispatch a task if it wasn't queued to the scheduler earlier).
121    pub fn new(task: &QueuedTask) -> Self {
122        DispatchedTask {
123            pid: task.pid,
124            cpu: task.cpu,
125            flags: task.flags,
126            slice_ns: 0, // use default time slice
127            vtime: 0,
128            enq_cnt: task.enq_cnt,
129        }
130    }
131}
132
133// Helpers used to submit tasks to the BPF user ring buffer.
134unsafe impl Plain for bpf_intf::dispatched_task_ctx {}
135
136impl AsMut<bpf_intf::dispatched_task_ctx> for bpf_intf::dispatched_task_ctx {
137    fn as_mut(&mut self) -> &mut bpf_intf::dispatched_task_ctx {
138        self
139    }
140}
141
142// Message received from the dispatcher (see bpf_intf::queued_task_ctx for details).
143//
144// NOTE: eventually libbpf-rs will provide a better abstraction for this.
145struct EnqueuedMessage {
146    inner: bpf_intf::queued_task_ctx,
147}
148
149impl EnqueuedMessage {
150    fn from_bytes(bytes: &[u8]) -> Self {
151        let queued_task_struct = unsafe { *(bytes.as_ptr() as *const bpf_intf::queued_task_ctx) };
152        EnqueuedMessage {
153            inner: queued_task_struct,
154        }
155    }
156
157    fn to_queued_task(&self) -> QueuedTask {
158        QueuedTask {
159            pid: self.inner.pid,
160            cpu: self.inner.cpu,
161            nr_cpus_allowed: self.inner.nr_cpus_allowed,
162            flags: self.inner.flags,
163            start_ts: self.inner.start_ts,
164            stop_ts: self.inner.stop_ts,
165            exec_runtime: self.inner.exec_runtime,
166            weight: self.inner.weight,
167            vtime: self.inner.vtime,
168            enq_cnt: self.inner.enq_cnt,
169            comm: self.inner.comm,
170        }
171    }
172}
173
174pub struct BpfScheduler<'cb> {
175    pub skel: BpfSkel<'cb>,                // Low-level BPF connector
176    shutdown: Arc<AtomicBool>,             // Determine scheduler shutdown
177    queued: libbpf_rs::RingBuffer<'cb>,    // Ring buffer of queued tasks
178    dispatched: libbpf_rs::UserRingBuffer, // User Ring buffer of dispatched tasks
179    struct_ops: Option<libbpf_rs::Link>,   // Low-level BPF methods
180}
181
182// Buffer to store a task read from the ring buffer.
183//
184// NOTE: make the buffer aligned to 64-bits to prevent misaligned dereferences when accessing the
185// buffer using a pointer.
186const BUFSIZE: usize = size_of::<queued_task_ctx>();
187
188#[repr(align(8))]
189struct AlignedBuffer([u8; BUFSIZE]);
190
191static mut BUF: AlignedBuffer = AlignedBuffer([0; BUFSIZE]);
192
193static SET_HANDLER: Once = Once::new();
194
195fn set_ctrlc_handler(shutdown: Arc<AtomicBool>) -> Result<(), anyhow::Error> {
196    SET_HANDLER.call_once(|| {
197        let shutdown_clone = shutdown.clone();
198        ctrlc::set_handler(move || {
199            shutdown_clone.store(true, Ordering::Relaxed);
200        })
201        .expect("Error setting Ctrl-C handler");
202    });
203    Ok(())
204}
205
206impl<'cb> BpfScheduler<'cb> {
207    #[allow(clippy::too_many_arguments)]
208    pub fn init(
209        open_object: &'cb mut MaybeUninit<OpenObject>,
210        open_opts: Option<bpf_object_open_opts>,
211        exit_dump_len: u32,
212        partial: bool,
213        debug: bool,
214        builtin_idle: bool,
215        numa_local: bool,
216        slice_ns: u64,
217        name: &str,
218    ) -> Result<Self> {
219        let shutdown = Arc::new(AtomicBool::new(false));
220        set_ctrlc_handler(shutdown.clone()).context("Error setting Ctrl-C handler")?;
221
222        // Open the BPF prog first for verification.
223        let mut skel_builder = BpfSkelBuilder::default();
224        skel_builder.obj_builder.debug(debug);
225        let mut skel = scx_ops_open!(skel_builder, open_object, rustland, open_opts)?;
226
227        // Copy one item from the ring buffer.
228        //
229        // # Safety
230        //
231        // Each invocation of the callback will trigger the copy of exactly one QueuedTask item to
232        // BUF. The caller must be synchronize to ensure that multiple invocations of the callback
233        // are not happening at the same time, but this is implicitly guaranteed by the fact that
234        // the caller is a single-thread process (for now).
235        //
236        // Use of a `str` whose contents are not valid UTF-8 is undefined behavior.
237        fn callback(data: &[u8]) -> i32 {
238            #[allow(static_mut_refs)]
239            unsafe {
240                // SAFETY: copying from the BPF ring buffer to BUF is safe, since the size of BUF
241                // is exactly the size of QueuedTask and the callback operates in chunks of
242                // QueuedTask items. It also copies exactly one QueuedTask at a time, this is
243                // guaranteed by the error code returned by this callback (see below). From a
244                // thread-safety perspective this is also correct, assuming the caller is a
245                // single-thread process (as it is for now).
246                BUF.0.copy_from_slice(data);
247            }
248
249            // Return 0 to indicate successful completion of the copy.
250            0
251        }
252
253        // Check host topology to determine if we need to enable SMT capabilities.
254        let topo = Topology::new().unwrap();
255        skel.maps.rodata_data.as_mut().unwrap().smt_enabled = topo.smt_enabled;
256
257        // Enable scheduler flags.
258        skel.struct_ops.rustland_mut().flags =
259            *compat::SCX_OPS_ENQ_LAST | *compat::SCX_OPS_ALLOW_QUEUED_WAKEUP;
260        if partial {
261            skel.struct_ops.rustland_mut().flags |= *compat::SCX_OPS_SWITCH_PARTIAL;
262        }
263        if numa_local {
264            skel.struct_ops.rustland_mut().flags |= *compat::SCX_OPS_BUILTIN_IDLE_PER_NODE;
265        }
266        skel.struct_ops.rustland_mut().exit_dump_len = exit_dump_len;
267        skel.maps.rodata_data.as_mut().unwrap().usersched_pid = std::process::id();
268        skel.maps.rodata_data.as_mut().unwrap().khugepaged_pid = Self::khugepaged_pid();
269        skel.maps.rodata_data.as_mut().unwrap().builtin_idle = builtin_idle;
270        skel.maps.rodata_data.as_mut().unwrap().numa_local = numa_local;
271        skel.maps.rodata_data.as_mut().unwrap().slice_ns = slice_ns;
272        skel.maps.rodata_data.as_mut().unwrap().debug = debug;
273        let _ = Self::set_scx_ops_name(&mut skel.struct_ops.rustland_mut().name, name);
274
275        // Attach BPF scheduler.
276        let mut skel = scx_ops_load!(skel, rustland, uei)?;
277
278        let struct_ops = Some(scx_ops_attach!(skel, rustland)?);
279
280        // Build the ring buffer of queued tasks.
281        let maps = &skel.maps;
282        let queued_ring_buffer = &maps.queued;
283        let mut rbb = libbpf_rs::RingBufferBuilder::new();
284        rbb.add(queued_ring_buffer, callback)
285            .expect("failed to add ringbuf callback");
286        let queued = rbb.build().expect("failed to build ringbuf");
287
288        // Build the user ring buffer of dispatched tasks.
289        let dispatched = libbpf_rs::UserRingBuffer::new(&maps.dispatched)
290            .expect("failed to create user ringbuf");
291
292        // Lock all the memory to prevent page faults that could trigger potential deadlocks during
293        // scheduling.
294        ALLOCATOR.lock_memory();
295        ALLOCATOR.disable_mmap().expect("Failed to disable mmap");
296
297        // Make sure to use the SCHED_EXT class at least for the scheduler itself.
298        if partial {
299            let err = Self::use_sched_ext();
300            if err < 0 {
301                return Err(anyhow::Error::msg(format!(
302                    "sched_setscheduler error: {err}"
303                )));
304            }
305        }
306
307        Ok(Self {
308            skel,
309            shutdown,
310            queued,
311            dispatched,
312            struct_ops,
313        })
314    }
315
316    // Set the name of the scx ops.
317    fn set_scx_ops_name(name_field: &mut [i8], src: &str) -> Result<()> {
318        if !src.is_ascii() {
319            bail!("name must be an ASCII string");
320        }
321
322        let bytes = src.as_bytes();
323        let n = bytes.len().min(name_field.len().saturating_sub(1));
324
325        name_field.fill(0);
326        for i in 0..n {
327            name_field[i] = bytes[i] as i8;
328        }
329
330        let version_suffix = ::scx_utils::build_id::ops_version_suffix(env!("CARGO_PKG_VERSION"));
331        let bytes = version_suffix.as_bytes();
332        let mut i = 0;
333        let mut bytes_idx = 0;
334        let mut found_null = false;
335
336        while i < name_field.len() - 1 {
337            found_null |= name_field[i] == 0;
338            if !found_null {
339                i += 1;
340                continue;
341            }
342
343            if bytes_idx < bytes.len() {
344                name_field[i] = bytes[bytes_idx] as i8;
345                bytes_idx += 1;
346            } else {
347                break;
348            }
349            i += 1;
350        }
351        name_field[i] = 0;
352
353        Ok(())
354    }
355
356    // Return the PID of khugepaged, if present, otherwise return 0.
357    fn khugepaged_pid() -> u32 {
358        let procs = match all_processes() {
359            Ok(p) => p,
360            Err(_) => return 0,
361        };
362
363        for proc in procs {
364            let proc = match proc {
365                Ok(p) => p,
366                Err(_) => continue,
367            };
368
369            if let Ok(stat) = proc.stat() {
370                if proc.exe().is_err() && stat.comm == "khugepaged" {
371                    return proc.pid() as u32;
372                }
373            }
374        }
375
376        0
377    }
378
379    // Notify the BPF component that the user-space scheduler has completed its scheduling cycle,
380    // updating the amount tasks that are still pending.
381    //
382    // NOTE: do not set allow(dead_code) for this method, any scheduler must use this method at
383    // some point, otherwise the BPF component will keep waking-up the user-space scheduler in a
384    // busy loop, causing unnecessary high CPU consumption.
385    pub fn notify_complete(&mut self, nr_pending: u64) {
386        self.skel.maps.bss_data.as_mut().unwrap().nr_scheduled = nr_pending;
387        std::thread::yield_now();
388    }
389
390    // Counter of the online CPUs.
391    #[allow(dead_code)]
392    pub fn nr_online_cpus_mut(&mut self) -> &mut u64 {
393        &mut self.skel.maps.bss_data.as_mut().unwrap().nr_online_cpus
394    }
395
396    // Counter of currently running tasks.
397    #[allow(dead_code)]
398    pub fn nr_running_mut(&mut self) -> &mut u64 {
399        &mut self.skel.maps.bss_data.as_mut().unwrap().nr_running
400    }
401
402    // Counter of queued tasks.
403    #[allow(dead_code)]
404    pub fn nr_queued_mut(&mut self) -> &mut u64 {
405        &mut self.skel.maps.bss_data.as_mut().unwrap().nr_queued
406    }
407
408    // Counter of scheduled tasks.
409    #[allow(dead_code)]
410    pub fn nr_scheduled_mut(&mut self) -> &mut u64 {
411        &mut self.skel.maps.bss_data.as_mut().unwrap().nr_scheduled
412    }
413
414    // Counter of user dispatch events.
415    #[allow(dead_code)]
416    pub fn nr_user_dispatches_mut(&mut self) -> &mut u64 {
417        &mut self.skel.maps.bss_data.as_mut().unwrap().nr_user_dispatches
418    }
419
420    // Counter of user kernel events.
421    #[allow(dead_code)]
422    pub fn nr_kernel_dispatches_mut(&mut self) -> &mut u64 {
423        &mut self
424            .skel
425            .maps
426            .bss_data
427            .as_mut()
428            .unwrap()
429            .nr_kernel_dispatches
430    }
431
432    // Counter of cancel dispatch events.
433    #[allow(dead_code)]
434    pub fn nr_cancel_dispatches_mut(&mut self) -> &mut u64 {
435        &mut self
436            .skel
437            .maps
438            .bss_data
439            .as_mut()
440            .unwrap()
441            .nr_cancel_dispatches
442    }
443
444    // Counter of dispatches bounced to the shared DSQ.
445    #[allow(dead_code)]
446    pub fn nr_bounce_dispatches_mut(&mut self) -> &mut u64 {
447        &mut self
448            .skel
449            .maps
450            .bss_data
451            .as_mut()
452            .unwrap()
453            .nr_bounce_dispatches
454    }
455
456    // Counter of failed dispatch events.
457    #[allow(dead_code)]
458    pub fn nr_failed_dispatches_mut(&mut self) -> &mut u64 {
459        &mut self
460            .skel
461            .maps
462            .bss_data
463            .as_mut()
464            .unwrap()
465            .nr_failed_dispatches
466    }
467
468    // Counter of scheduler congestion events.
469    #[allow(dead_code)]
470    pub fn nr_sched_congested_mut(&mut self) -> &mut u64 {
471        &mut self.skel.maps.bss_data.as_mut().unwrap().nr_sched_congested
472    }
473
474    // Set scheduling class for the scheduler itself to SCHED_EXT
475    fn use_sched_ext() -> i32 {
476        #[cfg(target_env = "gnu")]
477        let param: sched_param = sched_param { sched_priority: 0 };
478        #[cfg(target_env = "musl")]
479        let param: sched_param = sched_param {
480            sched_priority: 0,
481            sched_ss_low_priority: 0,
482            sched_ss_repl_period: timespec {
483                tv_sec: 0,
484                tv_nsec: 0,
485            },
486            sched_ss_init_budget: timespec {
487                tv_sec: 0,
488                tv_nsec: 0,
489            },
490            sched_ss_max_repl: 0,
491        };
492
493        unsafe { pthread_setschedparam(pthread_self(), SCHED_EXT, &param as *const sched_param) }
494    }
495
496    // Pick an idle CPU for the target PID.
497    #[allow(dead_code)]
498    pub fn select_cpu(&mut self, pid: i32, cpu: i32, flags: u64) -> i32 {
499        let prog = &mut self.skel.progs.rs_select_cpu;
500        let mut args = task_cpu_arg {
501            pid: pid as c_int,
502            cpu: cpu as c_int,
503            flags: flags as c_ulong,
504        };
505        let input = ProgramInput {
506            context_in: Some(unsafe {
507                std::slice::from_raw_parts_mut(
508                    &mut args as *mut _ as *mut u8,
509                    std::mem::size_of_val(&args),
510                )
511            }),
512            ..Default::default()
513        };
514        let out = prog.test_run(input).unwrap();
515
516        out.return_value as i32
517    }
518
519    // Receive a task to be scheduled from the BPF dispatcher.
520    #[allow(static_mut_refs)]
521    pub fn dequeue_task(&mut self) -> Result<Option<QueuedTask>, i32> {
522        let bss_data = self.skel.maps.bss_data.as_mut().unwrap();
523        
524        // Try to consume the first task from the ring buffer.
525        match self.queued.consume_raw_n(1) {
526            0 => {
527                // Ring buffer is empty.
528                bss_data.nr_queued = 0;
529                Ok(None)
530            }
531            1 => {
532                // A valid task is received, convert data to a proper task struct.
533                let task = unsafe { EnqueuedMessage::from_bytes(&BUF.0).to_queued_task() };
534                bss_data.nr_queued = bss_data.nr_queued.saturating_sub(1);
535
536                Ok(Some(task))
537            }
538            res if res < 0 => Err(res),
539            res => panic!("Unexpected return value from libbpf-rs::consume_raw(): {res}"),
540        }
541    }
542
543    // Send a task to the dispatcher.
544    pub fn dispatch_task(&mut self, task: &DispatchedTask) -> Result<(), libbpf_rs::Error> {
545        // Reserve a slot in the user ring buffer.
546        let mut urb_sample = self
547            .dispatched
548            .reserve(std::mem::size_of::<bpf_intf::dispatched_task_ctx>())?;
549        let bytes = urb_sample.as_mut();
550        let dispatched_task = plain::from_mut_bytes::<bpf_intf::dispatched_task_ctx>(bytes)
551            .expect("failed to convert bytes");
552
553        // Convert the dispatched task into the low-level dispatched task context.
554        let bpf_intf::dispatched_task_ctx {
555            pid,
556            cpu,
557            flags,
558            slice_ns,
559            vtime,
560            enq_cnt,
561            ..
562        } = dispatched_task;
563
564        *pid = task.pid;
565        *cpu = task.cpu;
566        *flags = task.flags;
567        *slice_ns = task.slice_ns;
568        *vtime = task.vtime;
569        *enq_cnt = task.enq_cnt;
570
571        // Store the task in the user ring buffer.
572        //
573        // NOTE: submit() only updates the reserved slot in the user ring buffer, so it is not
574        // expected to fail.
575        self.dispatched
576            .submit(urb_sample)
577            .expect("failed to submit task");
578
579        Ok(())
580    }
581
582    // Read exit code from the BPF part.
583    pub fn exited(&mut self) -> bool {
584        self.shutdown.load(Ordering::Relaxed) || uei_exited!(&self.skel, uei)
585    }
586
587    // Called on exit to shutdown and report exit message from the BPF part.
588    pub fn shutdown_and_report(&mut self) -> Result<UserExitInfo> {
589        let _ = self.struct_ops.take();
590        uei_report!(&self.skel, uei)
591    }
592}
593
594// Disconnect the low-level BPF scheduler.
595impl Drop for BpfScheduler<'_> {
596    fn drop(&mut self) {
597        if let Some(struct_ops) = self.struct_ops.take() {
598            drop(struct_ops);
599        }
600        ALLOCATOR.unlock_memory();
601    }
602}