scx_rlfifo/
bpf.rs

1// Copyright (c) Andrea Righi <andrea.righi@linux.dev>
2
3// This software may be used and distributed according to the terms of the
4// GNU General Public License version 2.
5
6use std::io::{self, ErrorKind};
7use std::mem::MaybeUninit;
8
9use crate::bpf_intf;
10use crate::bpf_intf::*;
11use crate::bpf_skel::*;
12
13use std::ffi::c_int;
14use std::ffi::c_ulong;
15
16use std::collections::HashMap;
17use std::sync::atomic::AtomicBool;
18use std::sync::atomic::Ordering;
19use std::sync::Arc;
20use std::sync::Once;
21
22use anyhow::Context;
23use anyhow::Result;
24
25use plain::Plain;
26use procfs::process::all_processes;
27
28use libbpf_rs::OpenObject;
29use libbpf_rs::ProgramInput;
30
31use libc::{pthread_self, pthread_setschedparam, sched_param};
32
33#[cfg(target_env = "musl")]
34use libc::timespec;
35
36use scx_utils::compat;
37use scx_utils::scx_ops_attach;
38use scx_utils::scx_ops_load;
39use scx_utils::scx_ops_open;
40use scx_utils::uei_exited;
41use scx_utils::uei_report;
42use scx_utils::Topology;
43use scx_utils::UserExitInfo;
44
45use scx_rustland_core::ALLOCATOR;
46
47// Defined in UAPI
48const SCHED_EXT: i32 = 7;
49
50// Allow to dispatch the task on any CPU.
51//
52// The task will be dispatched to the global shared DSQ and it will run on the first CPU available.
53#[allow(dead_code)]
54pub const RL_CPU_ANY: i32 = bpf_intf::RL_CPU_ANY as i32;
55
56/// High-level Rust abstraction to interact with a generic sched-ext BPF component.
57///
58/// Overview
59/// ========
60///
61/// The main BPF interface is provided by the BpfScheduler() struct. When this object is
62/// initialized it will take care of registering and initializing the BPF component.
63///
64/// The scheduler then can use BpfScheduler() instance to receive tasks (in the form of QueuedTask
65/// objects) and dispatch tasks (in the form of DispatchedTask objects), using respectively the
66/// methods dequeue_task() and dispatch_task().
67///
68/// BPF counters and statistics can be accessed using the methods nr_*_mut(), in particular
69/// nr_queued_mut() and nr_scheduled_mut() can be updated to notify the BPF component if the
70/// user-space scheduler has some pending work to do or not.
71///
72/// Finally the methods exited() and shutdown_and_report() can be used respectively to test
73/// whether the BPF component exited, and to shutdown and report the exit message.
74
75// Task queued for scheduling from the BPF component (see bpf_intf::queued_task_ctx).
76#[derive(Debug, PartialEq, Eq, PartialOrd, Clone)]
77pub struct QueuedTask {
78    pub pid: i32,             // pid that uniquely identifies a task
79    pub cpu: i32,             // CPU where the task is running
80    pub nr_cpus_allowed: u64, // Number of CPUs that the task can use
81    pub flags: u64,           // task enqueue flags
82    pub start_ts: u64,        // Timestamp since last time the task ran on a CPU
83    pub stop_ts: u64,         // Timestamp since last time the task released a CPU
84    pub exec_runtime: u64,    // Total cpu time since last sleep
85    pub weight: u64,          // Task static priority
86    pub vtime: u64,           // Current vruntime
87}
88
89// Task queued for dispatching to the BPF component (see bpf_intf::dispatched_task_ctx).
90#[derive(Debug, PartialEq, Eq, PartialOrd, Clone)]
91pub struct DispatchedTask {
92    pub pid: i32,      // pid that uniquely identifies a task
93    pub cpu: i32,      // target CPU selected by the scheduler
94    pub flags: u64,    // special dispatch flags
95    pub slice_ns: u64, // time slice assigned to the task (0 = default)
96    pub vtime: u64,    // task deadline / vruntime
97}
98
99impl DispatchedTask {
100    // Create a DispatchedTask from a QueuedTask.
101    //
102    // A dispatched task should be always originated from a QueuedTask (there is no reason to
103    // dispatch a task if it wasn't queued to the scheduler earlier).
104    pub fn new(task: &QueuedTask) -> Self {
105        DispatchedTask {
106            pid: task.pid,
107            cpu: task.cpu,
108            flags: task.flags,
109            slice_ns: 0, // use default time slice
110            vtime: 0,
111        }
112    }
113}
114
115// Helpers used to submit tasks to the BPF user ring buffer.
116unsafe impl Plain for bpf_intf::dispatched_task_ctx {}
117
118impl AsMut<bpf_intf::dispatched_task_ctx> for bpf_intf::dispatched_task_ctx {
119    fn as_mut(&mut self) -> &mut bpf_intf::dispatched_task_ctx {
120        self
121    }
122}
123
124// Message received from the dispatcher (see bpf_intf::queued_task_ctx for details).
125//
126// NOTE: eventually libbpf-rs will provide a better abstraction for this.
127struct EnqueuedMessage {
128    inner: bpf_intf::queued_task_ctx,
129}
130
131impl EnqueuedMessage {
132    fn from_bytes(bytes: &[u8]) -> Self {
133        let queued_task_struct = unsafe { *(bytes.as_ptr() as *const bpf_intf::queued_task_ctx) };
134        EnqueuedMessage {
135            inner: queued_task_struct,
136        }
137    }
138
139    fn to_queued_task(&self) -> QueuedTask {
140        QueuedTask {
141            pid: self.inner.pid,
142            cpu: self.inner.cpu,
143            nr_cpus_allowed: self.inner.nr_cpus_allowed,
144            flags: self.inner.flags,
145            start_ts: self.inner.start_ts,
146            stop_ts: self.inner.stop_ts,
147            exec_runtime: self.inner.exec_runtime,
148            weight: self.inner.weight,
149            vtime: self.inner.vtime,
150        }
151    }
152}
153
154pub struct BpfScheduler<'cb> {
155    pub skel: BpfSkel<'cb>,                // Low-level BPF connector
156    shutdown: Arc<AtomicBool>,             // Determine scheduler shutdown
157    queued: libbpf_rs::RingBuffer<'cb>,    // Ring buffer of queued tasks
158    dispatched: libbpf_rs::UserRingBuffer, // User Ring buffer of dispatched tasks
159    struct_ops: Option<libbpf_rs::Link>,   // Low-level BPF methods
160}
161
162// Buffer to store a task read from the ring buffer.
163//
164// NOTE: make the buffer aligned to 64-bits to prevent misaligned dereferences when accessing the
165// buffer using a pointer.
166const BUFSIZE: usize = std::mem::size_of::<QueuedTask>();
167
168#[repr(align(8))]
169struct AlignedBuffer([u8; BUFSIZE]);
170
171static mut BUF: AlignedBuffer = AlignedBuffer([0; BUFSIZE]);
172
173// Special negative error code for libbpf to stop after consuming just one item from a BPF
174// ring buffer.
175const LIBBPF_STOP: i32 = -255;
176
177static SET_HANDLER: Once = Once::new();
178
179fn set_ctrlc_handler(shutdown: Arc<AtomicBool>) -> Result<(), anyhow::Error> {
180    SET_HANDLER.call_once(|| {
181        let shutdown_clone = shutdown.clone();
182        ctrlc::set_handler(move || {
183            shutdown_clone.store(true, Ordering::Relaxed);
184        })
185        .expect("Error setting Ctrl-C handler");
186    });
187    Ok(())
188}
189
190impl<'cb> BpfScheduler<'cb> {
191    pub fn init(
192        open_object: &'cb mut MaybeUninit<OpenObject>,
193        exit_dump_len: u32,
194        partial: bool,
195        debug: bool,
196        builtin_idle: bool,
197    ) -> Result<Self> {
198        let shutdown = Arc::new(AtomicBool::new(false));
199        set_ctrlc_handler(shutdown.clone()).context("Error setting Ctrl-C handler")?;
200
201        // Open the BPF prog first for verification.
202        let mut skel_builder = BpfSkelBuilder::default();
203        skel_builder.obj_builder.debug(debug);
204        let mut skel = scx_ops_open!(skel_builder, open_object, rustland)?;
205
206        // Copy one item from the ring buffer.
207        //
208        // # Safety
209        //
210        // Each invocation of the callback will trigger the copy of exactly one QueuedTask item to
211        // BUF. The caller must be synchronize to ensure that multiple invocations of the callback
212        // are not happening at the same time, but this is implicitly guaranteed by the fact that
213        // the caller is a single-thread process (for now).
214        //
215        // Use of a `str` whose contents are not valid UTF-8 is undefined behavior.
216        fn callback(data: &[u8]) -> i32 {
217            #[allow(static_mut_refs)]
218            unsafe {
219                // SAFETY: copying from the BPF ring buffer to BUF is safe, since the size of BUF
220                // is exactly the size of QueuedTask and the callback operates in chunks of
221                // QueuedTask items. It also copies exactly one QueuedTask at a time, this is
222                // guaranteed by the error code returned by this callback (see below). From a
223                // thread-safety perspective this is also correct, assuming the caller is a
224                // single-thread process (as it is for now).
225                BUF.0.copy_from_slice(data);
226            }
227
228            // Return an unsupported error to stop early and consume only one item.
229            //
230            // NOTE: this is quite a hack. I wish libbpf would honor stopping after the first item
231            // is consumed, upon returning a non-zero positive value here, but it doesn't seem to
232            // be the case:
233            //
234            // https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/tree/tools/lib/bpf/ringbuf.c?h=v6.8-rc5#n260
235            //
236            // Maybe we should fix this to stop processing items from the ring buffer also when a
237            // value > 0 is returned.
238            //
239            LIBBPF_STOP
240        }
241
242        // Check host topology to determine if we need to enable SMT capabilities.
243        let topo = Topology::new().unwrap();
244        skel.maps.rodata_data.smt_enabled = topo.smt_enabled;
245
246        // Enable scheduler flags.
247        skel.struct_ops.rustland_mut().flags = *compat::SCX_OPS_ENQ_LAST
248            | *compat::SCX_OPS_ENQ_MIGRATION_DISABLED
249            | *compat::SCX_OPS_ALLOW_QUEUED_WAKEUP;
250        if partial {
251            skel.struct_ops.rustland_mut().flags |= *compat::SCX_OPS_SWITCH_PARTIAL;
252        }
253        skel.struct_ops.rustland_mut().exit_dump_len = exit_dump_len;
254        skel.maps.rodata_data.usersched_pid = std::process::id();
255        skel.maps.rodata_data.khugepaged_pid = Self::khugepaged_pid();
256        skel.maps.rodata_data.builtin_idle = builtin_idle;
257        skel.maps.rodata_data.debug = debug;
258
259        // Attach BPF scheduler.
260        let mut skel = scx_ops_load!(skel, rustland, uei)?;
261
262        // Initialize cache domains.
263        Self::init_l2_cache_domains(&mut skel, &topo)?;
264        Self::init_l3_cache_domains(&mut skel, &topo)?;
265
266        let struct_ops = Some(scx_ops_attach!(skel, rustland)?);
267
268        // Build the ring buffer of queued tasks.
269        let maps = &skel.maps;
270        let queued_ring_buffer = &maps.queued;
271        let mut rbb = libbpf_rs::RingBufferBuilder::new();
272        rbb.add(queued_ring_buffer, callback)
273            .expect("failed to add ringbuf callback");
274        let queued = rbb.build().expect("failed to build ringbuf");
275
276        // Build the user ring buffer of dispatched tasks.
277        let dispatched = libbpf_rs::UserRingBuffer::new(&maps.dispatched)
278            .expect("failed to create user ringbuf");
279
280        // Lock all the memory to prevent page faults that could trigger potential deadlocks during
281        // scheduling.
282        ALLOCATOR.lock_memory();
283        ALLOCATOR.disable_mmap().expect("Failed to disable mmap");
284
285        // Make sure to use the SCHED_EXT class at least for the scheduler itself.
286        if partial {
287            let err = Self::use_sched_ext();
288            if err < 0 {
289                return Err(anyhow::Error::msg(format!(
290                    "sched_setscheduler error: {}",
291                    err
292                )));
293            }
294        }
295
296        Ok(Self {
297            skel,
298            shutdown,
299            queued,
300            dispatched,
301            struct_ops,
302        })
303    }
304
305    // Return the PID of khugepaged, if present, otherwise return 0.
306    fn khugepaged_pid() -> u32 {
307        let procs = match all_processes() {
308            Ok(p) => p,
309            Err(_) => return 0,
310        };
311
312        for proc in procs {
313            let proc = match proc {
314                Ok(p) => p,
315                Err(_) => continue,
316            };
317
318            if let Ok(stat) = proc.stat() {
319                if proc.exe().is_err() && stat.comm == "khugepaged" {
320                    return proc.pid() as u32;
321                }
322            }
323        }
324
325        0
326    }
327
328    fn enable_sibling_cpu(
329        skel: &mut BpfSkel<'_>,
330        lvl: usize,
331        cpu: usize,
332        sibling_cpu: usize,
333    ) -> Result<(), u32> {
334        let prog = &mut skel.progs.enable_sibling_cpu;
335        let mut args = domain_arg {
336            lvl_id: lvl as c_int,
337            cpu_id: cpu as c_int,
338            sibling_cpu_id: sibling_cpu as c_int,
339        };
340        let input = ProgramInput {
341            context_in: Some(unsafe {
342                std::slice::from_raw_parts_mut(
343                    &mut args as *mut _ as *mut u8,
344                    std::mem::size_of_val(&args),
345                )
346            }),
347            ..Default::default()
348        };
349        let out = prog.test_run(input).unwrap();
350        if out.return_value != 0 {
351            return Err(out.return_value);
352        }
353
354        Ok(())
355    }
356
357    fn init_cache_domains<SiblingCpuFn>(
358        skel: &mut BpfSkel<'_>,
359        topo: &Topology,
360        cache_lvl: usize,
361        enable_sibling_cpu_fn: &SiblingCpuFn,
362    ) -> Result<(), std::io::Error>
363    where
364        SiblingCpuFn: Fn(&mut BpfSkel<'_>, usize, usize, usize) -> Result<(), u32>,
365    {
366        // Determine the list of CPU IDs associated to each cache node.
367        let mut cache_id_map: HashMap<usize, Vec<usize>> = HashMap::new();
368        for core in topo.all_cores.values() {
369            for (cpu_id, cpu) in &core.cpus {
370                let cache_id = match cache_lvl {
371                    2 => cpu.l2_id,
372                    3 => cpu.l3_id,
373                    _ => panic!("invalid cache level {}", cache_lvl),
374                };
375                cache_id_map
376                    .entry(cache_id)
377                    .or_insert_with(Vec::new)
378                    .push(*cpu_id);
379            }
380        }
381
382        // Update the BPF cpumasks for the cache domains.
383        for (_cache_id, cpus) in cache_id_map {
384            for cpu in &cpus {
385                for sibling_cpu in &cpus {
386                    enable_sibling_cpu_fn(skel, cache_lvl, *cpu, *sibling_cpu).map_err(|e| {
387                        io::Error::new(
388                            ErrorKind::Other,
389                            format!(
390                                "enable_sibling_cpu_fn failed for cpu {} sibling {}: err {}",
391                                cpu, sibling_cpu, e
392                            ),
393                        )
394                    })?;
395                }
396            }
397        }
398
399        Ok(())
400    }
401
402    fn init_l2_cache_domains(
403        skel: &mut BpfSkel<'_>,
404        topo: &Topology,
405    ) -> Result<(), std::io::Error> {
406        Self::init_cache_domains(skel, topo, 2, &|skel, lvl, cpu, sibling_cpu| {
407            Self::enable_sibling_cpu(skel, lvl, cpu, sibling_cpu)
408        })
409    }
410
411    fn init_l3_cache_domains(
412        skel: &mut BpfSkel<'_>,
413        topo: &Topology,
414    ) -> Result<(), std::io::Error> {
415        Self::init_cache_domains(skel, topo, 3, &|skel, lvl, cpu, sibling_cpu| {
416            Self::enable_sibling_cpu(skel, lvl, cpu, sibling_cpu)
417        })
418    }
419
420    // Notify the BPF component that the user-space scheduler has completed its scheduling cycle,
421    // updating the amount tasks that are still pending.
422    //
423    // NOTE: do not set allow(dead_code) for this method, any scheduler must use this method at
424    // some point, otherwise the BPF component will keep waking-up the user-space scheduler in a
425    // busy loop, causing unnecessary high CPU consumption.
426    pub fn notify_complete(&mut self, nr_pending: u64) {
427        self.skel.maps.bss_data.nr_scheduled = nr_pending;
428        std::thread::yield_now();
429    }
430
431    // Counter of the online CPUs.
432    #[allow(dead_code)]
433    pub fn nr_online_cpus_mut(&mut self) -> &mut u64 {
434        &mut self.skel.maps.bss_data.nr_online_cpus
435    }
436
437    // Counter of currently running tasks.
438    #[allow(dead_code)]
439    pub fn nr_running_mut(&mut self) -> &mut u64 {
440        &mut self.skel.maps.bss_data.nr_running
441    }
442
443    // Counter of queued tasks.
444    #[allow(dead_code)]
445    pub fn nr_queued_mut(&mut self) -> &mut u64 {
446        &mut self.skel.maps.bss_data.nr_queued
447    }
448
449    // Counter of scheduled tasks.
450    #[allow(dead_code)]
451    pub fn nr_scheduled_mut(&mut self) -> &mut u64 {
452        &mut self.skel.maps.bss_data.nr_scheduled
453    }
454
455    // Counter of user dispatch events.
456    #[allow(dead_code)]
457    pub fn nr_user_dispatches_mut(&mut self) -> &mut u64 {
458        &mut self.skel.maps.bss_data.nr_user_dispatches
459    }
460
461    // Counter of user kernel events.
462    #[allow(dead_code)]
463    pub fn nr_kernel_dispatches_mut(&mut self) -> &mut u64 {
464        &mut self.skel.maps.bss_data.nr_kernel_dispatches
465    }
466
467    // Counter of cancel dispatch events.
468    #[allow(dead_code)]
469    pub fn nr_cancel_dispatches_mut(&mut self) -> &mut u64 {
470        &mut self.skel.maps.bss_data.nr_cancel_dispatches
471    }
472
473    // Counter of dispatches bounced to the shared DSQ.
474    #[allow(dead_code)]
475    pub fn nr_bounce_dispatches_mut(&mut self) -> &mut u64 {
476        &mut self.skel.maps.bss_data.nr_bounce_dispatches
477    }
478
479    // Counter of failed dispatch events.
480    #[allow(dead_code)]
481    pub fn nr_failed_dispatches_mut(&mut self) -> &mut u64 {
482        &mut self.skel.maps.bss_data.nr_failed_dispatches
483    }
484
485    // Counter of scheduler congestion events.
486    #[allow(dead_code)]
487    pub fn nr_sched_congested_mut(&mut self) -> &mut u64 {
488        &mut self.skel.maps.bss_data.nr_sched_congested
489    }
490
491    // Set scheduling class for the scheduler itself to SCHED_EXT
492    fn use_sched_ext() -> i32 {
493        #[cfg(target_env = "gnu")]
494        let param: sched_param = sched_param { sched_priority: 0 };
495        #[cfg(target_env = "musl")]
496        let param: sched_param = sched_param {
497            sched_priority: 0,
498            sched_ss_low_priority: 0,
499            sched_ss_repl_period: timespec {
500                tv_sec: 0,
501                tv_nsec: 0,
502            },
503            sched_ss_init_budget: timespec {
504                tv_sec: 0,
505                tv_nsec: 0,
506            },
507            sched_ss_max_repl: 0,
508        };
509
510        unsafe { pthread_setschedparam(pthread_self(), SCHED_EXT, &param as *const sched_param) }
511    }
512
513    // Pick an idle CPU for the target PID.
514    pub fn select_cpu(&mut self, pid: i32, cpu: i32, flags: u64) -> i32 {
515        let prog = &mut self.skel.progs.rs_select_cpu;
516        let mut args = task_cpu_arg {
517            pid: pid as c_int,
518            cpu: cpu as c_int,
519            flags: flags as c_ulong,
520        };
521        let input = ProgramInput {
522            context_in: Some(unsafe {
523                std::slice::from_raw_parts_mut(
524                    &mut args as *mut _ as *mut u8,
525                    std::mem::size_of_val(&args),
526                )
527            }),
528            ..Default::default()
529        };
530        let out = prog.test_run(input).unwrap();
531
532        out.return_value as i32
533    }
534
535    // Receive a task to be scheduled from the BPF dispatcher.
536    #[allow(static_mut_refs)]
537    pub fn dequeue_task(&mut self) -> Result<Option<QueuedTask>, i32> {
538        match self.queued.consume_raw() {
539            0 => {
540                self.skel.maps.bss_data.nr_queued = 0;
541                Ok(None)
542            }
543            LIBBPF_STOP => {
544                // A valid task is received, convert data to a proper task struct.
545                let task = unsafe { EnqueuedMessage::from_bytes(&BUF.0).to_queued_task() };
546                self.skel.maps.bss_data.nr_queued =
547                    self.skel.maps.bss_data.nr_queued.saturating_sub(1);
548
549                Ok(Some(task))
550            }
551            res if res < 0 => Err(res),
552            res => panic!(
553                "Unexpected return value from libbpf-rs::consume_raw(): {}",
554                res
555            ),
556        }
557    }
558
559    // Send a task to the dispatcher.
560    pub fn dispatch_task(&mut self, task: &DispatchedTask) -> Result<(), libbpf_rs::Error> {
561        // Reserve a slot in the user ring buffer.
562        let mut urb_sample = self
563            .dispatched
564            .reserve(std::mem::size_of::<bpf_intf::dispatched_task_ctx>())?;
565        let bytes = urb_sample.as_mut();
566        let dispatched_task = plain::from_mut_bytes::<bpf_intf::dispatched_task_ctx>(bytes)
567            .expect("failed to convert bytes");
568
569        // Convert the dispatched task into the low-level dispatched task context.
570        let bpf_intf::dispatched_task_ctx {
571            pid,
572            cpu,
573            flags,
574            slice_ns,
575            vtime,
576            ..
577        } = &mut dispatched_task.as_mut();
578
579        *pid = task.pid;
580        *cpu = task.cpu;
581        *flags = task.flags;
582        *slice_ns = task.slice_ns;
583        *vtime = task.vtime;
584
585        // Store the task in the user ring buffer.
586        //
587        // NOTE: submit() only updates the reserved slot in the user ring buffer, so it is not
588        // expected to fail.
589        self.dispatched
590            .submit(urb_sample)
591            .expect("failed to submit task");
592
593        Ok(())
594    }
595
596    // Read exit code from the BPF part.
597    pub fn exited(&mut self) -> bool {
598        self.shutdown.load(Ordering::Relaxed) || uei_exited!(&self.skel, uei)
599    }
600
601    // Called on exit to shutdown and report exit message from the BPF part.
602    pub fn shutdown_and_report(&mut self) -> Result<UserExitInfo> {
603        let _ = self.struct_ops.take();
604        uei_report!(&self.skel, uei)
605    }
606}
607
608// Disconnect the low-level BPF scheduler.
609impl Drop for BpfScheduler<'_> {
610    fn drop(&mut self) {
611        if let Some(struct_ops) = self.struct_ops.take() {
612            drop(struct_ops);
613        }
614        ALLOCATOR.unlock_memory();
615    }
616}