scxcash/
monitors.rs

1// Cache monitor trait definitions and implementations.
2
3use anyhow::{Context, Result};
4use libbpf_rs::libbpf_sys;
5use libbpf_rs::skel::OpenSkel;
6use libbpf_rs::skel::SkelBuilder;
7use libbpf_rs::AsRawLibbpf;
8use libbpf_rs::{PerfBuffer, PerfBufferBuilder};
9use libbpf_rs::{RingBuffer, RingBufferBuilder};
10use log::trace;
11use scx_utils::perf;
12use std::cell::RefCell;
13use std::collections::VecDeque;
14use std::fs::OpenOptions;
15use std::io::Write;
16use std::os::fd::AsFd;
17use std::path::PathBuf;
18use std::rc::Rc;
19use std::time::Duration;
20
21/// Enum for cache monitor produced values.
22#[derive(Debug, serde::Serialize)]
23#[serde(tag = "type", rename_all = "snake_case")]
24pub enum CacheMonitorValue {
25    /// A soft-dirty page fault event.
26    SoftDirtyFault {
27        timestamp: u64,
28        pid: u32,
29        tid: u32,
30        cpu: u32,
31        address: u64,
32    },
33    /// A perf sampling event.
34    PerfSample {
35        timestamp: u64,
36        pid: u32,
37        tid: u32,
38        cpu: u32,
39        address: u64,
40    },
41    /// A task hint TLS update event (first 8 bytes of value)
42    HintsUpdate {
43        timestamp: u64,
44        pid: u32,
45        tid: u32,
46        cpu: u32,
47        hint_value: u64,
48    },
49}
50
51/// Trait representing a cache monitor instance.
52pub trait CacheMonitor<'a> {
53    fn poll(&mut self) -> Result<()>;
54    fn consume(&mut self, cb: &mut dyn FnMut(CacheMonitorValue)) -> Result<()>;
55}
56
57/// Soft-dirty page reset monitor.
58pub struct SoftDirtyCacheMonitor<'a> {
59    pid: Option<u32>,
60    _skel: crate::bpf::BpfSkel<'a>,
61    _ringbuf: RingBuffer<'a>,
62    _link: libbpf_rs::Link,
63    events: Rc<RefCell<VecDeque<CacheMonitorValue>>>,
64}
65
66impl<'a> SoftDirtyCacheMonitor<'a> {
67    pub fn new(
68        open_storage: &'a mut std::mem::MaybeUninit<libbpf_rs::OpenObject>,
69        pid: Option<u32>,
70        ring_size: u64,
71    ) -> Result<Self> {
72        let mut open_skel = crate::bpf::BpfSkelBuilder::default().open(open_storage)?;
73        let mut ring_capacity = ring_size.max(4096);
74        if !ring_capacity.is_power_of_two() {
75            ring_capacity = ring_capacity.next_power_of_two();
76        }
77        let max_entries = ring_capacity.min(u32::MAX as u64) as u32;
78        unsafe {
79            libbpf_sys::bpf_map__set_max_entries(
80                open_skel.maps.soft_dirty_events.as_libbpf_object().as_ptr(),
81                max_entries,
82            );
83        }
84        if let Some(pid) = pid {
85            open_skel.maps.rodata_data.as_mut().unwrap().filter_tgid = pid as i32;
86        }
87        let skel = open_skel.load()?;
88        let link = skel.progs.handle_do_fault.attach()?;
89        let mut builder = RingBufferBuilder::new();
90        let events: Rc<RefCell<VecDeque<CacheMonitorValue>>> =
91            Rc::new(RefCell::new(VecDeque::new()));
92        let events_cb = Rc::clone(&events);
93        let events_map = &skel.maps.soft_dirty_events;
94        builder.add(events_map, move |data: &[u8]| {
95            if data.len() == std::mem::size_of::<crate::bpf_intf::soft_dirty_fault_event>() {
96                let ev: &crate::bpf_intf::soft_dirty_fault_event =
97                    unsafe { &*(data.as_ptr() as *const _) };
98                trace!(
99                    "soft-dirty fault timestamp={} pid={} tid={} cpu={} addr=0x{:x}",
100                    ev.timestamp,
101                    ev.pid,
102                    ev.tid,
103                    ev.cpu,
104                    ev.address
105                );
106                events_cb
107                    .borrow_mut()
108                    .push_back(CacheMonitorValue::SoftDirtyFault {
109                        timestamp: ev.timestamp,
110                        pid: ev.pid,
111                        tid: ev.tid,
112                        cpu: ev.cpu,
113                        address: ev.address,
114                    });
115            }
116            0
117        })?;
118        let ringbuf = builder.build()?;
119        Ok(Self {
120            pid,
121            _skel: skel,
122            _ringbuf: ringbuf,
123            _link: link,
124            events,
125        })
126    }
127
128    fn write_clear_refs(pid: u32) -> Result<()> {
129        let mut path = PathBuf::from("/proc");
130        path.push(pid.to_string());
131        path.push("clear_refs");
132        let mut f = OpenOptions::new()
133            .write(true)
134            .open(&path)
135            .with_context(|| format!("Opening {:?}", path))?;
136        f.write_all(b"4\n")
137            .with_context(|| format!("Writing to {:?}", path))?;
138        Ok(())
139    }
140
141    fn walk_all_pids() -> Result<Vec<u32>> {
142        let mut pids = Vec::new();
143        for entry in std::fs::read_dir("/proc")? {
144            let entry = entry?;
145            if let Some(fname) = entry.file_name().to_str() {
146                if let Ok(pid) = fname.parse::<u32>() {
147                    pids.push(pid);
148                }
149            }
150        }
151        Ok(pids)
152    }
153}
154
155impl<'a> CacheMonitor<'a> for SoftDirtyCacheMonitor<'a> {
156    fn poll(&mut self) -> Result<()> {
157        // TODO(kkd): Switch to epoll later?
158        let _ = self._ringbuf.poll(Duration::from_millis(0));
159        match self.pid {
160            Some(pid) => {
161                // TODO(kkd): Handle failures
162                let _ = Self::write_clear_refs(pid);
163            }
164            None => {
165                // TODO(kkd): Make this less expensive
166                for pid in Self::walk_all_pids()? {
167                    let _ = Self::write_clear_refs(pid);
168                }
169            }
170        }
171        Ok(())
172    }
173
174    fn consume(&mut self, cb: &mut dyn FnMut(CacheMonitorValue)) -> Result<()> {
175        {
176            let mut q = self.events.borrow_mut();
177            while let Some(ev) = q.pop_front() {
178                cb(ev);
179            }
180        }
181        Ok(())
182    }
183}
184
185// Perf sampling monitor.
186pub struct PerfSampleMonitor<'a> {
187    _skel: crate::bpf::BpfSkel<'a>,
188    perf_buf: PerfBuffer<'a>,
189    _links: Vec<libbpf_rs::Link>,
190    events: Rc<RefCell<VecDeque<CacheMonitorValue>>>,
191}
192
193impl<'a> PerfSampleMonitor<'a> {
194    pub fn new(
195        open_storage: &'a mut std::mem::MaybeUninit<libbpf_rs::OpenObject>,
196        pid: Option<u32>,
197        period: u64,
198    ) -> Result<Self> {
199        let open = crate::bpf::BpfSkelBuilder::default().open(open_storage)?;
200        let skel = open.load()?;
201
202        let mut links = Vec::new();
203        let mut failures = 0u32;
204        let mut attr = perf::bindings::perf_event_attr::default();
205        attr.size = std::mem::size_of::<perf::bindings::perf_event_attr>() as u32;
206        attr.type_ = perf::bindings::PERF_TYPE_RAW;
207        attr.config = 0x076;
208        attr.__bindgen_anon_1.sample_freq = period as u64;
209        attr.set_freq(1); // frequency mode
210        attr.sample_type = perf::bindings::PERF_SAMPLE_ADDR as u64
211            | perf::bindings::PERF_SAMPLE_PHYS_ADDR as u64
212            | perf::bindings::PERF_SAMPLE_DATA_SRC as u64;
213        attr.set_inherit(if pid.is_some() { 1 } else { 0 });
214        attr.set_disabled(1);
215        attr.set_enable_on_exec(1);
216        attr.__bindgen_anon_2.wakeup_events = 1;
217        attr.set_precise_ip(1);
218
219        let events = Rc::new(RefCell::new(VecDeque::new()));
220        let events_cb = Rc::clone(&events);
221        let perf_events_map = &skel.maps.perf_sample_events;
222
223        let cpus: Vec<u32> = (0..num_cpus::get() as u32).collect();
224        let target_pid: i32 = pid.map(|p| p as i32).unwrap_or(-1); // -1 all processes
225        for cpu in cpus {
226            let fd = unsafe {
227                perf::perf_event_open(&mut attr as *mut _, target_pid, cpu as i32, -1, 0)
228            };
229            if fd < 0 {
230                failures += 1;
231                trace!(
232                    "perf_event_open failed cpu={cpu} pid={target_pid} errno={} period={period}",
233                    std::io::Error::last_os_error()
234                );
235                continue;
236            }
237            match skel.progs.handle_perf.attach_perf_event(fd) {
238                Ok(link) => {
239                    // attach_perf_event does event enablement
240                    trace!("attached perf sample prog cpu={cpu} fd={fd}");
241                    links.push(link);
242                }
243                Err(e) => {
244                    trace!("attach_perf_event failed cpu={cpu} fd={fd} err={:?}", e);
245                    unsafe {
246                        libc::close(fd);
247                    }
248                    failures += 1;
249                }
250            }
251
252            let map_fd =
253                unsafe { libbpf_sys::bpf_map__fd(perf_events_map.as_libbpf_object().as_ptr()) };
254            let key = cpu as u32;
255            let val = fd as u32;
256            let ret = unsafe {
257                libbpf_sys::bpf_map_update_elem(
258                    map_fd,
259                    &key as *const _ as *const _,
260                    &val as *const _ as *const _,
261                    0,
262                )
263            };
264            if ret != 0 {
265                trace!("bpf_map_update_elem failed cpu={cpu} fd={fd} ret={ret}");
266            } else {
267                trace!("mapped cpu={cpu} -> fd={fd}");
268            }
269        }
270        if links.is_empty() {
271            return Err(anyhow::anyhow!(
272                "Failed to attach perf events to any CPU ({} failures)",
273                failures
274            ));
275        }
276
277        let perf_buf = PerfBufferBuilder::new(perf_events_map)
278            .sample_cb(move |_cpu, data: &[u8]| {
279                let expect = std::mem::size_of::<crate::bpf_intf::perf_sample_event>();
280                if data.len() == expect + 4 {
281                    let ev: &crate::bpf_intf::perf_sample_event =
282                        unsafe { &*(data.as_ptr() as *const _) };
283                    trace!(
284                        "perf sample timestamp={} pid={} tid={} cpu={} addr=0x{:x}",
285                        ev.timestamp,
286                        ev.pid,
287                        ev.tid,
288                        ev.cpu,
289                        ev.address
290                    );
291                    events_cb
292                        .borrow_mut()
293                        .push_back(CacheMonitorValue::PerfSample {
294                            timestamp: ev.timestamp,
295                            pid: ev.pid,
296                            tid: ev.tid,
297                            cpu: ev.cpu,
298                            address: ev.address,
299                        });
300                }
301            })
302            .build()?;
303        Ok(Self {
304            _skel: skel,
305            perf_buf,
306            _links: links,
307            events,
308        })
309    }
310}
311
312impl<'a> CacheMonitor<'a> for PerfSampleMonitor<'a> {
313    fn poll(&mut self) -> Result<()> {
314        let _ = self.perf_buf.poll(Duration::from_millis(0));
315        Ok(())
316    }
317    fn consume(&mut self, cb: &mut dyn FnMut(CacheMonitorValue)) -> Result<()> {
318        let mut q = self.events.borrow_mut();
319        while let Some(ev) = q.pop_front() {
320            cb(ev);
321        }
322        Ok(())
323    }
324}
325
326// TLS hints monitor.
327pub struct HintsTlsMonitor<'a> {
328    _skel: crate::bpf::BpfSkel<'a>,
329    _link: libbpf_rs::Link,
330    _ringbuf: RingBuffer<'a>,
331    _map_handle: libbpf_rs::MapHandle,
332    events: Rc<RefCell<VecDeque<CacheMonitorValue>>>,
333}
334
335impl<'a> HintsTlsMonitor<'a> {
336    pub fn new(
337        open_storage: &'a mut std::mem::MaybeUninit<libbpf_rs::OpenObject>,
338        pinned_map_path: &str,
339        ring_size: u64,
340    ) -> Result<Self> {
341        let mut open = crate::bpf::BpfSkelBuilder::default().open(open_storage)?;
342        let mut ring_capacity = ring_size.max(4096);
343        if !ring_capacity.is_power_of_two() {
344            ring_capacity = ring_capacity.next_power_of_two();
345        }
346        let max_entries = ring_capacity.min(u32::MAX as u64) as u32;
347        unsafe {
348            libbpf_sys::bpf_map__set_max_entries(
349                open.maps.hints_events.as_libbpf_object().as_ptr(),
350                max_entries,
351            );
352        }
353        // Open pinned TLS map and reuse its FD for our BPF program's task_hint_map
354        let c_path = std::ffi::CString::new(pinned_map_path).unwrap();
355        let fd = unsafe { libbpf_sys::bpf_obj_get(c_path.as_ptr()) };
356        if fd < 0 {
357            return Err(anyhow::anyhow!(
358                "Failed to open pinned map at {}: {}",
359                pinned_map_path,
360                std::io::Error::last_os_error()
361            ));
362        }
363        let mut info = libbpf_sys::bpf_map_info::default();
364        let mut len = std::mem::size_of::<libbpf_sys::bpf_map_info>() as u32;
365        let ret = unsafe {
366            libbpf_sys::bpf_obj_get_info_by_fd(fd, &mut info as *mut _ as *mut _, &mut len)
367        };
368        if ret != 0 {
369            unsafe { libc::close(fd) };
370            return Err(anyhow::anyhow!(
371                "bpf_obj_get_info_by_fd failed for {}: {}",
372                pinned_map_path,
373                std::io::Error::last_os_error()
374            ));
375        }
376        // Sanity checks: must be TASK_STORAGE and value large enough
377        const BPF_MAP_TYPE_TASK_STORAGE: u32 = 29;
378        if info.type_ != BPF_MAP_TYPE_TASK_STORAGE {
379            unsafe { libc::close(fd) };
380            return Err(anyhow::anyhow!(
381                "--hints-map path is not a TASK_STORAGE map (type={})",
382                info.type_
383            ));
384        }
385        if info.value_size < 8 {
386            unsafe { libc::close(fd) };
387            return Err(anyhow::anyhow!(
388                "--hints-map value size {} < 8 bytes",
389                info.value_size
390            ));
391        }
392        let map_handle = libbpf_rs::MapHandle::from_map_id(info.id)
393            .context("Failed to create MapHandle from map ID.")?;
394        let borrowed_fd = map_handle.as_fd();
395        open.maps
396            .scx_layered_task_hint_map
397            .reuse_fd(borrowed_fd)
398            .context("Failed to reuse_fd on task_hint_map")?;
399        unsafe { libc::close(fd) };
400        let skel = open.load()?;
401        let link = skel.progs.handle_map_update.attach()?;
402
403        let events = Rc::new(RefCell::new(VecDeque::new()));
404        let events_cb = Rc::clone(&events);
405        let mut builder = RingBufferBuilder::new();
406        let events_map = &skel.maps.hints_events;
407        builder.add(events_map, move |data: &[u8]| {
408            if data.len() == std::mem::size_of::<crate::bpf_intf::hints_event>() {
409                let ev: &crate::bpf_intf::hints_event = unsafe { &*(data.as_ptr() as *const _) };
410                events_cb
411                    .borrow_mut()
412                    .push_back(CacheMonitorValue::HintsUpdate {
413                        timestamp: ev.timestamp,
414                        pid: ev.pid,
415                        tid: ev.tid,
416                        cpu: ev.cpu,
417                        hint_value: ev.hint_value,
418                    });
419            }
420            0
421        })?;
422        let ringbuf = builder.build()?;
423
424        Ok(Self {
425            _skel: skel,
426            _link: link,
427            _ringbuf: ringbuf,
428            _map_handle: map_handle,
429            events,
430        })
431    }
432}
433
434impl<'a> CacheMonitor<'a> for HintsTlsMonitor<'a> {
435    fn poll(&mut self) -> Result<()> {
436        let _ = self._ringbuf.poll(Duration::from_millis(0));
437        Ok(())
438    }
439    fn consume(&mut self, cb: &mut dyn FnMut(CacheMonitorValue)) -> Result<()> {
440        let mut q = self.events.borrow_mut();
441        while let Some(ev) = q.pop_front() {
442            cb(ev);
443        }
444        Ok(())
445    }
446}