1pub mod stats;
6use stats::Metrics;
7
8use std::mem::MaybeUninit;
9use std::sync::atomic::AtomicBool;
10use std::sync::atomic::Ordering;
11use std::sync::Arc;
12use std::time::Duration;
13
14use anyhow::bail;
15use anyhow::Context;
16use anyhow::Result;
17use clap::Parser;
18use crossbeam::channel::RecvTimeoutError;
19use libbpf_rs::skel::Skel;
20use libbpf_rs::AsRawLibbpf;
21use libbpf_rs::MapCore as _;
22use libbpf_rs::OpenObject;
23use libbpf_rs::ProgramInput;
24use scx_arena::ArenaLib;
25use scx_stats::prelude::*;
26use scx_utils::build_id;
27use scx_utils::compat;
28use scx_utils::init_libbpf_logging;
29use scx_utils::libbpf_clap_opts::LibbpfOpts;
30use scx_utils::pm::{cpu_idle_resume_latency_supported, update_cpu_idle_resume_latency};
31use scx_utils::scx_ops_attach;
32use scx_utils::scx_ops_load;
33use scx_utils::scx_ops_open;
34use scx_utils::uei_exited;
35use scx_utils::uei_report;
36use scx_utils::Topology;
37use scx_utils::UserExitInfo;
38use scx_utils::NR_CPU_IDS;
39use tracing::{debug, info, warn};
40use tracing_subscriber::filter::EnvFilter;
41
42use bpf_intf::stat_idx_P2DQ_NR_STATS;
43use bpf_intf::stat_idx_P2DQ_STAT_ATQ_ENQ;
44use bpf_intf::stat_idx_P2DQ_STAT_ATQ_REENQ;
45use bpf_intf::stat_idx_P2DQ_STAT_DIRECT;
46use bpf_intf::stat_idx_P2DQ_STAT_DISPATCH_PICK2;
47use bpf_intf::stat_idx_P2DQ_STAT_DSQ_CHANGE;
48use bpf_intf::stat_idx_P2DQ_STAT_DSQ_SAME;
49use bpf_intf::stat_idx_P2DQ_STAT_ENQ_CPU;
50use bpf_intf::stat_idx_P2DQ_STAT_ENQ_INTR;
51use bpf_intf::stat_idx_P2DQ_STAT_ENQ_LLC;
52use bpf_intf::stat_idx_P2DQ_STAT_ENQ_MIG;
53use bpf_intf::stat_idx_P2DQ_STAT_EXEC_BALANCE;
54use bpf_intf::stat_idx_P2DQ_STAT_EXEC_SAME_LLC;
55use bpf_intf::stat_idx_P2DQ_STAT_FORK_BALANCE;
56use bpf_intf::stat_idx_P2DQ_STAT_FORK_SAME_LLC;
57use bpf_intf::stat_idx_P2DQ_STAT_IDLE;
58use bpf_intf::stat_idx_P2DQ_STAT_KEEP;
59use bpf_intf::stat_idx_P2DQ_STAT_LLC_MIGRATION;
60use bpf_intf::stat_idx_P2DQ_STAT_NODE_MIGRATION;
61use bpf_intf::stat_idx_P2DQ_STAT_SELECT_PICK2;
62use bpf_intf::stat_idx_P2DQ_STAT_WAKE_LLC;
63use bpf_intf::stat_idx_P2DQ_STAT_WAKE_MIG;
64use bpf_intf::stat_idx_P2DQ_STAT_WAKE_PREV;
65use scx_p2dq::bpf_intf;
66use scx_p2dq::bpf_skel::*;
67use scx_p2dq::SchedulerOpts;
68use scx_p2dq::TOPO;
69
70const SCHEDULER_NAME: &str = "scx_p2dq";
71#[derive(Debug, Parser)]
77struct CliOpts {
78 #[clap(short = 'v', long, action = clap::ArgAction::Count)]
80 verbose: u8,
81
82 #[clap(long, default_value = "info")]
85 pub log_level: String,
86
87 #[clap(long)]
89 pub stats: Option<f64>,
90
91 #[clap(long)]
94 pub monitor: Option<f64>,
95
96 #[clap(long)]
98 pub version: bool,
99
100 #[clap(long)]
102 pub run_id: Option<u64>,
103
104 #[clap(flatten)]
105 pub sched: SchedulerOpts,
106
107 #[clap(flatten, next_help_heading = "Libbpf Options")]
108 pub libbpf: LibbpfOpts,
109}
110
111struct Scheduler<'a> {
112 skel: BpfSkel<'a>,
113 struct_ops: Option<libbpf_rs::Link>,
114 debug_level: u8,
115
116 stats_server: StatsServer<(), Metrics>,
117}
118
119impl<'a> Scheduler<'a> {
120 fn init(
121 opts: &SchedulerOpts,
122 libbpf_ops: &LibbpfOpts,
123 open_object: &'a mut MaybeUninit<OpenObject>,
124 log_level: &str,
125 ) -> Result<Self> {
126 let debug_level = if log_level.contains("trace") {
128 2
129 } else if log_level.contains("debug") {
130 1
131 } else {
132 0
133 };
134 let mut skel_builder = BpfSkelBuilder::default();
135 skel_builder.obj_builder.debug(debug_level > 1);
136 init_libbpf_logging(None);
137 info!(
138 "Running scx_p2dq (build ID: {})",
139 build_id::full_version(env!("CARGO_PKG_VERSION"))
140 );
141 let topo = if opts.virt_llc_enabled {
142 Topology::with_args(&opts.topo)?
143 } else {
144 Topology::new()?
145 };
146 let open_opts = libbpf_ops.clone().into_bpf_open_opts();
147 let mut open_skel = scx_ops_open!(skel_builder, open_object, p2dq, open_opts).context(
148 "Failed to open BPF object. This can be caused by a mismatch between the kernel \
149 version and the BPF object, permission or other libbpf issues. Try running `dmesg \
150 | grep bpf` to see if there are any error messages related to the BPF object. See \
151 the LibbpfOptions section in the help for more information on configuration related \
152 to this issue or file an issue on the scx repo if the problem persists. \
153 https://github.com/sched-ext/scx/issues/new?labels=scx_p2dq&title=scx_p2dq:%20New%20Issue&assignees=hodgesds&body=Kernel%20version:%20(fill%20me%20out)%0ADistribution:%20(fill%20me%20out)%0AHardware:%20(fill%20me%20out)%0A%0AIssue:%20(fill%20me%20out)"
154 )?;
155
156 let hw_profile = scx_p2dq::HardwareProfile::detect();
158 let mut opts_optimized = opts.clone();
159 if opts.hw_auto_optimize {
160 hw_profile.optimize_scheduler_opts(&mut opts_optimized);
161 }
162
163 scx_p2dq::init_open_skel!(
164 &mut open_skel,
165 topo,
166 &opts_optimized,
167 debug_level,
168 &hw_profile
169 )?;
170
171 if opts.queued_wakeup {
172 open_skel.struct_ops.p2dq_mut().flags |= *compat::SCX_OPS_ALLOW_QUEUED_WAKEUP;
173 }
174 open_skel.struct_ops.p2dq_mut().flags |= *compat::SCX_OPS_KEEP_BUILTIN_IDLE;
175
176 unsafe {
180 libbpf_rs::libbpf_sys::bpf_map__set_autoattach(
181 open_skel.maps.p2dq.as_libbpf_object().as_ptr(),
182 false,
183 );
184 }
185
186 let mut skel = scx_ops_load!(open_skel, p2dq, uei)?;
187 scx_p2dq::init_skel!(&mut skel, topo);
188
189 let stats_server = StatsServer::new(stats::server_data()).launch()?;
190
191 Ok(Self {
192 skel,
193 struct_ops: None,
194 debug_level,
195 stats_server,
196 })
197 }
198
199 fn get_metrics(&self) -> Metrics {
200 let mut stats = vec![0u64; stat_idx_P2DQ_NR_STATS as usize];
201 let stats_map = &self.skel.maps.stats;
202 for stat in 0..stat_idx_P2DQ_NR_STATS {
203 let cpu_stat_vec: Vec<Vec<u8>> = stats_map
204 .lookup_percpu(&stat.to_ne_bytes(), libbpf_rs::MapFlags::ANY)
205 .unwrap()
206 .unwrap();
207 let sum: u64 = cpu_stat_vec
208 .iter()
209 .map(|val| u64::from_ne_bytes(val.as_slice().try_into().unwrap()))
210 .sum();
211 stats[stat as usize] = sum;
212 }
213 Metrics {
214 atq_enq: stats[stat_idx_P2DQ_STAT_ATQ_ENQ as usize],
215 atq_reenq: stats[stat_idx_P2DQ_STAT_ATQ_REENQ as usize],
216 direct: stats[stat_idx_P2DQ_STAT_DIRECT as usize],
217 idle: stats[stat_idx_P2DQ_STAT_IDLE as usize],
218 dsq_change: stats[stat_idx_P2DQ_STAT_DSQ_CHANGE as usize],
219 same_dsq: stats[stat_idx_P2DQ_STAT_DSQ_SAME as usize],
220 keep: stats[stat_idx_P2DQ_STAT_KEEP as usize],
221 enq_cpu: stats[stat_idx_P2DQ_STAT_ENQ_CPU as usize],
222 enq_intr: stats[stat_idx_P2DQ_STAT_ENQ_INTR as usize],
223 enq_llc: stats[stat_idx_P2DQ_STAT_ENQ_LLC as usize],
224 enq_mig: stats[stat_idx_P2DQ_STAT_ENQ_MIG as usize],
225 select_pick2: stats[stat_idx_P2DQ_STAT_SELECT_PICK2 as usize],
226 dispatch_pick2: stats[stat_idx_P2DQ_STAT_DISPATCH_PICK2 as usize],
227 llc_migrations: stats[stat_idx_P2DQ_STAT_LLC_MIGRATION as usize],
228 node_migrations: stats[stat_idx_P2DQ_STAT_NODE_MIGRATION as usize],
229 wake_prev: stats[stat_idx_P2DQ_STAT_WAKE_PREV as usize],
230 wake_llc: stats[stat_idx_P2DQ_STAT_WAKE_LLC as usize],
231 wake_mig: stats[stat_idx_P2DQ_STAT_WAKE_MIG as usize],
232 fork_balance: stats[stat_idx_P2DQ_STAT_FORK_BALANCE as usize],
233 exec_balance: stats[stat_idx_P2DQ_STAT_EXEC_BALANCE as usize],
234 fork_same_llc: stats[stat_idx_P2DQ_STAT_FORK_SAME_LLC as usize],
235 exec_same_llc: stats[stat_idx_P2DQ_STAT_EXEC_SAME_LLC as usize],
236 }
237 }
238
239 fn run(&mut self, shutdown: Arc<AtomicBool>) -> Result<UserExitInfo> {
240 let (res_ch, req_ch) = self.stats_server.channels();
241
242 while !shutdown.load(Ordering::Relaxed) && !uei_exited!(&self.skel, uei) {
243 match req_ch.recv_timeout(Duration::from_secs(1)) {
244 Ok(()) => res_ch.send(self.get_metrics())?,
245 Err(RecvTimeoutError::Timeout) => {}
246 Err(e) => Err(e)?,
247 }
248 }
249
250 let _ = self.struct_ops.take();
251 uei_report!(&self.skel, uei)
252 }
253
254 fn print_topology(&mut self) -> Result<()> {
255 let input = ProgramInput {
256 ..Default::default()
257 };
258
259 let output = self.skel.progs.arena_topology_print.test_run(input)?;
260 if output.return_value != 0 {
261 bail!(
262 "Could not initialize arenas, topo_print returned {}",
263 output.return_value as i32
264 );
265 }
266
267 Ok(())
268 }
269
270 fn start(&mut self) -> Result<()> {
271 self.struct_ops = Some(scx_ops_attach!(self.skel, p2dq)?);
272
273 if self.debug_level > 0 {
274 self.print_topology()?;
275 }
276
277 info!("P2DQ scheduler started! Run `scx_p2dq --monitor` for metrics.");
278
279 Ok(())
280 }
281}
282
283impl Drop for Scheduler<'_> {
284 fn drop(&mut self) {
285 info!("Unregister {SCHEDULER_NAME} scheduler");
286
287 if let Some(struct_ops) = self.struct_ops.take() {
288 drop(struct_ops);
289 }
290 }
291}
292
293#[clap_main::clap_main]
294fn main(opts: CliOpts) -> Result<()> {
295 if opts.version {
296 println!(
297 "scx_p2dq: {}",
298 build_id::full_version(env!("CARGO_PKG_VERSION"))
299 );
300 return Ok(());
301 }
302
303 let env_filter = EnvFilter::try_from_default_env()
304 .or_else(|_| match EnvFilter::try_new(&opts.log_level) {
305 Ok(filter) => Ok(filter),
306 Err(e) => {
307 eprintln!(
308 "invalid log envvar: {}, using info, err is: {}",
309 opts.log_level, e
310 );
311 EnvFilter::try_new("info")
312 }
313 })
314 .unwrap_or_else(|_| EnvFilter::new("info"));
315
316 match tracing_subscriber::fmt()
317 .with_env_filter(env_filter)
318 .with_target(true)
319 .with_thread_ids(true)
320 .with_file(true)
321 .with_line_number(true)
322 .try_init()
323 {
324 Ok(()) => {}
325 Err(e) => eprintln!("failed to init logger: {}", e),
326 }
327
328 if opts.verbose > 0 {
329 warn!("Setting verbose via -v is depricated and will be an error in future releases.");
330 }
331
332 if let Some(run_id) = opts.run_id {
333 info!("scx_p2dq run_id: {}", run_id);
334 }
335
336 let shutdown = Arc::new(AtomicBool::new(false));
337 let shutdown_clone = shutdown.clone();
338 ctrlc::set_handler(move || {
339 shutdown_clone.store(true, Ordering::Relaxed);
340 })
341 .context("Error setting Ctrl-C handler")?;
342
343 if let Some(intv) = opts.monitor.or(opts.stats) {
344 let shutdown_copy = shutdown.clone();
345 let jh = std::thread::spawn(move || {
346 match stats::monitor(Duration::from_secs_f64(intv), shutdown_copy) {
347 Ok(_) => {
348 debug!("stats monitor thread finished successfully")
349 }
350 Err(error_object) => {
351 warn!("stats monitor thread finished because of an error {error_object}")
352 }
353 }
354 });
355 if opts.monitor.is_some() {
356 let _ = jh.join();
357 return Ok(());
358 }
359 }
360
361 if let Some(idle_resume_us) = opts.sched.idle_resume_us {
362 if !cpu_idle_resume_latency_supported() {
363 warn!("idle resume latency not supported");
364 } else if idle_resume_us > 0 {
365 info!("Setting idle QoS to {idle_resume_us}us");
366 for cpu in TOPO.all_cpus.values() {
367 update_cpu_idle_resume_latency(cpu.id, idle_resume_us.try_into().unwrap())?;
368 }
369 }
370 }
371
372 let mut open_object = MaybeUninit::uninit();
373 loop {
374 let mut sched =
375 Scheduler::init(&opts.sched, &opts.libbpf, &mut open_object, &opts.log_level)?;
376 let task_size = std::mem::size_of::<types::task_p2dq>();
377 let arenalib = ArenaLib::init(sched.skel.object_mut(), task_size, *NR_CPU_IDS)?;
378 arenalib.setup()?;
379
380 sched.start()?;
381
382 if !sched.run(shutdown.clone())?.should_restart() {
383 break;
384 }
385 }
386 Ok(())
387}