1use std::sync::atomic::{AtomicBool, Ordering};
13use std::time::Duration;
14
15use anyhow::Result;
16
17use crate::procdb::ProcessDb;
18use crate::scheduler::{PandemoniumStats, Scheduler};
19use crate::tuning::{
20 self, detect_regime, scaled_regime_knobs, MwuController, MwuSignals, Regime, HIST_BUCKETS,
21};
22
23const SLEEP_BUCKETS: usize = 4;
28
29pub fn monitor_loop(
35 sched: &mut Scheduler,
36 shutdown: &'static AtomicBool,
37 verbose: bool,
38 nr_cpus: u64,
39) -> Result<bool> {
40 let mut prev = PandemoniumStats::default();
41 let mut prev_hist = [[0u64; HIST_BUCKETS]; 3];
42 let mut prev_sleep = [0u64; SLEEP_BUCKETS];
43 let mut regime = Regime::Mixed;
44 let mut tau_ns = sched.read_tuning_knobs().topology_tau_ns;
48 let mut mwu = MwuController::new(scaled_regime_knobs(regime, nr_cpus, tau_ns));
49 let mut pending_regime = regime;
50 let mut regime_hold: u32 = 0;
51 let mut light_ticks: u64 = 0;
52 let mut mixed_ticks: u64 = 0;
53 let mut heavy_ticks: u64 = 0;
54 let mut stability_score: u32 = 0;
55 let mut tick_counter: u64 = 0;
56
57 let mut procdb = match ProcessDb::new() {
58 Ok(db) => Some(db),
59 Err(e) => {
60 log_warn!("PROCDB INIT FAILED: {}", e);
61 None
62 }
63 };
64
65 let live = sched.read_tuning_knobs();
69 let mut rk = scaled_regime_knobs(regime, nr_cpus, tau_ns);
70 rk.topology_tau_ns = tau_ns;
71 rk.codel_eq_ns = live.codel_eq_ns;
72 sched.write_tuning_knobs(&rk)?;
73
74 while !shutdown.load(Ordering::Relaxed) && !sched.exited() {
75 crate::watchdog::LOOP_HEARTBEAT.fetch_add(1, Ordering::Relaxed);
76 std::thread::sleep(Duration::from_secs(1));
77
78 let stats = sched.read_stats();
79 let cur_hist = sched.read_wake_lat_hist();
80 let cur_sleep = sched.read_sleep_hist();
81
82 let mut wrapped = stats.nr_dispatches < prev.nr_dispatches;
87 if !wrapped {
88 'wrap: for tier in 0..3 {
89 for b in 0..HIST_BUCKETS {
90 if cur_hist[tier][b] < prev_hist[tier][b] {
91 wrapped = true;
92 break 'wrap;
93 }
94 }
95 }
96 }
97 if !wrapped {
98 for i in 0..SLEEP_BUCKETS {
99 if cur_sleep[i] < prev_sleep[i] {
100 wrapped = true;
101 break;
102 }
103 }
104 }
105 if wrapped {
106 log_warn!("WRAP DETECTED: BASELINE RESET, SKIPPING ADAPTIVE UPDATE");
107 prev = stats;
108 prev_hist = cur_hist;
109 prev_sleep = cur_sleep;
110 continue;
111 }
112
113 let delta_d = stats.nr_dispatches.wrapping_sub(prev.nr_dispatches);
115 let delta_idle = stats.nr_idle_hits.wrapping_sub(prev.nr_idle_hits);
116 let delta_shared = stats.nr_shared.wrapping_sub(prev.nr_shared);
117 let delta_preempt = stats.nr_preempt.wrapping_sub(prev.nr_preempt);
118 let delta_keep = stats.nr_keep_running.wrapping_sub(prev.nr_keep_running);
119 let delta_wake_sum = stats.wake_lat_sum.wrapping_sub(prev.wake_lat_sum);
120 let delta_wake_samples = stats.wake_lat_samples.wrapping_sub(prev.wake_lat_samples);
121 let delta_hard = stats.nr_hard_kicks.wrapping_sub(prev.nr_hard_kicks);
122 let delta_soft = stats.nr_soft_kicks.wrapping_sub(prev.nr_soft_kicks);
123 let delta_enq_wake = stats.nr_enq_wakeup.wrapping_sub(prev.nr_enq_wakeup);
124 let delta_enq_requeue = stats.nr_enq_requeue.wrapping_sub(prev.nr_enq_requeue);
125 let delta_rescue = stats
126 .nr_overflow_rescue
127 .wrapping_sub(prev.nr_overflow_rescue);
128 let wake_avg_us = if delta_wake_samples > 0 {
129 delta_wake_sum / delta_wake_samples / 1000
130 } else {
131 0
132 };
133
134 let d_idle_sum = stats.wake_lat_idle_sum.wrapping_sub(prev.wake_lat_idle_sum);
136 let d_idle_cnt = stats.wake_lat_idle_cnt.wrapping_sub(prev.wake_lat_idle_cnt);
137 let d_kick_sum = stats.wake_lat_kick_sum.wrapping_sub(prev.wake_lat_kick_sum);
138 let d_kick_cnt = stats.wake_lat_kick_cnt.wrapping_sub(prev.wake_lat_kick_cnt);
139 let lat_idle_us = if d_idle_cnt > 0 {
140 d_idle_sum / d_idle_cnt / 1000
141 } else {
142 0
143 };
144 let lat_kick_us = if d_kick_cnt > 0 {
145 d_kick_sum / d_kick_cnt / 1000
146 } else {
147 0
148 };
149 let delta_reenq = stats.nr_reenqueue.wrapping_sub(prev.nr_reenqueue);
150
151 let dl2_hb = stats.nr_l2_hit_batch.wrapping_sub(prev.nr_l2_hit_batch);
153 let dl2_mb = stats.nr_l2_miss_batch.wrapping_sub(prev.nr_l2_miss_batch);
154 let dl2_hi = stats
155 .nr_l2_hit_interactive
156 .wrapping_sub(prev.nr_l2_hit_interactive);
157 let dl2_mi = stats
158 .nr_l2_miss_interactive
159 .wrapping_sub(prev.nr_l2_miss_interactive);
160 let dl2_hl = stats
161 .nr_l2_hit_lat_crit
162 .wrapping_sub(prev.nr_l2_hit_lat_crit);
163 let dl2_ml = stats
164 .nr_l2_miss_lat_crit
165 .wrapping_sub(prev.nr_l2_miss_lat_crit);
166 let l2_pct_b = if dl2_hb + dl2_mb > 0 {
167 dl2_hb * 100 / (dl2_hb + dl2_mb)
168 } else {
169 0
170 };
171 let l2_pct_i = if dl2_hi + dl2_mi > 0 {
172 dl2_hi * 100 / (dl2_hi + dl2_mi)
173 } else {
174 0
175 };
176 let l2_pct_l = if dl2_hl + dl2_ml > 0 {
177 dl2_hl * 100 / (dl2_hl + dl2_ml)
178 } else {
179 0
180 };
181
182 let idle_pct = if delta_d > 0 {
183 delta_idle * 100 / delta_d
184 } else {
185 0
186 };
187
188 let mut delta_hist = [[0u64; HIST_BUCKETS]; 3];
190 for tier in 0..3 {
191 for b in 0..HIST_BUCKETS {
192 delta_hist[tier][b] = cur_hist[tier][b] - prev_hist[tier][b];
193 }
194 }
195
196 let tp99_b_ns = tuning::compute_p99_from_histogram(&delta_hist[0]);
198 let tp99_i_ns = tuning::compute_p99_from_histogram(&delta_hist[1]);
199 let tp99_l_ns = tuning::compute_p99_from_histogram(&delta_hist[2]);
200
201 let mut agg = [0u64; HIST_BUCKETS];
203 for t in 0..3 {
204 for b in 0..HIST_BUCKETS {
205 agg[b] += delta_hist[t][b];
206 }
207 }
208 let p99_ns = tuning::compute_p99_from_histogram(&agg);
209
210 let mut delta_sleep = [0u64; SLEEP_BUCKETS];
212 for i in 0..SLEEP_BUCKETS {
213 delta_sleep[i] = cur_sleep[i] - prev_sleep[i];
214 }
215 let sleep_total: u64 = delta_sleep.iter().sum();
216 let io_pct = if sleep_total > 0 {
217 (delta_sleep[0] + delta_sleep[1]) * 100 / sleep_total
218 } else {
219 0
220 };
221
222 let detected = detect_regime(regime, idle_pct);
224
225 let mut regime_changed_this_tick = false;
226 if detected != regime {
227 if detected == pending_regime {
228 regime_hold += 1;
229 } else {
230 pending_regime = detected;
231 regime_hold = 1;
232 }
233 if regime_hold >= 2 {
234 regime = detected;
235 let live = sched.read_tuning_knobs();
239 tau_ns = live.topology_tau_ns;
240 let mut rk = scaled_regime_knobs(regime, nr_cpus, tau_ns);
241 rk.topology_tau_ns = tau_ns;
242 rk.codel_eq_ns = live.codel_eq_ns;
243 sched.write_tuning_knobs(&rk)?;
244 regime_changed_this_tick = true;
245 mwu.set_baseline(rk);
246 mwu.reset();
247 }
248 } else {
249 pending_regime = regime;
250 regime_hold = 0;
251 }
252
253 if !regime_changed_this_tick {
256 let signals = MwuSignals {
257 p99_ns,
258 interactive_p99_ns: tp99_i_ns,
259 io_pct,
260 rescue_count: delta_rescue,
261 wakeup_rate: delta_enq_wake,
266 };
267 let osc_state = sched.read_oscillator_state();
274 let mut knobs = mwu.update(&signals, regime.p99_ceiling(), nr_cpus, tau_ns, &osc_state);
275 let live = sched.read_tuning_knobs();
279 knobs.topology_tau_ns = live.topology_tau_ns;
280 knobs.codel_eq_ns = live.codel_eq_ns;
281 sched.write_tuning_knobs(&knobs)?;
282 }
283
284 let tighten_delta = if mwu.had_losses() { 1u64 } else { 0u64 };
286 stability_score = tuning::compute_stability_score(
287 stability_score,
288 regime_changed_this_tick,
289 tighten_delta,
290 p99_ns,
291 regime.p99_ceiling(),
292 );
293
294 let (db_total, db_confident) = if let Some(ref mut db) = procdb {
296 db.ingest();
297 db.flush_predictions();
298 db.tick();
299 db.summary()
300 } else {
301 (0, 0)
302 };
303
304 let p99_us = p99_ns / 1000;
305 let tp99_b = tp99_b_ns / 1000;
306 let tp99_i = tp99_i_ns / 1000;
307 let tp99_l = tp99_l_ns / 1000;
308 let knobs = sched.read_tuning_knobs();
309
310 let sojourn_ms = stats.batch_sojourn_ns / 1_000_000;
311 let sojourn_thresh_ms = knobs.sojourn_thresh_ns / 1_000_000;
312 let longrun_label = if stats.longrun_mode_active > 0 {
313 " LONGRUN"
314 } else {
315 ""
316 };
317
318 if verbose && tuning::should_print_telemetry(tick_counter, stability_score) {
319 println!(
320 "d/s: {:<8} idle: {}% shared: {:<6} preempt: {:<4} keep: {:<4} kick: H={:<4} S={:<4} enq: W={:<4} R={:<4} wake: {}us p99: {}us [B:{} I:{} L:{}] lat_idle: {}us lat_kick: {}us procdb: {}/{} sleep: io={}% slice: {}us batch: {}us reenq: {} sjrn: {}ms/{}ms rescue: {} l2: B={}% I={}% L={}% [{}{}]",
321 delta_d, idle_pct, delta_shared, delta_preempt, delta_keep,
322 delta_hard, delta_soft, delta_enq_wake, delta_enq_requeue,
323 wake_avg_us, p99_us, tp99_b, tp99_i, tp99_l,
324 lat_idle_us, lat_kick_us,
325 db_total, db_confident,
326 io_pct, knobs.slice_ns / 1000, knobs.batch_slice_ns / 1000,
327 delta_reenq, sojourn_ms, sojourn_thresh_ms,
328 delta_rescue,
329 l2_pct_b, l2_pct_i, l2_pct_l, regime.label(), longrun_label,
330 );
331 }
332
333 sched.log.snapshot(
334 delta_d,
335 delta_idle,
336 delta_shared,
337 delta_preempt,
338 delta_keep,
339 wake_avg_us,
340 delta_hard,
341 delta_soft,
342 lat_idle_us,
343 lat_kick_us,
344 );
345
346 match regime {
347 Regime::Light => light_ticks += 1,
348 Regime::Mixed => mixed_ticks += 1,
349 Regime::Heavy => heavy_ticks += 1,
350 }
351
352 tick_counter += 1;
353 prev_hist = cur_hist;
354 prev_sleep = cur_sleep;
355 prev = stats;
356 }
357
358 if let Some(ref db) = procdb {
360 let path = ProcessDb::default_path();
361 match db.save(&path) {
362 Ok(()) => {
363 let (total, confident) = db.summary();
364 log_info!(
365 "PROCDB: SAVED {}/{} PROFILES TO {}",
366 confident,
367 total,
368 path.display()
369 );
370 }
371 Err(e) => log_warn!("PROCDB SAVE FAILED: {}", e),
372 }
373 }
374
375 let final_knobs = sched.read_tuning_knobs();
377 let final_stats = sched.read_stats();
378 let l2_total_b = final_stats.nr_l2_hit_batch + final_stats.nr_l2_miss_batch;
379 let l2_total_i = final_stats.nr_l2_hit_interactive + final_stats.nr_l2_miss_interactive;
380 let l2_total_l = final_stats.nr_l2_hit_lat_crit + final_stats.nr_l2_miss_lat_crit;
381 let l2_cum_b = if l2_total_b > 0 {
382 final_stats.nr_l2_hit_batch * 100 / l2_total_b
383 } else {
384 0
385 };
386 let l2_cum_i = if l2_total_i > 0 {
387 final_stats.nr_l2_hit_interactive * 100 / l2_total_i
388 } else {
389 0
390 };
391 let l2_cum_l = if l2_total_l > 0 {
392 final_stats.nr_l2_hit_lat_crit * 100 / l2_total_l
393 } else {
394 0
395 };
396 println!(
397 "[KNOBS] regime={} slice_ns={} batch_ns={} preempt_ns={} lag={} mwu={:.3} ticks=L:{}/M:{}/H:{} l2_hit=B:{}%/I:{}%/L:{}%",
398 regime.label(), final_knobs.slice_ns, final_knobs.batch_slice_ns,
399 final_knobs.preempt_thresh_ns,
400 final_knobs.lag_scale, mwu.scale(),
401 light_ticks, mixed_ticks, heavy_ticks,
402 l2_cum_b, l2_cum_i, l2_cum_l,
403 );
404
405 let should_restart = sched.read_exit_info();
407 Ok(should_restart)
408}