scx_p2dq/
main.rs

1// Copyright (c) Meta Platforms, Inc. and affiliates.
2
3// This software may be used and distributed according to the terms of the
4// GNU General Public License version 2.
5pub mod stats;
6use stats::Metrics;
7
8use std::mem::MaybeUninit;
9use std::sync::atomic::AtomicBool;
10use std::sync::atomic::Ordering;
11use std::sync::Arc;
12use std::time::Duration;
13
14use anyhow::bail;
15use anyhow::Context;
16use anyhow::Result;
17use clap::Parser;
18use crossbeam::channel::RecvTimeoutError;
19use libbpf_rs::skel::Skel;
20use libbpf_rs::AsRawLibbpf;
21use libbpf_rs::MapCore as _;
22use libbpf_rs::OpenObject;
23use libbpf_rs::ProgramInput;
24use scx_arena::ArenaLib;
25use scx_stats::prelude::*;
26use scx_utils::build_id;
27use scx_utils::compat;
28use scx_utils::init_libbpf_logging;
29use scx_utils::libbpf_clap_opts::LibbpfOpts;
30use scx_utils::pm::{cpu_idle_resume_latency_supported, update_cpu_idle_resume_latency};
31use scx_utils::scx_ops_attach;
32use scx_utils::scx_ops_load;
33use scx_utils::scx_ops_open;
34use scx_utils::uei_exited;
35use scx_utils::uei_report;
36use scx_utils::Topology;
37use scx_utils::UserExitInfo;
38use scx_utils::NR_CPU_IDS;
39use tracing::{debug, info, warn};
40use tracing_subscriber::filter::EnvFilter;
41
42use bpf_intf::stat_idx_P2DQ_NR_STATS;
43use bpf_intf::stat_idx_P2DQ_STAT_ATQ_ENQ;
44use bpf_intf::stat_idx_P2DQ_STAT_ATQ_REENQ;
45use bpf_intf::stat_idx_P2DQ_STAT_DIRECT;
46use bpf_intf::stat_idx_P2DQ_STAT_DISPATCH_PICK2;
47use bpf_intf::stat_idx_P2DQ_STAT_DSQ_CHANGE;
48use bpf_intf::stat_idx_P2DQ_STAT_DSQ_SAME;
49use bpf_intf::stat_idx_P2DQ_STAT_ENQ_CPU;
50use bpf_intf::stat_idx_P2DQ_STAT_ENQ_INTR;
51use bpf_intf::stat_idx_P2DQ_STAT_ENQ_LLC;
52use bpf_intf::stat_idx_P2DQ_STAT_ENQ_MIG;
53use bpf_intf::stat_idx_P2DQ_STAT_EXEC_BALANCE;
54use bpf_intf::stat_idx_P2DQ_STAT_EXEC_SAME_LLC;
55use bpf_intf::stat_idx_P2DQ_STAT_FORK_BALANCE;
56use bpf_intf::stat_idx_P2DQ_STAT_FORK_SAME_LLC;
57use bpf_intf::stat_idx_P2DQ_STAT_IDLE;
58use bpf_intf::stat_idx_P2DQ_STAT_KEEP;
59use bpf_intf::stat_idx_P2DQ_STAT_LLC_MIGRATION;
60use bpf_intf::stat_idx_P2DQ_STAT_NODE_MIGRATION;
61use bpf_intf::stat_idx_P2DQ_STAT_SELECT_PICK2;
62use bpf_intf::stat_idx_P2DQ_STAT_WAKE_LLC;
63use bpf_intf::stat_idx_P2DQ_STAT_WAKE_MIG;
64use bpf_intf::stat_idx_P2DQ_STAT_WAKE_PREV;
65use scx_p2dq::bpf_intf;
66use scx_p2dq::bpf_skel::*;
67use scx_p2dq::SchedulerOpts;
68use scx_p2dq::TOPO;
69
70const SCHEDULER_NAME: &str = "scx_p2dq";
71/// scx_p2dq: A pick 2 dumb queuing load balancing scheduler.
72///
73/// The BPF part does simple vtime or round robin scheduling in each domain
74/// while tracking average load of each domain and duty cycle of each task.
75///
76#[derive(Debug, Parser)]
77struct CliOpts {
78    /// Depricated, noop, use RUST_LOG or --log-level instead.
79    #[clap(short = 'v', long, action = clap::ArgAction::Count)]
80    verbose: u8,
81
82    /// Specify the logging level. Accepts rust's envfilter syntax for modular
83    /// logging: https://docs.rs/tracing-subscriber/latest/tracing_subscriber/filter/struct.EnvFilter.html#example-syntax. Examples: ["info", "warn,tokio=info"]
84    #[clap(long, default_value = "info")]
85    pub log_level: String,
86
87    /// Enable stats monitoring with the specified interval.
88    #[clap(long)]
89    pub stats: Option<f64>,
90
91    /// Run in stats monitoring mode with the specified interval. Scheduler
92    /// is not launched.
93    #[clap(long)]
94    pub monitor: Option<f64>,
95
96    /// Print version and exit.
97    #[clap(long)]
98    pub version: bool,
99
100    /// Optional run ID for tracking scheduler instances.
101    #[clap(long)]
102    pub run_id: Option<u64>,
103
104    #[clap(flatten)]
105    pub sched: SchedulerOpts,
106
107    #[clap(flatten, next_help_heading = "Libbpf Options")]
108    pub libbpf: LibbpfOpts,
109}
110
111struct Scheduler<'a> {
112    skel: BpfSkel<'a>,
113    struct_ops: Option<libbpf_rs::Link>,
114    debug_level: u8,
115
116    stats_server: StatsServer<(), Metrics>,
117}
118
119impl<'a> Scheduler<'a> {
120    fn init(
121        opts: &SchedulerOpts,
122        libbpf_ops: &LibbpfOpts,
123        open_object: &'a mut MaybeUninit<OpenObject>,
124        log_level: &str,
125    ) -> Result<Self> {
126        // Open the BPF prog first for verification.
127        let debug_level = if log_level.contains("trace") {
128            2
129        } else if log_level.contains("debug") {
130            1
131        } else {
132            0
133        };
134        let mut skel_builder = BpfSkelBuilder::default();
135        skel_builder.obj_builder.debug(debug_level > 1);
136        init_libbpf_logging(None);
137        info!(
138            "Running scx_p2dq (build ID: {})",
139            build_id::full_version(env!("CARGO_PKG_VERSION"))
140        );
141        let topo = if opts.virt_llc_enabled {
142            Topology::with_args(&opts.topo)?
143        } else {
144            Topology::new()?
145        };
146        let open_opts = libbpf_ops.clone().into_bpf_open_opts();
147        let mut open_skel = scx_ops_open!(skel_builder, open_object, p2dq, open_opts).context(
148            "Failed to open BPF object. This can be caused by a mismatch between the kernel \
149            version and the BPF object, permission or other libbpf issues. Try running `dmesg \
150            | grep bpf` to see if there are any error messages related to the BPF object. See \
151            the LibbpfOptions section in the help for more information on configuration related \
152            to this issue or file an issue on the scx repo if the problem persists. \
153            https://github.com/sched-ext/scx/issues/new?labels=scx_p2dq&title=scx_p2dq:%20New%20Issue&assignees=hodgesds&body=Kernel%20version:%20(fill%20me%20out)%0ADistribution:%20(fill%20me%20out)%0AHardware:%20(fill%20me%20out)%0A%0AIssue:%20(fill%20me%20out)"
154        )?;
155
156        // Apply hardware-specific optimizations before macro
157        let hw_profile = scx_p2dq::HardwareProfile::detect();
158        let mut opts_optimized = opts.clone();
159        if opts.hw_auto_optimize {
160            hw_profile.optimize_scheduler_opts(&mut opts_optimized);
161        }
162
163        scx_p2dq::init_open_skel!(
164            &mut open_skel,
165            topo,
166            &opts_optimized,
167            debug_level,
168            &hw_profile
169        )?;
170
171        if opts.queued_wakeup {
172            open_skel.struct_ops.p2dq_mut().flags |= *compat::SCX_OPS_ALLOW_QUEUED_WAKEUP;
173        }
174        open_skel.struct_ops.p2dq_mut().flags |= *compat::SCX_OPS_KEEP_BUILTIN_IDLE;
175
176        // Disable autoattach for the struct_ops map since we attach it manually via
177        // attach_struct_ops() in scx_ops_attach!(). This prevents libbpf from warning
178        // about uninitialized skeleton link during attach().
179        unsafe {
180            libbpf_rs::libbpf_sys::bpf_map__set_autoattach(
181                open_skel.maps.p2dq.as_libbpf_object().as_ptr(),
182                false,
183            );
184        }
185
186        let mut skel = scx_ops_load!(open_skel, p2dq, uei)?;
187        scx_p2dq::init_skel!(&mut skel, topo);
188
189        let stats_server = StatsServer::new(stats::server_data()).launch()?;
190
191        Ok(Self {
192            skel,
193            struct_ops: None,
194            debug_level,
195            stats_server,
196        })
197    }
198
199    fn get_metrics(&self) -> Metrics {
200        let mut stats = vec![0u64; stat_idx_P2DQ_NR_STATS as usize];
201        let stats_map = &self.skel.maps.stats;
202        for stat in 0..stat_idx_P2DQ_NR_STATS {
203            let cpu_stat_vec: Vec<Vec<u8>> = stats_map
204                .lookup_percpu(&stat.to_ne_bytes(), libbpf_rs::MapFlags::ANY)
205                .unwrap()
206                .unwrap();
207            let sum: u64 = cpu_stat_vec
208                .iter()
209                .map(|val| u64::from_ne_bytes(val.as_slice().try_into().unwrap()))
210                .sum();
211            stats[stat as usize] = sum;
212        }
213        Metrics {
214            atq_enq: stats[stat_idx_P2DQ_STAT_ATQ_ENQ as usize],
215            atq_reenq: stats[stat_idx_P2DQ_STAT_ATQ_REENQ as usize],
216            direct: stats[stat_idx_P2DQ_STAT_DIRECT as usize],
217            idle: stats[stat_idx_P2DQ_STAT_IDLE as usize],
218            dsq_change: stats[stat_idx_P2DQ_STAT_DSQ_CHANGE as usize],
219            same_dsq: stats[stat_idx_P2DQ_STAT_DSQ_SAME as usize],
220            keep: stats[stat_idx_P2DQ_STAT_KEEP as usize],
221            enq_cpu: stats[stat_idx_P2DQ_STAT_ENQ_CPU as usize],
222            enq_intr: stats[stat_idx_P2DQ_STAT_ENQ_INTR as usize],
223            enq_llc: stats[stat_idx_P2DQ_STAT_ENQ_LLC as usize],
224            enq_mig: stats[stat_idx_P2DQ_STAT_ENQ_MIG as usize],
225            select_pick2: stats[stat_idx_P2DQ_STAT_SELECT_PICK2 as usize],
226            dispatch_pick2: stats[stat_idx_P2DQ_STAT_DISPATCH_PICK2 as usize],
227            llc_migrations: stats[stat_idx_P2DQ_STAT_LLC_MIGRATION as usize],
228            node_migrations: stats[stat_idx_P2DQ_STAT_NODE_MIGRATION as usize],
229            wake_prev: stats[stat_idx_P2DQ_STAT_WAKE_PREV as usize],
230            wake_llc: stats[stat_idx_P2DQ_STAT_WAKE_LLC as usize],
231            wake_mig: stats[stat_idx_P2DQ_STAT_WAKE_MIG as usize],
232            fork_balance: stats[stat_idx_P2DQ_STAT_FORK_BALANCE as usize],
233            exec_balance: stats[stat_idx_P2DQ_STAT_EXEC_BALANCE as usize],
234            fork_same_llc: stats[stat_idx_P2DQ_STAT_FORK_SAME_LLC as usize],
235            exec_same_llc: stats[stat_idx_P2DQ_STAT_EXEC_SAME_LLC as usize],
236        }
237    }
238
239    fn run(&mut self, shutdown: Arc<AtomicBool>) -> Result<UserExitInfo> {
240        let (res_ch, req_ch) = self.stats_server.channels();
241
242        while !shutdown.load(Ordering::Relaxed) && !uei_exited!(&self.skel, uei) {
243            match req_ch.recv_timeout(Duration::from_secs(1)) {
244                Ok(()) => res_ch.send(self.get_metrics())?,
245                Err(RecvTimeoutError::Timeout) => {}
246                Err(e) => Err(e)?,
247            }
248        }
249
250        let _ = self.struct_ops.take();
251        uei_report!(&self.skel, uei)
252    }
253
254    fn print_topology(&mut self) -> Result<()> {
255        let input = ProgramInput {
256            ..Default::default()
257        };
258
259        let output = self.skel.progs.arena_topology_print.test_run(input)?;
260        if output.return_value != 0 {
261            bail!(
262                "Could not initialize arenas, topo_print returned {}",
263                output.return_value as i32
264            );
265        }
266
267        Ok(())
268    }
269
270    fn start(&mut self) -> Result<()> {
271        self.struct_ops = Some(scx_ops_attach!(self.skel, p2dq)?);
272
273        if self.debug_level > 0 {
274            self.print_topology()?;
275        }
276
277        info!("P2DQ scheduler started! Run `scx_p2dq --monitor` for metrics.");
278
279        Ok(())
280    }
281}
282
283impl Drop for Scheduler<'_> {
284    fn drop(&mut self) {
285        info!("Unregister {SCHEDULER_NAME} scheduler");
286
287        if let Some(struct_ops) = self.struct_ops.take() {
288            drop(struct_ops);
289        }
290    }
291}
292
293#[clap_main::clap_main]
294fn main(opts: CliOpts) -> Result<()> {
295    if opts.version {
296        println!(
297            "scx_p2dq: {}",
298            build_id::full_version(env!("CARGO_PKG_VERSION"))
299        );
300        return Ok(());
301    }
302
303    let env_filter = EnvFilter::try_from_default_env()
304        .or_else(|_| match EnvFilter::try_new(&opts.log_level) {
305            Ok(filter) => Ok(filter),
306            Err(e) => {
307                eprintln!(
308                    "invalid log envvar: {}, using info, err is: {}",
309                    opts.log_level, e
310                );
311                EnvFilter::try_new("info")
312            }
313        })
314        .unwrap_or_else(|_| EnvFilter::new("info"));
315
316    match tracing_subscriber::fmt()
317        .with_env_filter(env_filter)
318        .with_target(true)
319        .with_thread_ids(true)
320        .with_file(true)
321        .with_line_number(true)
322        .try_init()
323    {
324        Ok(()) => {}
325        Err(e) => eprintln!("failed to init logger: {}", e),
326    }
327
328    if opts.verbose > 0 {
329        warn!("Setting verbose via -v is depricated and will be an error in future releases.");
330    }
331
332    if let Some(run_id) = opts.run_id {
333        info!("scx_p2dq run_id: {}", run_id);
334    }
335
336    let shutdown = Arc::new(AtomicBool::new(false));
337    let shutdown_clone = shutdown.clone();
338    ctrlc::set_handler(move || {
339        shutdown_clone.store(true, Ordering::Relaxed);
340    })
341    .context("Error setting Ctrl-C handler")?;
342
343    if let Some(intv) = opts.monitor.or(opts.stats) {
344        let shutdown_copy = shutdown.clone();
345        let jh = std::thread::spawn(move || {
346            match stats::monitor(Duration::from_secs_f64(intv), shutdown_copy) {
347                Ok(_) => {
348                    debug!("stats monitor thread finished successfully")
349                }
350                Err(error_object) => {
351                    warn!("stats monitor thread finished because of an error {error_object}")
352                }
353            }
354        });
355        if opts.monitor.is_some() {
356            let _ = jh.join();
357            return Ok(());
358        }
359    }
360
361    if let Some(idle_resume_us) = opts.sched.idle_resume_us {
362        if !cpu_idle_resume_latency_supported() {
363            warn!("idle resume latency not supported");
364        } else if idle_resume_us > 0 {
365            info!("Setting idle QoS to {idle_resume_us}us");
366            for cpu in TOPO.all_cpus.values() {
367                update_cpu_idle_resume_latency(cpu.id, idle_resume_us.try_into().unwrap())?;
368            }
369        }
370    }
371
372    let mut open_object = MaybeUninit::uninit();
373    loop {
374        let mut sched =
375            Scheduler::init(&opts.sched, &opts.libbpf, &mut open_object, &opts.log_level)?;
376        let task_size = std::mem::size_of::<types::task_p2dq>();
377        let arenalib = ArenaLib::init(sched.skel.object_mut(), task_size, *NR_CPU_IDS)?;
378        arenalib.setup()?;
379
380        sched.start()?;
381
382        if !sched.run(shutdown.clone())?.should_restart() {
383            break;
384        }
385    }
386    Ok(())
387}