1use std::sync::atomic::{AtomicBool, Ordering};
5use std::sync::{Arc, Barrier};
6
7use log::{debug, info};
8use quanta::Clock;
9
10#[repr(align(64))]
12struct PaddedAtomicBool {
13 val: AtomicBool,
14 _pad: [u8; 63],
15}
16
17impl PaddedAtomicBool {
18 fn new(v: bool) -> Self {
19 Self {
20 val: AtomicBool::new(v),
21 _pad: [0u8; 63],
22 }
23 }
24}
25
26struct SharedState {
28 barrier: Barrier,
29 flag: PaddedAtomicBool,
30}
31
32const PING: bool = false;
33const PONG: bool = true;
34
35pub struct EtdConfig {
37 pub iterations: u32,
39 pub samples: u32,
41 pub warmup: u32,
43 pub max_stddev: f64,
45}
46
47impl Default for EtdConfig {
48 fn default() -> Self {
49 Self {
50 iterations: 500,
52 samples: 50,
53 warmup: 200,
55 max_stddev: 15.0,
57 }
58 }
59}
60
61fn measure_pair(cpu_a: usize, cpu_b: usize, config: &EtdConfig) -> Option<Vec<f64>> {
63 let state = Arc::new(SharedState {
64 barrier: Barrier::new(2),
65 flag: PaddedAtomicBool::new(PING),
66 });
67
68 let clock = Arc::new(Clock::new());
69 let num_round_trips = config.iterations as usize;
70 let num_samples = config.samples as usize;
71 let warmup_trips = config.warmup as usize;
72
73 let state_pong = Arc::clone(&state);
74 let state_ping = Arc::clone(&state);
75 let clock_ping = Arc::clone(&clock);
76
77 crossbeam_utils::thread::scope(|s| {
78 let pong = s.spawn(move |_| {
80 let core_id = core_affinity::CoreId { id: cpu_b };
81 if !core_affinity::set_for_current(core_id) {
82 return;
83 }
84
85 unsafe {
87 let param = libc::sched_param { sched_priority: 99 };
88 libc::sched_setscheduler(0, libc::SCHED_FIFO, ¶m);
89 }
90
91 state_pong.barrier.wait();
92
93 for _ in 0..warmup_trips {
95 while state_pong
96 .flag
97 .val
98 .compare_exchange(PING, PONG, Ordering::AcqRel, Ordering::Relaxed)
99 .is_err()
100 {
101 std::hint::spin_loop();
102 }
103 }
104
105 for _ in 0..(num_round_trips * num_samples) {
107 while state_pong
108 .flag
109 .val
110 .compare_exchange(PING, PONG, Ordering::AcqRel, Ordering::Relaxed)
111 .is_err()
112 {
113 std::hint::spin_loop();
114 }
115 }
116
117 unsafe {
119 let param = libc::sched_param { sched_priority: 0 };
120 libc::sched_setscheduler(0, libc::SCHED_OTHER, ¶m);
121 }
122 });
123
124 let ping = s.spawn(move |_| {
126 let core_id = core_affinity::CoreId { id: cpu_a };
127 if !core_affinity::set_for_current(core_id) {
128 return None;
129 }
130
131 unsafe {
133 let param = libc::sched_param { sched_priority: 99 };
134 libc::sched_setscheduler(0, libc::SCHED_FIFO, ¶m);
135 }
136
137 let mut results = Vec::with_capacity(num_samples);
138
139 state_ping.barrier.wait();
140
141 for _ in 0..warmup_trips {
143 while state_ping
144 .flag
145 .val
146 .compare_exchange(PONG, PING, Ordering::AcqRel, Ordering::Relaxed)
147 .is_err()
148 {
149 std::hint::spin_loop();
150 }
151 }
152
153 for _ in 0..num_samples {
155 let start = clock_ping.raw();
156
157 for _ in 0..num_round_trips {
158 while state_ping
159 .flag
160 .val
161 .compare_exchange(PONG, PING, Ordering::AcqRel, Ordering::Relaxed)
162 .is_err()
163 {
164 std::hint::spin_loop();
165 }
166 }
167
168 let end = clock_ping.raw();
169 let duration_ns = clock_ping.delta(start, end).as_nanos() as f64;
170 results.push(duration_ns / (num_round_trips as f64 * 2.0));
172 }
173
174 unsafe {
176 let param = libc::sched_param { sched_priority: 0 };
177 libc::sched_setscheduler(0, libc::SCHED_OTHER, ¶m);
178 }
179
180 Some(results)
181 });
182
183 pong.join().unwrap();
184 ping.join().unwrap()
185 })
186 .ok()?
187}
188
189pub fn calibrate_full_matrix<F>(
191 nr_cpus: usize,
192 config: &EtdConfig,
193 mut progress_callback: F,
194) -> Vec<Vec<f64>>
195where
196 F: FnMut(usize, usize, bool),
197{
198 let mut matrix = vec![vec![0.0; nr_cpus]; nr_cpus];
199
200 info!(
201 "ETD: Starting calibration for {} CPUs ({} iterations × {} samples)",
202 nr_cpus, config.iterations, config.samples
203 );
204
205 let start = std::time::Instant::now();
206
207 let total_pairs = (nr_cpus * (nr_cpus - 1)) / 2;
209 let mut current_pair = 0;
210
211 #[allow(clippy::needless_range_loop)]
212 for cpu_a in 0..nr_cpus {
213 for cpu_b in (cpu_a + 1)..nr_cpus {
214 current_pair += 1;
215 let mut retry_count = 0;
216 const MAX_RETRIES: u32 = 3;
217
218 while let Some(samples) = measure_pair(cpu_a, cpu_b, config) {
219 if !samples.is_empty() {
220 let n = samples.len() as f64;
222 let mean = samples.iter().sum::<f64>() / n;
223 let variance = samples.iter().map(|x| (x - mean).powi(2)).sum::<f64>() / n;
224 let stddev = variance.sqrt();
225
226 if stddev <= config.max_stddev || retry_count >= MAX_RETRIES {
228 let mut sorted = samples;
230 sorted.sort_by(|a, b| a.partial_cmp(b).unwrap());
231 let median = sorted[sorted.len() / 2];
232
233 matrix[cpu_a][cpu_b] = median;
234 matrix[cpu_b][cpu_a] = median;
235
236 if stddev > config.max_stddev {
237 debug!(
238 "ETD: CPU {}<->{} stddev={:.1}ns (exceeded threshold after {} retries)",
239 cpu_a, cpu_b, stddev, retry_count
240 );
241 }
242
243 progress_callback(current_pair, total_pairs, false);
245 break;
246 } else {
247 retry_count += 1;
248 debug!(
249 "ETD: CPU {}<->{} stddev={:.1}ns > {:.1}ns, retrying ({}/{})",
250 cpu_a, cpu_b, stddev, config.max_stddev, retry_count, MAX_RETRIES
251 );
252 }
253 } else {
254 break; }
256 }
257 }
258 }
259
260 progress_callback(total_pairs, total_pairs, true);
262
263 let elapsed = start.elapsed();
264 info!("ETD: Calibration complete in {:.2}s", elapsed.as_secs_f64());
265
266 debug!("ETD: Latency matrix (ns):");
268 for (i, row) in matrix.iter().enumerate() {
269 debug!(
270 " CPU {:2}: {:?}",
271 i,
272 row.iter().map(|v| format!("{:.1}", v)).collect::<Vec<_>>()
273 );
274 }
275
276 matrix
277}
278
279#[cfg(test)]
280mod tests {
281 use super::*;
282
283 #[test]
284 fn test_measure_pair_smoke() {
285 let config = EtdConfig {
287 iterations: 100,
288 samples: 2,
289 warmup: 10,
290 max_stddev: 100.0,
291 };
292 let result = measure_pair(0, 1, &config);
293 if let Some(latencies) = result {
295 for latency in &latencies {
296 assert!(*latency > 0.0, "Latency should be positive");
297 assert!(
298 *latency < 1_000_000.0,
299 "Latency should be reasonable (<1ms)"
300 );
301 }
302 }
303 }
304}