scx_p2dq/
main.rs

1// Copyright (c) Meta Platforms, Inc. and affiliates.
2
3// This software may be used and distributed according to the terms of the
4// GNU General Public License version 2.
5mod bpf_skel;
6pub use bpf_skel::*;
7
8pub mod bpf_intf;
9pub mod stats;
10use stats::Metrics;
11
12use std::mem::MaybeUninit;
13use std::sync::atomic::AtomicBool;
14use std::sync::atomic::Ordering;
15use std::sync::Arc;
16use std::time::Duration;
17
18use anyhow::bail;
19use anyhow::Context;
20use anyhow::Result;
21use clap::Parser;
22use crossbeam::channel::RecvTimeoutError;
23use libbpf_rs::MapCore as _;
24use libbpf_rs::OpenObject;
25use libbpf_rs::ProgramInput;
26use log::{debug, info, warn};
27use scx_stats::prelude::*;
28use scx_utils::build_id;
29use scx_utils::compat;
30use scx_utils::init_libbpf_logging;
31use scx_utils::pm::{cpu_idle_resume_latency_supported, update_cpu_idle_resume_latency};
32use scx_utils::scx_ops_attach;
33use scx_utils::scx_ops_load;
34use scx_utils::scx_ops_open;
35use scx_utils::uei_exited;
36use scx_utils::uei_report;
37use scx_utils::Topology;
38use scx_utils::UserExitInfo;
39use scx_utils::NR_CPU_IDS;
40use scx_utils::{Core, Llc};
41
42use std::ffi::c_ulong;
43
44use bpf_intf::stat_idx_P2DQ_NR_STATS;
45use bpf_intf::stat_idx_P2DQ_STAT_DIRECT;
46use bpf_intf::stat_idx_P2DQ_STAT_DISPATCH_PICK2;
47use bpf_intf::stat_idx_P2DQ_STAT_DSQ_CHANGE;
48use bpf_intf::stat_idx_P2DQ_STAT_DSQ_SAME;
49use bpf_intf::stat_idx_P2DQ_STAT_IDLE;
50use bpf_intf::stat_idx_P2DQ_STAT_KEEP;
51use bpf_intf::stat_idx_P2DQ_STAT_LLC_MIGRATION;
52use bpf_intf::stat_idx_P2DQ_STAT_NODE_MIGRATION;
53use bpf_intf::stat_idx_P2DQ_STAT_SELECT_PICK2;
54use bpf_intf::stat_idx_P2DQ_STAT_WAKE_LLC;
55use bpf_intf::stat_idx_P2DQ_STAT_WAKE_MIG;
56use bpf_intf::stat_idx_P2DQ_STAT_WAKE_PREV;
57use scx_p2dq::SchedulerOpts;
58use scx_p2dq::TOPO;
59
60/// scx_p2dq: A pick 2 dumb queuing load balancing scheduler.
61///
62/// The BPF part does simple vtime or round robin scheduling in each domain
63/// while tracking average load of each domain and duty cycle of each task.
64///
65#[derive(Debug, Parser)]
66struct CliOpts {
67    /// Enable verbose output, including libbpf details. Specify multiple
68    /// times to increase verbosity.
69    #[clap(short = 'v', long, action = clap::ArgAction::Count)]
70    pub verbose: u8,
71
72    /// Enable stats monitoring with the specified interval.
73    #[clap(long)]
74    pub stats: Option<f64>,
75
76    /// Run in stats monitoring mode with the specified interval. Scheduler
77    /// is not launched.
78    #[clap(long)]
79    pub monitor: Option<f64>,
80
81    /// Print version and exit.
82    #[clap(long)]
83    pub version: bool,
84
85    #[clap(flatten)]
86    pub sched: SchedulerOpts,
87}
88
89struct Scheduler<'a> {
90    skel: BpfSkel<'a>,
91    struct_ops: Option<libbpf_rs::Link>,
92    verbose: u8,
93
94    stats_server: StatsServer<(), Metrics>,
95}
96
97impl<'a> Scheduler<'a> {
98    fn init(
99        opts: &SchedulerOpts,
100        open_object: &'a mut MaybeUninit<OpenObject>,
101        verbose: u8,
102    ) -> Result<Self> {
103        // Open the BPF prog first for verification.
104        let mut skel_builder = BpfSkelBuilder::default();
105        skel_builder.obj_builder.debug(verbose > 1);
106        init_libbpf_logging(None);
107        info!(
108            "Running scx_p2dq (build ID: {})",
109            build_id::full_version(env!("CARGO_PKG_VERSION"))
110        );
111        let mut open_skel = scx_ops_open!(skel_builder, open_object, p2dq).unwrap();
112        scx_p2dq::init_open_skel!(&mut open_skel, opts, verbose)?;
113
114        match *compat::SCX_OPS_ALLOW_QUEUED_WAKEUP {
115            0 => info!("Kernel does not support queued wakeup optimization."),
116            v => open_skel.struct_ops.p2dq_mut().flags |= v,
117        };
118
119        let mut skel = scx_ops_load!(open_skel, p2dq, uei)?;
120
121        scx_p2dq::init_skel!(&mut skel);
122
123        let stats_server = StatsServer::new(stats::server_data()).launch()?;
124
125        Ok(Self {
126            skel,
127            struct_ops: None,
128            verbose,
129            stats_server,
130        })
131    }
132
133    fn get_metrics(&self) -> Metrics {
134        let mut stats = vec![0u64; stat_idx_P2DQ_NR_STATS as usize];
135        let stats_map = &self.skel.maps.stats;
136        for stat in 0..stat_idx_P2DQ_NR_STATS {
137            let cpu_stat_vec: Vec<Vec<u8>> = stats_map
138                .lookup_percpu(&stat.to_ne_bytes(), libbpf_rs::MapFlags::ANY)
139                .unwrap()
140                .unwrap();
141            let sum: u64 = cpu_stat_vec
142                .iter()
143                .map(|val| u64::from_ne_bytes(val.as_slice().try_into().unwrap()))
144                .sum();
145            stats[stat as usize] = sum;
146        }
147        Metrics {
148            direct: stats[stat_idx_P2DQ_STAT_DIRECT as usize],
149            idle: stats[stat_idx_P2DQ_STAT_IDLE as usize],
150            sched_mode: self.skel.maps.bss_data.sched_mode,
151            dsq_change: stats[stat_idx_P2DQ_STAT_DSQ_CHANGE as usize],
152            same_dsq: stats[stat_idx_P2DQ_STAT_DSQ_SAME as usize],
153            keep: stats[stat_idx_P2DQ_STAT_KEEP as usize],
154            select_pick2: stats[stat_idx_P2DQ_STAT_SELECT_PICK2 as usize],
155            dispatch_pick2: stats[stat_idx_P2DQ_STAT_DISPATCH_PICK2 as usize],
156            llc_migrations: stats[stat_idx_P2DQ_STAT_LLC_MIGRATION as usize],
157            node_migrations: stats[stat_idx_P2DQ_STAT_NODE_MIGRATION as usize],
158            wake_prev: stats[stat_idx_P2DQ_STAT_WAKE_PREV as usize],
159            wake_llc: stats[stat_idx_P2DQ_STAT_WAKE_LLC as usize],
160            wake_mig: stats[stat_idx_P2DQ_STAT_WAKE_MIG as usize],
161        }
162    }
163
164    fn setup_arenas(&mut self) -> Result<()> {
165        // Allocate the arena memory from the BPF side so userspace initializes it before starting
166        // the scheduler. Despite the function call's name this is neither a test nor a test run,
167        // it's the recommended way of executing SEC("syscall") probes.
168        let mut args = types::arena_init_args {
169            static_pages: bpf_intf::consts_STATIC_ALLOC_PAGES_GRANULARITY as c_ulong,
170            task_ctx_size: std::mem::size_of::<types::task_p2dq>() as c_ulong,
171        };
172
173        let input = ProgramInput {
174            context_in: Some(unsafe {
175                std::slice::from_raw_parts_mut(
176                    &mut args as *mut _ as *mut u8,
177                    std::mem::size_of_val(&args),
178                )
179            }),
180            ..Default::default()
181        };
182
183        let output = self.skel.progs.arena_init.test_run(input)?;
184        if output.return_value != 0 {
185            bail!(
186                "Could not initialize arenas, p2dq_setup returned {}",
187                output.return_value as i32
188            );
189        }
190
191        Ok(())
192    }
193
194    fn setup_topology_node(&mut self, mask: &[u64]) -> Result<()> {
195        let mut args = types::arena_alloc_mask_args {
196            bitmap: 0 as c_ulong,
197        };
198
199        let input = ProgramInput {
200            context_in: Some(unsafe {
201                std::slice::from_raw_parts_mut(
202                    &mut args as *mut _ as *mut u8,
203                    std::mem::size_of_val(&args),
204                )
205            }),
206            ..Default::default()
207        };
208
209        let output = self.skel.progs.arena_alloc_mask.test_run(input)?;
210        if output.return_value != 0 {
211            bail!(
212                "Could not initialize arenas, setup_topology_node returned {}",
213                output.return_value as i32
214            );
215        }
216
217        let ptr = unsafe { std::mem::transmute::<u64, &mut [u64; 10]>(args.bitmap) };
218
219        let (valid_mask, _) = ptr.split_at_mut(mask.len());
220        valid_mask.clone_from_slice(mask);
221
222        let mut args = types::arena_topology_node_init_args {
223            bitmap: args.bitmap as c_ulong,
224            data_size: 0 as c_ulong,
225            id: 0 as c_ulong,
226        };
227
228        let input = ProgramInput {
229            context_in: Some(unsafe {
230                std::slice::from_raw_parts_mut(
231                    &mut args as *mut _ as *mut u8,
232                    std::mem::size_of_val(&args),
233                )
234            }),
235            ..Default::default()
236        };
237
238        let output = self.skel.progs.arena_topology_node_init.test_run(input)?;
239        if output.return_value != 0 {
240            bail!(
241                "p2dq_topology_node_init returned {}",
242                output.return_value as i32
243            );
244        }
245
246        Ok(())
247    }
248
249    fn setup_topology(&mut self) -> Result<()> {
250        let topo = Topology::new().expect("Failed to build host topology");
251
252        self.setup_topology_node(topo.span.as_raw_slice())?;
253
254        for (_, node) in topo.nodes {
255            self.setup_topology_node(node.span.as_raw_slice())?;
256        }
257
258        for (_, llc) in topo.all_llcs {
259            self.setup_topology_node(
260                Arc::<Llc>::into_inner(llc)
261                    .expect("missing llc")
262                    .span
263                    .as_raw_slice(),
264            )?;
265        }
266
267        for (_, core) in topo.all_cores {
268            self.setup_topology_node(
269                Arc::<Core>::into_inner(core)
270                    .expect("missing core")
271                    .span
272                    .as_raw_slice(),
273            )?;
274        }
275        for (_, cpu) in topo.all_cpus {
276            let mut mask = [0; 9];
277            mask[cpu.id.checked_shr(64).unwrap_or(0)] |= 1 << (cpu.id % 64);
278            self.setup_topology_node(&mask)?;
279        }
280
281        Ok(())
282    }
283
284    fn run(&mut self, shutdown: Arc<AtomicBool>) -> Result<UserExitInfo> {
285        let (res_ch, req_ch) = self.stats_server.channels();
286
287        while !shutdown.load(Ordering::Relaxed) && !uei_exited!(&self.skel, uei) {
288            match req_ch.recv_timeout(Duration::from_secs(1)) {
289                Ok(()) => res_ch.send(self.get_metrics())?,
290                Err(RecvTimeoutError::Timeout) => {}
291                Err(e) => Err(e)?,
292            }
293        }
294
295        let _ = self.struct_ops.take();
296        uei_report!(&self.skel, uei)
297    }
298
299    fn print_topology(&mut self) -> Result<()> {
300        let input = ProgramInput {
301            ..Default::default()
302        };
303
304        let output = self.skel.progs.arena_topology_print.test_run(input)?;
305        if output.return_value != 0 {
306            bail!(
307                "Could not initialize arenas, topo_print returned {}",
308                output.return_value as i32
309            );
310        }
311
312        Ok(())
313    }
314
315    fn start(&mut self) -> Result<()> {
316        self.setup_arenas()?;
317        self.setup_topology()?;
318
319        self.struct_ops = Some(scx_ops_attach!(self.skel, p2dq)?);
320
321        if self.verbose > 1 {
322            self.print_topology()?;
323        }
324
325        info!("P2DQ scheduler started! Run `scx_p2dq --monitor` for metrics.");
326
327        Ok(())
328    }
329}
330
331impl Drop for Scheduler<'_> {
332    fn drop(&mut self) {
333        if let Some(struct_ops) = self.struct_ops.take() {
334            drop(struct_ops);
335        }
336    }
337}
338
339fn main() -> Result<()> {
340    let opts = CliOpts::parse();
341
342    if opts.version {
343        println!(
344            "scx_p2dq: {}",
345            build_id::full_version(env!("CARGO_PKG_VERSION"))
346        );
347        return Ok(());
348    }
349
350    let llv = match opts.verbose {
351        0 => simplelog::LevelFilter::Info,
352        1 => simplelog::LevelFilter::Debug,
353        _ => simplelog::LevelFilter::Trace,
354    };
355    let mut lcfg = simplelog::ConfigBuilder::new();
356    lcfg.set_time_offset_to_local()
357        .expect("Failed to set local time offset")
358        .set_time_level(simplelog::LevelFilter::Error)
359        .set_location_level(simplelog::LevelFilter::Off)
360        .set_target_level(simplelog::LevelFilter::Off)
361        .set_thread_level(simplelog::LevelFilter::Off);
362    simplelog::TermLogger::init(
363        llv,
364        lcfg.build(),
365        simplelog::TerminalMode::Stderr,
366        simplelog::ColorChoice::Auto,
367    )?;
368
369    let shutdown = Arc::new(AtomicBool::new(false));
370    let shutdown_clone = shutdown.clone();
371    ctrlc::set_handler(move || {
372        shutdown_clone.store(true, Ordering::Relaxed);
373    })
374    .context("Error setting Ctrl-C handler")?;
375
376    if let Some(intv) = opts.monitor.or(opts.stats) {
377        let shutdown_copy = shutdown.clone();
378        let jh = std::thread::spawn(move || {
379            match stats::monitor(Duration::from_secs_f64(intv), shutdown_copy) {
380                Ok(_) => {
381                    debug!("stats monitor thread finished successfully")
382                }
383                Err(error_object) => {
384                    warn!(
385                        "stats monitor thread finished because of an error {}",
386                        error_object
387                    )
388                }
389            }
390        });
391        if opts.monitor.is_some() {
392            let _ = jh.join();
393            return Ok(());
394        }
395    }
396
397    if let Some(idle_resume_us) = opts.sched.idle_resume_us {
398        if !cpu_idle_resume_latency_supported() {
399            warn!("idle resume latency not supported");
400        } else {
401            if idle_resume_us > 0 {
402                info!("Setting idle QoS to {}us", idle_resume_us);
403                for cpu in TOPO.all_cpus.values() {
404                    update_cpu_idle_resume_latency(cpu.id, idle_resume_us.try_into().unwrap())?;
405                }
406            }
407        }
408    }
409
410    let mut open_object = MaybeUninit::uninit();
411    loop {
412        let mut sched = Scheduler::init(&opts.sched, &mut open_object, opts.verbose)?;
413        sched.start()?;
414
415        if !sched.run(shutdown.clone())?.should_restart() {
416            break;
417        }
418    }
419    Ok(())
420}