1mod bpf_skel;
6pub use bpf_skel::*;
7
8pub mod bpf_intf;
9pub mod stats;
10use stats::Metrics;
11
12use std::mem::MaybeUninit;
13use std::sync::atomic::AtomicBool;
14use std::sync::atomic::Ordering;
15use std::sync::Arc;
16use std::time::Duration;
17
18use anyhow::bail;
19use anyhow::Context;
20use anyhow::Result;
21use clap::Parser;
22use crossbeam::channel::RecvTimeoutError;
23use libbpf_rs::MapCore as _;
24use libbpf_rs::OpenObject;
25use libbpf_rs::ProgramInput;
26use log::{debug, info, warn};
27use scx_stats::prelude::*;
28use scx_utils::build_id;
29use scx_utils::compat;
30use scx_utils::init_libbpf_logging;
31use scx_utils::pm::{cpu_idle_resume_latency_supported, update_cpu_idle_resume_latency};
32use scx_utils::scx_ops_attach;
33use scx_utils::scx_ops_load;
34use scx_utils::scx_ops_open;
35use scx_utils::uei_exited;
36use scx_utils::uei_report;
37use scx_utils::Topology;
38use scx_utils::UserExitInfo;
39use scx_utils::NR_CPU_IDS;
40use scx_utils::{Core, Llc};
41
42use std::ffi::c_ulong;
43
44use bpf_intf::stat_idx_P2DQ_NR_STATS;
45use bpf_intf::stat_idx_P2DQ_STAT_DIRECT;
46use bpf_intf::stat_idx_P2DQ_STAT_DISPATCH_PICK2;
47use bpf_intf::stat_idx_P2DQ_STAT_DSQ_CHANGE;
48use bpf_intf::stat_idx_P2DQ_STAT_DSQ_SAME;
49use bpf_intf::stat_idx_P2DQ_STAT_IDLE;
50use bpf_intf::stat_idx_P2DQ_STAT_KEEP;
51use bpf_intf::stat_idx_P2DQ_STAT_LLC_MIGRATION;
52use bpf_intf::stat_idx_P2DQ_STAT_NODE_MIGRATION;
53use bpf_intf::stat_idx_P2DQ_STAT_SELECT_PICK2;
54use bpf_intf::stat_idx_P2DQ_STAT_WAKE_LLC;
55use bpf_intf::stat_idx_P2DQ_STAT_WAKE_MIG;
56use bpf_intf::stat_idx_P2DQ_STAT_WAKE_PREV;
57use scx_p2dq::SchedulerOpts;
58use scx_p2dq::TOPO;
59
60#[derive(Debug, Parser)]
66struct CliOpts {
67 #[clap(short = 'v', long, action = clap::ArgAction::Count)]
70 pub verbose: u8,
71
72 #[clap(long)]
74 pub stats: Option<f64>,
75
76 #[clap(long)]
79 pub monitor: Option<f64>,
80
81 #[clap(long)]
83 pub version: bool,
84
85 #[clap(flatten)]
86 pub sched: SchedulerOpts,
87}
88
89struct Scheduler<'a> {
90 skel: BpfSkel<'a>,
91 struct_ops: Option<libbpf_rs::Link>,
92 verbose: u8,
93
94 stats_server: StatsServer<(), Metrics>,
95}
96
97impl<'a> Scheduler<'a> {
98 fn init(
99 opts: &SchedulerOpts,
100 open_object: &'a mut MaybeUninit<OpenObject>,
101 verbose: u8,
102 ) -> Result<Self> {
103 let mut skel_builder = BpfSkelBuilder::default();
105 skel_builder.obj_builder.debug(verbose > 1);
106 init_libbpf_logging(None);
107 info!(
108 "Running scx_p2dq (build ID: {})",
109 build_id::full_version(env!("CARGO_PKG_VERSION"))
110 );
111 let mut open_skel = scx_ops_open!(skel_builder, open_object, p2dq).unwrap();
112 scx_p2dq::init_open_skel!(&mut open_skel, opts, verbose)?;
113
114 match *compat::SCX_OPS_ALLOW_QUEUED_WAKEUP {
115 0 => info!("Kernel does not support queued wakeup optimization."),
116 v => open_skel.struct_ops.p2dq_mut().flags |= v,
117 };
118
119 let mut skel = scx_ops_load!(open_skel, p2dq, uei)?;
120
121 scx_p2dq::init_skel!(&mut skel);
122
123 let stats_server = StatsServer::new(stats::server_data()).launch()?;
124
125 Ok(Self {
126 skel,
127 struct_ops: None,
128 verbose,
129 stats_server,
130 })
131 }
132
133 fn get_metrics(&self) -> Metrics {
134 let mut stats = vec![0u64; stat_idx_P2DQ_NR_STATS as usize];
135 let stats_map = &self.skel.maps.stats;
136 for stat in 0..stat_idx_P2DQ_NR_STATS {
137 let cpu_stat_vec: Vec<Vec<u8>> = stats_map
138 .lookup_percpu(&stat.to_ne_bytes(), libbpf_rs::MapFlags::ANY)
139 .unwrap()
140 .unwrap();
141 let sum: u64 = cpu_stat_vec
142 .iter()
143 .map(|val| u64::from_ne_bytes(val.as_slice().try_into().unwrap()))
144 .sum();
145 stats[stat as usize] = sum;
146 }
147 Metrics {
148 direct: stats[stat_idx_P2DQ_STAT_DIRECT as usize],
149 idle: stats[stat_idx_P2DQ_STAT_IDLE as usize],
150 sched_mode: self.skel.maps.bss_data.sched_mode,
151 dsq_change: stats[stat_idx_P2DQ_STAT_DSQ_CHANGE as usize],
152 same_dsq: stats[stat_idx_P2DQ_STAT_DSQ_SAME as usize],
153 keep: stats[stat_idx_P2DQ_STAT_KEEP as usize],
154 select_pick2: stats[stat_idx_P2DQ_STAT_SELECT_PICK2 as usize],
155 dispatch_pick2: stats[stat_idx_P2DQ_STAT_DISPATCH_PICK2 as usize],
156 llc_migrations: stats[stat_idx_P2DQ_STAT_LLC_MIGRATION as usize],
157 node_migrations: stats[stat_idx_P2DQ_STAT_NODE_MIGRATION as usize],
158 wake_prev: stats[stat_idx_P2DQ_STAT_WAKE_PREV as usize],
159 wake_llc: stats[stat_idx_P2DQ_STAT_WAKE_LLC as usize],
160 wake_mig: stats[stat_idx_P2DQ_STAT_WAKE_MIG as usize],
161 }
162 }
163
164 fn setup_arenas(&mut self) -> Result<()> {
165 let mut args = types::arena_init_args {
169 static_pages: bpf_intf::consts_STATIC_ALLOC_PAGES_GRANULARITY as c_ulong,
170 task_ctx_size: std::mem::size_of::<types::task_p2dq>() as c_ulong,
171 };
172
173 let input = ProgramInput {
174 context_in: Some(unsafe {
175 std::slice::from_raw_parts_mut(
176 &mut args as *mut _ as *mut u8,
177 std::mem::size_of_val(&args),
178 )
179 }),
180 ..Default::default()
181 };
182
183 let output = self.skel.progs.arena_init.test_run(input)?;
184 if output.return_value != 0 {
185 bail!(
186 "Could not initialize arenas, p2dq_setup returned {}",
187 output.return_value as i32
188 );
189 }
190
191 Ok(())
192 }
193
194 fn setup_topology_node(&mut self, mask: &[u64]) -> Result<()> {
195 let mut args = types::arena_alloc_mask_args {
196 bitmap: 0 as c_ulong,
197 };
198
199 let input = ProgramInput {
200 context_in: Some(unsafe {
201 std::slice::from_raw_parts_mut(
202 &mut args as *mut _ as *mut u8,
203 std::mem::size_of_val(&args),
204 )
205 }),
206 ..Default::default()
207 };
208
209 let output = self.skel.progs.arena_alloc_mask.test_run(input)?;
210 if output.return_value != 0 {
211 bail!(
212 "Could not initialize arenas, setup_topology_node returned {}",
213 output.return_value as i32
214 );
215 }
216
217 let ptr = unsafe { std::mem::transmute::<u64, &mut [u64; 10]>(args.bitmap) };
218
219 let (valid_mask, _) = ptr.split_at_mut(mask.len());
220 valid_mask.clone_from_slice(mask);
221
222 let mut args = types::arena_topology_node_init_args {
223 bitmap: args.bitmap as c_ulong,
224 data_size: 0 as c_ulong,
225 id: 0 as c_ulong,
226 };
227
228 let input = ProgramInput {
229 context_in: Some(unsafe {
230 std::slice::from_raw_parts_mut(
231 &mut args as *mut _ as *mut u8,
232 std::mem::size_of_val(&args),
233 )
234 }),
235 ..Default::default()
236 };
237
238 let output = self.skel.progs.arena_topology_node_init.test_run(input)?;
239 if output.return_value != 0 {
240 bail!(
241 "p2dq_topology_node_init returned {}",
242 output.return_value as i32
243 );
244 }
245
246 Ok(())
247 }
248
249 fn setup_topology(&mut self) -> Result<()> {
250 let topo = Topology::new().expect("Failed to build host topology");
251
252 self.setup_topology_node(topo.span.as_raw_slice())?;
253
254 for (_, node) in topo.nodes {
255 self.setup_topology_node(node.span.as_raw_slice())?;
256 }
257
258 for (_, llc) in topo.all_llcs {
259 self.setup_topology_node(
260 Arc::<Llc>::into_inner(llc)
261 .expect("missing llc")
262 .span
263 .as_raw_slice(),
264 )?;
265 }
266
267 for (_, core) in topo.all_cores {
268 self.setup_topology_node(
269 Arc::<Core>::into_inner(core)
270 .expect("missing core")
271 .span
272 .as_raw_slice(),
273 )?;
274 }
275 for (_, cpu) in topo.all_cpus {
276 let mut mask = [0; 9];
277 mask[cpu.id.checked_shr(64).unwrap_or(0)] |= 1 << (cpu.id % 64);
278 self.setup_topology_node(&mask)?;
279 }
280
281 Ok(())
282 }
283
284 fn run(&mut self, shutdown: Arc<AtomicBool>) -> Result<UserExitInfo> {
285 let (res_ch, req_ch) = self.stats_server.channels();
286
287 while !shutdown.load(Ordering::Relaxed) && !uei_exited!(&self.skel, uei) {
288 match req_ch.recv_timeout(Duration::from_secs(1)) {
289 Ok(()) => res_ch.send(self.get_metrics())?,
290 Err(RecvTimeoutError::Timeout) => {}
291 Err(e) => Err(e)?,
292 }
293 }
294
295 let _ = self.struct_ops.take();
296 uei_report!(&self.skel, uei)
297 }
298
299 fn print_topology(&mut self) -> Result<()> {
300 let input = ProgramInput {
301 ..Default::default()
302 };
303
304 let output = self.skel.progs.arena_topology_print.test_run(input)?;
305 if output.return_value != 0 {
306 bail!(
307 "Could not initialize arenas, topo_print returned {}",
308 output.return_value as i32
309 );
310 }
311
312 Ok(())
313 }
314
315 fn start(&mut self) -> Result<()> {
316 self.setup_arenas()?;
317 self.setup_topology()?;
318
319 self.struct_ops = Some(scx_ops_attach!(self.skel, p2dq)?);
320
321 if self.verbose > 1 {
322 self.print_topology()?;
323 }
324
325 info!("P2DQ scheduler started! Run `scx_p2dq --monitor` for metrics.");
326
327 Ok(())
328 }
329}
330
331impl Drop for Scheduler<'_> {
332 fn drop(&mut self) {
333 if let Some(struct_ops) = self.struct_ops.take() {
334 drop(struct_ops);
335 }
336 }
337}
338
339fn main() -> Result<()> {
340 let opts = CliOpts::parse();
341
342 if opts.version {
343 println!(
344 "scx_p2dq: {}",
345 build_id::full_version(env!("CARGO_PKG_VERSION"))
346 );
347 return Ok(());
348 }
349
350 let llv = match opts.verbose {
351 0 => simplelog::LevelFilter::Info,
352 1 => simplelog::LevelFilter::Debug,
353 _ => simplelog::LevelFilter::Trace,
354 };
355 let mut lcfg = simplelog::ConfigBuilder::new();
356 lcfg.set_time_offset_to_local()
357 .expect("Failed to set local time offset")
358 .set_time_level(simplelog::LevelFilter::Error)
359 .set_location_level(simplelog::LevelFilter::Off)
360 .set_target_level(simplelog::LevelFilter::Off)
361 .set_thread_level(simplelog::LevelFilter::Off);
362 simplelog::TermLogger::init(
363 llv,
364 lcfg.build(),
365 simplelog::TerminalMode::Stderr,
366 simplelog::ColorChoice::Auto,
367 )?;
368
369 let shutdown = Arc::new(AtomicBool::new(false));
370 let shutdown_clone = shutdown.clone();
371 ctrlc::set_handler(move || {
372 shutdown_clone.store(true, Ordering::Relaxed);
373 })
374 .context("Error setting Ctrl-C handler")?;
375
376 if let Some(intv) = opts.monitor.or(opts.stats) {
377 let shutdown_copy = shutdown.clone();
378 let jh = std::thread::spawn(move || {
379 match stats::monitor(Duration::from_secs_f64(intv), shutdown_copy) {
380 Ok(_) => {
381 debug!("stats monitor thread finished successfully")
382 }
383 Err(error_object) => {
384 warn!(
385 "stats monitor thread finished because of an error {}",
386 error_object
387 )
388 }
389 }
390 });
391 if opts.monitor.is_some() {
392 let _ = jh.join();
393 return Ok(());
394 }
395 }
396
397 if let Some(idle_resume_us) = opts.sched.idle_resume_us {
398 if !cpu_idle_resume_latency_supported() {
399 warn!("idle resume latency not supported");
400 } else {
401 if idle_resume_us > 0 {
402 info!("Setting idle QoS to {}us", idle_resume_us);
403 for cpu in TOPO.all_cpus.values() {
404 update_cpu_idle_resume_latency(cpu.id, idle_resume_us.try_into().unwrap())?;
405 }
406 }
407 }
408 }
409
410 let mut open_object = MaybeUninit::uninit();
411 loop {
412 let mut sched = Scheduler::init(&opts.sched, &mut open_object, opts.verbose)?;
413 sched.start()?;
414
415 if !sched.run(shutdown.clone())?.should_restart() {
416 break;
417 }
418 }
419 Ok(())
420}