1use std::io::Write;
2use std::sync::atomic::AtomicBool;
3use std::sync::atomic::Ordering;
4use std::sync::Arc;
5use std::time::Duration;
6
7use anyhow::Result;
8use scx_stats::prelude::*;
9use scx_stats_derive::stat_doc;
10use scx_stats_derive::Stats;
11use serde::Deserialize;
12use serde::Serialize;
13
14#[stat_doc]
15#[derive(Clone, Debug, Default, Serialize, Deserialize, Stats)]
16#[stat(top)]
17pub struct Metrics {
18 #[stat(desc = "local dispatches")]
19 pub nr_local_dispatch: u64,
20 #[stat(desc = "remote dispatches")]
21 pub nr_remote_dispatch: u64,
22 #[stat(desc = "keep running events")]
23 pub nr_keep_running: u64,
24}
25
26impl Metrics {
27 fn format<W: Write>(&self, w: &mut W) -> Result<()> {
28 writeln!(
29 w,
30 "[{}] dispatch: local={} remote={} running={}",
31 crate::SCHEDULER_NAME,
32 self.nr_local_dispatch,
33 self.nr_remote_dispatch,
34 self.nr_keep_running,
35 )?;
36 Ok(())
37 }
38
39 fn delta(&self, rhs: &Self) -> Self {
40 Self {
41 nr_local_dispatch: self.nr_local_dispatch - rhs.nr_local_dispatch,
42 nr_remote_dispatch: self.nr_remote_dispatch - rhs.nr_remote_dispatch,
43 nr_keep_running: self.nr_keep_running - rhs.nr_keep_running,
44 ..self.clone()
45 }
46 }
47}
48
49pub fn server_data() -> StatsServerData<(), Metrics> {
50 let open: Box<dyn StatsOpener<(), Metrics>> = Box::new(move |(req_ch, res_ch)| {
51 req_ch.send(())?;
52 let mut prev = res_ch.recv()?;
53
54 let read: Box<dyn StatsReader<(), Metrics>> = Box::new(move |_args, (req_ch, res_ch)| {
55 req_ch.send(())?;
56 let cur = res_ch.recv()?;
57 let delta = cur.delta(&prev);
58 prev = cur;
59 delta.to_json()
60 });
61
62 Ok(read)
63 });
64
65 StatsServerData::new()
66 .add_meta(Metrics::meta())
67 .add_ops("top", StatsOps { open, close: None })
68}
69
70pub fn monitor(intv: Duration, shutdown: Arc<AtomicBool>) -> Result<()> {
71 scx_utils::monitor_stats::<Metrics>(
72 &[],
73 intv,
74 || shutdown.load(Ordering::Relaxed),
75 |metrics| metrics.format(&mut std::io::stdout()),
76 )
77}