Skip to main content

scx_p2dq/
main.rs

1// Copyright (c) Meta Platforms, Inc. and affiliates.
2
3// This software may be used and distributed according to the terms of the
4// GNU General Public License version 2.
5pub mod stats;
6use stats::Metrics;
7
8use std::mem::MaybeUninit;
9use std::sync::atomic::AtomicBool;
10use std::sync::atomic::Ordering;
11use std::sync::Arc;
12use std::time::Duration;
13
14use anyhow::bail;
15use anyhow::Context;
16use anyhow::Result;
17use clap::Parser;
18use crossbeam::channel::RecvTimeoutError;
19use libbpf_rs::skel::Skel;
20use libbpf_rs::AsRawLibbpf;
21use libbpf_rs::MapCore as _;
22use libbpf_rs::OpenObject;
23use libbpf_rs::ProgramInput;
24use scx_arena::ArenaLib;
25use scx_stats::prelude::*;
26use scx_utils::build_id;
27use scx_utils::compat;
28use scx_utils::init_libbpf_logging;
29use scx_utils::libbpf_clap_opts::LibbpfOpts;
30use scx_utils::pm::{
31    cpu_idle_resume_latency_supported, epp_supported, for_each_uncore_domain, get_epp,
32    get_turbo_enabled, get_uncore_max_freq_khz, get_uncore_min_freq_khz, set_epp,
33    set_turbo_enabled, set_uncore_max_freq_khz, turbo_supported, uncore_freq_supported,
34    update_cpu_idle_resume_latency,
35};
36use scx_utils::scx_ops_attach;
37use scx_utils::scx_ops_load;
38use scx_utils::scx_ops_open;
39use scx_utils::uei_exited;
40use scx_utils::uei_report;
41use scx_utils::Topology;
42use scx_utils::UserExitInfo;
43use scx_utils::NR_CPU_IDS;
44use tracing::{debug, info, warn};
45use tracing_subscriber::filter::EnvFilter;
46
47use bpf_intf::stat_idx_P2DQ_NR_STATS;
48use bpf_intf::stat_idx_P2DQ_STAT_ATQ_ENQ;
49use bpf_intf::stat_idx_P2DQ_STAT_ATQ_REENQ;
50use bpf_intf::stat_idx_P2DQ_STAT_DIRECT;
51use bpf_intf::stat_idx_P2DQ_STAT_DISPATCH_PICK2;
52use bpf_intf::stat_idx_P2DQ_STAT_DSQ_CHANGE;
53use bpf_intf::stat_idx_P2DQ_STAT_DSQ_SAME;
54use bpf_intf::stat_idx_P2DQ_STAT_EAS_BIG_SELECT;
55use bpf_intf::stat_idx_P2DQ_STAT_EAS_FALLBACK;
56use bpf_intf::stat_idx_P2DQ_STAT_EAS_LITTLE_SELECT;
57use bpf_intf::stat_idx_P2DQ_STAT_ENQ_CPU;
58use bpf_intf::stat_idx_P2DQ_STAT_ENQ_INTR;
59use bpf_intf::stat_idx_P2DQ_STAT_ENQ_LLC;
60use bpf_intf::stat_idx_P2DQ_STAT_ENQ_MIG;
61use bpf_intf::stat_idx_P2DQ_STAT_EXEC_BALANCE;
62use bpf_intf::stat_idx_P2DQ_STAT_EXEC_SAME_LLC;
63use bpf_intf::stat_idx_P2DQ_STAT_FORK_BALANCE;
64use bpf_intf::stat_idx_P2DQ_STAT_FORK_SAME_LLC;
65use bpf_intf::stat_idx_P2DQ_STAT_IDLE;
66use bpf_intf::stat_idx_P2DQ_STAT_KEEP;
67use bpf_intf::stat_idx_P2DQ_STAT_LLC_MIGRATION;
68use bpf_intf::stat_idx_P2DQ_STAT_NODE_MIGRATION;
69use bpf_intf::stat_idx_P2DQ_STAT_SELECT_PICK2;
70use bpf_intf::stat_idx_P2DQ_STAT_THERMAL_AVOID;
71use bpf_intf::stat_idx_P2DQ_STAT_THERMAL_KICK;
72use bpf_intf::stat_idx_P2DQ_STAT_WAKE_LLC;
73use bpf_intf::stat_idx_P2DQ_STAT_WAKE_MIG;
74use bpf_intf::stat_idx_P2DQ_STAT_WAKE_PREV;
75use scx_p2dq::bpf_intf;
76use scx_p2dq::bpf_skel::*;
77use scx_p2dq::SchedulerOpts;
78use scx_p2dq::TOPO;
79
80const SCHEDULER_NAME: &str = "scx_p2dq";
81/// scx_p2dq: A pick 2 dumb queuing load balancing scheduler.
82///
83/// The BPF part does simple vtime or round robin scheduling in each domain
84/// while tracking average load of each domain and duty cycle of each task.
85///
86#[derive(Debug, Parser)]
87struct CliOpts {
88    /// Deprecated, noop, use RUST_LOG or --log-level instead.
89    #[clap(short = 'v', long, action = clap::ArgAction::Count)]
90    verbose: u8,
91
92    /// Specify the logging level. Accepts rust's envfilter syntax for modular
93    /// logging: https://docs.rs/tracing-subscriber/latest/tracing_subscriber/filter/struct.EnvFilter.html#example-syntax. Examples: ["info", "warn,tokio=info"]
94    #[clap(long, default_value = "info")]
95    pub log_level: String,
96
97    /// Enable stats monitoring with the specified interval.
98    #[clap(long)]
99    pub stats: Option<f64>,
100
101    /// Run in stats monitoring mode with the specified interval. Scheduler
102    /// is not launched.
103    #[clap(long)]
104    pub monitor: Option<f64>,
105
106    /// Print version and exit.
107    #[clap(long)]
108    pub version: bool,
109
110    /// Optional run ID for tracking scheduler instances.
111    #[clap(long)]
112    pub run_id: Option<u64>,
113
114    #[clap(flatten)]
115    pub sched: SchedulerOpts,
116
117    #[clap(flatten, next_help_heading = "Libbpf Options")]
118    pub libbpf: LibbpfOpts,
119}
120
121struct Scheduler<'a> {
122    skel: BpfSkel<'a>,
123    struct_ops: Option<libbpf_rs::Link>,
124    debug_level: u8,
125
126    stats_server: StatsServer<(), Metrics>,
127}
128
129impl<'a> Scheduler<'a> {
130    fn init(
131        opts: &SchedulerOpts,
132        libbpf_ops: &LibbpfOpts,
133        open_object: &'a mut MaybeUninit<OpenObject>,
134        log_level: &str,
135    ) -> Result<Self> {
136        // Open the BPF prog first for verification.
137        let debug_level = if log_level.contains("trace") {
138            2
139        } else if log_level.contains("debug") {
140            1
141        } else {
142            0
143        };
144        let mut skel_builder = BpfSkelBuilder::default();
145        skel_builder.obj_builder.debug(debug_level > 1);
146        init_libbpf_logging(None);
147        info!(
148            "Running scx_p2dq (build ID: {})",
149            build_id::full_version(env!("CARGO_PKG_VERSION"))
150        );
151        let topo = if opts.virt_llc_enabled {
152            Topology::with_args(&opts.topo)?
153        } else {
154            Topology::new()?
155        };
156        let open_opts = libbpf_ops.clone().into_bpf_open_opts();
157        let mut open_skel = scx_ops_open!(skel_builder, open_object, p2dq, open_opts).context(
158            "Failed to open BPF object. This can be caused by a mismatch between the kernel \
159            version and the BPF object, permission or other libbpf issues. Try running `dmesg \
160            | grep bpf` to see if there are any error messages related to the BPF object. See \
161            the LibbpfOptions section in the help for more information on configuration related \
162            to this issue or file an issue on the scx repo if the problem persists. \
163            https://github.com/sched-ext/scx/issues/new?labels=scx_p2dq&title=scx_p2dq:%20New%20Issue&assignees=hodgesds&body=Kernel%20version:%20(fill%20me%20out)%0ADistribution:%20(fill%20me%20out)%0AHardware:%20(fill%20me%20out)%0A%0AIssue:%20(fill%20me%20out)"
164        )?;
165
166        // Disable autoload for thermal pressure tracepoint by default
167        // Will be conditionally enabled if kernel supports it
168        // Note: This tracepoint only exists on ARM/ARM64 architectures
169        #[cfg(any(target_arch = "aarch64", target_arch = "arm"))]
170        open_skel.progs.on_thermal_pressure.set_autoload(false);
171
172        // Apply hardware-specific optimizations before macro
173        let hw_profile = scx_p2dq::HardwareProfile::detect();
174        let mut opts_optimized = opts.clone();
175        if opts.hw_auto_optimize {
176            hw_profile.optimize_scheduler_opts(&mut opts_optimized);
177        }
178
179        scx_p2dq::init_open_skel!(
180            &mut open_skel,
181            topo,
182            &opts_optimized,
183            debug_level,
184            &hw_profile
185        )?;
186
187        // Thermal pressure tracking (ARM/ARM64 only)
188        #[cfg(any(target_arch = "aarch64", target_arch = "arm"))]
189        {
190            let thermal_enabled = std::path::Path::new(
191                "/sys/kernel/tracing/events/thermal_pressure/hw_pressure_update",
192            )
193            .exists()
194                || std::path::Path::new(
195                    "/sys/kernel/debug/tracing/events/thermal_pressure/hw_pressure_update",
196                )
197                .exists();
198
199            if thermal_enabled {
200                debug!(
201                    "Kernel supports thermal pressure tracking, enabling hw_pressure_update tracepoint"
202                );
203                open_skel.progs.on_thermal_pressure.set_autoload(true);
204                stats::set_thermal_tracking_enabled(true);
205
206                open_skel
207                    .maps
208                    .rodata_data
209                    .as_mut()
210                    .unwrap()
211                    .p2dq_config
212                    .thermal_enabled = std::mem::MaybeUninit::new(true);
213            } else {
214                debug!("Kernel does not support thermal pressure tracking (CONFIG_SCHED_HW_PRESSURE not enabled)");
215            }
216        }
217
218        if opts_optimized.enable_eas {
219            stats::set_eas_enabled(true);
220        }
221
222        if opts_optimized.atq_enabled && compat::ksym_exists("bpf_spin_unlock").unwrap_or(false) {
223            stats::set_atq_enabled(true);
224        }
225
226        if opts.queued_wakeup {
227            open_skel.struct_ops.p2dq_mut().flags |= *compat::SCX_OPS_ALLOW_QUEUED_WAKEUP;
228        }
229        open_skel.struct_ops.p2dq_mut().flags |= *compat::SCX_OPS_KEEP_BUILTIN_IDLE;
230
231        // Disable autoattach for the struct_ops map since we attach it manually via
232        // attach_struct_ops() in scx_ops_attach!(). This prevents libbpf from warning
233        // about uninitialized skeleton link during attach().
234        unsafe {
235            libbpf_rs::libbpf_sys::bpf_map__set_autoattach(
236                open_skel.maps.p2dq.as_libbpf_object().as_ptr(),
237                false,
238            );
239        }
240
241        let mut skel = scx_ops_load!(open_skel, p2dq, uei)?;
242        scx_p2dq::init_skel!(&mut skel, topo);
243
244        let stats_server = StatsServer::new(stats::server_data()).launch()?;
245
246        Ok(Self {
247            skel,
248            struct_ops: None,
249            debug_level,
250            stats_server,
251        })
252    }
253
254    fn get_metrics(&self) -> Metrics {
255        let mut stats = vec![0u64; stat_idx_P2DQ_NR_STATS as usize];
256        let stats_map = &self.skel.maps.stats;
257        for stat in 0..stat_idx_P2DQ_NR_STATS {
258            let cpu_stat_vec: Vec<Vec<u8>> = stats_map
259                .lookup_percpu(&stat.to_ne_bytes(), libbpf_rs::MapFlags::ANY)
260                .unwrap()
261                .unwrap();
262            let sum: u64 = cpu_stat_vec
263                .iter()
264                .map(|val| u64::from_ne_bytes(val.as_slice().try_into().unwrap()))
265                .sum();
266            stats[stat as usize] = sum;
267        }
268        Metrics {
269            atq_enq: stats[stat_idx_P2DQ_STAT_ATQ_ENQ as usize],
270            atq_reenq: stats[stat_idx_P2DQ_STAT_ATQ_REENQ as usize],
271            direct: stats[stat_idx_P2DQ_STAT_DIRECT as usize],
272            idle: stats[stat_idx_P2DQ_STAT_IDLE as usize],
273            dsq_change: stats[stat_idx_P2DQ_STAT_DSQ_CHANGE as usize],
274            same_dsq: stats[stat_idx_P2DQ_STAT_DSQ_SAME as usize],
275            keep: stats[stat_idx_P2DQ_STAT_KEEP as usize],
276            enq_cpu: stats[stat_idx_P2DQ_STAT_ENQ_CPU as usize],
277            enq_intr: stats[stat_idx_P2DQ_STAT_ENQ_INTR as usize],
278            enq_llc: stats[stat_idx_P2DQ_STAT_ENQ_LLC as usize],
279            enq_mig: stats[stat_idx_P2DQ_STAT_ENQ_MIG as usize],
280            select_pick2: stats[stat_idx_P2DQ_STAT_SELECT_PICK2 as usize],
281            dispatch_pick2: stats[stat_idx_P2DQ_STAT_DISPATCH_PICK2 as usize],
282            llc_migrations: stats[stat_idx_P2DQ_STAT_LLC_MIGRATION as usize],
283            node_migrations: stats[stat_idx_P2DQ_STAT_NODE_MIGRATION as usize],
284            wake_prev: stats[stat_idx_P2DQ_STAT_WAKE_PREV as usize],
285            wake_llc: stats[stat_idx_P2DQ_STAT_WAKE_LLC as usize],
286            wake_mig: stats[stat_idx_P2DQ_STAT_WAKE_MIG as usize],
287            fork_balance: stats[stat_idx_P2DQ_STAT_FORK_BALANCE as usize],
288            exec_balance: stats[stat_idx_P2DQ_STAT_EXEC_BALANCE as usize],
289            fork_same_llc: stats[stat_idx_P2DQ_STAT_FORK_SAME_LLC as usize],
290            exec_same_llc: stats[stat_idx_P2DQ_STAT_EXEC_SAME_LLC as usize],
291            thermal_kick: stats[stat_idx_P2DQ_STAT_THERMAL_KICK as usize],
292            thermal_avoid: stats[stat_idx_P2DQ_STAT_THERMAL_AVOID as usize],
293            eas_little_select: stats[stat_idx_P2DQ_STAT_EAS_LITTLE_SELECT as usize],
294            eas_big_select: stats[stat_idx_P2DQ_STAT_EAS_BIG_SELECT as usize],
295            eas_fallback: stats[stat_idx_P2DQ_STAT_EAS_FALLBACK as usize],
296        }
297    }
298
299    fn run(&mut self, shutdown: Arc<AtomicBool>) -> Result<UserExitInfo> {
300        let (res_ch, req_ch) = self.stats_server.channels();
301
302        while !shutdown.load(Ordering::Relaxed) && !uei_exited!(&self.skel, uei) {
303            match req_ch.recv_timeout(Duration::from_secs(1)) {
304                Ok(()) => res_ch.send(self.get_metrics())?,
305                Err(RecvTimeoutError::Timeout) => {}
306                Err(e) => Err(e)?,
307            }
308        }
309
310        let _ = self.struct_ops.take();
311        uei_report!(&self.skel, uei)
312    }
313
314    fn print_topology(&mut self) -> Result<()> {
315        let input = ProgramInput {
316            ..Default::default()
317        };
318
319        let output = self.skel.progs.arena_topology_print.test_run(input)?;
320        if output.return_value != 0 {
321            bail!(
322                "Could not initialize arenas, topo_print returned {}",
323                output.return_value as i32
324            );
325        }
326
327        Ok(())
328    }
329
330    fn start(&mut self) -> Result<()> {
331        self.struct_ops = Some(scx_ops_attach!(self.skel, p2dq)?);
332
333        if self.debug_level > 0 {
334            self.print_topology()?;
335        }
336
337        info!("P2DQ scheduler started! Run `scx_p2dq --monitor` for metrics.");
338
339        Ok(())
340    }
341}
342
343impl Drop for Scheduler<'_> {
344    fn drop(&mut self) {
345        info!("Unregister {SCHEDULER_NAME} scheduler");
346
347        if let Some(struct_ops) = self.struct_ops.take() {
348            drop(struct_ops);
349        }
350    }
351}
352
353#[clap_main::clap_main]
354fn main(opts: CliOpts) -> Result<()> {
355    if opts.version {
356        println!(
357            "scx_p2dq: {}",
358            build_id::full_version(env!("CARGO_PKG_VERSION"))
359        );
360        return Ok(());
361    }
362
363    let env_filter = EnvFilter::try_from_default_env()
364        .or_else(|_| match EnvFilter::try_new(&opts.log_level) {
365            Ok(filter) => Ok(filter),
366            Err(e) => {
367                eprintln!(
368                    "invalid log envvar: {}, using info, err is: {}",
369                    opts.log_level, e
370                );
371                EnvFilter::try_new("info")
372            }
373        })
374        .unwrap_or_else(|_| EnvFilter::new("info"));
375
376    match tracing_subscriber::fmt()
377        .with_env_filter(env_filter)
378        .with_target(true)
379        .with_thread_ids(true)
380        .with_file(true)
381        .with_line_number(true)
382        .try_init()
383    {
384        Ok(()) => {}
385        Err(e) => eprintln!("failed to init logger: {}", e),
386    }
387
388    if opts.verbose > 0 {
389        warn!("Setting verbose via -v is deprecated and will be an error in future releases.");
390    }
391
392    if let Some(run_id) = opts.run_id {
393        info!("scx_p2dq run_id: {}", run_id);
394    }
395
396    let shutdown = Arc::new(AtomicBool::new(false));
397    let shutdown_clone = shutdown.clone();
398    ctrlc::set_handler(move || {
399        shutdown_clone.store(true, Ordering::Relaxed);
400    })
401    .context("Error setting Ctrl-C handler")?;
402
403    if let Some(intv) = opts.monitor.or(opts.stats) {
404        let shutdown_copy = shutdown.clone();
405        let jh = std::thread::spawn(move || {
406            match stats::monitor(Duration::from_secs_f64(intv), shutdown_copy) {
407                Ok(_) => {
408                    debug!("stats monitor thread finished successfully")
409                }
410                Err(error_object) => {
411                    warn!("stats monitor thread finished because of an error {error_object}")
412                }
413            }
414        });
415        if opts.monitor.is_some() {
416            let _ = jh.join();
417            return Ok(());
418        }
419    }
420
421    if let Some(idle_resume_us) = opts.sched.idle_resume_us {
422        if !cpu_idle_resume_latency_supported() {
423            warn!("idle resume latency not supported");
424        } else if idle_resume_us > 0 {
425            info!("Setting idle QoS to {idle_resume_us}us");
426            for cpu in TOPO.all_cpus.values() {
427                update_cpu_idle_resume_latency(cpu.id, idle_resume_us.try_into().unwrap())?;
428            }
429        }
430    }
431
432    let is_efficiency = opts.sched.sched_mode == scx_p2dq::SchedMode::Efficiency;
433    let is_performance = opts.sched.sched_mode == scx_p2dq::SchedMode::Performance;
434
435    let mut orig_uncore_freqs: Vec<(u32, u32, u32)> = Vec::new();
436    if opts.sched.uncore_max_freq_mhz.is_some() || is_efficiency || is_performance {
437        if !uncore_freq_supported() {
438            if opts.sched.uncore_max_freq_mhz.is_some() {
439                warn!("uncore frequency control not supported");
440            }
441        } else {
442            let _ = for_each_uncore_domain(|pkg, die| {
443                let freq_khz = if let Some(mhz) = opts.sched.uncore_max_freq_mhz {
444                    mhz * 1000
445                } else if is_efficiency {
446                    get_uncore_min_freq_khz(pkg, die)?
447                } else {
448                    get_uncore_max_freq_khz(pkg, die)?
449                };
450                if let Ok(orig) = get_uncore_max_freq_khz(pkg, die) {
451                    if orig != freq_khz {
452                        info!(
453                            "Setting max uncore frequency for package {} die {} to {} MHz",
454                            pkg,
455                            die,
456                            freq_khz / 1000
457                        );
458                        orig_uncore_freqs.push((pkg, die, orig));
459                        set_uncore_max_freq_khz(pkg, die, freq_khz)?;
460                    }
461                }
462                Ok(())
463            });
464        }
465    }
466
467    let mut orig_epps: Vec<(usize, String)> = Vec::new();
468    if (is_efficiency || is_performance) && epp_supported() {
469        let target_epp = if is_efficiency {
470            "power"
471        } else {
472            "performance"
473        };
474        for cpu in TOPO.all_cpus.values() {
475            if let Ok(orig) = get_epp(cpu.id) {
476                if orig != target_epp {
477                    if orig_epps.is_empty() {
478                        info!("Setting EPP to {} for all CPUs", target_epp);
479                    }
480                    orig_epps.push((cpu.id, orig));
481                    let _ = set_epp(cpu.id, target_epp);
482                }
483            }
484        }
485    }
486
487    let orig_turbo = if turbo_supported() {
488        let target_turbo = opts.sched.turbo.or(if is_efficiency {
489            Some(false)
490        } else if is_performance {
491            Some(true)
492        } else {
493            None
494        });
495        if let Some(want_enabled) = target_turbo {
496            if let Ok(current) = get_turbo_enabled() {
497                if current != want_enabled {
498                    let mode_suffix = if opts.sched.turbo.is_none() {
499                        if is_efficiency {
500                            " for efficiency mode"
501                        } else {
502                            " for performance mode"
503                        }
504                    } else {
505                        ""
506                    };
507                    info!(
508                        "{} turbo{}",
509                        if want_enabled {
510                            "Enabling"
511                        } else {
512                            "Disabling"
513                        },
514                        mode_suffix
515                    );
516                    let _ = set_turbo_enabled(want_enabled);
517                    Some(current)
518                } else {
519                    None
520                }
521            } else {
522                None
523            }
524        } else {
525            None
526        }
527    } else {
528        if opts.sched.turbo.is_some() {
529            warn!("turbo control not supported");
530        }
531        None
532    };
533
534    let mut open_object = MaybeUninit::uninit();
535    loop {
536        let mut sched =
537            Scheduler::init(&opts.sched, &opts.libbpf, &mut open_object, &opts.log_level)?;
538        let task_size = std::mem::size_of::<types::task_p2dq>();
539        let arenalib = ArenaLib::init(sched.skel.object_mut(), task_size, *NR_CPU_IDS)?;
540        arenalib.setup()?;
541
542        sched.start()?;
543
544        if !sched.run(shutdown.clone())?.should_restart() {
545            break;
546        }
547    }
548
549    if let Some(was_enabled) = orig_turbo {
550        info!(
551            "Restoring turbo to {}",
552            if was_enabled { "enabled" } else { "disabled" }
553        );
554        let _ = set_turbo_enabled(was_enabled);
555    }
556
557    if !orig_epps.is_empty() {
558        info!("Restoring EPP settings");
559        for (cpu, epp) in orig_epps {
560            let _ = set_epp(cpu, &epp);
561        }
562    }
563
564    for (pkg, die, orig_khz) in orig_uncore_freqs {
565        info!(
566            "Restoring uncore frequency for package {} die {} to {} MHz",
567            pkg,
568            die,
569            orig_khz / 1000
570        );
571        let _ = set_uncore_max_freq_khz(pkg, die, orig_khz);
572    }
573
574    Ok(())
575}