Skip to main content

scx_pandemonium/
procdb.rs

1// PANDEMONIUM PROCESS CLASSIFICATION DATABASE
2// BPF OBSERVES MATURE TASK BEHAVIOR, RUST LEARNS PATTERNS, BPF APPLIES
3//
4// PROBLEM: EVERY NEW TASK ENTERS AS TIER_INTERACTIVE IN BPF enable().
5// SHORT-LIVED PROCESSES (cc1, as, ld DURING COMPILATION) NEVER SURVIVE
6// LONG ENOUGH TO GET RECLASSIFIED. HUNDREDS OF MISCLASSIFIED TASKS PER
7// SECOND DURING make -j12, EACH FIRING PREEMPT KICKS AND GETTING SHORT
8// INTERACTIVE SLICES.
9//
10// SOLUTION: BPF WRITES OBSERVATIONS TO AN LRU MAP WHEN A TASK'S EWMA
11// MATURES (ewma_age == 8). RUST DRAINS OBSERVATIONS EVERY SECOND,
12// MERGES INTO A HASHMAP WITH EWMA DECAY, AND WRITES CONFIDENT
13// PREDICTIONS BACK TO A BPF HASH MAP. NEW TASKS WITH MATCHING comm
14// START WITH THE CORRECT TIER AND avg_runtime FROM enable().
15
16use std::collections::HashMap;
17use std::io::Write;
18use std::path::{Path, PathBuf};
19
20use anyhow::Result;
21use libbpf_rs::MapCore;
22
23fn _timestamp() -> String {
24    unsafe {
25        let mut t: libc::time_t = 0;
26        libc::time(&mut t);
27        let mut tm: libc::tm = std::mem::zeroed();
28        libc::localtime_r(&t, &mut tm);
29        format!("[{:02}:{:02}:{:02}]", tm.tm_hour, tm.tm_min, tm.tm_sec)
30    }
31}
32
33macro_rules! procdb_info {
34    ($($arg:tt)*) => { println!("{} [INFO]   {}", _timestamp(), format!($($arg)*)) };
35}
36macro_rules! procdb_warn {
37    ($($arg:tt)*) => { println!("{} [WARN]   {}", _timestamp(), format!($($arg)*)) };
38}
39
40const OBSERVE_PIN: &str = "/sys/fs/bpf/pandemonium/task_class_observe";
41const INIT_PIN: &str = "/sys/fs/bpf/pandemonium/task_class_init";
42
43pub const MIN_OBSERVATIONS: u32 = 3;
44pub const MIN_CONFIDENCE: f64 = 0.6;
45pub const MAX_PROFILES: usize = 512;
46pub const STALE_TICKS: u64 = 60;
47
48const PROCDB_MAGIC: &[u8; 4] = b"PDDB";
49const PROCDB_VERSION: u32 = 2;
50const PROCDB_PATH: &str = ".cache/pandemonium/procdb.bin";
51const ENTRY_SIZE: usize = 64;
52const V1_ENTRY_SIZE: usize = 40;
53
54// MATCHES struct task_class_entry IN intf.h
55#[repr(C)]
56#[derive(Clone, Copy)]
57pub struct TaskClassEntry {
58    pub tier: u8,
59    pub _pad: [u8; 7],
60    pub avg_runtime: u64,
61    pub runtime_dev: u64,
62    pub wakeup_freq: u64,
63    pub csw_rate: u64,
64}
65
66// COMPILE-TIME ABI SAFETY: MUST MATCH struct task_class_entry IN intf.h
67const _: () = assert!(std::mem::size_of::<TaskClassEntry>() == 40);
68
69#[derive(Default)]
70pub struct TaskProfile {
71    pub tier_votes: [u32; 3], // COUNT PER TIER: [BATCH, INTERACTIVE, LAT_CRITICAL]
72    pub avg_runtime_ns: u64,
73    pub runtime_dev_ns: u64,
74    pub wakeup_freq: u64,
75    pub csw_rate: u64,
76    pub observations: u32,
77    pub last_seen_tick: u64,
78}
79
80impl TaskProfile {
81    pub fn confidence(&self) -> f64 {
82        let total: u32 = self.tier_votes.iter().sum();
83        if total == 0 {
84            return 0.0;
85        }
86        let max_count = *self.tier_votes.iter().max().unwrap_or(&0);
87        max_count as f64 / total as f64
88    }
89
90    pub fn dominant_tier(&self) -> u8 {
91        self.tier_votes
92            .iter()
93            .enumerate()
94            .max_by_key(|(_, c)| *c)
95            .map(|(i, _)| i as u8)
96            .unwrap_or(1) // INTERACTIVE DEFAULT
97    }
98
99    // MULTI-DIMENSIONAL CONFIDENCE: TIER AGREEMENT * BEHAVIORAL STABILITY
100    // HIGH RUNTIME VARIANCE REDUCES CONFIDENCE EVEN WITH STRONG TIER AGREEMENT
101    pub fn behavioral_confidence(&self) -> f64 {
102        if self.observations < MIN_OBSERVATIONS {
103            return 0.0;
104        }
105        let tier_conf = self.confidence();
106        let dev_ratio = if self.avg_runtime_ns > 0 {
107            self.runtime_dev_ns as f64 / self.avg_runtime_ns as f64
108        } else {
109            1.0
110        };
111        let stability = (1.0 - dev_ratio.min(1.0)).max(0.0);
112        tier_conf * (0.5 + 0.5 * stability)
113    }
114}
115
116pub struct ProcessDb {
117    pub observe: Option<libbpf_rs::MapHandle>,
118    pub init: Option<libbpf_rs::MapHandle>,
119    pub profiles: HashMap<[u8; 16], TaskProfile>,
120    pub tick: u64,
121}
122
123impl ProcessDb {
124    pub fn default_path() -> PathBuf {
125        let home = std::env::var("HOME").unwrap_or_else(|_| "/root".into());
126        PathBuf::from(home).join(PROCDB_PATH)
127    }
128
129    pub fn new() -> Result<Self> {
130        let observe = libbpf_rs::MapHandle::from_pinned_path(OBSERVE_PIN)?;
131        let init = libbpf_rs::MapHandle::from_pinned_path(INIT_PIN)?;
132
133        let db_path = Self::default_path();
134        let profiles = match Self::load_from_disk(&db_path) {
135            Ok(p) => {
136                if !p.is_empty() {
137                    procdb_info!(
138                        "PROCDB: LOADED {} PROFILES FROM {}",
139                        p.len(),
140                        db_path.display()
141                    );
142                }
143                p
144            }
145            Err(e) => {
146                procdb_warn!("PROCDB LOAD: {}", e);
147                HashMap::new()
148            }
149        };
150
151        let db = Self {
152            observe: Some(observe),
153            init: Some(init),
154            profiles,
155            tick: 0,
156        };
157
158        db.flush_predictions();
159        Ok(db)
160    }
161
162    // DRAIN OBSERVATIONS FROM BPF LRU MAP, MERGE INTO PROFILES
163    pub fn ingest(&mut self) {
164        let observe = match &self.observe {
165            Some(m) => m,
166            None => return,
167        };
168        let keys: Vec<Vec<u8>> = observe.keys().collect();
169        for key in &keys {
170            if let Ok(Some(val)) = observe.lookup(key, libbpf_rs::MapFlags::ANY) {
171                if val.len() >= std::mem::size_of::<TaskClassEntry>() {
172                    let entry: TaskClassEntry =
173                        unsafe { std::ptr::read_unaligned(val.as_ptr() as *const TaskClassEntry) };
174
175                    let mut comm = [0u8; 16];
176                    let copy_len = key.len().min(16);
177                    comm[..copy_len].copy_from_slice(&key[..copy_len]);
178
179                    let profile = self.profiles.entry(comm).or_insert(TaskProfile {
180                        ..Default::default()
181                    });
182
183                    let tier_idx = (entry.tier as usize).min(2);
184                    profile.tier_votes[tier_idx] += 1;
185                    if profile.observations == 0 {
186                        profile.avg_runtime_ns = entry.avg_runtime;
187                        profile.runtime_dev_ns = entry.runtime_dev;
188                        profile.wakeup_freq = entry.wakeup_freq;
189                        profile.csw_rate = entry.csw_rate;
190                    } else {
191                        // EWMA: 7/8 OLD + 1/8 NEW
192                        profile.avg_runtime_ns =
193                            (profile.avg_runtime_ns * 7 + entry.avg_runtime) / 8;
194                        profile.runtime_dev_ns =
195                            (profile.runtime_dev_ns * 7 + entry.runtime_dev) / 8;
196                        profile.wakeup_freq = (profile.wakeup_freq * 7 + entry.wakeup_freq) / 8;
197                        profile.csw_rate = (profile.csw_rate * 7 + entry.csw_rate) / 8;
198                    }
199                    profile.observations += 1;
200                    profile.last_seen_tick = self.tick;
201                }
202            }
203            let _ = observe.delete(key);
204        }
205    }
206
207    // WRITE CONFIDENT PREDICTIONS TO BPF INIT MAP
208    pub fn flush_predictions(&self) {
209        let init = match &self.init {
210            Some(m) => m,
211            None => return,
212        };
213        for (comm, profile) in &self.profiles {
214            if profile.behavioral_confidence() >= MIN_CONFIDENCE {
215                let entry = TaskClassEntry {
216                    tier: profile.dominant_tier(),
217                    _pad: [0; 7],
218                    avg_runtime: profile.avg_runtime_ns,
219                    runtime_dev: profile.runtime_dev_ns,
220                    wakeup_freq: profile.wakeup_freq,
221                    csw_rate: profile.csw_rate,
222                };
223
224                let val = unsafe {
225                    std::slice::from_raw_parts(
226                        &entry as *const TaskClassEntry as *const u8,
227                        std::mem::size_of::<TaskClassEntry>(),
228                    )
229                };
230                let _ = init.update(comm.as_slice(), val, libbpf_rs::MapFlags::ANY);
231            }
232        }
233    }
234
235    // EVICT STALE PROFILES, CAP TOTAL ENTRIES
236    pub fn tick(&mut self) {
237        self.tick += 1;
238
239        // REMOVE PROFILES NOT SEEN IN 60 SECONDS
240        let tick = self.tick;
241        let stale: Vec<[u8; 16]> = self
242            .profiles
243            .iter()
244            .filter(|(_, p)| tick - p.last_seen_tick > STALE_TICKS)
245            .map(|(k, _)| *k)
246            .collect();
247        for comm in &stale {
248            self.profiles.remove(comm);
249            if let Some(ref init) = self.init {
250                let _ = init.delete(comm.as_slice());
251            }
252        }
253
254        // CAP ENTRIES: EVICT OLDEST FIRST, TIE-BREAK BY OBSERVATIONS THEN COMM
255        if self.profiles.len() > MAX_PROFILES {
256            let mut entries: Vec<([u8; 16], u64, u32)> = self
257                .profiles
258                .iter()
259                .map(|(k, v)| (*k, v.last_seen_tick, v.observations))
260                .collect();
261            entries.sort_by(|a, b| (a.1, a.2, a.0).cmp(&(b.1, b.2, b.0)));
262            let to_remove = self.profiles.len() - MAX_PROFILES;
263            for (k, _, _) in entries.into_iter().take(to_remove) {
264                self.profiles.remove(&k);
265                if let Some(ref init) = self.init {
266                    let _ = init.delete(k.as_slice());
267                }
268            }
269        }
270    }
271
272    // (TOTAL PROFILES, CONFIDENT PROFILES)
273    pub fn summary(&self) -> (usize, usize) {
274        let total = self.profiles.len();
275        let confident = self
276            .profiles
277            .values()
278            .filter(|p| p.behavioral_confidence() >= MIN_CONFIDENCE)
279            .count();
280        (total, confident)
281    }
282
283    // SERIALIZE CONFIDENT PROFILES TO DISK (ATOMIC WRITE)
284    pub fn save(&self, path: &Path) -> Result<()> {
285        let entries: Vec<_> = self
286            .profiles
287            .iter()
288            .filter(|(_, p)| p.behavioral_confidence() >= MIN_CONFIDENCE)
289            .collect();
290
291        if let Some(parent) = path.parent() {
292            std::fs::create_dir_all(parent)?;
293        }
294
295        let tmp_path = path.with_extension("bin.tmp");
296        let mut f = std::fs::File::create(&tmp_path)?;
297
298        // HEADER: MAGIC + VERSION + COUNT
299        f.write_all(PROCDB_MAGIC)?;
300        f.write_all(&PROCDB_VERSION.to_le_bytes())?;
301        f.write_all(&(entries.len() as u32).to_le_bytes())?;
302
303        // ENTRIES: 64 BYTES EACH (V2)
304        for (comm, profile) in &entries {
305            let tier = profile.dominant_tier();
306            let total_votes: u32 = profile.tier_votes.iter().sum();
307
308            f.write_all(comm.as_slice())?; // 16 bytes
309            f.write_all(&[tier])?; // 1 byte
310            f.write_all(&[0u8; 7])?; // 7 bytes pad
311            f.write_all(&profile.avg_runtime_ns.to_le_bytes())?; // 8 bytes
312            f.write_all(&profile.runtime_dev_ns.to_le_bytes())?; // 8 bytes
313            f.write_all(&profile.wakeup_freq.to_le_bytes())?; // 8 bytes
314            f.write_all(&profile.csw_rate.to_le_bytes())?; // 8 bytes
315            f.write_all(&profile.observations.to_le_bytes())?; // 4 bytes
316            f.write_all(&total_votes.to_le_bytes())?; // 4 bytes
317        }
318
319        drop(f);
320        std::fs::rename(&tmp_path, path)?;
321        Ok(())
322    }
323
324    // DESERIALIZE PROFILES FROM DISK (RETURNS EMPTY ON CORRUPTION)
325    pub fn load_from_disk(path: &Path) -> Result<HashMap<[u8; 16], TaskProfile>> {
326        let data = match std::fs::read(path) {
327            Ok(d) => d,
328            Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
329                return Ok(HashMap::new());
330            }
331            Err(e) => return Err(e.into()),
332        };
333
334        if data.len() < 12 {
335            procdb_warn!("PROCDB: FILE TOO SHORT ({} BYTES)", data.len());
336            return Ok(HashMap::new());
337        }
338
339        // VALIDATE MAGIC
340        if &data[0..4] != PROCDB_MAGIC {
341            procdb_warn!("PROCDB: BAD MAGIC {:?}", &data[0..4]);
342            return Ok(HashMap::new());
343        }
344
345        // VALIDATE VERSION
346        let version = u32::from_le_bytes([data[4], data[5], data[6], data[7]]);
347        let entry_size = match version {
348            1 => V1_ENTRY_SIZE,
349            2 => ENTRY_SIZE,
350            _ => {
351                procdb_warn!("PROCDB: UNKNOWN VERSION {}", version);
352                return Ok(HashMap::new());
353            }
354        };
355
356        // VALIDATE COUNT VS FILE SIZE
357        let count = u32::from_le_bytes([data[8], data[9], data[10], data[11]]) as usize;
358        let expected_size = 12 + count * entry_size;
359        if data.len() < expected_size {
360            procdb_warn!(
361                "PROCDB: TRUNCATED (EXPECTED {} BYTES, GOT {})",
362                expected_size,
363                data.len()
364            );
365            return Ok(HashMap::new());
366        }
367
368        let mut profiles = HashMap::new();
369        let mut offset = 12;
370
371        for _ in 0..count {
372            let mut comm = [0u8; 16];
373            comm.copy_from_slice(&data[offset..offset + 16]);
374            offset += 16;
375
376            let tier = data[offset] as usize;
377            offset += 8; // tier + 7 pad
378
379            let avg_runtime = u64::from_le_bytes(data[offset..offset + 8].try_into().unwrap());
380            offset += 8;
381
382            // V2: READ EXTRA BEHAVIORAL FIELDS
383            let (runtime_dev, wakeup_freq, csw_rate) = if version >= 2 {
384                let rd = u64::from_le_bytes(data[offset..offset + 8].try_into().unwrap());
385                offset += 8;
386                let wf = u64::from_le_bytes(data[offset..offset + 8].try_into().unwrap());
387                offset += 8;
388                let cr = u64::from_le_bytes(data[offset..offset + 8].try_into().unwrap());
389                offset += 8;
390                (rd, wf, cr)
391            } else {
392                (0, 0, 0)
393            };
394
395            let observations = u32::from_le_bytes(data[offset..offset + 4].try_into().unwrap());
396            offset += 4;
397
398            let total_votes = u32::from_le_bytes(data[offset..offset + 4].try_into().unwrap());
399            offset += 4;
400
401            // RECONSTRUCT: ALL VOTES GO TO DOMINANT TIER (CONFIDENCE = 1.0)
402            let mut tier_votes = [0u32; 3];
403            tier_votes[tier.min(2)] = total_votes;
404
405            profiles.insert(
406                comm,
407                TaskProfile {
408                    tier_votes,
409                    avg_runtime_ns: avg_runtime,
410                    runtime_dev_ns: runtime_dev,
411                    wakeup_freq,
412                    csw_rate,
413                    observations,
414                    last_seen_tick: 0,
415                },
416            );
417        }
418
419        Ok(profiles)
420    }
421}