Skip to main content

scx_pandemonium/
adaptive.rs

1// PANDEMONIUM ADAPTIVE CONTROL LOOP
2// SINGLE-THREAD CLOSED-LOOP TUNING SYSTEM
3//
4// ONE THREAD: MONITOR LOOP (1-SECOND CONTROL LOOP)
5//   READS BPF PER-CPU HISTOGRAMS FOR P99 COMPUTATION.
6//   DETECTS WORKLOAD REGIME VIA SCHMITT TRIGGER.
7//   MWU ORCHESTRATOR TUNES ALL 11 KNOBS WITHIN REGIME.
8//
9// BPF PRODUCES HISTOGRAMS, RUST READS AND REACTS. RUST WRITES KNOBS,
10// BPF READS THEM ON THE VERY NEXT SCHEDULING DECISION.
11
12use std::sync::atomic::{AtomicBool, Ordering};
13use std::time::Duration;
14
15use anyhow::Result;
16
17use crate::procdb::ProcessDb;
18use crate::scheduler::{PandemoniumStats, Scheduler};
19use crate::tuning::{
20    self, detect_regime, scaled_regime_knobs, MwuController, MwuSignals, Regime, HIST_BUCKETS,
21};
22
23// REGIME THRESHOLDS, PROFILES, AND KNOB COMPUTATION LIVE IN tuning.rs
24// (ZERO BPF DEPENDENCIES, TESTABLE OFFLINE)
25
26// SLEEP PATTERN BUCKETS: CLASSIFY IO-WAIT VS IDLE WORKLOADS
27const SLEEP_BUCKETS: usize = 4;
28
29// MONITOR LOOP
30
31// 1-SECOND CONTROL LOOP. READS BPF HISTOGRAMS, COMPUTES P99,
32// DETECTS WORKLOAD REGIME, TIGHTENS/RELAXES KNOBS.
33// RUNS ON THE MAIN THREAD.
34pub fn monitor_loop(
35    sched: &mut Scheduler,
36    shutdown: &'static AtomicBool,
37    verbose: bool,
38    nr_cpus: u64,
39) -> Result<bool> {
40    let mut prev = PandemoniumStats::default();
41    let mut prev_hist = [[0u64; HIST_BUCKETS]; 3];
42    let mut prev_sleep = [0u64; SLEEP_BUCKETS];
43    let mut regime = Regime::Mixed;
44    let mut mwu = MwuController::new(scaled_regime_knobs(regime, nr_cpus));
45    let mut pending_regime = regime;
46    let mut regime_hold: u32 = 0;
47    let mut light_ticks: u64 = 0;
48    let mut mixed_ticks: u64 = 0;
49    let mut heavy_ticks: u64 = 0;
50    let mut stability_score: u32 = 0;
51    let mut tick_counter: u64 = 0;
52
53    let mut procdb = match ProcessDb::new() {
54        Ok(db) => Some(db),
55        Err(e) => {
56            log_warn!("PROCDB INIT FAILED: {}", e);
57            None
58        }
59    };
60
61    // APPLY INITIAL REGIME
62    sched.write_tuning_knobs(&scaled_regime_knobs(regime, nr_cpus))?;
63
64    while !shutdown.load(Ordering::Relaxed) && !sched.exited() {
65        std::thread::sleep(Duration::from_secs(1));
66
67        let stats = sched.read_stats();
68
69        // COMPUTE DELTAS
70        let delta_d = stats.nr_dispatches.wrapping_sub(prev.nr_dispatches);
71        let delta_idle = stats.nr_idle_hits.wrapping_sub(prev.nr_idle_hits);
72        let delta_shared = stats.nr_shared.wrapping_sub(prev.nr_shared);
73        let delta_preempt = stats.nr_preempt.wrapping_sub(prev.nr_preempt);
74        let delta_keep = stats.nr_keep_running.wrapping_sub(prev.nr_keep_running);
75        let delta_wake_sum = stats.wake_lat_sum.wrapping_sub(prev.wake_lat_sum);
76        let delta_wake_samples = stats.wake_lat_samples.wrapping_sub(prev.wake_lat_samples);
77        let delta_hard = stats.nr_hard_kicks.wrapping_sub(prev.nr_hard_kicks);
78        let delta_soft = stats.nr_soft_kicks.wrapping_sub(prev.nr_soft_kicks);
79        let delta_enq_wake = stats.nr_enq_wakeup.wrapping_sub(prev.nr_enq_wakeup);
80        let delta_enq_requeue = stats.nr_enq_requeue.wrapping_sub(prev.nr_enq_requeue);
81        let delta_rescue = stats
82            .nr_overflow_rescue
83            .wrapping_sub(prev.nr_overflow_rescue);
84        let wake_avg_us = if delta_wake_samples > 0 {
85            delta_wake_sum / delta_wake_samples / 1000
86        } else {
87            0
88        };
89
90        // PER-PATH LATENCY
91        let d_idle_sum = stats.wake_lat_idle_sum.wrapping_sub(prev.wake_lat_idle_sum);
92        let d_idle_cnt = stats.wake_lat_idle_cnt.wrapping_sub(prev.wake_lat_idle_cnt);
93        let d_kick_sum = stats.wake_lat_kick_sum.wrapping_sub(prev.wake_lat_kick_sum);
94        let d_kick_cnt = stats.wake_lat_kick_cnt.wrapping_sub(prev.wake_lat_kick_cnt);
95        let lat_idle_us = if d_idle_cnt > 0 {
96            d_idle_sum / d_idle_cnt / 1000
97        } else {
98            0
99        };
100        let lat_kick_us = if d_kick_cnt > 0 {
101            d_kick_sum / d_kick_cnt / 1000
102        } else {
103            0
104        };
105        let delta_reenq = stats.nr_reenqueue.wrapping_sub(prev.nr_reenqueue);
106
107        // L2 CACHE AFFINITY DELTAS
108        let dl2_hb = stats.nr_l2_hit_batch.wrapping_sub(prev.nr_l2_hit_batch);
109        let dl2_mb = stats.nr_l2_miss_batch.wrapping_sub(prev.nr_l2_miss_batch);
110        let dl2_hi = stats
111            .nr_l2_hit_interactive
112            .wrapping_sub(prev.nr_l2_hit_interactive);
113        let dl2_mi = stats
114            .nr_l2_miss_interactive
115            .wrapping_sub(prev.nr_l2_miss_interactive);
116        let dl2_hl = stats
117            .nr_l2_hit_lat_crit
118            .wrapping_sub(prev.nr_l2_hit_lat_crit);
119        let dl2_ml = stats
120            .nr_l2_miss_lat_crit
121            .wrapping_sub(prev.nr_l2_miss_lat_crit);
122        let l2_pct_b = if dl2_hb + dl2_mb > 0 {
123            dl2_hb * 100 / (dl2_hb + dl2_mb)
124        } else {
125            0
126        };
127        let l2_pct_i = if dl2_hi + dl2_mi > 0 {
128            dl2_hi * 100 / (dl2_hi + dl2_mi)
129        } else {
130            0
131        };
132        let l2_pct_l = if dl2_hl + dl2_ml > 0 {
133            dl2_hl * 100 / (dl2_hl + dl2_ml)
134        } else {
135            0
136        };
137
138        let idle_pct = if delta_d > 0 {
139            delta_idle * 100 / delta_d
140        } else {
141            0
142        };
143
144        // READ HISTOGRAMS (CUMULATIVE, COMPUTE DELTAS)
145        let cur_hist = sched.read_wake_lat_hist();
146        let mut delta_hist = [[0u64; HIST_BUCKETS]; 3];
147        for tier in 0..3 {
148            for b in 0..HIST_BUCKETS {
149                delta_hist[tier][b] = cur_hist[tier][b].wrapping_sub(prev_hist[tier][b]);
150            }
151        }
152
153        // COMPUTE P99 PER TIER
154        let tp99_b_ns = tuning::compute_p99_from_histogram(&delta_hist[0]);
155        let tp99_i_ns = tuning::compute_p99_from_histogram(&delta_hist[1]);
156        let tp99_l_ns = tuning::compute_p99_from_histogram(&delta_hist[2]);
157
158        // AGGREGATE P99
159        let mut agg = [0u64; HIST_BUCKETS];
160        for t in 0..3 {
161            for b in 0..HIST_BUCKETS {
162                agg[b] += delta_hist[t][b];
163            }
164        }
165        let p99_ns = tuning::compute_p99_from_histogram(&agg);
166
167        // SLEEP HISTOGRAM
168        let cur_sleep = sched.read_sleep_hist();
169        let mut delta_sleep = [0u64; SLEEP_BUCKETS];
170        for i in 0..SLEEP_BUCKETS {
171            delta_sleep[i] = cur_sleep[i].wrapping_sub(prev_sleep[i]);
172        }
173        let sleep_total: u64 = delta_sleep.iter().sum();
174        let io_pct = if sleep_total > 0 {
175            (delta_sleep[0] + delta_sleep[1]) * 100 / sleep_total
176        } else {
177            0
178        };
179
180        // DETECT REGIME (SCHMITT TRIGGER + 2-TICK HOLD)
181        let detected = detect_regime(regime, idle_pct);
182
183        let mut regime_changed_this_tick = false;
184        if detected != regime {
185            if detected == pending_regime {
186                regime_hold += 1;
187            } else {
188                pending_regime = detected;
189                regime_hold = 1;
190            }
191            if regime_hold >= 2 {
192                regime = detected;
193                let rk = scaled_regime_knobs(regime, nr_cpus);
194                sched.write_tuning_knobs(&rk)?;
195                regime_changed_this_tick = true;
196                mwu.set_baseline(rk);
197                mwu.reset();
198            }
199        } else {
200            pending_regime = regime;
201            regime_hold = 0;
202        }
203
204        // MWU ORCHESTRATOR: UNIFIED KNOB CONTROL
205        // REPLACES: TIGHTEN/RELAX, SLEEP-INFORMED BATCH, SOJOURN EWMA, LONGRUN OVERRIDE
206        if !regime_changed_this_tick {
207            let signals = MwuSignals {
208                p99_ns,
209                interactive_p99_ns: tp99_i_ns,
210                io_pct,
211                rescue_count: delta_rescue,
212                wakeup_rate: delta_enq_wake / nr_cpus.max(1),
213            };
214            let knobs = mwu.update(&signals, regime.p99_ceiling(), nr_cpus);
215            sched.write_tuning_knobs(&knobs)?;
216        }
217
218        // STABILITY TRACKING
219        let tighten_delta = if mwu.had_losses() { 1u64 } else { 0u64 };
220        stability_score = tuning::compute_stability_score(
221            stability_score,
222            regime_changed_this_tick,
223            tighten_delta,
224            p99_ns,
225            regime.p99_ceiling(),
226        );
227
228        // PROCESS CLASSIFICATION DATABASE: INGEST, PREDICT, EVICT
229        let (db_total, db_confident) = if let Some(ref mut db) = procdb {
230            db.ingest();
231            db.flush_predictions();
232            db.tick();
233            db.summary()
234        } else {
235            (0, 0)
236        };
237
238        let p99_us = p99_ns / 1000;
239        let tp99_b = tp99_b_ns / 1000;
240        let tp99_i = tp99_i_ns / 1000;
241        let tp99_l = tp99_l_ns / 1000;
242        let knobs = sched.read_tuning_knobs();
243
244        let sojourn_ms = stats.batch_sojourn_ns / 1_000_000;
245        let sojourn_thresh_ms = knobs.sojourn_thresh_ns / 1_000_000;
246        let delta_burst = stats.burst_mode_active.wrapping_sub(prev.burst_mode_active);
247        let burst_label = if delta_burst > 0 { " BURST" } else { "" };
248        let longrun_label = if stats.longrun_mode_active > 0 {
249            " LONGRUN"
250        } else {
251            ""
252        };
253
254        if verbose && tuning::should_print_telemetry(tick_counter, stability_score) {
255            println!(
256                "d/s: {:<8} idle: {}% shared: {:<6} preempt: {:<4} keep: {:<4} kick: H={:<4} S={:<4} enq: W={:<4} R={:<4} wake: {}us p99: {}us [B:{} I:{} L:{}] lat_idle: {}us lat_kick: {}us procdb: {}/{} sleep: io={}% slice: {}us batch: {}us reenq: {} sjrn: {}ms/{}ms rescue: {} l2: B={}% I={}% L={}% [{}{}{}]",
257                delta_d, idle_pct, delta_shared, delta_preempt, delta_keep,
258                delta_hard, delta_soft, delta_enq_wake, delta_enq_requeue,
259                wake_avg_us, p99_us, tp99_b, tp99_i, tp99_l,
260                lat_idle_us, lat_kick_us,
261                db_total, db_confident,
262                io_pct, knobs.slice_ns / 1000, knobs.batch_slice_ns / 1000,
263                delta_reenq, sojourn_ms, sojourn_thresh_ms,
264                delta_rescue,
265                l2_pct_b, l2_pct_i, l2_pct_l, regime.label(), burst_label, longrun_label,
266            );
267        }
268
269        sched.log.snapshot(
270            delta_d,
271            delta_idle,
272            delta_shared,
273            delta_preempt,
274            delta_keep,
275            wake_avg_us,
276            delta_hard,
277            delta_soft,
278            lat_idle_us,
279            lat_kick_us,
280        );
281
282        match regime {
283            Regime::Light => light_ticks += 1,
284            Regime::Mixed => mixed_ticks += 1,
285            Regime::Heavy => heavy_ticks += 1,
286        }
287
288        tick_counter += 1;
289        prev_hist = cur_hist;
290        prev_sleep = cur_sleep;
291        prev = stats;
292    }
293
294    // PROCDB: SAVE LEARNED CLASSIFICATIONS TO DISK
295    if let Some(ref db) = procdb {
296        let path = ProcessDb::default_path();
297        match db.save(&path) {
298            Ok(()) => {
299                let (total, confident) = db.summary();
300                log_info!(
301                    "PROCDB: SAVED {}/{} PROFILES TO {}",
302                    confident,
303                    total,
304                    path.display()
305                );
306            }
307            Err(e) => log_warn!("PROCDB SAVE FAILED: {}", e),
308        }
309    }
310
311    // KNOBS SUMMARY: CAPTURED BY TEST HARNESS FOR ARCHIVE
312    let final_knobs = sched.read_tuning_knobs();
313    let final_stats = sched.read_stats();
314    let l2_total_b = final_stats.nr_l2_hit_batch + final_stats.nr_l2_miss_batch;
315    let l2_total_i = final_stats.nr_l2_hit_interactive + final_stats.nr_l2_miss_interactive;
316    let l2_total_l = final_stats.nr_l2_hit_lat_crit + final_stats.nr_l2_miss_lat_crit;
317    let l2_cum_b = if l2_total_b > 0 {
318        final_stats.nr_l2_hit_batch * 100 / l2_total_b
319    } else {
320        0
321    };
322    let l2_cum_i = if l2_total_i > 0 {
323        final_stats.nr_l2_hit_interactive * 100 / l2_total_i
324    } else {
325        0
326    };
327    let l2_cum_l = if l2_total_l > 0 {
328        final_stats.nr_l2_hit_lat_crit * 100 / l2_total_l
329    } else {
330        0
331    };
332    println!(
333        "[KNOBS] regime={} slice_ns={} batch_ns={} preempt_ns={} demotion_ns={} lag={} mwu={:.3} ticks=L:{}/M:{}/H:{} l2_hit=B:{}%/I:{}%/L:{}%",
334        regime.label(), final_knobs.slice_ns, final_knobs.batch_slice_ns,
335        final_knobs.preempt_thresh_ns, final_knobs.cpu_bound_thresh_ns,
336        final_knobs.lag_scale, mwu.scale(),
337        light_ticks, mixed_ticks, heavy_ticks,
338        l2_cum_b, l2_cum_i, l2_cum_l,
339    );
340
341    // READ UEI EXIT REASON
342    let should_restart = sched.read_exit_info();
343    Ok(should_restart)
344}