1use std::mem::MaybeUninit;
7
8use crate::bpf_intf;
9use crate::bpf_intf::*;
10use crate::bpf_skel::*;
11
12use std::ffi::c_int;
13use std::ffi::c_ulong;
14use std::ffi::CStr;
15
16use std::sync::atomic::AtomicBool;
17use std::sync::atomic::Ordering;
18use std::sync::Arc;
19use std::sync::Once;
20
21use anyhow::bail;
22use anyhow::Context;
23use anyhow::Result;
24
25use plain::Plain;
26use procfs::process::all_processes;
27
28use libbpf_rs::libbpf_sys::bpf_object_open_opts;
29use libbpf_rs::OpenObject;
30use libbpf_rs::ProgramInput;
31
32use libc::{c_char, pthread_self, pthread_setschedparam, sched_param};
33
34#[cfg(target_env = "musl")]
35use libc::timespec;
36
37use scx_utils::compat;
38use scx_utils::scx_ops_attach;
39use scx_utils::scx_ops_load;
40use scx_utils::scx_ops_open;
41use scx_utils::uei_exited;
42use scx_utils::uei_report;
43use scx_utils::Topology;
44use scx_utils::UserExitInfo;
45
46use scx_rustland_core::ALLOCATOR;
47
48const SCHED_EXT: i32 = 7;
50const TASK_COMM_LEN: usize = 16;
51
52#[allow(dead_code)]
56pub const RL_CPU_ANY: i32 = bpf_intf::RL_CPU_ANY as i32;
57
58#[derive(Debug, PartialEq, Eq, PartialOrd, Clone)]
79pub struct QueuedTask {
80 pub pid: i32, pub cpu: i32, pub nr_cpus_allowed: u64, pub flags: u64, pub start_ts: u64, pub stop_ts: u64, pub exec_runtime: u64, pub weight: u64, pub vtime: u64, pub enq_cnt: u64,
90 pub comm: [c_char; TASK_COMM_LEN], }
92
93impl QueuedTask {
94 #[allow(dead_code)]
96 pub fn comm_str(&self) -> String {
97 let c_str = unsafe { CStr::from_ptr(self.comm.as_ptr()) };
99
100 c_str.to_string_lossy().into_owned()
102 }
103}
104
105#[derive(Debug, PartialEq, Eq, PartialOrd, Clone)]
107pub struct DispatchedTask {
108 pub pid: i32, pub cpu: i32, pub flags: u64, pub slice_ns: u64, pub vtime: u64, pub enq_cnt: u64,
114}
115
116impl DispatchedTask {
117 pub fn new(task: &QueuedTask) -> Self {
122 DispatchedTask {
123 pid: task.pid,
124 cpu: task.cpu,
125 flags: task.flags,
126 slice_ns: 0, vtime: 0,
128 enq_cnt: task.enq_cnt,
129 }
130 }
131}
132
133unsafe impl Plain for bpf_intf::dispatched_task_ctx {}
135
136impl AsMut<bpf_intf::dispatched_task_ctx> for bpf_intf::dispatched_task_ctx {
137 fn as_mut(&mut self) -> &mut bpf_intf::dispatched_task_ctx {
138 self
139 }
140}
141
142struct EnqueuedMessage {
146 inner: bpf_intf::queued_task_ctx,
147}
148
149impl EnqueuedMessage {
150 fn from_bytes(bytes: &[u8]) -> Self {
151 let queued_task_struct = unsafe { *(bytes.as_ptr() as *const bpf_intf::queued_task_ctx) };
152 EnqueuedMessage {
153 inner: queued_task_struct,
154 }
155 }
156
157 fn to_queued_task(&self) -> QueuedTask {
158 QueuedTask {
159 pid: self.inner.pid,
160 cpu: self.inner.cpu,
161 nr_cpus_allowed: self.inner.nr_cpus_allowed,
162 flags: self.inner.flags,
163 start_ts: self.inner.start_ts,
164 stop_ts: self.inner.stop_ts,
165 exec_runtime: self.inner.exec_runtime,
166 weight: self.inner.weight,
167 vtime: self.inner.vtime,
168 enq_cnt: self.inner.enq_cnt,
169 comm: self.inner.comm,
170 }
171 }
172}
173
174pub struct BpfScheduler<'cb> {
175 pub skel: BpfSkel<'cb>, shutdown: Arc<AtomicBool>, queued: libbpf_rs::RingBuffer<'cb>, dispatched: libbpf_rs::UserRingBuffer, struct_ops: Option<libbpf_rs::Link>, }
181
182const BUFSIZE: usize = size_of::<queued_task_ctx>();
187
188#[repr(align(8))]
189struct AlignedBuffer([u8; BUFSIZE]);
190
191static mut BUF: AlignedBuffer = AlignedBuffer([0; BUFSIZE]);
192
193static SET_HANDLER: Once = Once::new();
194
195fn set_ctrlc_handler(shutdown: Arc<AtomicBool>) -> Result<(), anyhow::Error> {
196 SET_HANDLER.call_once(|| {
197 let shutdown_clone = shutdown.clone();
198 ctrlc::set_handler(move || {
199 shutdown_clone.store(true, Ordering::Relaxed);
200 })
201 .expect("Error setting Ctrl-C handler");
202 });
203 Ok(())
204}
205
206impl<'cb> BpfScheduler<'cb> {
207 #[allow(clippy::too_many_arguments)]
208 pub fn init(
209 open_object: &'cb mut MaybeUninit<OpenObject>,
210 open_opts: Option<bpf_object_open_opts>,
211 exit_dump_len: u32,
212 partial: bool,
213 debug: bool,
214 builtin_idle: bool,
215 numa_local: bool,
216 slice_ns: u64,
217 name: &str,
218 ) -> Result<Self> {
219 let shutdown = Arc::new(AtomicBool::new(false));
220 set_ctrlc_handler(shutdown.clone()).context("Error setting Ctrl-C handler")?;
221
222 let mut skel_builder = BpfSkelBuilder::default();
224 skel_builder.obj_builder.debug(debug);
225 let mut skel = scx_ops_open!(skel_builder, open_object, rustland, open_opts)?;
226
227 fn callback(data: &[u8]) -> i32 {
238 #[allow(static_mut_refs)]
239 unsafe {
240 BUF.0.copy_from_slice(data);
247 }
248
249 0
251 }
252
253 let topo = Topology::new().unwrap();
255 skel.maps.rodata_data.as_mut().unwrap().smt_enabled = topo.smt_enabled;
256
257 skel.struct_ops.rustland_mut().flags =
259 *compat::SCX_OPS_ENQ_LAST | *compat::SCX_OPS_ALLOW_QUEUED_WAKEUP;
260 if partial {
261 skel.struct_ops.rustland_mut().flags |= *compat::SCX_OPS_SWITCH_PARTIAL;
262 }
263 if numa_local {
264 skel.struct_ops.rustland_mut().flags |= *compat::SCX_OPS_BUILTIN_IDLE_PER_NODE;
265 }
266 skel.struct_ops.rustland_mut().exit_dump_len = exit_dump_len;
267 skel.maps.rodata_data.as_mut().unwrap().usersched_pid = std::process::id();
268 skel.maps.rodata_data.as_mut().unwrap().khugepaged_pid = Self::khugepaged_pid();
269 skel.maps.rodata_data.as_mut().unwrap().builtin_idle = builtin_idle;
270 skel.maps.rodata_data.as_mut().unwrap().numa_local = numa_local;
271 skel.maps.rodata_data.as_mut().unwrap().slice_ns = slice_ns;
272 skel.maps.rodata_data.as_mut().unwrap().debug = debug;
273 let _ = Self::set_scx_ops_name(&mut skel.struct_ops.rustland_mut().name, name);
274
275 let mut skel = scx_ops_load!(skel, rustland, uei)?;
277
278 let struct_ops = Some(scx_ops_attach!(skel, rustland)?);
279
280 let maps = &skel.maps;
282 let queued_ring_buffer = &maps.queued;
283 let mut rbb = libbpf_rs::RingBufferBuilder::new();
284 rbb.add(queued_ring_buffer, callback)
285 .expect("failed to add ringbuf callback");
286 let queued = rbb.build().expect("failed to build ringbuf");
287
288 let dispatched = libbpf_rs::UserRingBuffer::new(&maps.dispatched)
290 .expect("failed to create user ringbuf");
291
292 ALLOCATOR.lock_memory();
295 ALLOCATOR.disable_mmap().expect("Failed to disable mmap");
296
297 if partial {
299 let err = Self::use_sched_ext();
300 if err < 0 {
301 return Err(anyhow::Error::msg(format!(
302 "sched_setscheduler error: {err}"
303 )));
304 }
305 }
306
307 Ok(Self {
308 skel,
309 shutdown,
310 queued,
311 dispatched,
312 struct_ops,
313 })
314 }
315
316 fn set_scx_ops_name(name_field: &mut [i8], src: &str) -> Result<()> {
318 if !src.is_ascii() {
319 bail!("name must be an ASCII string");
320 }
321
322 let bytes = src.as_bytes();
323 let n = bytes.len().min(name_field.len().saturating_sub(1));
324
325 name_field.fill(0);
326 for i in 0..n {
327 name_field[i] = bytes[i] as i8;
328 }
329
330 let version_suffix = ::scx_utils::build_id::ops_version_suffix(env!("CARGO_PKG_VERSION"));
331 let bytes = version_suffix.as_bytes();
332 let mut i = 0;
333 let mut bytes_idx = 0;
334 let mut found_null = false;
335
336 while i < name_field.len() - 1 {
337 found_null |= name_field[i] == 0;
338 if !found_null {
339 i += 1;
340 continue;
341 }
342
343 if bytes_idx < bytes.len() {
344 name_field[i] = bytes[bytes_idx] as i8;
345 bytes_idx += 1;
346 } else {
347 break;
348 }
349 i += 1;
350 }
351 name_field[i] = 0;
352
353 Ok(())
354 }
355
356 fn khugepaged_pid() -> u32 {
358 let procs = match all_processes() {
359 Ok(p) => p,
360 Err(_) => return 0,
361 };
362
363 for proc in procs {
364 let proc = match proc {
365 Ok(p) => p,
366 Err(_) => continue,
367 };
368
369 if let Ok(stat) = proc.stat() {
370 if proc.exe().is_err() && stat.comm == "khugepaged" {
371 return proc.pid() as u32;
372 }
373 }
374 }
375
376 0
377 }
378
379 pub fn notify_complete(&mut self, nr_pending: u64) {
386 self.skel.maps.bss_data.as_mut().unwrap().nr_scheduled = nr_pending;
387 std::thread::yield_now();
388 }
389
390 #[allow(dead_code)]
392 pub fn nr_online_cpus_mut(&mut self) -> &mut u64 {
393 &mut self.skel.maps.bss_data.as_mut().unwrap().nr_online_cpus
394 }
395
396 #[allow(dead_code)]
398 pub fn nr_running_mut(&mut self) -> &mut u64 {
399 &mut self.skel.maps.bss_data.as_mut().unwrap().nr_running
400 }
401
402 #[allow(dead_code)]
404 pub fn nr_queued_mut(&mut self) -> &mut u64 {
405 &mut self.skel.maps.bss_data.as_mut().unwrap().nr_queued
406 }
407
408 #[allow(dead_code)]
410 pub fn nr_scheduled_mut(&mut self) -> &mut u64 {
411 &mut self.skel.maps.bss_data.as_mut().unwrap().nr_scheduled
412 }
413
414 #[allow(dead_code)]
416 pub fn nr_user_dispatches_mut(&mut self) -> &mut u64 {
417 &mut self.skel.maps.bss_data.as_mut().unwrap().nr_user_dispatches
418 }
419
420 #[allow(dead_code)]
422 pub fn nr_kernel_dispatches_mut(&mut self) -> &mut u64 {
423 &mut self
424 .skel
425 .maps
426 .bss_data
427 .as_mut()
428 .unwrap()
429 .nr_kernel_dispatches
430 }
431
432 #[allow(dead_code)]
434 pub fn nr_cancel_dispatches_mut(&mut self) -> &mut u64 {
435 &mut self
436 .skel
437 .maps
438 .bss_data
439 .as_mut()
440 .unwrap()
441 .nr_cancel_dispatches
442 }
443
444 #[allow(dead_code)]
446 pub fn nr_bounce_dispatches_mut(&mut self) -> &mut u64 {
447 &mut self
448 .skel
449 .maps
450 .bss_data
451 .as_mut()
452 .unwrap()
453 .nr_bounce_dispatches
454 }
455
456 #[allow(dead_code)]
458 pub fn nr_failed_dispatches_mut(&mut self) -> &mut u64 {
459 &mut self
460 .skel
461 .maps
462 .bss_data
463 .as_mut()
464 .unwrap()
465 .nr_failed_dispatches
466 }
467
468 #[allow(dead_code)]
470 pub fn nr_sched_congested_mut(&mut self) -> &mut u64 {
471 &mut self.skel.maps.bss_data.as_mut().unwrap().nr_sched_congested
472 }
473
474 fn use_sched_ext() -> i32 {
476 #[cfg(target_env = "gnu")]
477 let param: sched_param = sched_param { sched_priority: 0 };
478 #[cfg(target_env = "musl")]
479 let param: sched_param = sched_param {
480 sched_priority: 0,
481 sched_ss_low_priority: 0,
482 sched_ss_repl_period: timespec {
483 tv_sec: 0,
484 tv_nsec: 0,
485 },
486 sched_ss_init_budget: timespec {
487 tv_sec: 0,
488 tv_nsec: 0,
489 },
490 sched_ss_max_repl: 0,
491 };
492
493 unsafe { pthread_setschedparam(pthread_self(), SCHED_EXT, ¶m as *const sched_param) }
494 }
495
496 #[allow(dead_code)]
498 pub fn select_cpu(&mut self, pid: i32, cpu: i32, flags: u64) -> i32 {
499 let prog = &mut self.skel.progs.rs_select_cpu;
500 let mut args = task_cpu_arg {
501 pid: pid as c_int,
502 cpu: cpu as c_int,
503 flags: flags as c_ulong,
504 };
505 let input = ProgramInput {
506 context_in: Some(unsafe {
507 std::slice::from_raw_parts_mut(
508 &mut args as *mut _ as *mut u8,
509 std::mem::size_of_val(&args),
510 )
511 }),
512 ..Default::default()
513 };
514 let out = prog.test_run(input).unwrap();
515
516 out.return_value as i32
517 }
518
519 #[allow(static_mut_refs)]
521 pub fn dequeue_task(&mut self) -> Result<Option<QueuedTask>, i32> {
522 let bss_data = self.skel.maps.bss_data.as_mut().unwrap();
523
524 match self.queued.consume_raw_n(1) {
526 0 => {
527 bss_data.nr_queued = 0;
529 Ok(None)
530 }
531 1 => {
532 let task = unsafe { EnqueuedMessage::from_bytes(&BUF.0).to_queued_task() };
534 bss_data.nr_queued = bss_data.nr_queued.saturating_sub(1);
535
536 Ok(Some(task))
537 }
538 res if res < 0 => Err(res),
539 res => panic!("Unexpected return value from libbpf-rs::consume_raw(): {res}"),
540 }
541 }
542
543 pub fn dispatch_task(&mut self, task: &DispatchedTask) -> Result<(), libbpf_rs::Error> {
545 let mut urb_sample = self
547 .dispatched
548 .reserve(std::mem::size_of::<bpf_intf::dispatched_task_ctx>())?;
549 let bytes = urb_sample.as_mut();
550 let dispatched_task = plain::from_mut_bytes::<bpf_intf::dispatched_task_ctx>(bytes)
551 .expect("failed to convert bytes");
552
553 let bpf_intf::dispatched_task_ctx {
555 pid,
556 cpu,
557 flags,
558 slice_ns,
559 vtime,
560 enq_cnt,
561 ..
562 } = dispatched_task;
563
564 *pid = task.pid;
565 *cpu = task.cpu;
566 *flags = task.flags;
567 *slice_ns = task.slice_ns;
568 *vtime = task.vtime;
569 *enq_cnt = task.enq_cnt;
570
571 self.dispatched
576 .submit(urb_sample)
577 .expect("failed to submit task");
578
579 Ok(())
580 }
581
582 pub fn exited(&mut self) -> bool {
584 self.shutdown.load(Ordering::Relaxed) || uei_exited!(&self.skel, uei)
585 }
586
587 pub fn shutdown_and_report(&mut self) -> Result<UserExitInfo> {
589 let _ = self.struct_ops.take();
590 uei_report!(&self.skel, uei)
591 }
592}
593
594impl Drop for BpfScheduler<'_> {
596 fn drop(&mut self) {
597 if let Some(struct_ops) = self.struct_ops.take() {
598 drop(struct_ops);
599 }
600 ALLOCATOR.unlock_memory();
601 }
602}