scx_p2dq/
main.rs

1// Copyright (c) Meta Platforms, Inc. and affiliates.
2
3// This software may be used and distributed according to the terms of the
4// GNU General Public License version 2.
5pub mod stats;
6use stats::Metrics;
7
8use std::mem::MaybeUninit;
9use std::sync::atomic::AtomicBool;
10use std::sync::atomic::Ordering;
11use std::sync::Arc;
12use std::time::Duration;
13
14use anyhow::bail;
15use anyhow::Context;
16use anyhow::Result;
17use arenalib::ArenaLib;
18use clap::Parser;
19use crossbeam::channel::RecvTimeoutError;
20use libbpf_rs::skel::Skel;
21use libbpf_rs::MapCore as _;
22use libbpf_rs::OpenObject;
23use libbpf_rs::ProgramInput;
24use log::{debug, info, warn};
25use scx_stats::prelude::*;
26use scx_utils::build_id;
27use scx_utils::compat;
28use scx_utils::init_libbpf_logging;
29use scx_utils::libbpf_clap_opts::LibbpfOpts;
30use scx_utils::pm::{cpu_idle_resume_latency_supported, update_cpu_idle_resume_latency};
31use scx_utils::scx_ops_attach;
32use scx_utils::scx_ops_load;
33use scx_utils::scx_ops_open;
34use scx_utils::uei_exited;
35use scx_utils::uei_report;
36use scx_utils::Topology;
37use scx_utils::UserExitInfo;
38use scx_utils::NR_CPU_IDS;
39
40use bpf_intf::stat_idx_P2DQ_NR_STATS;
41use bpf_intf::stat_idx_P2DQ_STAT_ATQ_ENQ;
42use bpf_intf::stat_idx_P2DQ_STAT_ATQ_REENQ;
43use bpf_intf::stat_idx_P2DQ_STAT_DIRECT;
44use bpf_intf::stat_idx_P2DQ_STAT_DISPATCH_PICK2;
45use bpf_intf::stat_idx_P2DQ_STAT_DSQ_CHANGE;
46use bpf_intf::stat_idx_P2DQ_STAT_DSQ_SAME;
47use bpf_intf::stat_idx_P2DQ_STAT_ENQ_CPU;
48use bpf_intf::stat_idx_P2DQ_STAT_ENQ_INTR;
49use bpf_intf::stat_idx_P2DQ_STAT_ENQ_LLC;
50use bpf_intf::stat_idx_P2DQ_STAT_ENQ_MIG;
51use bpf_intf::stat_idx_P2DQ_STAT_IDLE;
52use bpf_intf::stat_idx_P2DQ_STAT_KEEP;
53use bpf_intf::stat_idx_P2DQ_STAT_LLC_MIGRATION;
54use bpf_intf::stat_idx_P2DQ_STAT_NODE_MIGRATION;
55use bpf_intf::stat_idx_P2DQ_STAT_SELECT_PICK2;
56use bpf_intf::stat_idx_P2DQ_STAT_WAKE_LLC;
57use bpf_intf::stat_idx_P2DQ_STAT_WAKE_MIG;
58use bpf_intf::stat_idx_P2DQ_STAT_WAKE_PREV;
59use scx_p2dq::bpf_intf;
60use scx_p2dq::bpf_skel::*;
61use scx_p2dq::SchedulerOpts;
62use scx_p2dq::TOPO;
63
64const SCHEDULER_NAME: &str = "scx_p2dq";
65/// scx_p2dq: A pick 2 dumb queuing load balancing scheduler.
66///
67/// The BPF part does simple vtime or round robin scheduling in each domain
68/// while tracking average load of each domain and duty cycle of each task.
69///
70#[derive(Debug, Parser)]
71struct CliOpts {
72    /// Enable verbose output, including libbpf details. Specify multiple
73    /// times to increase verbosity.
74    #[clap(short = 'v', long, action = clap::ArgAction::Count)]
75    pub verbose: u8,
76
77    /// Enable stats monitoring with the specified interval.
78    #[clap(long)]
79    pub stats: Option<f64>,
80
81    /// Run in stats monitoring mode with the specified interval. Scheduler
82    /// is not launched.
83    #[clap(long)]
84    pub monitor: Option<f64>,
85
86    /// Print version and exit.
87    #[clap(long)]
88    pub version: bool,
89
90    #[clap(flatten)]
91    pub sched: SchedulerOpts,
92
93    #[clap(flatten, next_help_heading = "Libbpf Options")]
94    pub libbpf: LibbpfOpts,
95}
96
97struct Scheduler<'a> {
98    skel: BpfSkel<'a>,
99    struct_ops: Option<libbpf_rs::Link>,
100    verbose: u8,
101
102    stats_server: StatsServer<(), Metrics>,
103}
104
105impl<'a> Scheduler<'a> {
106    fn init(
107        opts: &SchedulerOpts,
108        libbpf_ops: &LibbpfOpts,
109        open_object: &'a mut MaybeUninit<OpenObject>,
110        verbose: u8,
111    ) -> Result<Self> {
112        // Open the BPF prog first for verification.
113        let mut skel_builder = BpfSkelBuilder::default();
114        skel_builder.obj_builder.debug(verbose > 1);
115        init_libbpf_logging(None);
116        info!(
117            "Running scx_p2dq (build ID: {})",
118            build_id::full_version(env!("CARGO_PKG_VERSION"))
119        );
120        let topo = if opts.virt_llc_enabled {
121            Topology::with_args(&opts.topo)?
122        } else {
123            Topology::new()?
124        };
125        let open_opts = libbpf_ops.clone().into_bpf_open_opts();
126        let mut open_skel = scx_ops_open!(skel_builder, open_object, p2dq, open_opts).context(
127            "Failed to open BPF object. This can be caused by a mismatch between the kernel \
128            version and the BPF object, permission or other libbpf issues. Try running `dmesg \
129            | grep bpf` to see if there are any error messages related to the BPF object. See \
130            the LibbpfOptions section in the help for more information on configuration related \
131            to this issue or file an issue on the scx repo if the problem persists. \
132            https://github.com/sched-ext/scx/issues/new?labels=scx_p2dq&title=scx_p2dq:%20New%20Issue&assignees=hodgesds&body=Kernel%20version:%20(fill%20me%20out)%0ADistribution:%20(fill%20me%20out)%0AHardware:%20(fill%20me%20out)%0A%0AIssue:%20(fill%20me%20out)"
133        )?;
134        scx_p2dq::init_open_skel!(&mut open_skel, topo, opts, verbose)?;
135
136        if opts.queued_wakeup {
137            open_skel.struct_ops.p2dq_mut().flags |= *compat::SCX_OPS_ALLOW_QUEUED_WAKEUP;
138        }
139        open_skel.struct_ops.p2dq_mut().flags |= *compat::SCX_OPS_KEEP_BUILTIN_IDLE;
140
141        let mut skel = scx_ops_load!(open_skel, p2dq, uei)?;
142        scx_p2dq::init_skel!(&mut skel, topo);
143
144        let stats_server = StatsServer::new(stats::server_data()).launch()?;
145
146        Ok(Self {
147            skel,
148            struct_ops: None,
149            verbose,
150            stats_server,
151        })
152    }
153
154    fn get_metrics(&self) -> Metrics {
155        let mut stats = vec![0u64; stat_idx_P2DQ_NR_STATS as usize];
156        let stats_map = &self.skel.maps.stats;
157        for stat in 0..stat_idx_P2DQ_NR_STATS {
158            let cpu_stat_vec: Vec<Vec<u8>> = stats_map
159                .lookup_percpu(&stat.to_ne_bytes(), libbpf_rs::MapFlags::ANY)
160                .unwrap()
161                .unwrap();
162            let sum: u64 = cpu_stat_vec
163                .iter()
164                .map(|val| u64::from_ne_bytes(val.as_slice().try_into().unwrap()))
165                .sum();
166            stats[stat as usize] = sum;
167        }
168        Metrics {
169            atq_enq: stats[stat_idx_P2DQ_STAT_ATQ_ENQ as usize],
170            atq_reenq: stats[stat_idx_P2DQ_STAT_ATQ_REENQ as usize],
171            direct: stats[stat_idx_P2DQ_STAT_DIRECT as usize],
172            idle: stats[stat_idx_P2DQ_STAT_IDLE as usize],
173            dsq_change: stats[stat_idx_P2DQ_STAT_DSQ_CHANGE as usize],
174            same_dsq: stats[stat_idx_P2DQ_STAT_DSQ_SAME as usize],
175            keep: stats[stat_idx_P2DQ_STAT_KEEP as usize],
176            enq_cpu: stats[stat_idx_P2DQ_STAT_ENQ_CPU as usize],
177            enq_intr: stats[stat_idx_P2DQ_STAT_ENQ_INTR as usize],
178            enq_llc: stats[stat_idx_P2DQ_STAT_ENQ_LLC as usize],
179            enq_mig: stats[stat_idx_P2DQ_STAT_ENQ_MIG as usize],
180            select_pick2: stats[stat_idx_P2DQ_STAT_SELECT_PICK2 as usize],
181            dispatch_pick2: stats[stat_idx_P2DQ_STAT_DISPATCH_PICK2 as usize],
182            llc_migrations: stats[stat_idx_P2DQ_STAT_LLC_MIGRATION as usize],
183            node_migrations: stats[stat_idx_P2DQ_STAT_NODE_MIGRATION as usize],
184            wake_prev: stats[stat_idx_P2DQ_STAT_WAKE_PREV as usize],
185            wake_llc: stats[stat_idx_P2DQ_STAT_WAKE_LLC as usize],
186            wake_mig: stats[stat_idx_P2DQ_STAT_WAKE_MIG as usize],
187        }
188    }
189
190    fn run(&mut self, shutdown: Arc<AtomicBool>) -> Result<UserExitInfo> {
191        let (res_ch, req_ch) = self.stats_server.channels();
192
193        while !shutdown.load(Ordering::Relaxed) && !uei_exited!(&self.skel, uei) {
194            match req_ch.recv_timeout(Duration::from_secs(1)) {
195                Ok(()) => res_ch.send(self.get_metrics())?,
196                Err(RecvTimeoutError::Timeout) => {}
197                Err(e) => Err(e)?,
198            }
199        }
200
201        let _ = self.struct_ops.take();
202        uei_report!(&self.skel, uei)
203    }
204
205    fn print_topology(&mut self) -> Result<()> {
206        let input = ProgramInput {
207            ..Default::default()
208        };
209
210        let output = self.skel.progs.arena_topology_print.test_run(input)?;
211        if output.return_value != 0 {
212            bail!(
213                "Could not initialize arenas, topo_print returned {}",
214                output.return_value as i32
215            );
216        }
217
218        Ok(())
219    }
220
221    fn start(&mut self) -> Result<()> {
222        self.struct_ops = Some(scx_ops_attach!(self.skel, p2dq)?);
223
224        if self.verbose > 1 {
225            self.print_topology()?;
226        }
227
228        info!("P2DQ scheduler started! Run `scx_p2dq --monitor` for metrics.");
229
230        Ok(())
231    }
232}
233
234impl Drop for Scheduler<'_> {
235    fn drop(&mut self) {
236        info!("Unregister {SCHEDULER_NAME} scheduler");
237
238        if let Some(struct_ops) = self.struct_ops.take() {
239            drop(struct_ops);
240        }
241    }
242}
243
244fn main() -> Result<()> {
245    let opts = CliOpts::parse();
246
247    if opts.version {
248        println!(
249            "scx_p2dq: {}",
250            build_id::full_version(env!("CARGO_PKG_VERSION"))
251        );
252        return Ok(());
253    }
254
255    let llv = match opts.verbose {
256        0 => simplelog::LevelFilter::Info,
257        1 => simplelog::LevelFilter::Debug,
258        _ => simplelog::LevelFilter::Trace,
259    };
260    let mut lcfg = simplelog::ConfigBuilder::new();
261    lcfg.set_time_offset_to_local()
262        .expect("Failed to set local time offset")
263        .set_time_level(simplelog::LevelFilter::Error)
264        .set_location_level(simplelog::LevelFilter::Off)
265        .set_target_level(simplelog::LevelFilter::Off)
266        .set_thread_level(simplelog::LevelFilter::Off);
267    simplelog::TermLogger::init(
268        llv,
269        lcfg.build(),
270        simplelog::TerminalMode::Stderr,
271        simplelog::ColorChoice::Auto,
272    )?;
273
274    let shutdown = Arc::new(AtomicBool::new(false));
275    let shutdown_clone = shutdown.clone();
276    ctrlc::set_handler(move || {
277        shutdown_clone.store(true, Ordering::Relaxed);
278    })
279    .context("Error setting Ctrl-C handler")?;
280
281    if let Some(intv) = opts.monitor.or(opts.stats) {
282        let shutdown_copy = shutdown.clone();
283        let jh = std::thread::spawn(move || {
284            match stats::monitor(Duration::from_secs_f64(intv), shutdown_copy) {
285                Ok(_) => {
286                    debug!("stats monitor thread finished successfully")
287                }
288                Err(error_object) => {
289                    warn!("stats monitor thread finished because of an error {error_object}")
290                }
291            }
292        });
293        if opts.monitor.is_some() {
294            let _ = jh.join();
295            return Ok(());
296        }
297    }
298
299    if let Some(idle_resume_us) = opts.sched.idle_resume_us {
300        if !cpu_idle_resume_latency_supported() {
301            warn!("idle resume latency not supported");
302        } else if idle_resume_us > 0 {
303            info!("Setting idle QoS to {idle_resume_us}us");
304            for cpu in TOPO.all_cpus.values() {
305                update_cpu_idle_resume_latency(cpu.id, idle_resume_us.try_into().unwrap())?;
306            }
307        }
308    }
309
310    let mut open_object = MaybeUninit::uninit();
311    loop {
312        let mut sched = Scheduler::init(&opts.sched, &opts.libbpf, &mut open_object, opts.verbose)?;
313        let task_size = std::mem::size_of::<types::task_p2dq>();
314        let arenalib = ArenaLib::init(sched.skel.object_mut(), task_size, *NR_CPU_IDS)?;
315        arenalib.setup()?;
316
317        sched.start()?;
318
319        if !sched.run(shutdown.clone())?.should_restart() {
320            break;
321        }
322    }
323    Ok(())
324}