scx_p2dq/
stats.rs

1use std::io::Write;
2use std::sync::atomic::AtomicBool;
3use std::sync::atomic::Ordering;
4use std::sync::Arc;
5use std::time::Duration;
6
7use anyhow::Result;
8use scx_stats::prelude::*;
9use scx_stats_derive::stat_doc;
10use scx_stats_derive::Stats;
11use serde::Deserialize;
12use serde::Serialize;
13
14#[stat_doc]
15#[derive(Clone, Debug, Default, Serialize, Deserialize, Stats)]
16#[stat(top)]
17pub struct Metrics {
18    #[stat(desc = "Number of times a task was enqueued to a ATQ")]
19    pub atq_enq: u64,
20    #[stat(desc = "Number of times a task was re-enqueued to a ATQ")]
21    pub atq_reenq: u64,
22    #[stat(desc = "Number of times tasks have switched DSQs")]
23    pub dsq_change: u64,
24    #[stat(desc = "Number of times tasks have stayed on the same DSQ")]
25    pub same_dsq: u64,
26    #[stat(desc = "Number of times a task kept running")]
27    pub keep: u64,
28    #[stat(desc = "Number of times a task was enqueued to CPUC DSQ")]
29    pub enq_cpu: u64,
30    #[stat(desc = "Number of times a task was enqueued to LLC DSQ")]
31    pub enq_llc: u64,
32    #[stat(desc = "Number of times a task was enqueued to interactive DSQ")]
33    pub enq_intr: u64,
34    #[stat(desc = "Number of times a task was enqueued to migration DSQ")]
35    pub enq_mig: u64,
36    #[stat(desc = "Number of times a select_cpu pick 2 load balancing occured")]
37    pub select_pick2: u64,
38    #[stat(desc = "Number of times a dispatch pick 2 load balancing occured")]
39    pub dispatch_pick2: u64,
40    #[stat(desc = "Number of times a task migrated LLCs")]
41    pub llc_migrations: u64,
42    #[stat(desc = "Number of times a task migrated NUMA nodes")]
43    pub node_migrations: u64,
44    #[stat(desc = "Number of times tasks have directly been dispatched to local per CPU DSQs")]
45    pub direct: u64,
46    #[stat(desc = "Number of times tasks have dispatched to an idle local per CPU DSQs")]
47    pub idle: u64,
48    #[stat(desc = "Number of times tasks have been woken to the previous CPU")]
49    pub wake_prev: u64,
50    #[stat(desc = "Number of times tasks have been woken to the previous llc")]
51    pub wake_llc: u64,
52    #[stat(desc = "Number of times tasks have been woken and migrated llc")]
53    pub wake_mig: u64,
54}
55
56impl Metrics {
57    fn format<W: Write>(&self, w: &mut W) -> Result<()> {
58        writeln!(
59            w,
60            "direct/idle/keep {}/{}/{}\n\tdsq same/migrate {}/{}\n\tatq enq/reenq {}/{}\n\tenq cpu/llc/intr/mig {}/{}/{}/{}",
61            self.direct,
62            self.idle,
63            self.keep,
64            self.same_dsq,
65            self.dsq_change,
66            self.atq_enq,
67            self.atq_reenq,
68            self.enq_cpu,
69            self.enq_llc,
70            self.enq_intr,
71            self.enq_mig,
72        )?;
73        writeln!(
74            w,
75            "\twake prev/llc/mig {}/{}/{}\n\tpick2 select/dispatch {}/{}\n\tmigrations llc/node: {}/{}",
76            self.wake_prev,
77            self.wake_llc,
78            self.wake_mig,
79            self.select_pick2,
80            self.dispatch_pick2,
81            self.llc_migrations,
82            self.node_migrations,
83        )?;
84        Ok(())
85    }
86
87    fn delta(&self, rhs: &Self) -> Self {
88        Self {
89            atq_enq: self.atq_enq - rhs.atq_enq,
90            atq_reenq: self.atq_reenq - rhs.atq_reenq,
91            direct: self.direct - rhs.direct,
92            idle: self.idle - rhs.idle,
93            dsq_change: self.dsq_change - rhs.dsq_change,
94            same_dsq: self.same_dsq - rhs.same_dsq,
95            keep: self.keep - rhs.keep,
96            enq_cpu: self.enq_cpu - rhs.enq_cpu,
97            enq_llc: self.enq_llc - rhs.enq_llc,
98            enq_intr: self.enq_intr - rhs.enq_intr,
99            enq_mig: self.enq_mig - rhs.enq_mig,
100            select_pick2: self.select_pick2 - rhs.select_pick2,
101            dispatch_pick2: self.dispatch_pick2 - rhs.dispatch_pick2,
102            llc_migrations: self.llc_migrations - rhs.llc_migrations,
103            node_migrations: self.node_migrations - rhs.node_migrations,
104            wake_prev: self.wake_prev - rhs.wake_prev,
105            wake_llc: self.wake_llc - rhs.wake_llc,
106            wake_mig: self.wake_mig - rhs.wake_mig,
107        }
108    }
109}
110pub fn server_data() -> StatsServerData<(), Metrics> {
111    let open: Box<dyn StatsOpener<(), Metrics>> = Box::new(move |(req_ch, res_ch)| {
112        req_ch.send(())?;
113        let mut prev = res_ch.recv()?;
114
115        let read: Box<dyn StatsReader<(), Metrics>> = Box::new(move |_args, (req_ch, res_ch)| {
116            req_ch.send(())?;
117            let cur = res_ch.recv()?;
118            let delta = cur.delta(&prev);
119            prev = cur;
120            delta.to_json()
121        });
122
123        Ok(read)
124    });
125
126    StatsServerData::new()
127        .add_meta(Metrics::meta())
128        .add_ops("top", StatsOps { open, close: None })
129}
130
131pub fn monitor(intv: Duration, shutdown: Arc<AtomicBool>) -> Result<()> {
132    scx_utils::monitor_stats::<Metrics>(
133        &[],
134        intv,
135        || shutdown.load(Ordering::Relaxed),
136        |metrics| metrics.format(&mut std::io::stdout()),
137    )
138}