Skip to main content

scx_pandemonium/
adaptive.rs

1// PANDEMONIUM ADAPTIVE CONTROL LOOP
2// SINGLE-THREAD CLOSED-LOOP TUNING SYSTEM
3//
4// ONE THREAD: MONITOR LOOP (1-SECOND CONTROL LOOP)
5//   READS BPF PER-CPU HISTOGRAMS FOR P99 COMPUTATION.
6//   DETECTS WORKLOAD REGIME VIA SCHMITT TRIGGER.
7//   MWU ORCHESTRATOR TUNES ALL 11 KNOBS WITHIN REGIME.
8//
9// BPF PRODUCES HISTOGRAMS, RUST READS AND REACTS. RUST WRITES KNOBS,
10// BPF READS THEM ON THE VERY NEXT SCHEDULING DECISION.
11
12use std::sync::atomic::{AtomicBool, Ordering};
13use std::time::Duration;
14
15use anyhow::Result;
16
17use crate::procdb::ProcessDb;
18use crate::scheduler::{PandemoniumStats, Scheduler};
19use crate::tuning::{
20    self, detect_regime, scaled_regime_knobs, MwuController, MwuSignals, Regime, HIST_BUCKETS,
21};
22
23// REGIME THRESHOLDS, PROFILES, AND KNOB COMPUTATION LIVE IN tuning.rs
24// (ZERO BPF DEPENDENCIES, TESTABLE OFFLINE)
25
26// SLEEP PATTERN BUCKETS: CLASSIFY IO-WAIT VS IDLE WORKLOADS
27const SLEEP_BUCKETS: usize = 4;
28
29// MONITOR LOOP
30
31// 1-SECOND CONTROL LOOP. READS BPF HISTOGRAMS, COMPUTES P99,
32// DETECTS WORKLOAD REGIME, TIGHTENS/RELAXES KNOBS.
33// RUNS ON THE MAIN THREAD.
34pub fn monitor_loop(
35    sched: &mut Scheduler,
36    shutdown: &'static AtomicBool,
37    verbose: bool,
38    nr_cpus: u64,
39) -> Result<bool> {
40    let mut prev = PandemoniumStats::default();
41    let mut prev_hist = [[0u64; HIST_BUCKETS]; 3];
42    let mut prev_sleep = [0u64; SLEEP_BUCKETS];
43    let mut regime = Regime::Mixed;
44    // READ CURRENT tau SNAPSHOT FROM THE BPF-SIDE KNOB MAP. main.rs WROTE IT
45    // ONCE AT TOPOLOGY DETECT; THE ADAPTIVE LOOP RE-READS SO TAU-SCALED REGIME
46    // KNOBS AGREE WITH TAU-SCALED BPF INIT AT FIRST TICK AND EVERY REGIME CHANGE.
47    let mut tau_ns = sched.read_tuning_knobs().topology_tau_ns;
48    let mut mwu = MwuController::new(scaled_regime_knobs(regime, nr_cpus, tau_ns));
49    let mut pending_regime = regime;
50    let mut regime_hold: u32 = 0;
51    let mut light_ticks: u64 = 0;
52    let mut mixed_ticks: u64 = 0;
53    let mut heavy_ticks: u64 = 0;
54    let mut stability_score: u32 = 0;
55    let mut tick_counter: u64 = 0;
56
57    let mut procdb = match ProcessDb::new() {
58        Ok(db) => Some(db),
59        Err(e) => {
60            log_warn!("PROCDB INIT FAILED: {}", e);
61            None
62        }
63    };
64
65    // APPLY INITIAL REGIME. scaled_regime_knobs RETURNS topology_tau_ns/codel_eq_ns=0;
66    // OVERLAY THE LIVE BPF VALUES SO THE FIRST WRITE DOESN'T CLOBBER WHAT
67    // write_topology_fields() PUT IN THE MAP. Mirrors the regime-change path at line 230.
68    let live = sched.read_tuning_knobs();
69    let mut rk = scaled_regime_knobs(regime, nr_cpus, tau_ns);
70    rk.topology_tau_ns = tau_ns;
71    rk.codel_eq_ns = live.codel_eq_ns;
72    sched.write_tuning_knobs(&rk)?;
73
74    while !shutdown.load(Ordering::Relaxed) && !sched.exited() {
75        crate::watchdog::LOOP_HEARTBEAT.fetch_add(1, Ordering::Relaxed);
76        std::thread::sleep(Duration::from_secs(1));
77
78        let stats = sched.read_stats();
79        let cur_hist = sched.read_wake_lat_hist();
80        let cur_sleep = sched.read_sleep_hist();
81
82        // WRAP GUARD: BPF RELOAD, UEI RECOVERY, OR HOTPLUG CAN RESET KERNEL-SIDE
83        // CUMULATIVE COUNTERS WHILE RUST'S PREV STILL HOLDS OLD VALUES. WITHOUT
84        // THIS CHECK, WRAPPING_SUB PRODUCES A GARBAGE POSITIVE DELTA THAT POISONS
85        // P99 AND FEEDS NONSENSE TO MWU. RESET BASELINE AND SKIP THE TICK.
86        let mut wrapped = stats.nr_dispatches < prev.nr_dispatches;
87        if !wrapped {
88            'wrap: for tier in 0..3 {
89                for b in 0..HIST_BUCKETS {
90                    if cur_hist[tier][b] < prev_hist[tier][b] {
91                        wrapped = true;
92                        break 'wrap;
93                    }
94                }
95            }
96        }
97        if !wrapped {
98            for i in 0..SLEEP_BUCKETS {
99                if cur_sleep[i] < prev_sleep[i] {
100                    wrapped = true;
101                    break;
102                }
103            }
104        }
105        if wrapped {
106            log_warn!("WRAP DETECTED: BASELINE RESET, SKIPPING ADAPTIVE UPDATE");
107            prev = stats;
108            prev_hist = cur_hist;
109            prev_sleep = cur_sleep;
110            continue;
111        }
112
113        // COMPUTE DELTAS
114        let delta_d = stats.nr_dispatches.wrapping_sub(prev.nr_dispatches);
115        let delta_idle = stats.nr_idle_hits.wrapping_sub(prev.nr_idle_hits);
116        let delta_shared = stats.nr_shared.wrapping_sub(prev.nr_shared);
117        let delta_preempt = stats.nr_preempt.wrapping_sub(prev.nr_preempt);
118        let delta_keep = stats.nr_keep_running.wrapping_sub(prev.nr_keep_running);
119        let delta_wake_sum = stats.wake_lat_sum.wrapping_sub(prev.wake_lat_sum);
120        let delta_wake_samples = stats.wake_lat_samples.wrapping_sub(prev.wake_lat_samples);
121        let delta_hard = stats.nr_hard_kicks.wrapping_sub(prev.nr_hard_kicks);
122        let delta_soft = stats.nr_soft_kicks.wrapping_sub(prev.nr_soft_kicks);
123        let delta_enq_wake = stats.nr_enq_wakeup.wrapping_sub(prev.nr_enq_wakeup);
124        let delta_enq_requeue = stats.nr_enq_requeue.wrapping_sub(prev.nr_enq_requeue);
125        let delta_rescue = stats
126            .nr_overflow_rescue
127            .wrapping_sub(prev.nr_overflow_rescue);
128        let wake_avg_us = if delta_wake_samples > 0 {
129            delta_wake_sum / delta_wake_samples / 1000
130        } else {
131            0
132        };
133
134        // PER-PATH LATENCY
135        let d_idle_sum = stats.wake_lat_idle_sum.wrapping_sub(prev.wake_lat_idle_sum);
136        let d_idle_cnt = stats.wake_lat_idle_cnt.wrapping_sub(prev.wake_lat_idle_cnt);
137        let d_kick_sum = stats.wake_lat_kick_sum.wrapping_sub(prev.wake_lat_kick_sum);
138        let d_kick_cnt = stats.wake_lat_kick_cnt.wrapping_sub(prev.wake_lat_kick_cnt);
139        let lat_idle_us = if d_idle_cnt > 0 {
140            d_idle_sum / d_idle_cnt / 1000
141        } else {
142            0
143        };
144        let lat_kick_us = if d_kick_cnt > 0 {
145            d_kick_sum / d_kick_cnt / 1000
146        } else {
147            0
148        };
149        let delta_reenq = stats.nr_reenqueue.wrapping_sub(prev.nr_reenqueue);
150
151        // L2 CACHE AFFINITY DELTAS
152        let dl2_hb = stats.nr_l2_hit_batch.wrapping_sub(prev.nr_l2_hit_batch);
153        let dl2_mb = stats.nr_l2_miss_batch.wrapping_sub(prev.nr_l2_miss_batch);
154        let dl2_hi = stats
155            .nr_l2_hit_interactive
156            .wrapping_sub(prev.nr_l2_hit_interactive);
157        let dl2_mi = stats
158            .nr_l2_miss_interactive
159            .wrapping_sub(prev.nr_l2_miss_interactive);
160        let dl2_hl = stats
161            .nr_l2_hit_lat_crit
162            .wrapping_sub(prev.nr_l2_hit_lat_crit);
163        let dl2_ml = stats
164            .nr_l2_miss_lat_crit
165            .wrapping_sub(prev.nr_l2_miss_lat_crit);
166        let l2_pct_b = if dl2_hb + dl2_mb > 0 {
167            dl2_hb * 100 / (dl2_hb + dl2_mb)
168        } else {
169            0
170        };
171        let l2_pct_i = if dl2_hi + dl2_mi > 0 {
172            dl2_hi * 100 / (dl2_hi + dl2_mi)
173        } else {
174            0
175        };
176        let l2_pct_l = if dl2_hl + dl2_ml > 0 {
177            dl2_hl * 100 / (dl2_hl + dl2_ml)
178        } else {
179            0
180        };
181
182        let idle_pct = if delta_d > 0 {
183            delta_idle * 100 / delta_d
184        } else {
185            0
186        };
187
188        // COMPUTE HISTOGRAM DELTAS (cur_hist READ AT TOP FOR WRAP GUARD)
189        let mut delta_hist = [[0u64; HIST_BUCKETS]; 3];
190        for tier in 0..3 {
191            for b in 0..HIST_BUCKETS {
192                delta_hist[tier][b] = cur_hist[tier][b] - prev_hist[tier][b];
193            }
194        }
195
196        // COMPUTE P99 PER TIER
197        let tp99_b_ns = tuning::compute_p99_from_histogram(&delta_hist[0]);
198        let tp99_i_ns = tuning::compute_p99_from_histogram(&delta_hist[1]);
199        let tp99_l_ns = tuning::compute_p99_from_histogram(&delta_hist[2]);
200
201        // AGGREGATE P99
202        let mut agg = [0u64; HIST_BUCKETS];
203        for t in 0..3 {
204            for b in 0..HIST_BUCKETS {
205                agg[b] += delta_hist[t][b];
206            }
207        }
208        let p99_ns = tuning::compute_p99_from_histogram(&agg);
209
210        // SLEEP HISTOGRAM DELTAS (cur_sleep READ AT TOP FOR WRAP GUARD)
211        let mut delta_sleep = [0u64; SLEEP_BUCKETS];
212        for i in 0..SLEEP_BUCKETS {
213            delta_sleep[i] = cur_sleep[i] - prev_sleep[i];
214        }
215        let sleep_total: u64 = delta_sleep.iter().sum();
216        let io_pct = if sleep_total > 0 {
217            (delta_sleep[0] + delta_sleep[1]) * 100 / sleep_total
218        } else {
219            0
220        };
221
222        // DETECT REGIME (SCHMITT TRIGGER + 2-TICK HOLD)
223        let detected = detect_regime(regime, idle_pct);
224
225        let mut regime_changed_this_tick = false;
226        if detected != regime {
227            if detected == pending_regime {
228                regime_hold += 1;
229            } else {
230                pending_regime = detected;
231                regime_hold = 1;
232            }
233            if regime_hold >= 2 {
234                regime = detected;
235                // REFRESH tau IN CASE HOTPLUG/TOPOLOGY CHANGED.
236                // scaled_regime_knobs RETURNS topology_tau_ns/codel_eq_ns=0;
237                // OVERLAY THE LIVE BPF VALUES (BOTH OWNED BY TOPOLOGY LAYER).
238                let live = sched.read_tuning_knobs();
239                tau_ns = live.topology_tau_ns;
240                let mut rk = scaled_regime_knobs(regime, nr_cpus, tau_ns);
241                rk.topology_tau_ns = tau_ns;
242                rk.codel_eq_ns = live.codel_eq_ns;
243                sched.write_tuning_knobs(&rk)?;
244                regime_changed_this_tick = true;
245                mwu.set_baseline(rk);
246                mwu.reset();
247            }
248        } else {
249            pending_regime = regime;
250            regime_hold = 0;
251        }
252
253        // MWU ORCHESTRATOR: UNIFIED KNOB CONTROL
254        // REPLACES: TIGHTEN/RELAX, SLEEP-INFORMED BATCH, SOJOURN EWMA, LONGRUN OVERRIDE
255        if !regime_changed_this_tick {
256            let signals = MwuSignals {
257                p99_ns,
258                interactive_p99_ns: tp99_i_ns,
259                io_pct,
260                rescue_count: delta_rescue,
261                // RAW total wakes/sec; the MWU fork-storm gate compares against
262                // a tau-derived total threshold (scale_tau_u64 * K_FORK_STORM_RATE).
263                // Per-CPU normalization here re-introduced an nr_cpus^2 effective
264                // threshold and latched on quiet 2-4C systems.
265                wakeup_rate: delta_enq_wake,
266            };
267            // OSCILLATOR-AWARE GATING: READ THE BPF DAMPED-HARMONIC
268            // OSCILLATOR'S CURRENT STATE BEFORE MWU DECIDES. PATHWAYS
269            // 2 AND 4 (RESCUE-DRIVEN) DEFER WHEN THE OSCILLATOR HAS
270            // ALREADY MOVED. WITHOUT THIS, MWU AND THE OSCILLATOR
271            // INDEPENDENTLY ADAPT ON global_rescue_count AND THE TWO
272            // CONTROLLERS DOUBLE-CORRECT.
273            let osc_state = sched.read_oscillator_state();
274            let mut knobs = mwu.update(&signals, regime.p99_ceiling(), nr_cpus, tau_ns, &osc_state);
275            // PRESERVE TOPOLOGY-OWNED FIELDS (tau_ns, codel_eq_ns) -- MWU
276            // DOESN'T TOUCH THEM. WITHOUT THIS, THE ADAPTIVE LOOP'S 1HZ
277            // WRITES WOULD CLOBBER VALUES main.rs SET AT TOPOLOGY DETECT.
278            let live = sched.read_tuning_knobs();
279            knobs.topology_tau_ns = live.topology_tau_ns;
280            knobs.codel_eq_ns = live.codel_eq_ns;
281            sched.write_tuning_knobs(&knobs)?;
282        }
283
284        // STABILITY TRACKING
285        let tighten_delta = if mwu.had_losses() { 1u64 } else { 0u64 };
286        stability_score = tuning::compute_stability_score(
287            stability_score,
288            regime_changed_this_tick,
289            tighten_delta,
290            p99_ns,
291            regime.p99_ceiling(),
292        );
293
294        // PROCESS CLASSIFICATION DATABASE: INGEST, PREDICT, EVICT
295        let (db_total, db_confident) = if let Some(ref mut db) = procdb {
296            db.ingest();
297            db.flush_predictions();
298            db.tick();
299            db.summary()
300        } else {
301            (0, 0)
302        };
303
304        let p99_us = p99_ns / 1000;
305        let tp99_b = tp99_b_ns / 1000;
306        let tp99_i = tp99_i_ns / 1000;
307        let tp99_l = tp99_l_ns / 1000;
308        let knobs = sched.read_tuning_knobs();
309
310        let sojourn_ms = stats.batch_sojourn_ns / 1_000_000;
311        let sojourn_thresh_ms = knobs.sojourn_thresh_ns / 1_000_000;
312        let longrun_label = if stats.longrun_mode_active > 0 {
313            " LONGRUN"
314        } else {
315            ""
316        };
317
318        if verbose && tuning::should_print_telemetry(tick_counter, stability_score) {
319            println!(
320                "d/s: {:<8} idle: {}% shared: {:<6} preempt: {:<4} keep: {:<4} kick: H={:<4} S={:<4} enq: W={:<4} R={:<4} wake: {}us p99: {}us [B:{} I:{} L:{}] lat_idle: {}us lat_kick: {}us procdb: {}/{} sleep: io={}% slice: {}us batch: {}us reenq: {} sjrn: {}ms/{}ms rescue: {} l2: B={}% I={}% L={}% [{}{}]",
321                delta_d, idle_pct, delta_shared, delta_preempt, delta_keep,
322                delta_hard, delta_soft, delta_enq_wake, delta_enq_requeue,
323                wake_avg_us, p99_us, tp99_b, tp99_i, tp99_l,
324                lat_idle_us, lat_kick_us,
325                db_total, db_confident,
326                io_pct, knobs.slice_ns / 1000, knobs.batch_slice_ns / 1000,
327                delta_reenq, sojourn_ms, sojourn_thresh_ms,
328                delta_rescue,
329                l2_pct_b, l2_pct_i, l2_pct_l, regime.label(), longrun_label,
330            );
331        }
332
333        sched.log.snapshot(
334            delta_d,
335            delta_idle,
336            delta_shared,
337            delta_preempt,
338            delta_keep,
339            wake_avg_us,
340            delta_hard,
341            delta_soft,
342            lat_idle_us,
343            lat_kick_us,
344        );
345
346        match regime {
347            Regime::Light => light_ticks += 1,
348            Regime::Mixed => mixed_ticks += 1,
349            Regime::Heavy => heavy_ticks += 1,
350        }
351
352        tick_counter += 1;
353        prev_hist = cur_hist;
354        prev_sleep = cur_sleep;
355        prev = stats;
356    }
357
358    // PROCDB: SAVE LEARNED CLASSIFICATIONS TO DISK
359    if let Some(ref db) = procdb {
360        let path = ProcessDb::default_path();
361        match db.save(&path) {
362            Ok(()) => {
363                let (total, confident) = db.summary();
364                log_info!(
365                    "PROCDB: SAVED {}/{} PROFILES TO {}",
366                    confident,
367                    total,
368                    path.display()
369                );
370            }
371            Err(e) => log_warn!("PROCDB SAVE FAILED: {}", e),
372        }
373    }
374
375    // KNOBS SUMMARY: CAPTURED BY TEST HARNESS FOR ARCHIVE
376    let final_knobs = sched.read_tuning_knobs();
377    let final_stats = sched.read_stats();
378    let l2_total_b = final_stats.nr_l2_hit_batch + final_stats.nr_l2_miss_batch;
379    let l2_total_i = final_stats.nr_l2_hit_interactive + final_stats.nr_l2_miss_interactive;
380    let l2_total_l = final_stats.nr_l2_hit_lat_crit + final_stats.nr_l2_miss_lat_crit;
381    let l2_cum_b = if l2_total_b > 0 {
382        final_stats.nr_l2_hit_batch * 100 / l2_total_b
383    } else {
384        0
385    };
386    let l2_cum_i = if l2_total_i > 0 {
387        final_stats.nr_l2_hit_interactive * 100 / l2_total_i
388    } else {
389        0
390    };
391    let l2_cum_l = if l2_total_l > 0 {
392        final_stats.nr_l2_hit_lat_crit * 100 / l2_total_l
393    } else {
394        0
395    };
396    println!(
397        "[KNOBS] regime={} slice_ns={} batch_ns={} preempt_ns={} lag={} mwu={:.3} ticks=L:{}/M:{}/H:{} l2_hit=B:{}%/I:{}%/L:{}%",
398        regime.label(), final_knobs.slice_ns, final_knobs.batch_slice_ns,
399        final_knobs.preempt_thresh_ns,
400        final_knobs.lag_scale, mwu.scale(),
401        light_ticks, mixed_ticks, heavy_ticks,
402        l2_cum_b, l2_cum_i, l2_cum_l,
403    );
404
405    // READ UEI EXIT REASON
406    let should_restart = sched.read_exit_info();
407    Ok(should_restart)
408}