scx_p2dq/
main.rs

1// Copyright (c) Meta Platforms, Inc. and affiliates.
2
3// This software may be used and distributed according to the terms of the
4// GNU General Public License version 2.
5mod bpf_skel;
6pub use bpf_skel::*;
7
8pub mod bpf_intf;
9pub mod stats;
10use stats::Metrics;
11
12use scx_p2dq::SchedulerOpts;
13use scx_p2dq::TOPO;
14
15use std::mem::MaybeUninit;
16use std::sync::Arc;
17use std::sync::atomic::AtomicBool;
18use std::sync::atomic::Ordering;
19use std::time::Duration;
20
21use anyhow::Context;
22use anyhow::Result;
23use clap::Parser;
24use crossbeam::channel::RecvTimeoutError;
25use libbpf_rs::MapCore as _;
26use libbpf_rs::OpenObject;
27use log::{debug, info, warn};
28use scx_stats::prelude::*;
29use scx_utils::UserExitInfo;
30use scx_utils::build_id;
31use scx_utils::compat;
32use scx_utils::init_libbpf_logging;
33use scx_utils::pm::{cpu_idle_resume_latency_supported, update_cpu_idle_resume_latency};
34use scx_utils::scx_ops_attach;
35use scx_utils::scx_ops_load;
36use scx_utils::scx_ops_open;
37use scx_utils::uei_exited;
38use scx_utils::uei_report;
39
40use crate::bpf_intf::stat_idx_P2DQ_NR_STATS;
41use crate::bpf_intf::stat_idx_P2DQ_STAT_DIRECT;
42use crate::bpf_intf::stat_idx_P2DQ_STAT_DISPATCH_PICK2;
43use crate::bpf_intf::stat_idx_P2DQ_STAT_DSQ_CHANGE;
44use crate::bpf_intf::stat_idx_P2DQ_STAT_DSQ_SAME;
45use crate::bpf_intf::stat_idx_P2DQ_STAT_IDLE;
46use crate::bpf_intf::stat_idx_P2DQ_STAT_KEEP;
47use crate::bpf_intf::stat_idx_P2DQ_STAT_LLC_MIGRATION;
48use crate::bpf_intf::stat_idx_P2DQ_STAT_NODE_MIGRATION;
49use crate::bpf_intf::stat_idx_P2DQ_STAT_SELECT_PICK2;
50use crate::bpf_intf::stat_idx_P2DQ_STAT_WAKE_LLC;
51use crate::bpf_intf::stat_idx_P2DQ_STAT_WAKE_MIG;
52use crate::bpf_intf::stat_idx_P2DQ_STAT_WAKE_PREV;
53
54/// scx_p2dq: A pick 2 dumb queuing load balancing scheduler.
55///
56/// The BPF part does simple vtime or round robin scheduling in each domain
57/// while tracking average load of each domain and duty cycle of each task.
58///
59#[derive(Debug, Parser)]
60struct CliOpts {
61    /// Enable verbose output, including libbpf details. Specify multiple
62    /// times to increase verbosity.
63    #[clap(short = 'v', long, action = clap::ArgAction::Count)]
64    pub verbose: u8,
65
66    /// Enable stats monitoring with the specified interval.
67    #[clap(long)]
68    pub stats: Option<f64>,
69
70    /// Run in stats monitoring mode with the specified interval. Scheduler
71    /// is not launched.
72    #[clap(long)]
73    pub monitor: Option<f64>,
74
75    /// Print version and exit.
76    #[clap(long)]
77    pub version: bool,
78
79    #[clap(flatten)]
80    pub sched: SchedulerOpts,
81}
82
83struct Scheduler<'a> {
84    skel: BpfSkel<'a>,
85    struct_ops: Option<libbpf_rs::Link>,
86
87    stats_server: StatsServer<(), Metrics>,
88}
89
90impl<'a> Scheduler<'a> {
91    fn init(
92        opts: &SchedulerOpts,
93        open_object: &'a mut MaybeUninit<OpenObject>,
94        verbose: u8,
95    ) -> Result<Self> {
96        // Open the BPF prog first for verification.
97        let mut skel_builder = BpfSkelBuilder::default();
98        skel_builder.obj_builder.debug(verbose > 1);
99        init_libbpf_logging(None);
100        info!(
101            "Running scx_p2dq (build ID: {})",
102            build_id::full_version(env!("CARGO_PKG_VERSION"))
103        );
104        let mut open_skel = scx_ops_open!(skel_builder, open_object, p2dq).unwrap();
105        scx_p2dq::init_open_skel!(&mut open_skel, opts, verbose)?;
106
107        match *compat::SCX_OPS_ALLOW_QUEUED_WAKEUP {
108            0 => info!("Kernel does not support queued wakeup optimization."),
109            v => open_skel.struct_ops.p2dq_mut().flags |= v,
110        };
111
112        let mut skel = scx_ops_load!(open_skel, p2dq, uei)?;
113        scx_p2dq::init_skel!(&mut skel);
114
115        let struct_ops = Some(scx_ops_attach!(skel, p2dq)?);
116
117        let stats_server = StatsServer::new(stats::server_data()).launch()?;
118
119        info!("P2DQ scheduler started! Run `scx_p2dq --monitor` for metrics.");
120
121        Ok(Self {
122            skel,
123            struct_ops,
124            stats_server,
125        })
126    }
127
128    fn get_metrics(&self) -> Metrics {
129        let mut stats = vec![0u64; stat_idx_P2DQ_NR_STATS as usize];
130        let stats_map = &self.skel.maps.stats;
131        for stat in 0..stat_idx_P2DQ_NR_STATS {
132            let cpu_stat_vec: Vec<Vec<u8>> = stats_map
133                .lookup_percpu(&stat.to_ne_bytes(), libbpf_rs::MapFlags::ANY)
134                .unwrap()
135                .unwrap();
136            let sum: u64 = cpu_stat_vec
137                .iter()
138                .map(|val| u64::from_ne_bytes(val.as_slice().try_into().unwrap()))
139                .sum();
140            stats[stat as usize] = sum;
141        }
142        Metrics {
143            direct: stats[stat_idx_P2DQ_STAT_DIRECT as usize],
144            idle: stats[stat_idx_P2DQ_STAT_IDLE as usize],
145            sched_mode: self.skel.maps.bss_data.sched_mode,
146            dsq_change: stats[stat_idx_P2DQ_STAT_DSQ_CHANGE as usize],
147            same_dsq: stats[stat_idx_P2DQ_STAT_DSQ_SAME as usize],
148            keep: stats[stat_idx_P2DQ_STAT_KEEP as usize],
149            select_pick2: stats[stat_idx_P2DQ_STAT_SELECT_PICK2 as usize],
150            dispatch_pick2: stats[stat_idx_P2DQ_STAT_DISPATCH_PICK2 as usize],
151            llc_migrations: stats[stat_idx_P2DQ_STAT_LLC_MIGRATION as usize],
152            node_migrations: stats[stat_idx_P2DQ_STAT_NODE_MIGRATION as usize],
153            wake_prev: stats[stat_idx_P2DQ_STAT_WAKE_PREV as usize],
154            wake_llc: stats[stat_idx_P2DQ_STAT_WAKE_LLC as usize],
155            wake_mig: stats[stat_idx_P2DQ_STAT_WAKE_MIG as usize],
156        }
157    }
158
159    fn run(&mut self, shutdown: Arc<AtomicBool>) -> Result<UserExitInfo> {
160        let (res_ch, req_ch) = self.stats_server.channels();
161
162        while !shutdown.load(Ordering::Relaxed) && !uei_exited!(&self.skel, uei) {
163            match req_ch.recv_timeout(Duration::from_secs(1)) {
164                Ok(()) => res_ch.send(self.get_metrics())?,
165                Err(RecvTimeoutError::Timeout) => {}
166                Err(e) => Err(e)?,
167            }
168        }
169
170        self.struct_ops.take();
171        uei_report!(&self.skel, uei)
172    }
173}
174
175impl Drop for Scheduler<'_> {
176    fn drop(&mut self) {
177        if let Some(struct_ops) = self.struct_ops.take() {
178            drop(struct_ops);
179        }
180    }
181}
182
183fn main() -> Result<()> {
184    let opts = CliOpts::parse();
185
186    if opts.version {
187        println!(
188            "scx_p2dq: {}",
189            build_id::full_version(env!("CARGO_PKG_VERSION"))
190        );
191        return Ok(());
192    }
193
194    let llv = match opts.verbose {
195        0 => simplelog::LevelFilter::Info,
196        1 => simplelog::LevelFilter::Debug,
197        _ => simplelog::LevelFilter::Trace,
198    };
199    let mut lcfg = simplelog::ConfigBuilder::new();
200    lcfg.set_time_level(simplelog::LevelFilter::Error)
201        .set_location_level(simplelog::LevelFilter::Off)
202        .set_target_level(simplelog::LevelFilter::Off)
203        .set_thread_level(simplelog::LevelFilter::Off);
204    simplelog::TermLogger::init(
205        llv,
206        lcfg.build(),
207        simplelog::TerminalMode::Stderr,
208        simplelog::ColorChoice::Auto,
209    )?;
210
211    let shutdown = Arc::new(AtomicBool::new(false));
212    let shutdown_clone = shutdown.clone();
213    ctrlc::set_handler(move || {
214        shutdown_clone.store(true, Ordering::Relaxed);
215    })
216    .context("Error setting Ctrl-C handler")?;
217
218    if let Some(intv) = opts.monitor.or(opts.stats) {
219        let shutdown_copy = shutdown.clone();
220        let jh = std::thread::spawn(move || {
221            match stats::monitor(Duration::from_secs_f64(intv), shutdown_copy) {
222                Ok(_) => {
223                    debug!("stats monitor thread finished successfully")
224                }
225                Err(error_object) => {
226                    warn!(
227                        "stats monitor thread finished because of an error {}",
228                        error_object
229                    )
230                }
231            }
232        });
233        if opts.monitor.is_some() {
234            let _ = jh.join();
235            return Ok(());
236        }
237    }
238
239    if let Some(idle_resume_us) = opts.sched.idle_resume_us {
240        if !cpu_idle_resume_latency_supported() {
241            warn!("idle resume latency not supported");
242        } else {
243            if idle_resume_us > 0 {
244                info!("Setting idle QoS to {}us", idle_resume_us);
245                for cpu in TOPO.all_cpus.values() {
246                    update_cpu_idle_resume_latency(cpu.id, idle_resume_us.try_into().unwrap())?;
247                }
248            }
249        }
250    }
251
252    let mut open_object = MaybeUninit::uninit();
253    loop {
254        let mut sched = Scheduler::init(&opts.sched, &mut open_object, opts.verbose)?;
255        if !sched.run(shutdown.clone())?.should_restart() {
256            break;
257        }
258    }
259    Ok(())
260}