1use std::collections::BTreeMap;
2use std::io::Write;
3use std::sync::Arc;
4use std::sync::atomic::AtomicBool;
5use std::sync::atomic::Ordering;
6use std::time::Duration;
7use std::time::UNIX_EPOCH;
8
9use anyhow::Result;
10use chrono::DateTime;
11use chrono::Local;
12use scx_stats::prelude::*;
13use scx_stats_derive::Stats;
14use scx_stats_derive::stat_doc;
15use scx_utils::Cpumask;
16use scx_utils::normalize_load_metric;
17use serde::Deserialize;
18use serde::Serialize;
19
20use crate::StatsCtx;
21
22fn signed(x: f64) -> String {
23 if x >= 0.0f64 {
24 format!("{:+7.2}", x)
25 } else {
26 format!("{:7.2}", x)
27 }
28}
29
30#[stat_doc]
31#[derive(Clone, Debug, Default, Serialize, Deserialize, Stats)]
32#[stat(_om_prefix = "d_", _om_label = "domain")]
33pub struct DomainStats {
34 #[stat(desc = "sum of weight * duty_cycle for all tasks")]
35 pub load: f64,
36 #[stat(desc = "load imbalance from average")]
37 pub imbal: f64,
38 #[stat(desc = "load migrated for load balancing")]
39 pub delta: f64,
40}
41
42impl DomainStats {
43 pub fn new(load: f64, imbal: f64, delta: f64) -> Self {
44 Self {
45 load: normalize_load_metric(load),
46 imbal: normalize_load_metric(imbal),
47 delta: normalize_load_metric(delta),
48 }
49 }
50
51 pub fn format<W: Write>(&self, w: &mut W, id: usize) -> Result<()> {
52 writeln!(
53 w,
54 " DOM[{:02}] load={:6.2} imbal={} delta={}",
55 id,
56 self.load,
57 signed(self.imbal),
58 signed(self.delta)
59 )?;
60 Ok(())
61 }
62}
63
64#[stat_doc]
65#[derive(Clone, Debug, Default, Serialize, Deserialize, Stats)]
66#[stat(_om_prefix = "n_", _om_label = "node")]
67pub struct NodeStats {
68 #[stat(desc = "sum of weight * duty_cycle for all tasks")]
69 pub load: f64,
70 #[stat(desc = "load imbalance from average")]
71 pub imbal: f64,
72 #[stat(desc = "load migrated for load balancing")]
73 pub delta: f64,
74 #[stat(desc = "per-domain statistics")]
75 pub doms: BTreeMap<usize, DomainStats>,
76}
77
78impl NodeStats {
79 pub fn new(load: f64, imbal: f64, delta: f64, doms: BTreeMap<usize, DomainStats>) -> Self {
80 Self {
81 load: normalize_load_metric(load),
82 imbal: normalize_load_metric(imbal),
83 delta: normalize_load_metric(delta),
84 doms,
85 }
86 }
87
88 pub fn format<W: Write>(&self, w: &mut W, id: usize) -> Result<()> {
89 writeln!(
90 w,
91 " NODE[{:02}] load={:6.2} imbal={} delta={}",
92 id,
93 self.load,
94 signed(self.imbal),
95 signed(self.delta)
96 )?;
97 Ok(())
98 }
99}
100
101#[stat_doc]
102#[derive(Clone, Debug, Default, Serialize, Deserialize, Stats)]
103#[stat(top)]
104pub struct ClusterStats {
105 #[stat(desc = "timestamp")]
106 pub at_us: u64,
107 #[stat(desc = "timestamp of the last load balancing")]
108 pub lb_at_us: u64,
109 #[stat(desc = "# sched events duringg the period")]
110 pub total: u64,
111 #[stat(desc = "scheduling slice in usecs")]
112 pub slice_us: u64,
113
114 #[stat(desc = "CPU busy % (100% means all CPU)")]
115 pub cpu_busy: f64,
116 #[stat(desc = "sum of weight * duty_cycle for all tasks")]
117 pub load: f64,
118 #[stat(desc = "# of migrations from load balancing")]
119 pub nr_migrations: u64,
120
121 #[stat(desc = "# of BPF task get errors")]
122 pub task_get_err: u64,
123 #[stat(desc = "time spent running scheduler userspace")]
124 pub time_used: f64,
125
126 #[stat(desc = "% WAKE_SYNC directly dispatched to idle previous CPU")]
127 pub sync_prev_idle: f64,
128 #[stat(desc = "% WAKE_SYNC directly dispatched to waker CPU")]
129 pub wake_sync: f64,
130 #[stat(desc = "% directly dispatched to idle previous CPU")]
131 pub prev_idle: f64,
132 #[stat(desc = "% directly dispatched to idle previous CPU in a different domain")]
133 pub greedy_idle: f64,
134 #[stat(desc = "% directly dispatched to CPU due to restricted to one CPU")]
135 pub pinned: f64,
136 #[stat(desc = "% directly dispatched to CPU (--kthreads-local or local domain)")]
137 pub direct: f64,
138 #[stat(desc = "% directly dispatched to CPU (foreign domain, local node)")]
139 pub greedy: f64,
140 #[stat(desc = "% directly dispatched to CPU (foreign node)")]
141 pub greedy_far: f64,
142 #[stat(desc = "% scheduled from local domain")]
143 pub dsq_dispatch: f64,
144 #[stat(desc = "% scheduled from foreign domain")]
145 pub greedy_local: f64,
146 #[stat(desc = "% scheduled from foreign node")]
147 pub greedy_xnuma: f64,
148 #[stat(desc = "% foreign domain CPU kicked on enqueue")]
149 pub kick_greedy: f64,
150 #[stat(desc = "% repatriated to local domain on enqueue")]
151 pub repatriate: f64,
152 #[stat(desc = "% accumulated vtime budget clamped")]
153 pub dl_clamp: f64,
154 #[stat(desc = "% accumulated vtime budget used as-is")]
155 pub dl_preset: f64,
156
157 #[stat(_om_skip)]
158 pub direct_greedy_cpus: Vec<u64>,
159 #[stat(_om_skip)]
160 pub kick_greedy_cpus: Vec<u64>,
161
162 #[stat(desc = "per-node statistics")]
163 pub nodes: BTreeMap<usize, NodeStats>,
164}
165
166impl ClusterStats {
167 pub fn format<W: Write>(&self, w: &mut W) -> Result<()> {
168 writeln!(
169 w,
170 "cpu={:7.2} load={:8.2} mig={} task_err={} time_used={:4.1}ms",
171 self.cpu_busy,
172 self.load,
173 self.nr_migrations,
174 self.task_get_err,
175 self.time_used * 1000.0,
176 )?;
177 writeln!(
178 w,
179 "tot={:7} sync_prev_idle={:5.2} wsync={:5.2}",
180 self.total, self.sync_prev_idle, self.wake_sync,
181 )?;
182 writeln!(
183 w,
184 "prev_idle={:5.2} greedy_idle={:5.2} pin={:5.2}",
185 self.prev_idle, self.greedy_idle, self.pinned
186 )?;
187
188 writeln!(
189 w,
190 "dir={:5.2} dir_greedy={:5.2} dir_greedy_far={:5.2}",
191 self.direct, self.greedy, self.greedy_far,
192 )?;
193
194 writeln!(
195 w,
196 "dsq={:5.2} greedy_local={:5.2} greedy_xnuma={:5.2}",
197 self.dsq_dispatch, self.greedy_local, self.greedy_xnuma,
198 )?;
199
200 writeln!(
201 w,
202 "kick_greedy={:5.2} rep={:5.2}",
203 self.kick_greedy, self.repatriate
204 )?;
205 writeln!(
206 w,
207 "dl_clamp={:5.2} dl_preset={:5.2}",
208 self.dl_clamp, self.dl_preset,
209 )?;
210
211 writeln!(w, "slice={}us", self.slice_us)?;
212 writeln!(
213 w,
214 "direct_greedy_cpus={:x}",
215 Cpumask::from_vec(self.direct_greedy_cpus.clone())
216 )?;
217 writeln!(
218 w,
219 " kick_greedy_cpus={:x}",
220 Cpumask::from_vec(self.kick_greedy_cpus.clone())
221 )?;
222
223 for (nid, node) in self.nodes.iter() {
224 node.format(w, *nid)?;
225 for (did, dom) in node.doms.iter() {
226 dom.format(w, *did)?;
227 }
228 }
229
230 Ok(())
231 }
232}
233
234pub fn server_data() -> StatsServerData<StatsCtx, (StatsCtx, ClusterStats)> {
235 let open: Box<dyn StatsOpener<StatsCtx, (StatsCtx, ClusterStats)>> =
236 Box::new(move |(req_ch, res_ch)| {
237 let mut prev_sc = StatsCtx::blank();
239 req_ch.send(prev_sc.clone())?;
240 let (cur_sc, _) = res_ch.recv()?;
241 prev_sc = cur_sc;
242
243 let read: Box<dyn StatsReader<StatsCtx, (StatsCtx, ClusterStats)>> =
244 Box::new(move |_args, (req_ch, res_ch)| {
245 req_ch.send(prev_sc.clone())?;
246 let (cur_sc, cluster_stats) = res_ch.recv()?;
247 prev_sc = cur_sc;
248 cluster_stats.to_json()
249 });
250 Ok(read)
251 });
252
253 StatsServerData::new()
254 .add_meta(DomainStats::meta())
255 .add_meta(NodeStats::meta())
256 .add_meta(ClusterStats::meta())
257 .add_ops("top", StatsOps { open, close: None })
258}
259
260pub fn monitor(intv: Duration, shutdown: Arc<AtomicBool>) -> Result<()> {
261 scx_utils::monitor_stats::<ClusterStats>(
262 &vec![],
263 intv,
264 || shutdown.load(Ordering::Relaxed),
265 |cst| {
266 let dt = DateTime::<Local>::from(UNIX_EPOCH + Duration::from_micros(cst.at_us));
267 println!(
268 "###### {}, load balance @ {:7.1}ms ######",
269 dt.to_rfc2822(),
270 (cst.lb_at_us as i64 - cst.at_us as i64) as f64 / 1000.0
271 );
272 cst.format(&mut std::io::stdout())
273 },
274 )
275}