1pub mod stats;
6use stats::Metrics;
7
8use std::mem::MaybeUninit;
9use std::sync::atomic::AtomicBool;
10use std::sync::atomic::Ordering;
11use std::sync::Arc;
12use std::time::Duration;
13
14use anyhow::bail;
15use anyhow::Context;
16use anyhow::Result;
17use clap::Parser;
18use crossbeam::channel::RecvTimeoutError;
19use libbpf_rs::skel::Skel;
20use libbpf_rs::AsRawLibbpf;
21use libbpf_rs::MapCore as _;
22use libbpf_rs::OpenObject;
23use libbpf_rs::ProgramInput;
24use scx_arena::ArenaLib;
25use scx_stats::prelude::*;
26use scx_utils::build_id;
27use scx_utils::compat;
28use scx_utils::init_libbpf_logging;
29use scx_utils::libbpf_clap_opts::LibbpfOpts;
30use scx_utils::pm::{
31 cpu_idle_resume_latency_supported, epp_supported, for_each_uncore_domain, get_epp,
32 get_turbo_enabled, get_uncore_max_freq_khz, get_uncore_min_freq_khz, set_epp,
33 set_turbo_enabled, set_uncore_max_freq_khz, turbo_supported, uncore_freq_supported,
34 update_cpu_idle_resume_latency,
35};
36use scx_utils::scx_ops_attach;
37use scx_utils::scx_ops_load;
38use scx_utils::scx_ops_open;
39use scx_utils::uei_exited;
40use scx_utils::uei_report;
41use scx_utils::Topology;
42use scx_utils::UserExitInfo;
43use scx_utils::NR_CPU_IDS;
44use tracing::{debug, info, warn};
45use tracing_subscriber::filter::EnvFilter;
46
47use bpf_intf::stat_idx_P2DQ_NR_STATS;
48use bpf_intf::stat_idx_P2DQ_STAT_ATQ_ENQ;
49use bpf_intf::stat_idx_P2DQ_STAT_ATQ_REENQ;
50use bpf_intf::stat_idx_P2DQ_STAT_DIRECT;
51use bpf_intf::stat_idx_P2DQ_STAT_DISPATCH_PICK2;
52use bpf_intf::stat_idx_P2DQ_STAT_DSQ_CHANGE;
53use bpf_intf::stat_idx_P2DQ_STAT_DSQ_SAME;
54use bpf_intf::stat_idx_P2DQ_STAT_EAS_BIG_SELECT;
55use bpf_intf::stat_idx_P2DQ_STAT_EAS_FALLBACK;
56use bpf_intf::stat_idx_P2DQ_STAT_EAS_LITTLE_SELECT;
57use bpf_intf::stat_idx_P2DQ_STAT_ENQ_CPU;
58use bpf_intf::stat_idx_P2DQ_STAT_ENQ_INTR;
59use bpf_intf::stat_idx_P2DQ_STAT_ENQ_LLC;
60use bpf_intf::stat_idx_P2DQ_STAT_ENQ_MIG;
61use bpf_intf::stat_idx_P2DQ_STAT_EXEC_BALANCE;
62use bpf_intf::stat_idx_P2DQ_STAT_EXEC_SAME_LLC;
63use bpf_intf::stat_idx_P2DQ_STAT_FORK_BALANCE;
64use bpf_intf::stat_idx_P2DQ_STAT_FORK_SAME_LLC;
65use bpf_intf::stat_idx_P2DQ_STAT_IDLE;
66use bpf_intf::stat_idx_P2DQ_STAT_KEEP;
67use bpf_intf::stat_idx_P2DQ_STAT_LLC_MIGRATION;
68use bpf_intf::stat_idx_P2DQ_STAT_NODE_MIGRATION;
69use bpf_intf::stat_idx_P2DQ_STAT_SELECT_PICK2;
70use bpf_intf::stat_idx_P2DQ_STAT_THERMAL_AVOID;
71use bpf_intf::stat_idx_P2DQ_STAT_THERMAL_KICK;
72use bpf_intf::stat_idx_P2DQ_STAT_WAKE_LLC;
73use bpf_intf::stat_idx_P2DQ_STAT_WAKE_MIG;
74use bpf_intf::stat_idx_P2DQ_STAT_WAKE_PREV;
75use scx_p2dq::bpf_intf;
76use scx_p2dq::bpf_skel::*;
77use scx_p2dq::SchedulerOpts;
78use scx_p2dq::TOPO;
79
80const SCHEDULER_NAME: &str = "scx_p2dq";
81#[derive(Debug, Parser)]
87struct CliOpts {
88 #[clap(short = 'v', long, action = clap::ArgAction::Count)]
90 verbose: u8,
91
92 #[clap(long, default_value = "info")]
95 pub log_level: String,
96
97 #[clap(long)]
99 pub stats: Option<f64>,
100
101 #[clap(long)]
104 pub monitor: Option<f64>,
105
106 #[clap(long)]
108 pub version: bool,
109
110 #[clap(long)]
112 pub run_id: Option<u64>,
113
114 #[clap(flatten)]
115 pub sched: SchedulerOpts,
116
117 #[clap(flatten, next_help_heading = "Libbpf Options")]
118 pub libbpf: LibbpfOpts,
119}
120
121struct Scheduler<'a> {
122 skel: BpfSkel<'a>,
123 struct_ops: Option<libbpf_rs::Link>,
124 debug_level: u8,
125
126 stats_server: StatsServer<(), Metrics>,
127}
128
129impl<'a> Scheduler<'a> {
130 fn init(
131 opts: &SchedulerOpts,
132 libbpf_ops: &LibbpfOpts,
133 open_object: &'a mut MaybeUninit<OpenObject>,
134 log_level: &str,
135 ) -> Result<Self> {
136 let debug_level = if log_level.contains("trace") {
138 2
139 } else if log_level.contains("debug") {
140 1
141 } else {
142 0
143 };
144 let mut skel_builder = BpfSkelBuilder::default();
145 skel_builder.obj_builder.debug(debug_level > 1);
146 init_libbpf_logging(None);
147 info!(
148 "Running scx_p2dq (build ID: {})",
149 build_id::full_version(env!("CARGO_PKG_VERSION"))
150 );
151 let topo = if opts.virt_llc_enabled {
152 Topology::with_args(&opts.topo)?
153 } else {
154 Topology::new()?
155 };
156 let open_opts = libbpf_ops.clone().into_bpf_open_opts();
157 let mut open_skel = scx_ops_open!(skel_builder, open_object, p2dq, open_opts).context(
158 "Failed to open BPF object. This can be caused by a mismatch between the kernel \
159 version and the BPF object, permission or other libbpf issues. Try running `dmesg \
160 | grep bpf` to see if there are any error messages related to the BPF object. See \
161 the LibbpfOptions section in the help for more information on configuration related \
162 to this issue or file an issue on the scx repo if the problem persists. \
163 https://github.com/sched-ext/scx/issues/new?labels=scx_p2dq&title=scx_p2dq:%20New%20Issue&assignees=hodgesds&body=Kernel%20version:%20(fill%20me%20out)%0ADistribution:%20(fill%20me%20out)%0AHardware:%20(fill%20me%20out)%0A%0AIssue:%20(fill%20me%20out)"
164 )?;
165
166 #[cfg(any(target_arch = "aarch64", target_arch = "arm"))]
170 open_skel.progs.on_thermal_pressure.set_autoload(false);
171
172 let hw_profile = scx_p2dq::HardwareProfile::detect();
174 let mut opts_optimized = opts.clone();
175 if opts.hw_auto_optimize {
176 hw_profile.optimize_scheduler_opts(&mut opts_optimized);
177 }
178
179 scx_p2dq::init_open_skel!(
180 &mut open_skel,
181 topo,
182 &opts_optimized,
183 debug_level,
184 &hw_profile
185 )?;
186
187 #[cfg(any(target_arch = "aarch64", target_arch = "arm"))]
189 {
190 let thermal_enabled = std::path::Path::new(
191 "/sys/kernel/tracing/events/thermal_pressure/hw_pressure_update",
192 )
193 .exists()
194 || std::path::Path::new(
195 "/sys/kernel/debug/tracing/events/thermal_pressure/hw_pressure_update",
196 )
197 .exists();
198
199 if thermal_enabled {
200 debug!(
201 "Kernel supports thermal pressure tracking, enabling hw_pressure_update tracepoint"
202 );
203 open_skel.progs.on_thermal_pressure.set_autoload(true);
204 stats::set_thermal_tracking_enabled(true);
205
206 open_skel
207 .maps
208 .rodata_data
209 .as_mut()
210 .unwrap()
211 .p2dq_config
212 .thermal_enabled = std::mem::MaybeUninit::new(true);
213 } else {
214 debug!("Kernel does not support thermal pressure tracking (CONFIG_SCHED_HW_PRESSURE not enabled)");
215 }
216 }
217
218 if opts_optimized.enable_eas {
219 stats::set_eas_enabled(true);
220 }
221
222 if opts_optimized.atq_enabled && compat::ksym_exists("bpf_spin_unlock").unwrap_or(false) {
223 stats::set_atq_enabled(true);
224 }
225
226 if opts.queued_wakeup {
227 open_skel.struct_ops.p2dq_mut().flags |= *compat::SCX_OPS_ALLOW_QUEUED_WAKEUP;
228 }
229 open_skel.struct_ops.p2dq_mut().flags |= *compat::SCX_OPS_KEEP_BUILTIN_IDLE;
230
231 unsafe {
235 libbpf_rs::libbpf_sys::bpf_map__set_autoattach(
236 open_skel.maps.p2dq.as_libbpf_object().as_ptr(),
237 false,
238 );
239 }
240
241 let mut skel = scx_ops_load!(open_skel, p2dq, uei)?;
242 scx_p2dq::init_skel!(&mut skel, topo);
243
244 let stats_server = StatsServer::new(stats::server_data()).launch()?;
245
246 Ok(Self {
247 skel,
248 struct_ops: None,
249 debug_level,
250 stats_server,
251 })
252 }
253
254 fn get_metrics(&self) -> Metrics {
255 let mut stats = vec![0u64; stat_idx_P2DQ_NR_STATS as usize];
256 let stats_map = &self.skel.maps.stats;
257 for stat in 0..stat_idx_P2DQ_NR_STATS {
258 let cpu_stat_vec: Vec<Vec<u8>> = stats_map
259 .lookup_percpu(&stat.to_ne_bytes(), libbpf_rs::MapFlags::ANY)
260 .unwrap()
261 .unwrap();
262 let sum: u64 = cpu_stat_vec
263 .iter()
264 .map(|val| u64::from_ne_bytes(val.as_slice().try_into().unwrap()))
265 .sum();
266 stats[stat as usize] = sum;
267 }
268 Metrics {
269 atq_enq: stats[stat_idx_P2DQ_STAT_ATQ_ENQ as usize],
270 atq_reenq: stats[stat_idx_P2DQ_STAT_ATQ_REENQ as usize],
271 direct: stats[stat_idx_P2DQ_STAT_DIRECT as usize],
272 idle: stats[stat_idx_P2DQ_STAT_IDLE as usize],
273 dsq_change: stats[stat_idx_P2DQ_STAT_DSQ_CHANGE as usize],
274 same_dsq: stats[stat_idx_P2DQ_STAT_DSQ_SAME as usize],
275 keep: stats[stat_idx_P2DQ_STAT_KEEP as usize],
276 enq_cpu: stats[stat_idx_P2DQ_STAT_ENQ_CPU as usize],
277 enq_intr: stats[stat_idx_P2DQ_STAT_ENQ_INTR as usize],
278 enq_llc: stats[stat_idx_P2DQ_STAT_ENQ_LLC as usize],
279 enq_mig: stats[stat_idx_P2DQ_STAT_ENQ_MIG as usize],
280 select_pick2: stats[stat_idx_P2DQ_STAT_SELECT_PICK2 as usize],
281 dispatch_pick2: stats[stat_idx_P2DQ_STAT_DISPATCH_PICK2 as usize],
282 llc_migrations: stats[stat_idx_P2DQ_STAT_LLC_MIGRATION as usize],
283 node_migrations: stats[stat_idx_P2DQ_STAT_NODE_MIGRATION as usize],
284 wake_prev: stats[stat_idx_P2DQ_STAT_WAKE_PREV as usize],
285 wake_llc: stats[stat_idx_P2DQ_STAT_WAKE_LLC as usize],
286 wake_mig: stats[stat_idx_P2DQ_STAT_WAKE_MIG as usize],
287 fork_balance: stats[stat_idx_P2DQ_STAT_FORK_BALANCE as usize],
288 exec_balance: stats[stat_idx_P2DQ_STAT_EXEC_BALANCE as usize],
289 fork_same_llc: stats[stat_idx_P2DQ_STAT_FORK_SAME_LLC as usize],
290 exec_same_llc: stats[stat_idx_P2DQ_STAT_EXEC_SAME_LLC as usize],
291 thermal_kick: stats[stat_idx_P2DQ_STAT_THERMAL_KICK as usize],
292 thermal_avoid: stats[stat_idx_P2DQ_STAT_THERMAL_AVOID as usize],
293 eas_little_select: stats[stat_idx_P2DQ_STAT_EAS_LITTLE_SELECT as usize],
294 eas_big_select: stats[stat_idx_P2DQ_STAT_EAS_BIG_SELECT as usize],
295 eas_fallback: stats[stat_idx_P2DQ_STAT_EAS_FALLBACK as usize],
296 }
297 }
298
299 fn run(&mut self, shutdown: Arc<AtomicBool>) -> Result<UserExitInfo> {
300 let (res_ch, req_ch) = self.stats_server.channels();
301
302 while !shutdown.load(Ordering::Relaxed) && !uei_exited!(&self.skel, uei) {
303 match req_ch.recv_timeout(Duration::from_secs(1)) {
304 Ok(()) => res_ch.send(self.get_metrics())?,
305 Err(RecvTimeoutError::Timeout) => {}
306 Err(e) => Err(e)?,
307 }
308 }
309
310 let _ = self.struct_ops.take();
311 uei_report!(&self.skel, uei)
312 }
313
314 fn print_topology(&mut self) -> Result<()> {
315 let input = ProgramInput {
316 ..Default::default()
317 };
318
319 let output = self.skel.progs.arena_topology_print.test_run(input)?;
320 if output.return_value != 0 {
321 bail!(
322 "Could not initialize arenas, topo_print returned {}",
323 output.return_value as i32
324 );
325 }
326
327 Ok(())
328 }
329
330 fn start(&mut self) -> Result<()> {
331 self.struct_ops = Some(scx_ops_attach!(self.skel, p2dq)?);
332
333 if self.debug_level > 0 {
334 self.print_topology()?;
335 }
336
337 info!("P2DQ scheduler started! Run `scx_p2dq --monitor` for metrics.");
338
339 Ok(())
340 }
341}
342
343impl Drop for Scheduler<'_> {
344 fn drop(&mut self) {
345 info!("Unregister {SCHEDULER_NAME} scheduler");
346
347 if let Some(struct_ops) = self.struct_ops.take() {
348 drop(struct_ops);
349 }
350 }
351}
352
353#[clap_main::clap_main]
354fn main(opts: CliOpts) -> Result<()> {
355 if opts.version {
356 println!(
357 "scx_p2dq: {}",
358 build_id::full_version(env!("CARGO_PKG_VERSION"))
359 );
360 return Ok(());
361 }
362
363 let env_filter = EnvFilter::try_from_default_env()
364 .or_else(|_| match EnvFilter::try_new(&opts.log_level) {
365 Ok(filter) => Ok(filter),
366 Err(e) => {
367 eprintln!(
368 "invalid log envvar: {}, using info, err is: {}",
369 opts.log_level, e
370 );
371 EnvFilter::try_new("info")
372 }
373 })
374 .unwrap_or_else(|_| EnvFilter::new("info"));
375
376 match tracing_subscriber::fmt()
377 .with_env_filter(env_filter)
378 .with_target(true)
379 .with_thread_ids(true)
380 .with_file(true)
381 .with_line_number(true)
382 .try_init()
383 {
384 Ok(()) => {}
385 Err(e) => eprintln!("failed to init logger: {}", e),
386 }
387
388 if opts.verbose > 0 {
389 warn!("Setting verbose via -v is deprecated and will be an error in future releases.");
390 }
391
392 if let Some(run_id) = opts.run_id {
393 info!("scx_p2dq run_id: {}", run_id);
394 }
395
396 let shutdown = Arc::new(AtomicBool::new(false));
397 let shutdown_clone = shutdown.clone();
398 ctrlc::set_handler(move || {
399 shutdown_clone.store(true, Ordering::Relaxed);
400 })
401 .context("Error setting Ctrl-C handler")?;
402
403 if let Some(intv) = opts.monitor.or(opts.stats) {
404 let shutdown_copy = shutdown.clone();
405 let jh = std::thread::spawn(move || {
406 match stats::monitor(Duration::from_secs_f64(intv), shutdown_copy) {
407 Ok(_) => {
408 debug!("stats monitor thread finished successfully")
409 }
410 Err(error_object) => {
411 warn!("stats monitor thread finished because of an error {error_object}")
412 }
413 }
414 });
415 if opts.monitor.is_some() {
416 let _ = jh.join();
417 return Ok(());
418 }
419 }
420
421 if let Some(idle_resume_us) = opts.sched.idle_resume_us {
422 if !cpu_idle_resume_latency_supported() {
423 warn!("idle resume latency not supported");
424 } else if idle_resume_us > 0 {
425 info!("Setting idle QoS to {idle_resume_us}us");
426 for cpu in TOPO.all_cpus.values() {
427 update_cpu_idle_resume_latency(cpu.id, idle_resume_us.try_into().unwrap())?;
428 }
429 }
430 }
431
432 let is_efficiency = opts.sched.sched_mode == scx_p2dq::SchedMode::Efficiency;
433 let is_performance = opts.sched.sched_mode == scx_p2dq::SchedMode::Performance;
434
435 let mut orig_uncore_freqs: Vec<(u32, u32, u32)> = Vec::new();
436 if opts.sched.uncore_max_freq_mhz.is_some() || is_efficiency || is_performance {
437 if !uncore_freq_supported() {
438 if opts.sched.uncore_max_freq_mhz.is_some() {
439 warn!("uncore frequency control not supported");
440 }
441 } else {
442 let _ = for_each_uncore_domain(|pkg, die| {
443 let freq_khz = if let Some(mhz) = opts.sched.uncore_max_freq_mhz {
444 mhz * 1000
445 } else if is_efficiency {
446 get_uncore_min_freq_khz(pkg, die)?
447 } else {
448 get_uncore_max_freq_khz(pkg, die)?
449 };
450 if let Ok(orig) = get_uncore_max_freq_khz(pkg, die) {
451 if orig != freq_khz {
452 info!(
453 "Setting max uncore frequency for package {} die {} to {} MHz",
454 pkg,
455 die,
456 freq_khz / 1000
457 );
458 orig_uncore_freqs.push((pkg, die, orig));
459 set_uncore_max_freq_khz(pkg, die, freq_khz)?;
460 }
461 }
462 Ok(())
463 });
464 }
465 }
466
467 let mut orig_epps: Vec<(usize, String)> = Vec::new();
468 if (is_efficiency || is_performance) && epp_supported() {
469 let target_epp = if is_efficiency {
470 "power"
471 } else {
472 "performance"
473 };
474 for cpu in TOPO.all_cpus.values() {
475 if let Ok(orig) = get_epp(cpu.id) {
476 if orig != target_epp {
477 if orig_epps.is_empty() {
478 info!("Setting EPP to {} for all CPUs", target_epp);
479 }
480 orig_epps.push((cpu.id, orig));
481 let _ = set_epp(cpu.id, target_epp);
482 }
483 }
484 }
485 }
486
487 let orig_turbo = if turbo_supported() {
488 let target_turbo = opts.sched.turbo.or(if is_efficiency {
489 Some(false)
490 } else if is_performance {
491 Some(true)
492 } else {
493 None
494 });
495 if let Some(want_enabled) = target_turbo {
496 if let Ok(current) = get_turbo_enabled() {
497 if current != want_enabled {
498 let mode_suffix = if opts.sched.turbo.is_none() {
499 if is_efficiency {
500 " for efficiency mode"
501 } else {
502 " for performance mode"
503 }
504 } else {
505 ""
506 };
507 info!(
508 "{} turbo{}",
509 if want_enabled {
510 "Enabling"
511 } else {
512 "Disabling"
513 },
514 mode_suffix
515 );
516 let _ = set_turbo_enabled(want_enabled);
517 Some(current)
518 } else {
519 None
520 }
521 } else {
522 None
523 }
524 } else {
525 None
526 }
527 } else {
528 if opts.sched.turbo.is_some() {
529 warn!("turbo control not supported");
530 }
531 None
532 };
533
534 let mut open_object = MaybeUninit::uninit();
535 loop {
536 let mut sched =
537 Scheduler::init(&opts.sched, &opts.libbpf, &mut open_object, &opts.log_level)?;
538 let task_size = std::mem::size_of::<types::task_p2dq>();
539 let arenalib = ArenaLib::init(sched.skel.object_mut(), task_size, *NR_CPU_IDS)?;
540 arenalib.setup()?;
541
542 sched.start()?;
543
544 if !sched.run(shutdown.clone())?.should_restart() {
545 break;
546 }
547 }
548
549 if let Some(was_enabled) = orig_turbo {
550 info!(
551 "Restoring turbo to {}",
552 if was_enabled { "enabled" } else { "disabled" }
553 );
554 let _ = set_turbo_enabled(was_enabled);
555 }
556
557 if !orig_epps.is_empty() {
558 info!("Restoring EPP settings");
559 for (cpu, epp) in orig_epps {
560 let _ = set_epp(cpu, &epp);
561 }
562 }
563
564 for (pkg, die, orig_khz) in orig_uncore_freqs {
565 info!(
566 "Restoring uncore frequency for package {} die {} to {} MHz",
567 pkg,
568 die,
569 orig_khz / 1000
570 );
571 let _ = set_uncore_max_freq_khz(pkg, die, orig_khz);
572 }
573
574 Ok(())
575}