1mod bpf_skel;
6pub use bpf_skel::*;
7
8pub mod bpf_intf;
9pub mod stats;
10use stats::Metrics;
11
12use scx_p2dq::SchedulerOpts;
13use scx_p2dq::TOPO;
14
15use std::mem::MaybeUninit;
16use std::sync::Arc;
17use std::sync::atomic::AtomicBool;
18use std::sync::atomic::Ordering;
19use std::time::Duration;
20
21use anyhow::Context;
22use anyhow::Result;
23use clap::Parser;
24use crossbeam::channel::RecvTimeoutError;
25use libbpf_rs::MapCore as _;
26use libbpf_rs::OpenObject;
27use log::{debug, info, warn};
28use scx_stats::prelude::*;
29use scx_utils::UserExitInfo;
30use scx_utils::build_id;
31use scx_utils::compat;
32use scx_utils::init_libbpf_logging;
33use scx_utils::pm::{cpu_idle_resume_latency_supported, update_cpu_idle_resume_latency};
34use scx_utils::scx_ops_attach;
35use scx_utils::scx_ops_load;
36use scx_utils::scx_ops_open;
37use scx_utils::uei_exited;
38use scx_utils::uei_report;
39
40use crate::bpf_intf::stat_idx_P2DQ_NR_STATS;
41use crate::bpf_intf::stat_idx_P2DQ_STAT_DIRECT;
42use crate::bpf_intf::stat_idx_P2DQ_STAT_DISPATCH_PICK2;
43use crate::bpf_intf::stat_idx_P2DQ_STAT_DSQ_CHANGE;
44use crate::bpf_intf::stat_idx_P2DQ_STAT_DSQ_SAME;
45use crate::bpf_intf::stat_idx_P2DQ_STAT_IDLE;
46use crate::bpf_intf::stat_idx_P2DQ_STAT_KEEP;
47use crate::bpf_intf::stat_idx_P2DQ_STAT_LLC_MIGRATION;
48use crate::bpf_intf::stat_idx_P2DQ_STAT_NODE_MIGRATION;
49use crate::bpf_intf::stat_idx_P2DQ_STAT_SELECT_PICK2;
50use crate::bpf_intf::stat_idx_P2DQ_STAT_WAKE_LLC;
51use crate::bpf_intf::stat_idx_P2DQ_STAT_WAKE_MIG;
52use crate::bpf_intf::stat_idx_P2DQ_STAT_WAKE_PREV;
53
54#[derive(Debug, Parser)]
60struct CliOpts {
61 #[clap(short = 'v', long, action = clap::ArgAction::Count)]
64 pub verbose: u8,
65
66 #[clap(long)]
68 pub stats: Option<f64>,
69
70 #[clap(long)]
73 pub monitor: Option<f64>,
74
75 #[clap(long)]
77 pub version: bool,
78
79 #[clap(flatten)]
80 pub sched: SchedulerOpts,
81}
82
83struct Scheduler<'a> {
84 skel: BpfSkel<'a>,
85 struct_ops: Option<libbpf_rs::Link>,
86
87 stats_server: StatsServer<(), Metrics>,
88}
89
90impl<'a> Scheduler<'a> {
91 fn init(
92 opts: &SchedulerOpts,
93 open_object: &'a mut MaybeUninit<OpenObject>,
94 verbose: u8,
95 ) -> Result<Self> {
96 let mut skel_builder = BpfSkelBuilder::default();
98 skel_builder.obj_builder.debug(verbose > 1);
99 init_libbpf_logging(None);
100 info!(
101 "Running scx_p2dq (build ID: {})",
102 build_id::full_version(env!("CARGO_PKG_VERSION"))
103 );
104 let mut open_skel = scx_ops_open!(skel_builder, open_object, p2dq).unwrap();
105 scx_p2dq::init_open_skel!(&mut open_skel, opts, verbose)?;
106
107 match *compat::SCX_OPS_ALLOW_QUEUED_WAKEUP {
108 0 => info!("Kernel does not support queued wakeup optimization."),
109 v => open_skel.struct_ops.p2dq_mut().flags |= v,
110 };
111
112 let mut skel = scx_ops_load!(open_skel, p2dq, uei)?;
113 scx_p2dq::init_skel!(&mut skel);
114
115 let struct_ops = Some(scx_ops_attach!(skel, p2dq)?);
116
117 let stats_server = StatsServer::new(stats::server_data()).launch()?;
118
119 info!("P2DQ scheduler started! Run `scx_p2dq --monitor` for metrics.");
120
121 Ok(Self {
122 skel,
123 struct_ops,
124 stats_server,
125 })
126 }
127
128 fn get_metrics(&self) -> Metrics {
129 let mut stats = vec![0u64; stat_idx_P2DQ_NR_STATS as usize];
130 let stats_map = &self.skel.maps.stats;
131 for stat in 0..stat_idx_P2DQ_NR_STATS {
132 let cpu_stat_vec: Vec<Vec<u8>> = stats_map
133 .lookup_percpu(&stat.to_ne_bytes(), libbpf_rs::MapFlags::ANY)
134 .unwrap()
135 .unwrap();
136 let sum: u64 = cpu_stat_vec
137 .iter()
138 .map(|val| u64::from_ne_bytes(val.as_slice().try_into().unwrap()))
139 .sum();
140 stats[stat as usize] = sum;
141 }
142 Metrics {
143 direct: stats[stat_idx_P2DQ_STAT_DIRECT as usize],
144 idle: stats[stat_idx_P2DQ_STAT_IDLE as usize],
145 sched_mode: self.skel.maps.bss_data.sched_mode,
146 dsq_change: stats[stat_idx_P2DQ_STAT_DSQ_CHANGE as usize],
147 same_dsq: stats[stat_idx_P2DQ_STAT_DSQ_SAME as usize],
148 keep: stats[stat_idx_P2DQ_STAT_KEEP as usize],
149 select_pick2: stats[stat_idx_P2DQ_STAT_SELECT_PICK2 as usize],
150 dispatch_pick2: stats[stat_idx_P2DQ_STAT_DISPATCH_PICK2 as usize],
151 llc_migrations: stats[stat_idx_P2DQ_STAT_LLC_MIGRATION as usize],
152 node_migrations: stats[stat_idx_P2DQ_STAT_NODE_MIGRATION as usize],
153 wake_prev: stats[stat_idx_P2DQ_STAT_WAKE_PREV as usize],
154 wake_llc: stats[stat_idx_P2DQ_STAT_WAKE_LLC as usize],
155 wake_mig: stats[stat_idx_P2DQ_STAT_WAKE_MIG as usize],
156 }
157 }
158
159 fn run(&mut self, shutdown: Arc<AtomicBool>) -> Result<UserExitInfo> {
160 let (res_ch, req_ch) = self.stats_server.channels();
161
162 while !shutdown.load(Ordering::Relaxed) && !uei_exited!(&self.skel, uei) {
163 match req_ch.recv_timeout(Duration::from_secs(1)) {
164 Ok(()) => res_ch.send(self.get_metrics())?,
165 Err(RecvTimeoutError::Timeout) => {}
166 Err(e) => Err(e)?,
167 }
168 }
169
170 self.struct_ops.take();
171 uei_report!(&self.skel, uei)
172 }
173}
174
175impl Drop for Scheduler<'_> {
176 fn drop(&mut self) {
177 if let Some(struct_ops) = self.struct_ops.take() {
178 drop(struct_ops);
179 }
180 }
181}
182
183fn main() -> Result<()> {
184 let opts = CliOpts::parse();
185
186 if opts.version {
187 println!(
188 "scx_p2dq: {}",
189 build_id::full_version(env!("CARGO_PKG_VERSION"))
190 );
191 return Ok(());
192 }
193
194 let llv = match opts.verbose {
195 0 => simplelog::LevelFilter::Info,
196 1 => simplelog::LevelFilter::Debug,
197 _ => simplelog::LevelFilter::Trace,
198 };
199 let mut lcfg = simplelog::ConfigBuilder::new();
200 lcfg.set_time_level(simplelog::LevelFilter::Error)
201 .set_location_level(simplelog::LevelFilter::Off)
202 .set_target_level(simplelog::LevelFilter::Off)
203 .set_thread_level(simplelog::LevelFilter::Off);
204 simplelog::TermLogger::init(
205 llv,
206 lcfg.build(),
207 simplelog::TerminalMode::Stderr,
208 simplelog::ColorChoice::Auto,
209 )?;
210
211 let shutdown = Arc::new(AtomicBool::new(false));
212 let shutdown_clone = shutdown.clone();
213 ctrlc::set_handler(move || {
214 shutdown_clone.store(true, Ordering::Relaxed);
215 })
216 .context("Error setting Ctrl-C handler")?;
217
218 if let Some(intv) = opts.monitor.or(opts.stats) {
219 let shutdown_copy = shutdown.clone();
220 let jh = std::thread::spawn(move || {
221 match stats::monitor(Duration::from_secs_f64(intv), shutdown_copy) {
222 Ok(_) => {
223 debug!("stats monitor thread finished successfully")
224 }
225 Err(error_object) => {
226 warn!(
227 "stats monitor thread finished because of an error {}",
228 error_object
229 )
230 }
231 }
232 });
233 if opts.monitor.is_some() {
234 let _ = jh.join();
235 return Ok(());
236 }
237 }
238
239 if let Some(idle_resume_us) = opts.sched.idle_resume_us {
240 if !cpu_idle_resume_latency_supported() {
241 warn!("idle resume latency not supported");
242 } else {
243 if idle_resume_us > 0 {
244 info!("Setting idle QoS to {}us", idle_resume_us);
245 for cpu in TOPO.all_cpus.values() {
246 update_cpu_idle_resume_latency(cpu.id, idle_resume_us.try_into().unwrap())?;
247 }
248 }
249 }
250 }
251
252 let mut open_object = MaybeUninit::uninit();
253 loop {
254 let mut sched = Scheduler::init(&opts.sched, &mut open_object, opts.verbose)?;
255 if !sched.run(shutdown.clone())?.should_restart() {
256 break;
257 }
258 }
259 Ok(())
260}