1pub mod stats;
6use stats::Metrics;
7
8use std::mem::MaybeUninit;
9use std::sync::atomic::AtomicBool;
10use std::sync::atomic::Ordering;
11use std::sync::Arc;
12use std::time::Duration;
13
14use anyhow::bail;
15use anyhow::Context;
16use anyhow::Result;
17use arenalib::ArenaLib;
18use clap::Parser;
19use crossbeam::channel::RecvTimeoutError;
20use libbpf_rs::skel::Skel;
21use libbpf_rs::MapCore as _;
22use libbpf_rs::OpenObject;
23use libbpf_rs::ProgramInput;
24use log::{debug, info, warn};
25use scx_stats::prelude::*;
26use scx_utils::build_id;
27use scx_utils::compat;
28use scx_utils::init_libbpf_logging;
29use scx_utils::libbpf_clap_opts::LibbpfOpts;
30use scx_utils::pm::{cpu_idle_resume_latency_supported, update_cpu_idle_resume_latency};
31use scx_utils::scx_ops_attach;
32use scx_utils::scx_ops_load;
33use scx_utils::scx_ops_open;
34use scx_utils::uei_exited;
35use scx_utils::uei_report;
36use scx_utils::Topology;
37use scx_utils::UserExitInfo;
38use scx_utils::NR_CPU_IDS;
39
40use bpf_intf::stat_idx_P2DQ_NR_STATS;
41use bpf_intf::stat_idx_P2DQ_STAT_ATQ_ENQ;
42use bpf_intf::stat_idx_P2DQ_STAT_ATQ_REENQ;
43use bpf_intf::stat_idx_P2DQ_STAT_DIRECT;
44use bpf_intf::stat_idx_P2DQ_STAT_DISPATCH_PICK2;
45use bpf_intf::stat_idx_P2DQ_STAT_DSQ_CHANGE;
46use bpf_intf::stat_idx_P2DQ_STAT_DSQ_SAME;
47use bpf_intf::stat_idx_P2DQ_STAT_ENQ_CPU;
48use bpf_intf::stat_idx_P2DQ_STAT_ENQ_INTR;
49use bpf_intf::stat_idx_P2DQ_STAT_ENQ_LLC;
50use bpf_intf::stat_idx_P2DQ_STAT_ENQ_MIG;
51use bpf_intf::stat_idx_P2DQ_STAT_IDLE;
52use bpf_intf::stat_idx_P2DQ_STAT_KEEP;
53use bpf_intf::stat_idx_P2DQ_STAT_LLC_MIGRATION;
54use bpf_intf::stat_idx_P2DQ_STAT_NODE_MIGRATION;
55use bpf_intf::stat_idx_P2DQ_STAT_SELECT_PICK2;
56use bpf_intf::stat_idx_P2DQ_STAT_WAKE_LLC;
57use bpf_intf::stat_idx_P2DQ_STAT_WAKE_MIG;
58use bpf_intf::stat_idx_P2DQ_STAT_WAKE_PREV;
59use scx_p2dq::bpf_intf;
60use scx_p2dq::bpf_skel::*;
61use scx_p2dq::SchedulerOpts;
62use scx_p2dq::TOPO;
63
64const SCHEDULER_NAME: &str = "scx_p2dq";
65#[derive(Debug, Parser)]
71struct CliOpts {
72 #[clap(short = 'v', long, action = clap::ArgAction::Count)]
75 pub verbose: u8,
76
77 #[clap(long)]
79 pub stats: Option<f64>,
80
81 #[clap(long)]
84 pub monitor: Option<f64>,
85
86 #[clap(long)]
88 pub version: bool,
89
90 #[clap(flatten)]
91 pub sched: SchedulerOpts,
92
93 #[clap(flatten, next_help_heading = "Libbpf Options")]
94 pub libbpf: LibbpfOpts,
95}
96
97struct Scheduler<'a> {
98 skel: BpfSkel<'a>,
99 struct_ops: Option<libbpf_rs::Link>,
100 verbose: u8,
101
102 stats_server: StatsServer<(), Metrics>,
103}
104
105impl<'a> Scheduler<'a> {
106 fn init(
107 opts: &SchedulerOpts,
108 libbpf_ops: &LibbpfOpts,
109 open_object: &'a mut MaybeUninit<OpenObject>,
110 verbose: u8,
111 ) -> Result<Self> {
112 let mut skel_builder = BpfSkelBuilder::default();
114 skel_builder.obj_builder.debug(verbose > 1);
115 init_libbpf_logging(None);
116 info!(
117 "Running scx_p2dq (build ID: {})",
118 build_id::full_version(env!("CARGO_PKG_VERSION"))
119 );
120 let topo = if opts.virt_llc_enabled {
121 Topology::with_args(&opts.topo)?
122 } else {
123 Topology::new()?
124 };
125 let open_opts = libbpf_ops.clone().into_bpf_open_opts();
126 let mut open_skel = scx_ops_open!(skel_builder, open_object, p2dq, open_opts).context(
127 "Failed to open BPF object. This can be caused by a mismatch between the kernel \
128 version and the BPF object, permission or other libbpf issues. Try running `dmesg \
129 | grep bpf` to see if there are any error messages related to the BPF object. See \
130 the LibbpfOptions section in the help for more information on configuration related \
131 to this issue or file an issue on the scx repo if the problem persists. \
132 https://github.com/sched-ext/scx/issues/new?labels=scx_p2dq&title=scx_p2dq:%20New%20Issue&assignees=hodgesds&body=Kernel%20version:%20(fill%20me%20out)%0ADistribution:%20(fill%20me%20out)%0AHardware:%20(fill%20me%20out)%0A%0AIssue:%20(fill%20me%20out)"
133 )?;
134 scx_p2dq::init_open_skel!(&mut open_skel, topo, opts, verbose)?;
135
136 if opts.queued_wakeup {
137 open_skel.struct_ops.p2dq_mut().flags |= *compat::SCX_OPS_ALLOW_QUEUED_WAKEUP;
138 }
139 open_skel.struct_ops.p2dq_mut().flags |= *compat::SCX_OPS_KEEP_BUILTIN_IDLE;
140
141 let mut skel = scx_ops_load!(open_skel, p2dq, uei)?;
142 scx_p2dq::init_skel!(&mut skel, topo);
143
144 let stats_server = StatsServer::new(stats::server_data()).launch()?;
145
146 Ok(Self {
147 skel,
148 struct_ops: None,
149 verbose,
150 stats_server,
151 })
152 }
153
154 fn get_metrics(&self) -> Metrics {
155 let mut stats = vec![0u64; stat_idx_P2DQ_NR_STATS as usize];
156 let stats_map = &self.skel.maps.stats;
157 for stat in 0..stat_idx_P2DQ_NR_STATS {
158 let cpu_stat_vec: Vec<Vec<u8>> = stats_map
159 .lookup_percpu(&stat.to_ne_bytes(), libbpf_rs::MapFlags::ANY)
160 .unwrap()
161 .unwrap();
162 let sum: u64 = cpu_stat_vec
163 .iter()
164 .map(|val| u64::from_ne_bytes(val.as_slice().try_into().unwrap()))
165 .sum();
166 stats[stat as usize] = sum;
167 }
168 Metrics {
169 atq_enq: stats[stat_idx_P2DQ_STAT_ATQ_ENQ as usize],
170 atq_reenq: stats[stat_idx_P2DQ_STAT_ATQ_REENQ as usize],
171 direct: stats[stat_idx_P2DQ_STAT_DIRECT as usize],
172 idle: stats[stat_idx_P2DQ_STAT_IDLE as usize],
173 dsq_change: stats[stat_idx_P2DQ_STAT_DSQ_CHANGE as usize],
174 same_dsq: stats[stat_idx_P2DQ_STAT_DSQ_SAME as usize],
175 keep: stats[stat_idx_P2DQ_STAT_KEEP as usize],
176 enq_cpu: stats[stat_idx_P2DQ_STAT_ENQ_CPU as usize],
177 enq_intr: stats[stat_idx_P2DQ_STAT_ENQ_INTR as usize],
178 enq_llc: stats[stat_idx_P2DQ_STAT_ENQ_LLC as usize],
179 enq_mig: stats[stat_idx_P2DQ_STAT_ENQ_MIG as usize],
180 select_pick2: stats[stat_idx_P2DQ_STAT_SELECT_PICK2 as usize],
181 dispatch_pick2: stats[stat_idx_P2DQ_STAT_DISPATCH_PICK2 as usize],
182 llc_migrations: stats[stat_idx_P2DQ_STAT_LLC_MIGRATION as usize],
183 node_migrations: stats[stat_idx_P2DQ_STAT_NODE_MIGRATION as usize],
184 wake_prev: stats[stat_idx_P2DQ_STAT_WAKE_PREV as usize],
185 wake_llc: stats[stat_idx_P2DQ_STAT_WAKE_LLC as usize],
186 wake_mig: stats[stat_idx_P2DQ_STAT_WAKE_MIG as usize],
187 }
188 }
189
190 fn run(&mut self, shutdown: Arc<AtomicBool>) -> Result<UserExitInfo> {
191 let (res_ch, req_ch) = self.stats_server.channels();
192
193 while !shutdown.load(Ordering::Relaxed) && !uei_exited!(&self.skel, uei) {
194 match req_ch.recv_timeout(Duration::from_secs(1)) {
195 Ok(()) => res_ch.send(self.get_metrics())?,
196 Err(RecvTimeoutError::Timeout) => {}
197 Err(e) => Err(e)?,
198 }
199 }
200
201 let _ = self.struct_ops.take();
202 uei_report!(&self.skel, uei)
203 }
204
205 fn print_topology(&mut self) -> Result<()> {
206 let input = ProgramInput {
207 ..Default::default()
208 };
209
210 let output = self.skel.progs.arena_topology_print.test_run(input)?;
211 if output.return_value != 0 {
212 bail!(
213 "Could not initialize arenas, topo_print returned {}",
214 output.return_value as i32
215 );
216 }
217
218 Ok(())
219 }
220
221 fn start(&mut self) -> Result<()> {
222 self.struct_ops = Some(scx_ops_attach!(self.skel, p2dq)?);
223
224 if self.verbose > 1 {
225 self.print_topology()?;
226 }
227
228 info!("P2DQ scheduler started! Run `scx_p2dq --monitor` for metrics.");
229
230 Ok(())
231 }
232}
233
234impl Drop for Scheduler<'_> {
235 fn drop(&mut self) {
236 info!("Unregister {SCHEDULER_NAME} scheduler");
237
238 if let Some(struct_ops) = self.struct_ops.take() {
239 drop(struct_ops);
240 }
241 }
242}
243
244fn main() -> Result<()> {
245 let opts = CliOpts::parse();
246
247 if opts.version {
248 println!(
249 "scx_p2dq: {}",
250 build_id::full_version(env!("CARGO_PKG_VERSION"))
251 );
252 return Ok(());
253 }
254
255 let llv = match opts.verbose {
256 0 => simplelog::LevelFilter::Info,
257 1 => simplelog::LevelFilter::Debug,
258 _ => simplelog::LevelFilter::Trace,
259 };
260 let mut lcfg = simplelog::ConfigBuilder::new();
261 lcfg.set_time_offset_to_local()
262 .expect("Failed to set local time offset")
263 .set_time_level(simplelog::LevelFilter::Error)
264 .set_location_level(simplelog::LevelFilter::Off)
265 .set_target_level(simplelog::LevelFilter::Off)
266 .set_thread_level(simplelog::LevelFilter::Off);
267 simplelog::TermLogger::init(
268 llv,
269 lcfg.build(),
270 simplelog::TerminalMode::Stderr,
271 simplelog::ColorChoice::Auto,
272 )?;
273
274 let shutdown = Arc::new(AtomicBool::new(false));
275 let shutdown_clone = shutdown.clone();
276 ctrlc::set_handler(move || {
277 shutdown_clone.store(true, Ordering::Relaxed);
278 })
279 .context("Error setting Ctrl-C handler")?;
280
281 if let Some(intv) = opts.monitor.or(opts.stats) {
282 let shutdown_copy = shutdown.clone();
283 let jh = std::thread::spawn(move || {
284 match stats::monitor(Duration::from_secs_f64(intv), shutdown_copy) {
285 Ok(_) => {
286 debug!("stats monitor thread finished successfully")
287 }
288 Err(error_object) => {
289 warn!("stats monitor thread finished because of an error {error_object}")
290 }
291 }
292 });
293 if opts.monitor.is_some() {
294 let _ = jh.join();
295 return Ok(());
296 }
297 }
298
299 if let Some(idle_resume_us) = opts.sched.idle_resume_us {
300 if !cpu_idle_resume_latency_supported() {
301 warn!("idle resume latency not supported");
302 } else if idle_resume_us > 0 {
303 info!("Setting idle QoS to {idle_resume_us}us");
304 for cpu in TOPO.all_cpus.values() {
305 update_cpu_idle_resume_latency(cpu.id, idle_resume_us.try_into().unwrap())?;
306 }
307 }
308 }
309
310 let mut open_object = MaybeUninit::uninit();
311 loop {
312 let mut sched = Scheduler::init(&opts.sched, &opts.libbpf, &mut open_object, opts.verbose)?;
313 let task_size = std::mem::size_of::<types::task_p2dq>();
314 let arenalib = ArenaLib::init(sched.skel.object_mut(), task_size, *NR_CPU_IDS)?;
315 arenalib.setup()?;
316
317 sched.start()?;
318
319 if !sched.run(shutdown.clone())?.should_restart() {
320 break;
321 }
322 }
323 Ok(())
324}