Skip to main content

scx_cake/
calibrate.rs

1// SPDX-License-Identifier: GPL-2.0
2// ETD for scx_cake - measures inter-core latency via CAS ping-pong (adapted from nviennot/core-to-core-latency)
3
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::sync::{Arc, Barrier};
6
7use log::{debug, info};
8use quanta::Clock;
9
10/// Cache-line padded atomic to avoid false sharing
11#[repr(align(64))]
12struct PaddedAtomicBool {
13    val: AtomicBool,
14    _pad: [u8; 63],
15}
16
17impl PaddedAtomicBool {
18    fn new(v: bool) -> Self {
19        Self {
20            val: AtomicBool::new(v),
21            _pad: [0u8; 63],
22        }
23    }
24}
25
26/// Shared state for two-buffer ping-pong (avoids SMT contention)
27struct SharedState {
28    barrier: Barrier,
29    flag: PaddedAtomicBool,
30}
31
32const PING: bool = false;
33const PONG: bool = true;
34
35/// Configuration for ETD calibration
36pub struct EtdConfig {
37    /// Number of round-trips per sample
38    pub iterations: u32,
39    /// Number of samples to collect
40    pub samples: u32,
41    /// Warmup iterations to stabilize boost clocks (discarded)
42    pub warmup: u32,
43    /// Maximum acceptable standard deviation (ns) - samples exceeding this trigger retry
44    pub max_stddev: f64,
45}
46
47impl Default for EtdConfig {
48    fn default() -> Self {
49        Self {
50            // Display-grade: 500 iterations @ 50 samples (sufficient for heatmap accuracy)
51            iterations: 500,
52            samples: 50,
53            // 200 warmup iters to stabilize boost clocks
54            warmup: 200,
55            // Discard samples with σ > 15ns (relaxed for faster calibration)
56            max_stddev: 15.0,
57        }
58    }
59}
60
61/// Measure round-trip latency between two CPUs using CAS ping-pong. Returns per-sample latencies (ns).
62fn measure_pair(cpu_a: usize, cpu_b: usize, config: &EtdConfig) -> Option<Vec<f64>> {
63    let state = Arc::new(SharedState {
64        barrier: Barrier::new(2),
65        flag: PaddedAtomicBool::new(PING),
66    });
67
68    let clock = Arc::new(Clock::new());
69    let num_round_trips = config.iterations as usize;
70    let num_samples = config.samples as usize;
71    let warmup_trips = config.warmup as usize;
72
73    let state_pong = Arc::clone(&state);
74    let state_ping = Arc::clone(&state);
75    let clock_ping = Arc::clone(&clock);
76
77    crossbeam_utils::thread::scope(|s| {
78        // PONG thread: waits for PING, sets to PONG
79        let pong = s.spawn(move |_| {
80            let core_id = core_affinity::CoreId { id: cpu_b };
81            if !core_affinity::set_for_current(core_id) {
82                return;
83            }
84
85            // Set real-time priority to minimize preemption jitter
86            unsafe {
87                let param = libc::sched_param { sched_priority: 99 };
88                libc::sched_setscheduler(0, libc::SCHED_FIFO, &param);
89            }
90
91            state_pong.barrier.wait();
92
93            // Warmup phase (not timed, stabilizes boost clocks)
94            for _ in 0..warmup_trips {
95                while state_pong
96                    .flag
97                    .val
98                    .compare_exchange(PING, PONG, Ordering::AcqRel, Ordering::Relaxed)
99                    .is_err()
100                {
101                    std::hint::spin_loop();
102                }
103            }
104
105            // Measurement phase
106            for _ in 0..(num_round_trips * num_samples) {
107                while state_pong
108                    .flag
109                    .val
110                    .compare_exchange(PING, PONG, Ordering::AcqRel, Ordering::Relaxed)
111                    .is_err()
112                {
113                    std::hint::spin_loop();
114                }
115            }
116
117            // Reset to normal priority before thread exit
118            unsafe {
119                let param = libc::sched_param { sched_priority: 0 };
120                libc::sched_setscheduler(0, libc::SCHED_OTHER, &param);
121            }
122        });
123
124        // PING thread: sets to PING, waits for PONG, measures time
125        let ping = s.spawn(move |_| {
126            let core_id = core_affinity::CoreId { id: cpu_a };
127            if !core_affinity::set_for_current(core_id) {
128                return None;
129            }
130
131            // Set real-time priority to minimize preemption jitter
132            unsafe {
133                let param = libc::sched_param { sched_priority: 99 };
134                libc::sched_setscheduler(0, libc::SCHED_FIFO, &param);
135            }
136
137            let mut results = Vec::with_capacity(num_samples);
138
139            state_ping.barrier.wait();
140
141            // Warmup phase (not timed, stabilizes boost clocks)
142            for _ in 0..warmup_trips {
143                while state_ping
144                    .flag
145                    .val
146                    .compare_exchange(PONG, PING, Ordering::AcqRel, Ordering::Relaxed)
147                    .is_err()
148                {
149                    std::hint::spin_loop();
150                }
151            }
152
153            // Measurement phase
154            for _ in 0..num_samples {
155                let start = clock_ping.raw();
156
157                for _ in 0..num_round_trips {
158                    while state_ping
159                        .flag
160                        .val
161                        .compare_exchange(PONG, PING, Ordering::AcqRel, Ordering::Relaxed)
162                        .is_err()
163                    {
164                        std::hint::spin_loop();
165                    }
166                }
167
168                let end = clock_ping.raw();
169                let duration_ns = clock_ping.delta(start, end).as_nanos() as f64;
170                // One-way latency = total time / (round_trips * 2 hops)
171                results.push(duration_ns / (num_round_trips as f64 * 2.0));
172            }
173
174            // Reset to normal priority before thread exit
175            unsafe {
176                let param = libc::sched_param { sched_priority: 0 };
177                libc::sched_setscheduler(0, libc::SCHED_OTHER, &param);
178            }
179
180            Some(results)
181        });
182
183        pong.join().unwrap();
184        ping.join().unwrap()
185    })
186    .ok()?
187}
188
189/// Perform full topology calibration. Returns matrix[i][j] = latency from CPU i to CPU j.
190pub fn calibrate_full_matrix<F>(
191    nr_cpus: usize,
192    config: &EtdConfig,
193    mut progress_callback: F,
194) -> Vec<Vec<f64>>
195where
196    F: FnMut(usize, usize, bool),
197{
198    let mut matrix = vec![vec![0.0; nr_cpus]; nr_cpus];
199
200    info!(
201        "ETD: Starting calibration for {} CPUs ({} iterations × {} samples)",
202        nr_cpus, config.iterations, config.samples
203    );
204
205    let start = std::time::Instant::now();
206
207    // Calculate total pairs to measure
208    let total_pairs = (nr_cpus * (nr_cpus - 1)) / 2;
209    let mut current_pair = 0;
210
211    #[allow(clippy::needless_range_loop)]
212    for cpu_a in 0..nr_cpus {
213        for cpu_b in (cpu_a + 1)..nr_cpus {
214            current_pair += 1;
215            let mut retry_count = 0;
216            const MAX_RETRIES: u32 = 3;
217
218            while let Some(samples) = measure_pair(cpu_a, cpu_b, config) {
219                if !samples.is_empty() {
220                    // Calculate mean and standard deviation
221                    let n = samples.len() as f64;
222                    let mean = samples.iter().sum::<f64>() / n;
223                    let variance = samples.iter().map(|x| (x - mean).powi(2)).sum::<f64>() / n;
224                    let stddev = variance.sqrt();
225
226                    // Check if variance is acceptable (no IRQ interference)
227                    if stddev <= config.max_stddev || retry_count >= MAX_RETRIES {
228                        // Use median for final value (more robust than mean)
229                        let mut sorted = samples;
230                        sorted.sort_by(|a, b| a.partial_cmp(b).unwrap());
231                        let median = sorted[sorted.len() / 2];
232
233                        matrix[cpu_a][cpu_b] = median;
234                        matrix[cpu_b][cpu_a] = median;
235
236                        if stddev > config.max_stddev {
237                            debug!(
238                                "ETD: CPU {}<->{} stddev={:.1}ns (exceeded threshold after {} retries)",
239                                cpu_a, cpu_b, stddev, retry_count
240                            );
241                        }
242
243                        // Report progress (not complete yet)
244                        progress_callback(current_pair, total_pairs, false);
245                        break;
246                    } else {
247                        retry_count += 1;
248                        debug!(
249                            "ETD: CPU {}<->{} stddev={:.1}ns > {:.1}ns, retrying ({}/{})",
250                            cpu_a, cpu_b, stddev, config.max_stddev, retry_count, MAX_RETRIES
251                        );
252                    }
253                } else {
254                    break; // Empty samples, skip
255                }
256            }
257        }
258    }
259
260    // Final progress update to signal completion
261    progress_callback(total_pairs, total_pairs, true);
262
263    let elapsed = start.elapsed();
264    info!("ETD: Calibration complete in {:.2}s", elapsed.as_secs_f64());
265
266    // Log the matrix for debugging
267    debug!("ETD: Latency matrix (ns):");
268    for (i, row) in matrix.iter().enumerate() {
269        debug!(
270            "  CPU {:2}: {:?}",
271            i,
272            row.iter().map(|v| format!("{:.1}", v)).collect::<Vec<_>>()
273        );
274    }
275
276    matrix
277}
278
279#[cfg(test)]
280mod tests {
281    use super::*;
282
283    #[test]
284    fn test_measure_pair_smoke() {
285        // Just verify it doesn't panic on a 2-CPU system
286        let config = EtdConfig {
287            iterations: 100,
288            samples: 2,
289            warmup: 10,
290            max_stddev: 100.0,
291        };
292        let result = measure_pair(0, 1, &config);
293        // Result might be None if pinning fails, that's OK in tests
294        if let Some(latencies) = result {
295            for latency in &latencies {
296                assert!(*latency > 0.0, "Latency should be positive");
297                assert!(
298                    *latency < 1_000_000.0,
299                    "Latency should be reasonable (<1ms)"
300                );
301            }
302        }
303    }
304}