1use std::io::{self, ErrorKind};
7use std::mem::MaybeUninit;
8
9use crate::bpf_intf;
10use crate::bpf_intf::*;
11use crate::bpf_skel::*;
12
13use std::ffi::c_int;
14use std::ffi::c_ulong;
15
16use std::collections::HashMap;
17use std::sync::atomic::AtomicBool;
18use std::sync::atomic::Ordering;
19use std::sync::Arc;
20use std::sync::Once;
21
22use anyhow::Context;
23use anyhow::Result;
24
25use plain::Plain;
26use procfs::process::all_processes;
27
28use libbpf_rs::OpenObject;
29use libbpf_rs::ProgramInput;
30
31use libc::{pthread_self, pthread_setschedparam, sched_param};
32
33#[cfg(target_env = "musl")]
34use libc::timespec;
35
36use scx_utils::compat;
37use scx_utils::scx_ops_attach;
38use scx_utils::scx_ops_load;
39use scx_utils::scx_ops_open;
40use scx_utils::uei_exited;
41use scx_utils::uei_report;
42use scx_utils::Topology;
43use scx_utils::UserExitInfo;
44
45use scx_rustland_core::ALLOCATOR;
46
47const SCHED_EXT: i32 = 7;
49
50#[allow(dead_code)]
54pub const RL_CPU_ANY: i32 = bpf_intf::RL_CPU_ANY as i32;
55
56#[derive(Debug, PartialEq, Eq, PartialOrd, Clone)]
77pub struct QueuedTask {
78 pub pid: i32, pub cpu: i32, pub nr_cpus_allowed: u64, pub flags: u64, pub start_ts: u64, pub stop_ts: u64, pub exec_runtime: u64, pub weight: u64, pub vtime: u64, }
88
89#[derive(Debug, PartialEq, Eq, PartialOrd, Clone)]
91pub struct DispatchedTask {
92 pub pid: i32, pub cpu: i32, pub flags: u64, pub slice_ns: u64, pub vtime: u64, }
98
99impl DispatchedTask {
100 pub fn new(task: &QueuedTask) -> Self {
105 DispatchedTask {
106 pid: task.pid,
107 cpu: task.cpu,
108 flags: task.flags,
109 slice_ns: 0, vtime: 0,
111 }
112 }
113}
114
115unsafe impl Plain for bpf_intf::dispatched_task_ctx {}
117
118impl AsMut<bpf_intf::dispatched_task_ctx> for bpf_intf::dispatched_task_ctx {
119 fn as_mut(&mut self) -> &mut bpf_intf::dispatched_task_ctx {
120 self
121 }
122}
123
124struct EnqueuedMessage {
128 inner: bpf_intf::queued_task_ctx,
129}
130
131impl EnqueuedMessage {
132 fn from_bytes(bytes: &[u8]) -> Self {
133 let queued_task_struct = unsafe { *(bytes.as_ptr() as *const bpf_intf::queued_task_ctx) };
134 EnqueuedMessage {
135 inner: queued_task_struct,
136 }
137 }
138
139 fn to_queued_task(&self) -> QueuedTask {
140 QueuedTask {
141 pid: self.inner.pid,
142 cpu: self.inner.cpu,
143 nr_cpus_allowed: self.inner.nr_cpus_allowed,
144 flags: self.inner.flags,
145 start_ts: self.inner.start_ts,
146 stop_ts: self.inner.stop_ts,
147 exec_runtime: self.inner.exec_runtime,
148 weight: self.inner.weight,
149 vtime: self.inner.vtime,
150 }
151 }
152}
153
154pub struct BpfScheduler<'cb> {
155 pub skel: BpfSkel<'cb>, shutdown: Arc<AtomicBool>, queued: libbpf_rs::RingBuffer<'cb>, dispatched: libbpf_rs::UserRingBuffer, struct_ops: Option<libbpf_rs::Link>, }
161
162const BUFSIZE: usize = std::mem::size_of::<QueuedTask>();
167
168#[repr(align(8))]
169struct AlignedBuffer([u8; BUFSIZE]);
170
171static mut BUF: AlignedBuffer = AlignedBuffer([0; BUFSIZE]);
172
173const LIBBPF_STOP: i32 = -255;
176
177static SET_HANDLER: Once = Once::new();
178
179fn set_ctrlc_handler(shutdown: Arc<AtomicBool>) -> Result<(), anyhow::Error> {
180 SET_HANDLER.call_once(|| {
181 let shutdown_clone = shutdown.clone();
182 ctrlc::set_handler(move || {
183 shutdown_clone.store(true, Ordering::Relaxed);
184 })
185 .expect("Error setting Ctrl-C handler");
186 });
187 Ok(())
188}
189
190impl<'cb> BpfScheduler<'cb> {
191 pub fn init(
192 open_object: &'cb mut MaybeUninit<OpenObject>,
193 exit_dump_len: u32,
194 partial: bool,
195 debug: bool,
196 builtin_idle: bool,
197 ) -> Result<Self> {
198 let shutdown = Arc::new(AtomicBool::new(false));
199 set_ctrlc_handler(shutdown.clone()).context("Error setting Ctrl-C handler")?;
200
201 let mut skel_builder = BpfSkelBuilder::default();
203 skel_builder.obj_builder.debug(debug);
204 let mut skel = scx_ops_open!(skel_builder, open_object, rustland)?;
205
206 fn callback(data: &[u8]) -> i32 {
217 #[allow(static_mut_refs)]
218 unsafe {
219 BUF.0.copy_from_slice(data);
226 }
227
228 LIBBPF_STOP
240 }
241
242 let topo = Topology::new().unwrap();
244 skel.maps.rodata_data.smt_enabled = topo.smt_enabled;
245
246 skel.struct_ops.rustland_mut().flags = *compat::SCX_OPS_ENQ_LAST
248 | *compat::SCX_OPS_ENQ_MIGRATION_DISABLED
249 | *compat::SCX_OPS_ALLOW_QUEUED_WAKEUP;
250 if partial {
251 skel.struct_ops.rustland_mut().flags |= *compat::SCX_OPS_SWITCH_PARTIAL;
252 }
253 skel.struct_ops.rustland_mut().exit_dump_len = exit_dump_len;
254 skel.maps.rodata_data.usersched_pid = std::process::id();
255 skel.maps.rodata_data.khugepaged_pid = Self::khugepaged_pid();
256 skel.maps.rodata_data.builtin_idle = builtin_idle;
257 skel.maps.rodata_data.debug = debug;
258
259 let mut skel = scx_ops_load!(skel, rustland, uei)?;
261
262 Self::init_l2_cache_domains(&mut skel, &topo)?;
264 Self::init_l3_cache_domains(&mut skel, &topo)?;
265
266 let struct_ops = Some(scx_ops_attach!(skel, rustland)?);
267
268 let maps = &skel.maps;
270 let queued_ring_buffer = &maps.queued;
271 let mut rbb = libbpf_rs::RingBufferBuilder::new();
272 rbb.add(queued_ring_buffer, callback)
273 .expect("failed to add ringbuf callback");
274 let queued = rbb.build().expect("failed to build ringbuf");
275
276 let dispatched = libbpf_rs::UserRingBuffer::new(&maps.dispatched)
278 .expect("failed to create user ringbuf");
279
280 ALLOCATOR.lock_memory();
283 ALLOCATOR.disable_mmap().expect("Failed to disable mmap");
284
285 if partial {
287 let err = Self::use_sched_ext();
288 if err < 0 {
289 return Err(anyhow::Error::msg(format!(
290 "sched_setscheduler error: {}",
291 err
292 )));
293 }
294 }
295
296 Ok(Self {
297 skel,
298 shutdown,
299 queued,
300 dispatched,
301 struct_ops,
302 })
303 }
304
305 fn khugepaged_pid() -> u32 {
307 let procs = match all_processes() {
308 Ok(p) => p,
309 Err(_) => return 0,
310 };
311
312 for proc in procs {
313 let proc = match proc {
314 Ok(p) => p,
315 Err(_) => continue,
316 };
317
318 if let Ok(stat) = proc.stat() {
319 if proc.exe().is_err() && stat.comm == "khugepaged" {
320 return proc.pid() as u32;
321 }
322 }
323 }
324
325 0
326 }
327
328 fn enable_sibling_cpu(
329 skel: &mut BpfSkel<'_>,
330 lvl: usize,
331 cpu: usize,
332 sibling_cpu: usize,
333 ) -> Result<(), u32> {
334 let prog = &mut skel.progs.enable_sibling_cpu;
335 let mut args = domain_arg {
336 lvl_id: lvl as c_int,
337 cpu_id: cpu as c_int,
338 sibling_cpu_id: sibling_cpu as c_int,
339 };
340 let input = ProgramInput {
341 context_in: Some(unsafe {
342 std::slice::from_raw_parts_mut(
343 &mut args as *mut _ as *mut u8,
344 std::mem::size_of_val(&args),
345 )
346 }),
347 ..Default::default()
348 };
349 let out = prog.test_run(input).unwrap();
350 if out.return_value != 0 {
351 return Err(out.return_value);
352 }
353
354 Ok(())
355 }
356
357 fn init_cache_domains<SiblingCpuFn>(
358 skel: &mut BpfSkel<'_>,
359 topo: &Topology,
360 cache_lvl: usize,
361 enable_sibling_cpu_fn: &SiblingCpuFn,
362 ) -> Result<(), std::io::Error>
363 where
364 SiblingCpuFn: Fn(&mut BpfSkel<'_>, usize, usize, usize) -> Result<(), u32>,
365 {
366 let mut cache_id_map: HashMap<usize, Vec<usize>> = HashMap::new();
368 for core in topo.all_cores.values() {
369 for (cpu_id, cpu) in &core.cpus {
370 let cache_id = match cache_lvl {
371 2 => cpu.l2_id,
372 3 => cpu.l3_id,
373 _ => panic!("invalid cache level {}", cache_lvl),
374 };
375 cache_id_map
376 .entry(cache_id)
377 .or_insert_with(Vec::new)
378 .push(*cpu_id);
379 }
380 }
381
382 for (_cache_id, cpus) in cache_id_map {
384 for cpu in &cpus {
385 for sibling_cpu in &cpus {
386 enable_sibling_cpu_fn(skel, cache_lvl, *cpu, *sibling_cpu).map_err(|e| {
387 io::Error::new(
388 ErrorKind::Other,
389 format!(
390 "enable_sibling_cpu_fn failed for cpu {} sibling {}: err {}",
391 cpu, sibling_cpu, e
392 ),
393 )
394 })?;
395 }
396 }
397 }
398
399 Ok(())
400 }
401
402 fn init_l2_cache_domains(
403 skel: &mut BpfSkel<'_>,
404 topo: &Topology,
405 ) -> Result<(), std::io::Error> {
406 Self::init_cache_domains(skel, topo, 2, &|skel, lvl, cpu, sibling_cpu| {
407 Self::enable_sibling_cpu(skel, lvl, cpu, sibling_cpu)
408 })
409 }
410
411 fn init_l3_cache_domains(
412 skel: &mut BpfSkel<'_>,
413 topo: &Topology,
414 ) -> Result<(), std::io::Error> {
415 Self::init_cache_domains(skel, topo, 3, &|skel, lvl, cpu, sibling_cpu| {
416 Self::enable_sibling_cpu(skel, lvl, cpu, sibling_cpu)
417 })
418 }
419
420 pub fn notify_complete(&mut self, nr_pending: u64) {
427 self.skel.maps.bss_data.nr_scheduled = nr_pending;
428 std::thread::yield_now();
429 }
430
431 #[allow(dead_code)]
433 pub fn nr_online_cpus_mut(&mut self) -> &mut u64 {
434 &mut self.skel.maps.bss_data.nr_online_cpus
435 }
436
437 #[allow(dead_code)]
439 pub fn nr_running_mut(&mut self) -> &mut u64 {
440 &mut self.skel.maps.bss_data.nr_running
441 }
442
443 #[allow(dead_code)]
445 pub fn nr_queued_mut(&mut self) -> &mut u64 {
446 &mut self.skel.maps.bss_data.nr_queued
447 }
448
449 #[allow(dead_code)]
451 pub fn nr_scheduled_mut(&mut self) -> &mut u64 {
452 &mut self.skel.maps.bss_data.nr_scheduled
453 }
454
455 #[allow(dead_code)]
457 pub fn nr_user_dispatches_mut(&mut self) -> &mut u64 {
458 &mut self.skel.maps.bss_data.nr_user_dispatches
459 }
460
461 #[allow(dead_code)]
463 pub fn nr_kernel_dispatches_mut(&mut self) -> &mut u64 {
464 &mut self.skel.maps.bss_data.nr_kernel_dispatches
465 }
466
467 #[allow(dead_code)]
469 pub fn nr_cancel_dispatches_mut(&mut self) -> &mut u64 {
470 &mut self.skel.maps.bss_data.nr_cancel_dispatches
471 }
472
473 #[allow(dead_code)]
475 pub fn nr_bounce_dispatches_mut(&mut self) -> &mut u64 {
476 &mut self.skel.maps.bss_data.nr_bounce_dispatches
477 }
478
479 #[allow(dead_code)]
481 pub fn nr_failed_dispatches_mut(&mut self) -> &mut u64 {
482 &mut self.skel.maps.bss_data.nr_failed_dispatches
483 }
484
485 #[allow(dead_code)]
487 pub fn nr_sched_congested_mut(&mut self) -> &mut u64 {
488 &mut self.skel.maps.bss_data.nr_sched_congested
489 }
490
491 fn use_sched_ext() -> i32 {
493 #[cfg(target_env = "gnu")]
494 let param: sched_param = sched_param { sched_priority: 0 };
495 #[cfg(target_env = "musl")]
496 let param: sched_param = sched_param {
497 sched_priority: 0,
498 sched_ss_low_priority: 0,
499 sched_ss_repl_period: timespec {
500 tv_sec: 0,
501 tv_nsec: 0,
502 },
503 sched_ss_init_budget: timespec {
504 tv_sec: 0,
505 tv_nsec: 0,
506 },
507 sched_ss_max_repl: 0,
508 };
509
510 unsafe { pthread_setschedparam(pthread_self(), SCHED_EXT, ¶m as *const sched_param) }
511 }
512
513 pub fn select_cpu(&mut self, pid: i32, cpu: i32, flags: u64) -> i32 {
515 let prog = &mut self.skel.progs.rs_select_cpu;
516 let mut args = task_cpu_arg {
517 pid: pid as c_int,
518 cpu: cpu as c_int,
519 flags: flags as c_ulong,
520 };
521 let input = ProgramInput {
522 context_in: Some(unsafe {
523 std::slice::from_raw_parts_mut(
524 &mut args as *mut _ as *mut u8,
525 std::mem::size_of_val(&args),
526 )
527 }),
528 ..Default::default()
529 };
530 let out = prog.test_run(input).unwrap();
531
532 out.return_value as i32
533 }
534
535 #[allow(static_mut_refs)]
537 pub fn dequeue_task(&mut self) -> Result<Option<QueuedTask>, i32> {
538 match self.queued.consume_raw() {
539 0 => {
540 self.skel.maps.bss_data.nr_queued = 0;
541 Ok(None)
542 }
543 LIBBPF_STOP => {
544 let task = unsafe { EnqueuedMessage::from_bytes(&BUF.0).to_queued_task() };
546 self.skel.maps.bss_data.nr_queued =
547 self.skel.maps.bss_data.nr_queued.saturating_sub(1);
548
549 Ok(Some(task))
550 }
551 res if res < 0 => Err(res),
552 res => panic!(
553 "Unexpected return value from libbpf-rs::consume_raw(): {}",
554 res
555 ),
556 }
557 }
558
559 pub fn dispatch_task(&mut self, task: &DispatchedTask) -> Result<(), libbpf_rs::Error> {
561 let mut urb_sample = self
563 .dispatched
564 .reserve(std::mem::size_of::<bpf_intf::dispatched_task_ctx>())?;
565 let bytes = urb_sample.as_mut();
566 let dispatched_task = plain::from_mut_bytes::<bpf_intf::dispatched_task_ctx>(bytes)
567 .expect("failed to convert bytes");
568
569 let bpf_intf::dispatched_task_ctx {
571 pid,
572 cpu,
573 flags,
574 slice_ns,
575 vtime,
576 ..
577 } = &mut dispatched_task.as_mut();
578
579 *pid = task.pid;
580 *cpu = task.cpu;
581 *flags = task.flags;
582 *slice_ns = task.slice_ns;
583 *vtime = task.vtime;
584
585 self.dispatched
590 .submit(urb_sample)
591 .expect("failed to submit task");
592
593 Ok(())
594 }
595
596 pub fn exited(&mut self) -> bool {
598 self.shutdown.load(Ordering::Relaxed) || uei_exited!(&self.skel, uei)
599 }
600
601 pub fn shutdown_and_report(&mut self) -> Result<UserExitInfo> {
603 let _ = self.struct_ops.take();
604 uei_report!(&self.skel, uei)
605 }
606}
607
608impl Drop for BpfScheduler<'_> {
610 fn drop(&mut self) {
611 if let Some(struct_ops) = self.struct_ops.take() {
612 drop(struct_ops);
613 }
614 ALLOCATOR.unlock_memory();
615 }
616}