1mod bpf_skel;
9pub use bpf_skel::*;
10pub mod bpf_intf;
11pub use bpf_intf::*;
12
13mod stats;
14use std::ffi::c_int;
15use std::fs;
16use std::mem::MaybeUninit;
17use std::sync::atomic::AtomicBool;
18use std::sync::atomic::Ordering;
19use std::sync::Arc;
20use std::time::Duration;
21
22use affinity::set_thread_affinity;
23use anyhow::bail;
24use anyhow::Context;
25use anyhow::Result;
26use clap::Parser;
27use crossbeam::channel::RecvTimeoutError;
28use libbpf_rs::OpenObject;
29use libbpf_rs::ProgramInput;
30use log::warn;
31use log::{debug, info};
32use scx_stats::prelude::*;
33use scx_utils::build_id;
34use scx_utils::compat;
35use scx_utils::libbpf_clap_opts::LibbpfOpts;
36use scx_utils::scx_ops_attach;
37use scx_utils::scx_ops_load;
38use scx_utils::scx_ops_open;
39use scx_utils::try_set_rlimit_infinity;
40use scx_utils::uei_exited;
41use scx_utils::uei_report;
42use scx_utils::Cpumask;
43use scx_utils::Topology;
44use scx_utils::UserExitInfo;
45use scx_utils::NR_CPU_IDS;
46use stats::Metrics;
47
48const SCHEDULER_NAME: &str = "scx_tickless";
49
50#[derive(Debug, Parser)]
51struct Opts {
52 #[clap(long, default_value = "0")]
54 exit_dump_len: u32,
55
56 #[clap(short = 'm', long, default_value = "0")]
61 primary_domain: String,
62
63 #[clap(short = 's', long, default_value = "20000")]
66 slice_us: u64,
67
68 #[clap(short = 'f', long, default_value = "0")]
74 frequency: u64,
75
76 #[clap(short = 'n', long, action = clap::ArgAction::SetTrue)]
78 nosmt: bool,
79
80 #[clap(long)]
82 stats: Option<f64>,
83
84 #[clap(long)]
87 monitor: Option<f64>,
88
89 #[clap(short = 'v', long, action = clap::ArgAction::SetTrue)]
91 verbose: bool,
92
93 #[clap(short = 'V', long, action = clap::ArgAction::SetTrue)]
95 version: bool,
96
97 #[clap(long)]
99 help_stats: bool,
100
101 #[clap(flatten, next_help_heading = "Libbpf Options")]
102 pub libbpf: LibbpfOpts,
103}
104
105pub fn is_nohz_enabled() -> bool {
106 if let Ok(contents) = fs::read_to_string("/sys/devices/system/cpu/nohz_full") {
107 let trimmed = contents.trim();
108 return trimmed != "(null)" && !trimmed.is_empty();
109 }
110 false
111}
112
113struct Scheduler<'a> {
114 skel: BpfSkel<'a>,
115 struct_ops: Option<libbpf_rs::Link>,
116 stats_server: StatsServer<(), Metrics>,
117}
118
119impl<'a> Scheduler<'a> {
120 fn init(opts: &'a Opts, open_object: &'a mut MaybeUninit<OpenObject>) -> Result<Self> {
121 try_set_rlimit_infinity();
122
123 let topo = Topology::new().unwrap();
125 let smt_enabled = !opts.nosmt && topo.smt_enabled;
126 info!(
127 "{} {} {}",
128 SCHEDULER_NAME,
129 build_id::full_version(env!("CARGO_PKG_VERSION")),
130 if smt_enabled { "SMT on" } else { "SMT off" }
131 );
132
133 if !is_nohz_enabled() {
135 warn!("nohz_full is not enabled in the kernel");
136 }
137
138 let mut cpus: Vec<_> = topo.all_cpus.values().collect();
140 cpus.sort_by_key(|cpu| std::cmp::Reverse(cpu.cpu_capacity));
141
142 let mut domain = Cpumask::from_str(&opts.primary_domain)?;
144 if domain.is_empty() {
145 if let Some(cpu) = cpus.last() {
146 domain = Cpumask::from_str(&format!("{:x}", 1 << cpu.id).to_string())?;
147 }
148 }
149 info!("primary CPU domain = 0x{:x}", domain);
150
151 let mut skel_builder = BpfSkelBuilder::default();
153 skel_builder.obj_builder.debug(opts.verbose);
154 let open_opts = opts.libbpf.clone().into_bpf_open_opts();
155 let mut skel = scx_ops_open!(skel_builder, open_object, tickless_ops, open_opts)?;
156 skel.struct_ops.tickless_ops_mut().exit_dump_len = opts.exit_dump_len;
157
158 let rodata = skel.maps.rodata_data.as_mut().unwrap();
159
160 rodata.smt_enabled = smt_enabled;
161 rodata.nr_cpu_ids = *NR_CPU_IDS as u32;
162
163 rodata.slice_ns = opts.slice_us * 1000;
165 rodata.tick_freq = opts.frequency;
166
167 for (i, cpu) in cpus.iter().enumerate() {
168 rodata.preferred_cpus[i] = cpu.id as u64;
169 }
170
171 skel.struct_ops.tickless_ops_mut().flags = *compat::SCX_OPS_ENQ_LAST
173 | *compat::SCX_OPS_ENQ_MIGRATION_DISABLED
174 | *compat::SCX_OPS_ALLOW_QUEUED_WAKEUP;
175 info!(
176 "scheduler flags: {:#x}",
177 skel.struct_ops.tickless_ops_mut().flags
178 );
179
180 let mut skel = scx_ops_load!(skel, tickless_ops, uei)?;
182
183 let timer_cpu = domain.iter().next();
186 if timer_cpu.is_none() {
187 bail!("primary cpumask is empty");
188 }
189 if let Err(e) = set_thread_affinity(&[timer_cpu.unwrap() as usize]) {
190 bail!("cannot set central CPU affinity: {}", e);
191 }
192
193 if let Err(err) = Self::init_primary_domain(&mut skel, &domain) {
195 warn!("failed to initialize primary domain: error {}", err);
196 }
197
198 let struct_ops = Some(scx_ops_attach!(skel, tickless_ops)?);
200 let stats_server = StatsServer::new(stats::server_data()).launch()?;
201
202 if let Err(e) = set_thread_affinity((0..*NR_CPU_IDS).collect::<Vec<usize>>()) {
204 bail!("cannot reset CPU affinity: {}", e);
205 }
206
207 Ok(Self {
208 skel,
209 struct_ops,
210 stats_server,
211 })
212 }
213
214 fn enable_primary_cpu(skel: &mut BpfSkel<'_>, cpu: i32) -> Result<(), u32> {
215 let prog = &mut skel.progs.enable_primary_cpu;
216 let mut args = cpu_arg {
217 cpu_id: cpu as c_int,
218 };
219 let input = ProgramInput {
220 context_in: Some(unsafe {
221 std::slice::from_raw_parts_mut(
222 &mut args as *mut _ as *mut u8,
223 std::mem::size_of_val(&args),
224 )
225 }),
226 ..Default::default()
227 };
228 let out = prog.test_run(input).unwrap();
229 if out.return_value != 0 {
230 return Err(out.return_value);
231 }
232
233 Ok(())
234 }
235
236 fn init_primary_domain(skel: &mut BpfSkel<'_>, domain: &Cpumask) -> Result<()> {
237 if let Err(err) = Self::enable_primary_cpu(skel, -1) {
239 warn!("failed to reset primary domain: error {}", err as i32);
240 }
241 for cpu in 0..*NR_CPU_IDS {
243 if domain.test_cpu(cpu) {
244 if let Err(err) = Self::enable_primary_cpu(skel, cpu as i32) {
245 warn!("failed to add CPU {} to primary domain: error {}", cpu, err);
246 }
247 }
248 }
249
250 Ok(())
251 }
252
253 fn get_metrics(&self) -> Metrics {
254 let bss_data = self.skel.maps.bss_data.as_ref().unwrap();
255 Metrics {
256 nr_ticks: bss_data.nr_ticks,
257 nr_preemptions: bss_data.nr_preemptions,
258 nr_direct_dispatches: bss_data.nr_direct_dispatches,
259 nr_primary_dispatches: bss_data.nr_primary_dispatches,
260 nr_timer_dispatches: bss_data.nr_timer_dispatches,
261 }
262 }
263
264 pub fn exited(&mut self) -> bool {
265 uei_exited!(&self.skel, uei)
266 }
267
268 fn run(&mut self, shutdown: Arc<AtomicBool>) -> Result<UserExitInfo> {
269 let (res_ch, req_ch) = self.stats_server.channels();
270 while !shutdown.load(Ordering::Relaxed) && !self.exited() {
271 match req_ch.recv_timeout(Duration::from_secs(1)) {
272 Ok(()) => res_ch.send(self.get_metrics())?,
273 Err(RecvTimeoutError::Timeout) => {}
274 Err(e) => Err(e)?,
275 }
276 }
277
278 let _ = self.struct_ops.take();
279 uei_report!(&self.skel, uei)
280 }
281}
282
283impl Drop for Scheduler<'_> {
284 fn drop(&mut self) {
285 info!("Unregister {SCHEDULER_NAME} scheduler");
286 }
287}
288
289fn main() -> Result<()> {
290 let opts = Opts::parse();
291
292 if opts.version {
293 println!(
294 "{} {}",
295 SCHEDULER_NAME,
296 build_id::full_version(env!("CARGO_PKG_VERSION"))
297 );
298 return Ok(());
299 }
300
301 if opts.help_stats {
302 stats::server_data().describe_meta(&mut std::io::stdout(), None)?;
303 return Ok(());
304 }
305
306 let loglevel = simplelog::LevelFilter::Info;
307
308 let mut lcfg = simplelog::ConfigBuilder::new();
309 lcfg.set_time_offset_to_local()
310 .expect("Failed to set local time offset")
311 .set_time_level(simplelog::LevelFilter::Error)
312 .set_location_level(simplelog::LevelFilter::Off)
313 .set_target_level(simplelog::LevelFilter::Off)
314 .set_thread_level(simplelog::LevelFilter::Off);
315 simplelog::TermLogger::init(
316 loglevel,
317 lcfg.build(),
318 simplelog::TerminalMode::Stderr,
319 simplelog::ColorChoice::Auto,
320 )?;
321
322 let shutdown = Arc::new(AtomicBool::new(false));
323 let shutdown_clone = shutdown.clone();
324 ctrlc::set_handler(move || {
325 shutdown_clone.store(true, Ordering::Relaxed);
326 })
327 .context("Error setting Ctrl-C handler")?;
328
329 if let Some(intv) = opts.monitor.or(opts.stats) {
330 let shutdown_copy = shutdown.clone();
331 let jh = std::thread::spawn(move || {
332 match stats::monitor(Duration::from_secs_f64(intv), shutdown_copy) {
333 Ok(_) => {
334 debug!("stats monitor thread finished successfully")
335 }
336 Err(error_object) => {
337 warn!(
338 "stats monitor thread finished because of an error {}",
339 error_object
340 )
341 }
342 }
343 });
344 if opts.monitor.is_some() {
345 let _ = jh.join();
346 return Ok(());
347 }
348 }
349
350 let mut open_object = MaybeUninit::uninit();
351 loop {
352 let mut sched = Scheduler::init(&opts, &mut open_object)?;
353 if !sched.run(shutdown.clone())?.should_restart() {
354 break;
355 }
356 }
357
358 Ok(())
359}