Skip to main content

scx_pandemonium/
topology.rs

1// PANDEMONIUM CPU CACHE TOPOLOGY
2// PARSES SYSFS AT STARTUP, POPULATES BPF MAP FOR CACHE-AWARE DISPATCH
3//
4// BPF dispatch() USES THE CACHE DOMAIN MAP TO PREFER TASKS THAT LAST
5// RAN ON THE SAME CPU OR AN L2 SIBLING. THIS PRESERVES CACHE WARMTH
6// AND REDUCES THE THROUGHPUT GAP CAUSED BY BLIND NODE-DSQ CONSUMPTION.
7
8use anyhow::Result;
9
10use crate::scheduler::Scheduler;
11
12#[allow(dead_code)]
13pub struct CpuTopology {
14    pub nr_cpus: usize,
15    pub l2_domain: Vec<u32>,      // l2_domain[cpu] = group_id
16    pub l2_groups: Vec<Vec<u32>>, // l2_groups[group_id] = [cpu, ...]
17    pub socket_domain: Vec<u32>,  // socket_domain[cpu] = socket_id
18    pub nr_sockets: u32,
19}
20
21impl CpuTopology {
22    pub fn detect(nr_cpus: usize) -> Result<Self> {
23        let mut l2_domain = vec![0u32; nr_cpus];
24        let mut seen_groups: Vec<Vec<u32>> = Vec::new();
25
26        for cpu in 0..nr_cpus {
27            let path = format!(
28                "/sys/devices/system/cpu/cpu{}/cache/index2/shared_cpu_list",
29                cpu
30            );
31            let content = match std::fs::read_to_string(&path) {
32                Ok(s) => s,
33                Err(_) => {
34                    // CPU MIGHT BE OFFLINE OR HAVE NO L2 INFO -- ASSIGN OWN GROUP
35                    l2_domain[cpu] = cpu as u32;
36                    continue;
37                }
38            };
39
40            let members = parse_cpu_list(content.trim());
41
42            // CHECK IF THIS GROUP ALREADY EXISTS
43            let group_id = match seen_groups.iter().position(|g| *g == members) {
44                Some(id) => id as u32,
45                None => {
46                    let id = seen_groups.len() as u32;
47                    seen_groups.push(members.clone());
48                    id
49                }
50            };
51
52            l2_domain[cpu] = group_id;
53        }
54
55        // DETECT SOCKET (PHYSICAL PACKAGE)
56        let mut socket_domain = vec![0u32; nr_cpus];
57        let mut seen_sockets: Vec<u32> = Vec::new();
58
59        for cpu in 0..nr_cpus {
60            let path = format!(
61                "/sys/devices/system/cpu/cpu{}/topology/physical_package_id",
62                cpu
63            );
64            let pkg_id = match std::fs::read_to_string(&path) {
65                Ok(s) => s.trim().parse::<u32>().unwrap_or(0),
66                Err(_) => 0,
67            };
68            if !seen_sockets.contains(&pkg_id) {
69                seen_sockets.push(pkg_id);
70            }
71            let socket_idx = seen_sockets.iter().position(|&s| s == pkg_id).unwrap() as u32;
72            socket_domain[cpu] = socket_idx;
73        }
74
75        let nr_sockets = seen_sockets.len() as u32;
76
77        Ok(Self {
78            nr_cpus,
79            l2_domain,
80            l2_groups: seen_groups,
81            socket_domain,
82            nr_sockets,
83        })
84    }
85
86    // WRITE L2 DOMAIN MAP TO BPF ARRAY VIA SCHEDULER
87    pub fn populate_bpf_map(&self, sched: &Scheduler) -> Result<()> {
88        for cpu in 0..self.nr_cpus {
89            sched.write_cache_domain(cpu as u32, self.l2_domain[cpu])?;
90        }
91        Ok(())
92    }
93
94    // WRITE L2 SIBLINGS FLAT ARRAY TO BPF MAP
95    // l2_siblings[group_id * 8 + slot] = cpu_id, SENTINEL u32::MAX MARKS END
96    pub fn populate_l2_siblings_map(&self, sched: &Scheduler) -> Result<()> {
97        const MAX_L2_SIBLINGS: usize = 8;
98        for (gid, members) in self.l2_groups.iter().enumerate() {
99            for (slot, &cpu) in members.iter().enumerate().take(MAX_L2_SIBLINGS) {
100                sched.write_l2_sibling(gid as u32, slot as u32, cpu)?;
101            }
102            if members.len() < MAX_L2_SIBLINGS {
103                sched.write_l2_sibling(gid as u32, members.len() as u32, u32::MAX)?;
104            }
105        }
106        Ok(())
107    }
108
109    // RESISTANCE AFFINITY (KYNG-DINIC ELECTRICAL FLOW MODEL)
110    //
111    // Effective resistance R_eff(u,v) between two CPUs captures the true
112    // migration cost through ALL topology paths. Computed from the Laplacian
113    // pseudoinverse of the CPU topology graph:
114    //   R_eff(i,j) = L+[i,i] + L+[j,j] - 2*L+[i,j]
115    //
116    // Edge conductances (inverse resistance):
117    //   L2 siblings:     10.0  (shared cache, near-zero migration cost)
118    //   Same socket:      1.0  (shared LLC, moderate cost)
119    //   Cross-socket:     0.3  (NUMA hop, high cost)
120    //
121    // The Laplacian L = D - W where D is degree matrix, W is weighted adjacency.
122    // L+ (Moore-Penrose pseudoinverse) computed via eigendecomposition:
123    //   L+ = sum_{i: lambda_i > 0} (1/lambda_i) * v_i * v_i^T
124    //
125    // For n CPUs this is O(n^3) -- trivial at scheduler startup (n <= 256).
126    //
127    // Reference: Christiano-Kelner-Madry-Spielman-Teng (STOC 2011),
128    //            Chen-Kyng-Liu-Peng-Gutenberg-Sachdeva (FOCS 2022)
129
130    const CONDUCTANCE_L2: f64 = 10.0;
131    const CONDUCTANCE_SOCKET: f64 = 1.0;
132    const CONDUCTANCE_CROSS: f64 = 0.3;
133
134    // BUILD WEIGHTED GRAPH LAPLACIAN FROM CPU TOPOLOGY
135    fn build_laplacian(&self) -> Vec<f64> {
136        let n = self.nr_cpus;
137        let mut l = vec![0.0f64; n * n];
138        for i in 0..n {
139            for j in (i + 1)..n {
140                let w = if self.l2_domain[i] == self.l2_domain[j] {
141                    Self::CONDUCTANCE_L2
142                } else if self.socket_domain[i] == self.socket_domain[j] {
143                    Self::CONDUCTANCE_SOCKET
144                } else {
145                    Self::CONDUCTANCE_CROSS
146                };
147                l[i * n + j] = -w;
148                l[j * n + i] = -w;
149                l[i * n + i] += w;
150                l[j * n + j] += w;
151            }
152        }
153        l
154    }
155
156    // SYMMETRIC EIGENDECOMPOSITION VIA JACOBI ROTATIONS
157    // Returns (eigenvalues, eigenvectors_column_major)
158    // Suitable for n <= 256. No external dependencies.
159    fn symmetric_eigen(mat: &[f64], n: usize) -> (Vec<f64>, Vec<f64>) {
160        let mut a = mat.to_vec();
161        // EIGENVECTORS START AS IDENTITY
162        let mut v = vec![0.0f64; n * n];
163        for i in 0..n {
164            v[i * n + i] = 1.0;
165        }
166
167        let max_iter = 100 * n * n;
168        for _ in 0..max_iter {
169            // FIND LARGEST OFF-DIAGONAL ELEMENT
170            let mut max_val = 0.0f64;
171            let mut p = 0;
172            let mut q = 1;
173            for i in 0..n {
174                for j in (i + 1)..n {
175                    let val = a[i * n + j].abs();
176                    if val > max_val {
177                        max_val = val;
178                        p = i;
179                        q = j;
180                    }
181                }
182            }
183            if max_val < 1e-12 {
184                break;
185            }
186
187            // COMPUTE ROTATION
188            let app = a[p * n + p];
189            let aqq = a[q * n + q];
190            let apq = a[p * n + q];
191            let theta = if (app - aqq).abs() < 1e-15 {
192                std::f64::consts::FRAC_PI_4
193            } else {
194                0.5 * (2.0 * apq / (app - aqq)).atan()
195            };
196            let c = theta.cos();
197            let s = theta.sin();
198
199            // APPLY ROTATION TO A
200            for i in 0..n {
201                if i == p || i == q {
202                    continue;
203                }
204                let aip = a[i * n + p];
205                let aiq = a[i * n + q];
206                a[i * n + p] = c * aip + s * aiq;
207                a[p * n + i] = a[i * n + p];
208                a[i * n + q] = -s * aip + c * aiq;
209                a[q * n + i] = a[i * n + q];
210            }
211            let new_pp = c * c * app + 2.0 * s * c * apq + s * s * aqq;
212            let new_qq = s * s * app - 2.0 * s * c * apq + c * c * aqq;
213            a[p * n + p] = new_pp;
214            a[q * n + q] = new_qq;
215            a[p * n + q] = 0.0;
216            a[q * n + p] = 0.0;
217
218            // ACCUMULATE EIGENVECTORS
219            for i in 0..n {
220                let vip = v[i * n + p];
221                let viq = v[i * n + q];
222                v[i * n + p] = c * vip + s * viq;
223                v[i * n + q] = -s * vip + c * viq;
224            }
225        }
226
227        let eigenvalues: Vec<f64> = (0..n).map(|i| a[i * n + i]).collect();
228        (eigenvalues, v)
229    }
230
231    // COMPUTE LAPLACIAN PSEUDOINVERSE FROM EIGENDECOMPOSITION
232    fn compute_pseudoinverse(eigenvalues: &[f64], eigenvectors: &[f64], n: usize) -> Vec<f64> {
233        let mut l_pinv = vec![0.0f64; n * n];
234        for k in 0..n {
235            if eigenvalues[k].abs() < 1e-8 {
236                continue; // SKIP NULL EIGENVALUE (CONNECTED GRAPH HAS ONE)
237            }
238            let inv_lambda = 1.0 / eigenvalues[k];
239            for i in 0..n {
240                for j in 0..n {
241                    l_pinv[i * n + j] +=
242                        inv_lambda * eigenvectors[i * n + k] * eigenvectors[j * n + k];
243                }
244            }
245        }
246        l_pinv
247    }
248
249    // COMPUTE ALL-PAIRS EFFECTIVE RESISTANCE FROM PSEUDOINVERSE
250    // R_eff(i,j) = L+[i,i] + L+[j,j] - 2*L+[i,j]
251    fn extract_reff(l_pinv: &[f64], n: usize) -> Vec<f64> {
252        let mut r = vec![0.0f64; n * n];
253        for i in 0..n {
254            for j in (i + 1)..n {
255                let val = l_pinv[i * n + i] + l_pinv[j * n + j] - 2.0 * l_pinv[i * n + j];
256                r[i * n + j] = val.max(0.0);
257                r[j * n + i] = r[i * n + j];
258            }
259        }
260        r
261    }
262
263    // BUILD PER-CPU AFFINITY RANK: FOR EACH CPU, ALL OTHERS SORTED BY R_EFF
264    // Returns flat array: affinity_rank[cpu * nr_cpus + slot] = target_cpu
265    fn build_affinity_rank(reff: &[f64], n: usize) -> Vec<u32> {
266        let mut rank = vec![0u32; n * n];
267        for cpu in 0..n {
268            let mut others: Vec<(u64, u32)> = (0..n)
269                .filter(|&c| c != cpu)
270                .map(|c| {
271                    // SORT KEY: R_EFF AS FIXED-POINT TO AVOID FLOAT COMPARISON ISSUES
272                    let key = (reff[cpu * n + c] * 1_000_000.0) as u64;
273                    (key, c as u32)
274                })
275                .collect();
276            others.sort();
277            for (slot, &(_, target)) in others.iter().enumerate() {
278                rank[cpu * n + slot] = target;
279            }
280            // FILL REMAINING SLOTS WITH SENTINEL
281            for slot in others.len()..n {
282                rank[cpu * n + slot] = u32::MAX;
283            }
284        }
285        rank
286    }
287
288    // COMPUTE RESISTANCE AFFINITY: FULL PIPELINE
289    // Returns (reff_matrix, affinity_rank) for use by BPF and scheduler
290    pub fn compute_resistance_affinity(&self) -> (Vec<f64>, Vec<u32>) {
291        let n = self.nr_cpus;
292        let laplacian = self.build_laplacian();
293        let (eigenvalues, eigenvectors) = Self::symmetric_eigen(&laplacian, n);
294        let l_pinv = Self::compute_pseudoinverse(&eigenvalues, &eigenvectors, n);
295        let reff = Self::extract_reff(&l_pinv, n);
296        let rank = Self::build_affinity_rank(&reff, n);
297        (reff, rank)
298    }
299
300    // QUERY EFFECTIVE RESISTANCE BETWEEN TWO CPUS FROM PRECOMPUTED MATRIX
301    #[allow(dead_code)]
302    pub fn effective_resistance(reff: &[f64], nr_cpus: usize, cpu_a: usize, cpu_b: usize) -> f64 {
303        if cpu_a == cpu_b {
304            return 0.0;
305        }
306        reff[cpu_a * nr_cpus + cpu_b]
307    }
308
309    // WRITE AFFINITY RANK TO BPF MAP
310    // affinity_rank[cpu * MAX_AFFINITY_CANDIDATES + slot] = target_cpu
311    pub fn populate_affinity_rank_map(&self, sched: &Scheduler, rank: &[u32]) -> Result<()> {
312        let max_candidates = self.nr_cpus.min(16); // BPF MAP BOUNDED
313        for cpu in 0..self.nr_cpus {
314            for slot in 0..max_candidates {
315                let val = rank[cpu * self.nr_cpus + slot];
316                sched.write_affinity_rank(cpu as u32, slot as u32, val)?;
317            }
318        }
319        Ok(())
320    }
321
322    pub fn log_resistance_affinity(&self, reff: &[f64], rank: &[u32]) {
323        let n = self.nr_cpus;
324        // LOG TOP 3 AFFINITIES FOR CPU 0
325        let mut parts = Vec::new();
326        for slot in 0..3.min(n - 1) {
327            let target = rank[slot] as usize;
328            if target >= n {
329                break;
330            }
331            let r = reff[target];
332            parts.push(format!("CPU{}(R={:.3})", target, r));
333        }
334        log_info!("RESISTANCE AFFINITY: CPU 0 rank: {}", parts.join(", "));
335
336        // LOG L2 VS NON-L2 R_EFF FOR FIRST CPU
337        if n >= 2 {
338            let l2_sib = rank[0] as usize;
339            let non_l2 = rank[1.min(n - 2)] as usize;
340            log_info!(
341                "RESISTANCE AFFINITY: R_eff L2={:.4} non-L2={:.4} ratio={:.1}x",
342                reff[l2_sib],
343                reff[non_l2],
344                if reff[l2_sib] > 0.0 {
345                    reff[non_l2] / reff[l2_sib]
346                } else {
347                    0.0
348                }
349            );
350        }
351    }
352
353    pub fn log_summary(&self) {
354        for (gid, members) in self.l2_groups.iter().enumerate() {
355            let cpus: Vec<String> = members.iter().map(|c| c.to_string()).collect();
356            log_info!("L2 GROUP {}: [{}]", gid, cpus.join(","));
357        }
358        log_info!(
359            "L2 GROUPS: {} across {} CPUs, {} SOCKETS",
360            self.l2_groups.len(),
361            self.nr_cpus,
362            self.nr_sockets
363        );
364    }
365}
366
367// PARSE KERNEL CPU LIST FORMAT: "0,6" or "0-2,6-8" or "3"
368fn parse_cpu_list(s: &str) -> Vec<u32> {
369    let mut result = Vec::new();
370    for part in s.split(',') {
371        let part = part.trim();
372        if part.is_empty() {
373            continue;
374        }
375        if let Some((start, end)) = part.split_once('-') {
376            if let (Ok(s), Ok(e)) = (start.parse::<u32>(), end.parse::<u32>()) {
377                for cpu in s..=e {
378                    result.push(cpu);
379                }
380            }
381        } else if let Ok(cpu) = part.parse::<u32>() {
382            result.push(cpu);
383        }
384    }
385    result.sort();
386    result.dedup();
387    result
388}
389
390#[cfg(test)]
391mod tests {
392    use super::*;
393
394    #[test]
395    fn parse_single() {
396        assert_eq!(parse_cpu_list("3"), vec![3]);
397    }
398
399    #[test]
400    fn parse_comma() {
401        assert_eq!(parse_cpu_list("0,6"), vec![0, 6]);
402    }
403
404    #[test]
405    fn parse_range() {
406        assert_eq!(parse_cpu_list("0-2,6-8"), vec![0, 1, 2, 6, 7, 8]);
407    }
408
409    #[test]
410    fn parse_mixed() {
411        assert_eq!(parse_cpu_list("0-2,5,9-11"), vec![0, 1, 2, 5, 9, 10, 11]);
412    }
413
414    #[test]
415    fn parse_empty() {
416        assert_eq!(parse_cpu_list(""), Vec::<u32>::new());
417    }
418
419    #[test]
420    fn detect_topology() {
421        // RUNS ON ANY MACHINE -- VERIFIES SANE OUTPUT
422        let nr_cpus = std::fs::read_dir("/sys/devices/system/cpu")
423            .unwrap()
424            .filter(|e| {
425                e.as_ref()
426                    .map(|e| {
427                        e.file_name().to_string_lossy().starts_with("cpu")
428                            && e.file_name().to_string_lossy()[3..].parse::<u32>().is_ok()
429                    })
430                    .unwrap_or(false)
431            })
432            .count();
433
434        if nr_cpus == 0 {
435            return; // NO CPUS VISIBLE (CONTAINER?)
436        }
437
438        let topo = CpuTopology::detect(nr_cpus).unwrap();
439        assert_eq!(topo.nr_cpus, nr_cpus);
440        assert_eq!(topo.l2_domain.len(), nr_cpus);
441
442        // EVERY CPU MUST HAVE A VALID GROUP ID
443        let max_group = topo.l2_groups.len() as u32;
444        for cpu in 0..nr_cpus {
445            assert!(
446                topo.l2_domain[cpu] < max_group || topo.l2_domain[cpu] == cpu as u32,
447                "CPU {} has invalid l2 group {}",
448                cpu,
449                topo.l2_domain[cpu]
450            );
451        }
452
453        // AT LEAST ONE GROUP MUST EXIST
454        assert!(!topo.l2_groups.is_empty());
455
456        // SOCKET DETECTION
457        assert_eq!(topo.socket_domain.len(), nr_cpus);
458        assert!(topo.nr_sockets >= 1);
459        for cpu in 0..nr_cpus {
460            assert!(
461                topo.socket_domain[cpu] < topo.nr_sockets,
462                "CPU {} socket {} >= nr_sockets {}",
463                cpu,
464                topo.socket_domain[cpu],
465                topo.nr_sockets
466            );
467        }
468
469        // RESISTANCE AFFINITY: LAPLACIAN R_EFF
470        let (reff, rank) = topo.compute_resistance_affinity();
471        // SAME CPU = 0
472        assert_eq!(CpuTopology::effective_resistance(&reff, nr_cpus, 0, 0), 0.0);
473        // L2 SIBLING SHOULD BE CHEAPEST (RANK SLOT 0)
474        if nr_cpus >= 2 {
475            let best = rank[0] as usize;
476            assert!(best < nr_cpus);
477            let r_best = CpuTopology::effective_resistance(&reff, nr_cpus, 0, best);
478            assert!(r_best > 0.0);
479            // EVERY OTHER CPU SHOULD COST >= THE BEST
480            for c in 1..nr_cpus {
481                let r_c = CpuTopology::effective_resistance(&reff, nr_cpus, 0, c);
482                assert!(
483                    r_c >= r_best - 1e-9,
484                    "CPU {} R_eff {:.6} < best {:.6}",
485                    c,
486                    r_c,
487                    r_best
488                );
489            }
490        }
491    }
492}