1use std::mem::MaybeUninit;
7
8use crate::bpf_intf;
9use crate::bpf_intf::*;
10use crate::bpf_skel::*;
11
12use std::ffi::c_int;
13use std::ffi::c_ulong;
14
15use std::sync::atomic::AtomicBool;
16use std::sync::atomic::Ordering;
17use std::sync::Arc;
18use std::sync::Once;
19
20use anyhow::bail;
21use anyhow::Context;
22use anyhow::Result;
23
24use plain::Plain;
25use procfs::process::all_processes;
26
27use libbpf_rs::libbpf_sys::bpf_object_open_opts;
28use libbpf_rs::OpenObject;
29use libbpf_rs::ProgramInput;
30
31use libc::{c_char, pthread_self, pthread_setschedparam, sched_param};
32
33#[cfg(target_env = "musl")]
34use libc::timespec;
35
36use scx_utils::compat;
37use scx_utils::scx_ops_attach;
38use scx_utils::scx_ops_load;
39use scx_utils::scx_ops_open;
40use scx_utils::uei_exited;
41use scx_utils::uei_report;
42use scx_utils::Topology;
43use scx_utils::UserExitInfo;
44
45use scx_rustland_core::ALLOCATOR;
46
47const SCHED_EXT: i32 = 7;
49const TASK_COMM_LEN: usize = 16;
50
51#[allow(dead_code)]
55pub const RL_CPU_ANY: i32 = bpf_intf::RL_CPU_ANY as i32;
56
57#[derive(Debug, PartialEq, Eq, PartialOrd, Clone)]
78pub struct QueuedTask {
79 pub pid: i32, pub cpu: i32, pub nr_cpus_allowed: u64, pub flags: u64, pub start_ts: u64, pub stop_ts: u64, pub exec_runtime: u64, pub weight: u64, pub vtime: u64, pub enq_cnt: u64,
89 pub comm: [c_char; TASK_COMM_LEN], }
91
92impl QueuedTask {
93 #[allow(dead_code)]
95 pub fn comm_str(&self) -> String {
96 let bytes: &[u8] =
97 unsafe { std::slice::from_raw_parts(self.comm.as_ptr() as *const u8, self.comm.len()) };
98
99 let nul_pos = bytes.iter().position(|&c| c == 0).unwrap_or(bytes.len());
101
102 String::from_utf8_lossy(&bytes[..nul_pos]).into_owned()
104 }
105}
106
107#[derive(Debug, PartialEq, Eq, PartialOrd, Clone)]
109pub struct DispatchedTask {
110 pub pid: i32, pub cpu: i32, pub flags: u64, pub slice_ns: u64, pub vtime: u64, pub enq_cnt: u64,
116}
117
118impl DispatchedTask {
119 pub fn new(task: &QueuedTask) -> Self {
124 DispatchedTask {
125 pid: task.pid,
126 cpu: task.cpu,
127 flags: task.flags,
128 slice_ns: 0, vtime: 0,
130 enq_cnt: task.enq_cnt,
131 }
132 }
133}
134
135unsafe impl Plain for bpf_intf::dispatched_task_ctx {}
137
138impl AsMut<bpf_intf::dispatched_task_ctx> for bpf_intf::dispatched_task_ctx {
139 fn as_mut(&mut self) -> &mut bpf_intf::dispatched_task_ctx {
140 self
141 }
142}
143
144struct EnqueuedMessage {
148 inner: bpf_intf::queued_task_ctx,
149}
150
151impl EnqueuedMessage {
152 fn from_bytes(bytes: &[u8]) -> Self {
153 let queued_task_struct = unsafe { *(bytes.as_ptr() as *const bpf_intf::queued_task_ctx) };
154 EnqueuedMessage {
155 inner: queued_task_struct,
156 }
157 }
158
159 fn to_queued_task(&self) -> QueuedTask {
160 QueuedTask {
161 pid: self.inner.pid,
162 cpu: self.inner.cpu,
163 nr_cpus_allowed: self.inner.nr_cpus_allowed,
164 flags: self.inner.flags,
165 start_ts: self.inner.start_ts,
166 stop_ts: self.inner.stop_ts,
167 exec_runtime: self.inner.exec_runtime,
168 weight: self.inner.weight,
169 vtime: self.inner.vtime,
170 enq_cnt: self.inner.enq_cnt,
171 comm: self.inner.comm,
172 }
173 }
174}
175
176pub struct BpfScheduler<'cb> {
177 pub skel: BpfSkel<'cb>, shutdown: Arc<AtomicBool>, queued: libbpf_rs::RingBuffer<'cb>, dispatched: libbpf_rs::UserRingBuffer, struct_ops: Option<libbpf_rs::Link>, }
183
184const BUFSIZE: usize = std::mem::size_of::<QueuedTask>();
189
190#[repr(align(8))]
191struct AlignedBuffer([u8; BUFSIZE]);
192
193static mut BUF: AlignedBuffer = AlignedBuffer([0; BUFSIZE]);
194
195static SET_HANDLER: Once = Once::new();
196
197fn set_ctrlc_handler(shutdown: Arc<AtomicBool>) -> Result<(), anyhow::Error> {
198 SET_HANDLER.call_once(|| {
199 let shutdown_clone = shutdown.clone();
200 ctrlc::set_handler(move || {
201 shutdown_clone.store(true, Ordering::Relaxed);
202 })
203 .expect("Error setting Ctrl-C handler");
204 });
205 Ok(())
206}
207
208impl<'cb> BpfScheduler<'cb> {
209 pub fn init(
210 open_object: &'cb mut MaybeUninit<OpenObject>,
211 open_opts: Option<bpf_object_open_opts>,
212 exit_dump_len: u32,
213 partial: bool,
214 debug: bool,
215 builtin_idle: bool,
216 slice_ns: u64,
217 name: &str,
218 ) -> Result<Self> {
219 let shutdown = Arc::new(AtomicBool::new(false));
220 set_ctrlc_handler(shutdown.clone()).context("Error setting Ctrl-C handler")?;
221
222 let mut skel_builder = BpfSkelBuilder::default();
224 skel_builder.obj_builder.debug(debug);
225 let mut skel = scx_ops_open!(skel_builder, open_object, rustland, open_opts)?;
226
227 fn callback(data: &[u8]) -> i32 {
238 #[allow(static_mut_refs)]
239 unsafe {
240 BUF.0.copy_from_slice(data);
247 }
248
249 0
251 }
252
253 let topo = Topology::new().unwrap();
255 skel.maps.rodata_data.as_mut().unwrap().smt_enabled = topo.smt_enabled;
256
257 skel.struct_ops.rustland_mut().flags =
259 *compat::SCX_OPS_ENQ_LAST | *compat::SCX_OPS_ALLOW_QUEUED_WAKEUP;
260 if partial {
261 skel.struct_ops.rustland_mut().flags |= *compat::SCX_OPS_SWITCH_PARTIAL;
262 }
263 skel.struct_ops.rustland_mut().exit_dump_len = exit_dump_len;
264 skel.maps.rodata_data.as_mut().unwrap().usersched_pid = std::process::id();
265 skel.maps.rodata_data.as_mut().unwrap().khugepaged_pid = Self::khugepaged_pid();
266 skel.maps.rodata_data.as_mut().unwrap().builtin_idle = builtin_idle;
267 skel.maps.rodata_data.as_mut().unwrap().slice_ns = slice_ns;
268 skel.maps.rodata_data.as_mut().unwrap().debug = debug;
269 let _ = Self::set_scx_ops_name(&mut skel.struct_ops.rustland_mut().name, name);
270
271 let mut skel = scx_ops_load!(skel, rustland, uei)?;
273
274 let struct_ops = Some(scx_ops_attach!(skel, rustland)?);
275
276 let maps = &skel.maps;
278 let queued_ring_buffer = &maps.queued;
279 let mut rbb = libbpf_rs::RingBufferBuilder::new();
280 rbb.add(queued_ring_buffer, callback)
281 .expect("failed to add ringbuf callback");
282 let queued = rbb.build().expect("failed to build ringbuf");
283
284 let dispatched = libbpf_rs::UserRingBuffer::new(&maps.dispatched)
286 .expect("failed to create user ringbuf");
287
288 ALLOCATOR.lock_memory();
291 ALLOCATOR.disable_mmap().expect("Failed to disable mmap");
292
293 if partial {
295 let err = Self::use_sched_ext();
296 if err < 0 {
297 return Err(anyhow::Error::msg(format!(
298 "sched_setscheduler error: {err}"
299 )));
300 }
301 }
302
303 Ok(Self {
304 skel,
305 shutdown,
306 queued,
307 dispatched,
308 struct_ops,
309 })
310 }
311
312 fn set_scx_ops_name(name_field: &mut [i8], src: &str) -> Result<()> {
314 if !src.is_ascii() {
315 bail!("name must be an ASCII string");
316 }
317
318 let bytes = src.as_bytes();
319 let n = bytes.len().min(name_field.len().saturating_sub(1));
320
321 name_field.fill(0);
322 for i in 0..n {
323 name_field[i] = bytes[i] as i8;
324 }
325
326 let version_suffix = ::scx_utils::build_id::ops_version_suffix(env!("CARGO_PKG_VERSION"));
327 let bytes = version_suffix.as_bytes();
328 let mut i = 0;
329 let mut bytes_idx = 0;
330 let mut found_null = false;
331
332 while i < name_field.len() - 1 {
333 found_null |= name_field[i] == 0;
334 if !found_null {
335 i += 1;
336 continue;
337 }
338
339 if bytes_idx < bytes.len() {
340 name_field[i] = bytes[bytes_idx] as i8;
341 bytes_idx += 1;
342 } else {
343 break;
344 }
345 i += 1;
346 }
347 name_field[i] = 0;
348
349 Ok(())
350 }
351
352 fn khugepaged_pid() -> u32 {
354 let procs = match all_processes() {
355 Ok(p) => p,
356 Err(_) => return 0,
357 };
358
359 for proc in procs {
360 let proc = match proc {
361 Ok(p) => p,
362 Err(_) => continue,
363 };
364
365 if let Ok(stat) = proc.stat() {
366 if proc.exe().is_err() && stat.comm == "khugepaged" {
367 return proc.pid() as u32;
368 }
369 }
370 }
371
372 0
373 }
374
375 pub fn notify_complete(&mut self, nr_pending: u64) {
382 self.skel.maps.bss_data.as_mut().unwrap().nr_scheduled = nr_pending;
383 std::thread::yield_now();
384 }
385
386 #[allow(dead_code)]
388 pub fn nr_online_cpus_mut(&mut self) -> &mut u64 {
389 &mut self.skel.maps.bss_data.as_mut().unwrap().nr_online_cpus
390 }
391
392 #[allow(dead_code)]
394 pub fn nr_running_mut(&mut self) -> &mut u64 {
395 &mut self.skel.maps.bss_data.as_mut().unwrap().nr_running
396 }
397
398 #[allow(dead_code)]
400 pub fn nr_queued_mut(&mut self) -> &mut u64 {
401 &mut self.skel.maps.bss_data.as_mut().unwrap().nr_queued
402 }
403
404 #[allow(dead_code)]
406 pub fn nr_scheduled_mut(&mut self) -> &mut u64 {
407 &mut self.skel.maps.bss_data.as_mut().unwrap().nr_scheduled
408 }
409
410 #[allow(dead_code)]
412 pub fn nr_user_dispatches_mut(&mut self) -> &mut u64 {
413 &mut self.skel.maps.bss_data.as_mut().unwrap().nr_user_dispatches
414 }
415
416 #[allow(dead_code)]
418 pub fn nr_kernel_dispatches_mut(&mut self) -> &mut u64 {
419 &mut self
420 .skel
421 .maps
422 .bss_data
423 .as_mut()
424 .unwrap()
425 .nr_kernel_dispatches
426 }
427
428 #[allow(dead_code)]
430 pub fn nr_cancel_dispatches_mut(&mut self) -> &mut u64 {
431 &mut self
432 .skel
433 .maps
434 .bss_data
435 .as_mut()
436 .unwrap()
437 .nr_cancel_dispatches
438 }
439
440 #[allow(dead_code)]
442 pub fn nr_bounce_dispatches_mut(&mut self) -> &mut u64 {
443 &mut self
444 .skel
445 .maps
446 .bss_data
447 .as_mut()
448 .unwrap()
449 .nr_bounce_dispatches
450 }
451
452 #[allow(dead_code)]
454 pub fn nr_failed_dispatches_mut(&mut self) -> &mut u64 {
455 &mut self
456 .skel
457 .maps
458 .bss_data
459 .as_mut()
460 .unwrap()
461 .nr_failed_dispatches
462 }
463
464 #[allow(dead_code)]
466 pub fn nr_sched_congested_mut(&mut self) -> &mut u64 {
467 &mut self.skel.maps.bss_data.as_mut().unwrap().nr_sched_congested
468 }
469
470 fn use_sched_ext() -> i32 {
472 #[cfg(target_env = "gnu")]
473 let param: sched_param = sched_param { sched_priority: 0 };
474 #[cfg(target_env = "musl")]
475 let param: sched_param = sched_param {
476 sched_priority: 0,
477 sched_ss_low_priority: 0,
478 sched_ss_repl_period: timespec {
479 tv_sec: 0,
480 tv_nsec: 0,
481 },
482 sched_ss_init_budget: timespec {
483 tv_sec: 0,
484 tv_nsec: 0,
485 },
486 sched_ss_max_repl: 0,
487 };
488
489 unsafe { pthread_setschedparam(pthread_self(), SCHED_EXT, ¶m as *const sched_param) }
490 }
491
492 #[allow(dead_code)]
494 pub fn select_cpu(&mut self, pid: i32, cpu: i32, flags: u64) -> i32 {
495 let prog = &mut self.skel.progs.rs_select_cpu;
496 let mut args = task_cpu_arg {
497 pid: pid as c_int,
498 cpu: cpu as c_int,
499 flags: flags as c_ulong,
500 };
501 let input = ProgramInput {
502 context_in: Some(unsafe {
503 std::slice::from_raw_parts_mut(
504 &mut args as *mut _ as *mut u8,
505 std::mem::size_of_val(&args),
506 )
507 }),
508 ..Default::default()
509 };
510 let out = prog.test_run(input).unwrap();
511
512 out.return_value as i32
513 }
514
515 #[allow(static_mut_refs)]
517 pub fn dequeue_task(&mut self) -> Result<Option<QueuedTask>, i32> {
518 match self.queued.consume_raw_n(1) {
520 0 => {
521 self.skel.maps.bss_data.as_mut().unwrap().nr_queued = 0;
523 Ok(None)
524 }
525 1 => {
526 let task = unsafe { EnqueuedMessage::from_bytes(&BUF.0).to_queued_task() };
528 self.skel.maps.bss_data.as_mut().unwrap().nr_queued = self
529 .skel
530 .maps
531 .bss_data
532 .as_ref()
533 .unwrap()
534 .nr_queued
535 .saturating_sub(1);
536
537 Ok(Some(task))
538 }
539 res if res < 0 => Err(res),
540 res => panic!("Unexpected return value from libbpf-rs::consume_raw(): {res}"),
541 }
542 }
543
544 pub fn dispatch_task(&mut self, task: &DispatchedTask) -> Result<(), libbpf_rs::Error> {
546 let mut urb_sample = self
548 .dispatched
549 .reserve(std::mem::size_of::<bpf_intf::dispatched_task_ctx>())?;
550 let bytes = urb_sample.as_mut();
551 let dispatched_task = plain::from_mut_bytes::<bpf_intf::dispatched_task_ctx>(bytes)
552 .expect("failed to convert bytes");
553
554 let bpf_intf::dispatched_task_ctx {
556 pid,
557 cpu,
558 flags,
559 slice_ns,
560 vtime,
561 enq_cnt,
562 ..
563 } = &mut dispatched_task.as_mut();
564
565 *pid = task.pid;
566 *cpu = task.cpu;
567 *flags = task.flags;
568 *slice_ns = task.slice_ns;
569 *vtime = task.vtime;
570 *enq_cnt = task.enq_cnt;
571
572 self.dispatched
577 .submit(urb_sample)
578 .expect("failed to submit task");
579
580 Ok(())
581 }
582
583 pub fn exited(&mut self) -> bool {
585 self.shutdown.load(Ordering::Relaxed) || uei_exited!(&self.skel, uei)
586 }
587
588 pub fn shutdown_and_report(&mut self) -> Result<UserExitInfo> {
590 let _ = self.struct_ops.take();
591 uei_report!(&self.skel, uei)
592 }
593}
594
595impl Drop for BpfScheduler<'_> {
597 fn drop(&mut self) {
598 if let Some(struct_ops) = self.struct_ops.take() {
599 drop(struct_ops);
600 }
601 ALLOCATOR.unlock_memory();
602 }
603}