scx_rusty/
stats.rs

1use std::collections::BTreeMap;
2use std::io::Write;
3use std::sync::Arc;
4use std::sync::atomic::AtomicBool;
5use std::sync::atomic::Ordering;
6use std::time::Duration;
7use std::time::UNIX_EPOCH;
8
9use anyhow::Result;
10use chrono::DateTime;
11use chrono::Local;
12use scx_stats::prelude::*;
13use scx_stats_derive::Stats;
14use scx_stats_derive::stat_doc;
15use scx_utils::Cpumask;
16use scx_utils::normalize_load_metric;
17use serde::Deserialize;
18use serde::Serialize;
19
20use crate::StatsCtx;
21
22fn signed(x: f64) -> String {
23    if x >= 0.0f64 {
24        format!("{:+7.2}", x)
25    } else {
26        format!("{:7.2}", x)
27    }
28}
29
30#[stat_doc]
31#[derive(Clone, Debug, Default, Serialize, Deserialize, Stats)]
32#[stat(_om_prefix = "d_", _om_label = "domain")]
33pub struct DomainStats {
34    #[stat(desc = "sum of weight * duty_cycle for all tasks")]
35    pub load: f64,
36    #[stat(desc = "load imbalance from average")]
37    pub imbal: f64,
38    #[stat(desc = "load migrated for load balancing")]
39    pub delta: f64,
40}
41
42impl DomainStats {
43    pub fn new(load: f64, imbal: f64, delta: f64) -> Self {
44        Self {
45            load: normalize_load_metric(load),
46            imbal: normalize_load_metric(imbal),
47            delta: normalize_load_metric(delta),
48        }
49    }
50
51    pub fn format<W: Write>(&self, w: &mut W, id: usize) -> Result<()> {
52        writeln!(
53            w,
54            "   DOM[{:02}] load={:6.2} imbal={} delta={}",
55            id,
56            self.load,
57            signed(self.imbal),
58            signed(self.delta)
59        )?;
60        Ok(())
61    }
62}
63
64#[stat_doc]
65#[derive(Clone, Debug, Default, Serialize, Deserialize, Stats)]
66#[stat(_om_prefix = "n_", _om_label = "node")]
67pub struct NodeStats {
68    #[stat(desc = "sum of weight * duty_cycle for all tasks")]
69    pub load: f64,
70    #[stat(desc = "load imbalance from average")]
71    pub imbal: f64,
72    #[stat(desc = "load migrated for load balancing")]
73    pub delta: f64,
74    #[stat(desc = "per-domain statistics")]
75    pub doms: BTreeMap<usize, DomainStats>,
76}
77
78impl NodeStats {
79    pub fn new(load: f64, imbal: f64, delta: f64, doms: BTreeMap<usize, DomainStats>) -> Self {
80        Self {
81            load: normalize_load_metric(load),
82            imbal: normalize_load_metric(imbal),
83            delta: normalize_load_metric(delta),
84            doms,
85        }
86    }
87
88    pub fn format<W: Write>(&self, w: &mut W, id: usize) -> Result<()> {
89        writeln!(
90            w,
91            "  NODE[{:02}] load={:6.2} imbal={} delta={}",
92            id,
93            self.load,
94            signed(self.imbal),
95            signed(self.delta)
96        )?;
97        Ok(())
98    }
99}
100
101#[stat_doc]
102#[derive(Clone, Debug, Default, Serialize, Deserialize, Stats)]
103#[stat(top)]
104pub struct ClusterStats {
105    #[stat(desc = "timestamp")]
106    pub at_us: u64,
107    #[stat(desc = "timestamp of the last load balancing")]
108    pub lb_at_us: u64,
109    #[stat(desc = "# sched events duringg the period")]
110    pub total: u64,
111    #[stat(desc = "scheduling slice in usecs")]
112    pub slice_us: u64,
113
114    #[stat(desc = "CPU busy % (100% means all CPU)")]
115    pub cpu_busy: f64,
116    #[stat(desc = "sum of weight * duty_cycle for all tasks")]
117    pub load: f64,
118    #[stat(desc = "# of migrations from load balancing")]
119    pub nr_migrations: u64,
120
121    #[stat(desc = "# of BPF task get errors")]
122    pub task_get_err: u64,
123    #[stat(desc = "time spent running scheduler userspace")]
124    pub time_used: f64,
125
126    #[stat(desc = "% WAKE_SYNC directly dispatched to idle previous CPU")]
127    pub sync_prev_idle: f64,
128    #[stat(desc = "% WAKE_SYNC directly dispatched to waker CPU")]
129    pub wake_sync: f64,
130    #[stat(desc = "% directly dispatched to idle previous CPU")]
131    pub prev_idle: f64,
132    #[stat(desc = "% directly dispatched to idle previous CPU in a different domain")]
133    pub greedy_idle: f64,
134    #[stat(desc = "% directly dispatched to CPU due to restricted to one CPU")]
135    pub pinned: f64,
136    #[stat(desc = "% directly dispatched to CPU (--kthreads-local or local domain)")]
137    pub direct: f64,
138    #[stat(desc = "% directly dispatched to CPU (foreign domain, local node)")]
139    pub greedy: f64,
140    #[stat(desc = "% directly dispatched to CPU (foreign node)")]
141    pub greedy_far: f64,
142    #[stat(desc = "% scheduled from local domain")]
143    pub dsq_dispatch: f64,
144    #[stat(desc = "% scheduled from foreign domain")]
145    pub greedy_local: f64,
146    #[stat(desc = "% scheduled from foreign node")]
147    pub greedy_xnuma: f64,
148    #[stat(desc = "% foreign domain CPU kicked on enqueue")]
149    pub kick_greedy: f64,
150    #[stat(desc = "% repatriated to local domain on enqueue")]
151    pub repatriate: f64,
152    #[stat(desc = "% accumulated vtime budget clamped")]
153    pub dl_clamp: f64,
154    #[stat(desc = "% accumulated vtime budget used as-is")]
155    pub dl_preset: f64,
156
157    #[stat(_om_skip)]
158    pub direct_greedy_cpus: Vec<u64>,
159    #[stat(_om_skip)]
160    pub kick_greedy_cpus: Vec<u64>,
161
162    #[stat(desc = "per-node statistics")]
163    pub nodes: BTreeMap<usize, NodeStats>,
164}
165
166impl ClusterStats {
167    pub fn format<W: Write>(&self, w: &mut W) -> Result<()> {
168        writeln!(
169            w,
170            "cpu={:7.2} load={:8.2} mig={} task_err={} time_used={:4.1}ms",
171            self.cpu_busy,
172            self.load,
173            self.nr_migrations,
174            self.task_get_err,
175            self.time_used * 1000.0,
176        )?;
177        writeln!(
178            w,
179            "tot={:7} sync_prev_idle={:5.2} wsync={:5.2}",
180            self.total, self.sync_prev_idle, self.wake_sync,
181        )?;
182        writeln!(
183            w,
184            "prev_idle={:5.2} greedy_idle={:5.2} pin={:5.2}",
185            self.prev_idle, self.greedy_idle, self.pinned
186        )?;
187
188        writeln!(
189            w,
190            "dir={:5.2} dir_greedy={:5.2} dir_greedy_far={:5.2}",
191            self.direct, self.greedy, self.greedy_far,
192        )?;
193
194        writeln!(
195            w,
196            "dsq={:5.2} greedy_local={:5.2} greedy_xnuma={:5.2}",
197            self.dsq_dispatch, self.greedy_local, self.greedy_xnuma,
198        )?;
199
200        writeln!(
201            w,
202            "kick_greedy={:5.2} rep={:5.2}",
203            self.kick_greedy, self.repatriate
204        )?;
205        writeln!(
206            w,
207            "dl_clamp={:5.2} dl_preset={:5.2}",
208            self.dl_clamp, self.dl_preset,
209        )?;
210
211        writeln!(w, "slice={}us", self.slice_us)?;
212        writeln!(
213            w,
214            "direct_greedy_cpus={:x}",
215            Cpumask::from_vec(self.direct_greedy_cpus.clone())
216        )?;
217        writeln!(
218            w,
219            "  kick_greedy_cpus={:x}",
220            Cpumask::from_vec(self.kick_greedy_cpus.clone())
221        )?;
222
223        for (nid, node) in self.nodes.iter() {
224            node.format(w, *nid)?;
225            for (did, dom) in node.doms.iter() {
226                dom.format(w, *did)?;
227            }
228        }
229
230        Ok(())
231    }
232}
233
234pub fn server_data() -> StatsServerData<StatsCtx, (StatsCtx, ClusterStats)> {
235    let open: Box<dyn StatsOpener<StatsCtx, (StatsCtx, ClusterStats)>> =
236        Box::new(move |(req_ch, res_ch)| {
237            // Send one bogus request on open to establish prev_sc.
238            let mut prev_sc = StatsCtx::blank();
239            req_ch.send(prev_sc.clone())?;
240            let (cur_sc, _) = res_ch.recv()?;
241            prev_sc = cur_sc;
242
243            let read: Box<dyn StatsReader<StatsCtx, (StatsCtx, ClusterStats)>> =
244                Box::new(move |_args, (req_ch, res_ch)| {
245                    req_ch.send(prev_sc.clone())?;
246                    let (cur_sc, cluster_stats) = res_ch.recv()?;
247                    prev_sc = cur_sc;
248                    cluster_stats.to_json()
249                });
250            Ok(read)
251        });
252
253    StatsServerData::new()
254        .add_meta(DomainStats::meta())
255        .add_meta(NodeStats::meta())
256        .add_meta(ClusterStats::meta())
257        .add_ops("top", StatsOps { open, close: None })
258}
259
260pub fn monitor(intv: Duration, shutdown: Arc<AtomicBool>) -> Result<()> {
261    scx_utils::monitor_stats::<ClusterStats>(
262        &vec![],
263        intv,
264        || shutdown.load(Ordering::Relaxed),
265        |cst| {
266            let dt = DateTime::<Local>::from(UNIX_EPOCH + Duration::from_micros(cst.at_us));
267            println!(
268                "###### {}, load balance @ {:7.1}ms ######",
269                dt.to_rfc2822(),
270                (cst.lb_at_us as i64 - cst.at_us as i64) as f64 / 1000.0
271            );
272            cst.format(&mut std::io::stdout())
273        },
274    )
275}