1use std::sync::atomic::{AtomicBool, Ordering};
13use std::time::Duration;
14
15use anyhow::Result;
16
17use crate::procdb::ProcessDb;
18use crate::scheduler::{PandemoniumStats, Scheduler};
19use crate::tuning::{
20 self, detect_regime, scaled_regime_knobs, MwuController, MwuSignals, Regime, HIST_BUCKETS,
21};
22
23const SLEEP_BUCKETS: usize = 4;
28
29pub fn monitor_loop(
35 sched: &mut Scheduler,
36 shutdown: &'static AtomicBool,
37 verbose: bool,
38 nr_cpus: u64,
39) -> Result<bool> {
40 let mut prev = PandemoniumStats::default();
41 let mut prev_hist = [[0u64; HIST_BUCKETS]; 3];
42 let mut prev_sleep = [0u64; SLEEP_BUCKETS];
43 let mut regime = Regime::Mixed;
44 let mut mwu = MwuController::new(scaled_regime_knobs(regime, nr_cpus));
45 let mut pending_regime = regime;
46 let mut regime_hold: u32 = 0;
47 let mut light_ticks: u64 = 0;
48 let mut mixed_ticks: u64 = 0;
49 let mut heavy_ticks: u64 = 0;
50 let mut stability_score: u32 = 0;
51 let mut tick_counter: u64 = 0;
52
53 let mut procdb = match ProcessDb::new() {
54 Ok(db) => Some(db),
55 Err(e) => {
56 log_warn!("PROCDB INIT FAILED: {}", e);
57 None
58 }
59 };
60
61 sched.write_tuning_knobs(&scaled_regime_knobs(regime, nr_cpus))?;
63
64 while !shutdown.load(Ordering::Relaxed) && !sched.exited() {
65 std::thread::sleep(Duration::from_secs(1));
66
67 let stats = sched.read_stats();
68
69 let delta_d = stats.nr_dispatches.wrapping_sub(prev.nr_dispatches);
71 let delta_idle = stats.nr_idle_hits.wrapping_sub(prev.nr_idle_hits);
72 let delta_shared = stats.nr_shared.wrapping_sub(prev.nr_shared);
73 let delta_preempt = stats.nr_preempt.wrapping_sub(prev.nr_preempt);
74 let delta_keep = stats.nr_keep_running.wrapping_sub(prev.nr_keep_running);
75 let delta_wake_sum = stats.wake_lat_sum.wrapping_sub(prev.wake_lat_sum);
76 let delta_wake_samples = stats.wake_lat_samples.wrapping_sub(prev.wake_lat_samples);
77 let delta_hard = stats.nr_hard_kicks.wrapping_sub(prev.nr_hard_kicks);
78 let delta_soft = stats.nr_soft_kicks.wrapping_sub(prev.nr_soft_kicks);
79 let delta_enq_wake = stats.nr_enq_wakeup.wrapping_sub(prev.nr_enq_wakeup);
80 let delta_enq_requeue = stats.nr_enq_requeue.wrapping_sub(prev.nr_enq_requeue);
81 let delta_rescue = stats
82 .nr_overflow_rescue
83 .wrapping_sub(prev.nr_overflow_rescue);
84 let wake_avg_us = if delta_wake_samples > 0 {
85 delta_wake_sum / delta_wake_samples / 1000
86 } else {
87 0
88 };
89
90 let d_idle_sum = stats.wake_lat_idle_sum.wrapping_sub(prev.wake_lat_idle_sum);
92 let d_idle_cnt = stats.wake_lat_idle_cnt.wrapping_sub(prev.wake_lat_idle_cnt);
93 let d_kick_sum = stats.wake_lat_kick_sum.wrapping_sub(prev.wake_lat_kick_sum);
94 let d_kick_cnt = stats.wake_lat_kick_cnt.wrapping_sub(prev.wake_lat_kick_cnt);
95 let lat_idle_us = if d_idle_cnt > 0 {
96 d_idle_sum / d_idle_cnt / 1000
97 } else {
98 0
99 };
100 let lat_kick_us = if d_kick_cnt > 0 {
101 d_kick_sum / d_kick_cnt / 1000
102 } else {
103 0
104 };
105 let delta_reenq = stats.nr_reenqueue.wrapping_sub(prev.nr_reenqueue);
106
107 let dl2_hb = stats.nr_l2_hit_batch.wrapping_sub(prev.nr_l2_hit_batch);
109 let dl2_mb = stats.nr_l2_miss_batch.wrapping_sub(prev.nr_l2_miss_batch);
110 let dl2_hi = stats
111 .nr_l2_hit_interactive
112 .wrapping_sub(prev.nr_l2_hit_interactive);
113 let dl2_mi = stats
114 .nr_l2_miss_interactive
115 .wrapping_sub(prev.nr_l2_miss_interactive);
116 let dl2_hl = stats
117 .nr_l2_hit_lat_crit
118 .wrapping_sub(prev.nr_l2_hit_lat_crit);
119 let dl2_ml = stats
120 .nr_l2_miss_lat_crit
121 .wrapping_sub(prev.nr_l2_miss_lat_crit);
122 let l2_pct_b = if dl2_hb + dl2_mb > 0 {
123 dl2_hb * 100 / (dl2_hb + dl2_mb)
124 } else {
125 0
126 };
127 let l2_pct_i = if dl2_hi + dl2_mi > 0 {
128 dl2_hi * 100 / (dl2_hi + dl2_mi)
129 } else {
130 0
131 };
132 let l2_pct_l = if dl2_hl + dl2_ml > 0 {
133 dl2_hl * 100 / (dl2_hl + dl2_ml)
134 } else {
135 0
136 };
137
138 let idle_pct = if delta_d > 0 {
139 delta_idle * 100 / delta_d
140 } else {
141 0
142 };
143
144 let cur_hist = sched.read_wake_lat_hist();
146 let mut delta_hist = [[0u64; HIST_BUCKETS]; 3];
147 for tier in 0..3 {
148 for b in 0..HIST_BUCKETS {
149 delta_hist[tier][b] = cur_hist[tier][b].wrapping_sub(prev_hist[tier][b]);
150 }
151 }
152
153 let tp99_b_ns = tuning::compute_p99_from_histogram(&delta_hist[0]);
155 let tp99_i_ns = tuning::compute_p99_from_histogram(&delta_hist[1]);
156 let tp99_l_ns = tuning::compute_p99_from_histogram(&delta_hist[2]);
157
158 let mut agg = [0u64; HIST_BUCKETS];
160 for t in 0..3 {
161 for b in 0..HIST_BUCKETS {
162 agg[b] += delta_hist[t][b];
163 }
164 }
165 let p99_ns = tuning::compute_p99_from_histogram(&agg);
166
167 let cur_sleep = sched.read_sleep_hist();
169 let mut delta_sleep = [0u64; SLEEP_BUCKETS];
170 for i in 0..SLEEP_BUCKETS {
171 delta_sleep[i] = cur_sleep[i].wrapping_sub(prev_sleep[i]);
172 }
173 let sleep_total: u64 = delta_sleep.iter().sum();
174 let io_pct = if sleep_total > 0 {
175 (delta_sleep[0] + delta_sleep[1]) * 100 / sleep_total
176 } else {
177 0
178 };
179
180 let detected = detect_regime(regime, idle_pct);
182
183 let mut regime_changed_this_tick = false;
184 if detected != regime {
185 if detected == pending_regime {
186 regime_hold += 1;
187 } else {
188 pending_regime = detected;
189 regime_hold = 1;
190 }
191 if regime_hold >= 2 {
192 regime = detected;
193 let rk = scaled_regime_knobs(regime, nr_cpus);
194 sched.write_tuning_knobs(&rk)?;
195 regime_changed_this_tick = true;
196 mwu.set_baseline(rk);
197 mwu.reset();
198 }
199 } else {
200 pending_regime = regime;
201 regime_hold = 0;
202 }
203
204 if !regime_changed_this_tick {
207 let signals = MwuSignals {
208 p99_ns,
209 interactive_p99_ns: tp99_i_ns,
210 io_pct,
211 rescue_count: delta_rescue,
212 wakeup_rate: delta_enq_wake / nr_cpus.max(1),
213 };
214 let knobs = mwu.update(&signals, regime.p99_ceiling(), nr_cpus);
215 sched.write_tuning_knobs(&knobs)?;
216 }
217
218 let tighten_delta = if mwu.had_losses() { 1u64 } else { 0u64 };
220 stability_score = tuning::compute_stability_score(
221 stability_score,
222 regime_changed_this_tick,
223 tighten_delta,
224 p99_ns,
225 regime.p99_ceiling(),
226 );
227
228 let (db_total, db_confident) = if let Some(ref mut db) = procdb {
230 db.ingest();
231 db.flush_predictions();
232 db.tick();
233 db.summary()
234 } else {
235 (0, 0)
236 };
237
238 let p99_us = p99_ns / 1000;
239 let tp99_b = tp99_b_ns / 1000;
240 let tp99_i = tp99_i_ns / 1000;
241 let tp99_l = tp99_l_ns / 1000;
242 let knobs = sched.read_tuning_knobs();
243
244 let sojourn_ms = stats.batch_sojourn_ns / 1_000_000;
245 let sojourn_thresh_ms = knobs.sojourn_thresh_ns / 1_000_000;
246 let delta_burst = stats.burst_mode_active.wrapping_sub(prev.burst_mode_active);
247 let burst_label = if delta_burst > 0 { " BURST" } else { "" };
248 let longrun_label = if stats.longrun_mode_active > 0 {
249 " LONGRUN"
250 } else {
251 ""
252 };
253
254 if verbose && tuning::should_print_telemetry(tick_counter, stability_score) {
255 println!(
256 "d/s: {:<8} idle: {}% shared: {:<6} preempt: {:<4} keep: {:<4} kick: H={:<4} S={:<4} enq: W={:<4} R={:<4} wake: {}us p99: {}us [B:{} I:{} L:{}] lat_idle: {}us lat_kick: {}us procdb: {}/{} sleep: io={}% slice: {}us batch: {}us reenq: {} sjrn: {}ms/{}ms rescue: {} l2: B={}% I={}% L={}% [{}{}{}]",
257 delta_d, idle_pct, delta_shared, delta_preempt, delta_keep,
258 delta_hard, delta_soft, delta_enq_wake, delta_enq_requeue,
259 wake_avg_us, p99_us, tp99_b, tp99_i, tp99_l,
260 lat_idle_us, lat_kick_us,
261 db_total, db_confident,
262 io_pct, knobs.slice_ns / 1000, knobs.batch_slice_ns / 1000,
263 delta_reenq, sojourn_ms, sojourn_thresh_ms,
264 delta_rescue,
265 l2_pct_b, l2_pct_i, l2_pct_l, regime.label(), burst_label, longrun_label,
266 );
267 }
268
269 sched.log.snapshot(
270 delta_d,
271 delta_idle,
272 delta_shared,
273 delta_preempt,
274 delta_keep,
275 wake_avg_us,
276 delta_hard,
277 delta_soft,
278 lat_idle_us,
279 lat_kick_us,
280 );
281
282 match regime {
283 Regime::Light => light_ticks += 1,
284 Regime::Mixed => mixed_ticks += 1,
285 Regime::Heavy => heavy_ticks += 1,
286 }
287
288 tick_counter += 1;
289 prev_hist = cur_hist;
290 prev_sleep = cur_sleep;
291 prev = stats;
292 }
293
294 if let Some(ref db) = procdb {
296 let path = ProcessDb::default_path();
297 match db.save(&path) {
298 Ok(()) => {
299 let (total, confident) = db.summary();
300 log_info!(
301 "PROCDB: SAVED {}/{} PROFILES TO {}",
302 confident,
303 total,
304 path.display()
305 );
306 }
307 Err(e) => log_warn!("PROCDB SAVE FAILED: {}", e),
308 }
309 }
310
311 let final_knobs = sched.read_tuning_knobs();
313 let final_stats = sched.read_stats();
314 let l2_total_b = final_stats.nr_l2_hit_batch + final_stats.nr_l2_miss_batch;
315 let l2_total_i = final_stats.nr_l2_hit_interactive + final_stats.nr_l2_miss_interactive;
316 let l2_total_l = final_stats.nr_l2_hit_lat_crit + final_stats.nr_l2_miss_lat_crit;
317 let l2_cum_b = if l2_total_b > 0 {
318 final_stats.nr_l2_hit_batch * 100 / l2_total_b
319 } else {
320 0
321 };
322 let l2_cum_i = if l2_total_i > 0 {
323 final_stats.nr_l2_hit_interactive * 100 / l2_total_i
324 } else {
325 0
326 };
327 let l2_cum_l = if l2_total_l > 0 {
328 final_stats.nr_l2_hit_lat_crit * 100 / l2_total_l
329 } else {
330 0
331 };
332 println!(
333 "[KNOBS] regime={} slice_ns={} batch_ns={} preempt_ns={} demotion_ns={} lag={} mwu={:.3} ticks=L:{}/M:{}/H:{} l2_hit=B:{}%/I:{}%/L:{}%",
334 regime.label(), final_knobs.slice_ns, final_knobs.batch_slice_ns,
335 final_knobs.preempt_thresh_ns, final_knobs.cpu_bound_thresh_ns,
336 final_knobs.lag_scale, mwu.scale(),
337 light_ticks, mixed_ticks, heavy_ticks,
338 l2_cum_b, l2_cum_i, l2_cum_l,
339 );
340
341 let should_restart = sched.read_exit_info();
343 Ok(should_restart)
344}