1use anyhow::{Context, Result};
4use libbpf_rs::libbpf_sys;
5use libbpf_rs::skel::OpenSkel;
6use libbpf_rs::skel::SkelBuilder;
7use libbpf_rs::AsRawLibbpf;
8use libbpf_rs::{PerfBuffer, PerfBufferBuilder};
9use libbpf_rs::{RingBuffer, RingBufferBuilder};
10use log::trace;
11use scx_utils::perf;
12use std::cell::RefCell;
13use std::collections::VecDeque;
14use std::fs::OpenOptions;
15use std::io::Write;
16use std::os::fd::AsFd;
17use std::path::PathBuf;
18use std::rc::Rc;
19use std::time::Duration;
20
21#[derive(Debug, serde::Serialize)]
23#[serde(tag = "type", rename_all = "snake_case")]
24pub enum CacheMonitorValue {
25 SoftDirtyFault {
27 timestamp: u64,
28 pid: u32,
29 tid: u32,
30 cpu: u32,
31 address: u64,
32 },
33 PerfSample {
35 timestamp: u64,
36 pid: u32,
37 tid: u32,
38 cpu: u32,
39 address: u64,
40 },
41 HintsUpdate {
43 timestamp: u64,
44 pid: u32,
45 tid: u32,
46 cpu: u32,
47 hint_value: u64,
48 },
49}
50
51pub trait CacheMonitor<'a> {
53 fn poll(&mut self) -> Result<()>;
54 fn consume(&mut self, cb: &mut dyn FnMut(CacheMonitorValue)) -> Result<()>;
55}
56
57pub struct SoftDirtyCacheMonitor<'a> {
59 pid: Option<u32>,
60 _skel: crate::bpf::BpfSkel<'a>,
61 _ringbuf: RingBuffer<'a>,
62 _link: libbpf_rs::Link,
63 events: Rc<RefCell<VecDeque<CacheMonitorValue>>>,
64}
65
66impl<'a> SoftDirtyCacheMonitor<'a> {
67 pub fn new(
68 open_storage: &'a mut std::mem::MaybeUninit<libbpf_rs::OpenObject>,
69 pid: Option<u32>,
70 ring_size: u64,
71 ) -> Result<Self> {
72 let mut open_skel = crate::bpf::BpfSkelBuilder::default().open(open_storage)?;
73 let mut ring_capacity = ring_size.max(4096);
74 if !ring_capacity.is_power_of_two() {
75 ring_capacity = ring_capacity.next_power_of_two();
76 }
77 let max_entries = ring_capacity.min(u32::MAX as u64) as u32;
78 unsafe {
79 libbpf_sys::bpf_map__set_max_entries(
80 open_skel.maps.soft_dirty_events.as_libbpf_object().as_ptr(),
81 max_entries,
82 );
83 }
84 if let Some(pid) = pid {
85 open_skel.maps.rodata_data.as_mut().unwrap().filter_tgid = pid as i32;
86 }
87 let skel = open_skel.load()?;
88 let link = skel.progs.handle_do_fault.attach()?;
89 let mut builder = RingBufferBuilder::new();
90 let events: Rc<RefCell<VecDeque<CacheMonitorValue>>> =
91 Rc::new(RefCell::new(VecDeque::new()));
92 let events_cb = Rc::clone(&events);
93 let events_map = &skel.maps.soft_dirty_events;
94 builder.add(events_map, move |data: &[u8]| {
95 if data.len() == std::mem::size_of::<crate::bpf_intf::soft_dirty_fault_event>() {
96 let ev: &crate::bpf_intf::soft_dirty_fault_event =
97 unsafe { &*(data.as_ptr() as *const _) };
98 trace!(
99 "soft-dirty fault timestamp={} pid={} tid={} cpu={} addr=0x{:x}",
100 ev.timestamp,
101 ev.pid,
102 ev.tid,
103 ev.cpu,
104 ev.address
105 );
106 events_cb
107 .borrow_mut()
108 .push_back(CacheMonitorValue::SoftDirtyFault {
109 timestamp: ev.timestamp,
110 pid: ev.pid,
111 tid: ev.tid,
112 cpu: ev.cpu,
113 address: ev.address,
114 });
115 }
116 0
117 })?;
118 let ringbuf = builder.build()?;
119 Ok(Self {
120 pid,
121 _skel: skel,
122 _ringbuf: ringbuf,
123 _link: link,
124 events,
125 })
126 }
127
128 fn write_clear_refs(pid: u32) -> Result<()> {
129 let mut path = PathBuf::from("/proc");
130 path.push(pid.to_string());
131 path.push("clear_refs");
132 let mut f = OpenOptions::new()
133 .write(true)
134 .open(&path)
135 .with_context(|| format!("Opening {:?}", path))?;
136 f.write_all(b"4\n")
137 .with_context(|| format!("Writing to {:?}", path))?;
138 Ok(())
139 }
140
141 fn walk_all_pids() -> Result<Vec<u32>> {
142 let mut pids = Vec::new();
143 for entry in std::fs::read_dir("/proc")? {
144 let entry = entry?;
145 if let Some(fname) = entry.file_name().to_str() {
146 if let Ok(pid) = fname.parse::<u32>() {
147 pids.push(pid);
148 }
149 }
150 }
151 Ok(pids)
152 }
153}
154
155impl<'a> CacheMonitor<'a> for SoftDirtyCacheMonitor<'a> {
156 fn poll(&mut self) -> Result<()> {
157 let _ = self._ringbuf.poll(Duration::from_millis(0));
159 match self.pid {
160 Some(pid) => {
161 let _ = Self::write_clear_refs(pid);
163 }
164 None => {
165 for pid in Self::walk_all_pids()? {
167 let _ = Self::write_clear_refs(pid);
168 }
169 }
170 }
171 Ok(())
172 }
173
174 fn consume(&mut self, cb: &mut dyn FnMut(CacheMonitorValue)) -> Result<()> {
175 {
176 let mut q = self.events.borrow_mut();
177 while let Some(ev) = q.pop_front() {
178 cb(ev);
179 }
180 }
181 Ok(())
182 }
183}
184
185pub struct PerfSampleMonitor<'a> {
187 _skel: crate::bpf::BpfSkel<'a>,
188 perf_buf: PerfBuffer<'a>,
189 _links: Vec<libbpf_rs::Link>,
190 events: Rc<RefCell<VecDeque<CacheMonitorValue>>>,
191}
192
193impl<'a> PerfSampleMonitor<'a> {
194 pub fn new(
195 open_storage: &'a mut std::mem::MaybeUninit<libbpf_rs::OpenObject>,
196 pid: Option<u32>,
197 period: u64,
198 ) -> Result<Self> {
199 let open = crate::bpf::BpfSkelBuilder::default().open(open_storage)?;
200 let skel = open.load()?;
201
202 let mut links = Vec::new();
203 let mut failures = 0u32;
204 let mut attr = perf::bindings::perf_event_attr::default();
205 attr.size = std::mem::size_of::<perf::bindings::perf_event_attr>() as u32;
206 attr.type_ = perf::bindings::PERF_TYPE_RAW;
207 attr.config = 0x076;
208 attr.__bindgen_anon_1.sample_freq = period as u64;
209 attr.set_freq(1); attr.sample_type = perf::bindings::PERF_SAMPLE_ADDR as u64
211 | perf::bindings::PERF_SAMPLE_PHYS_ADDR as u64
212 | perf::bindings::PERF_SAMPLE_DATA_SRC as u64;
213 attr.set_inherit(if pid.is_some() { 1 } else { 0 });
214 attr.set_disabled(1);
215 attr.set_enable_on_exec(1);
216 attr.__bindgen_anon_2.wakeup_events = 1;
217 attr.set_precise_ip(1);
218
219 let events = Rc::new(RefCell::new(VecDeque::new()));
220 let events_cb = Rc::clone(&events);
221 let perf_events_map = &skel.maps.perf_sample_events;
222
223 let cpus: Vec<u32> = (0..num_cpus::get() as u32).collect();
224 let target_pid: i32 = pid.map(|p| p as i32).unwrap_or(-1); for cpu in cpus {
226 let fd = unsafe {
227 perf::perf_event_open(&mut attr as *mut _, target_pid, cpu as i32, -1, 0)
228 };
229 if fd < 0 {
230 failures += 1;
231 trace!(
232 "perf_event_open failed cpu={cpu} pid={target_pid} errno={} period={period}",
233 std::io::Error::last_os_error()
234 );
235 continue;
236 }
237 match skel.progs.handle_perf.attach_perf_event(fd) {
238 Ok(link) => {
239 trace!("attached perf sample prog cpu={cpu} fd={fd}");
241 links.push(link);
242 }
243 Err(e) => {
244 trace!("attach_perf_event failed cpu={cpu} fd={fd} err={:?}", e);
245 unsafe {
246 libc::close(fd);
247 }
248 failures += 1;
249 }
250 }
251
252 let map_fd =
253 unsafe { libbpf_sys::bpf_map__fd(perf_events_map.as_libbpf_object().as_ptr()) };
254 let key = cpu as u32;
255 let val = fd as u32;
256 let ret = unsafe {
257 libbpf_sys::bpf_map_update_elem(
258 map_fd,
259 &key as *const _ as *const _,
260 &val as *const _ as *const _,
261 0,
262 )
263 };
264 if ret != 0 {
265 trace!("bpf_map_update_elem failed cpu={cpu} fd={fd} ret={ret}");
266 } else {
267 trace!("mapped cpu={cpu} -> fd={fd}");
268 }
269 }
270 if links.is_empty() {
271 return Err(anyhow::anyhow!(
272 "Failed to attach perf events to any CPU ({} failures)",
273 failures
274 ));
275 }
276
277 let perf_buf = PerfBufferBuilder::new(perf_events_map)
278 .sample_cb(move |_cpu, data: &[u8]| {
279 let expect = std::mem::size_of::<crate::bpf_intf::perf_sample_event>();
280 if data.len() == expect + 4 {
281 let ev: &crate::bpf_intf::perf_sample_event =
282 unsafe { &*(data.as_ptr() as *const _) };
283 trace!(
284 "perf sample timestamp={} pid={} tid={} cpu={} addr=0x{:x}",
285 ev.timestamp,
286 ev.pid,
287 ev.tid,
288 ev.cpu,
289 ev.address
290 );
291 events_cb
292 .borrow_mut()
293 .push_back(CacheMonitorValue::PerfSample {
294 timestamp: ev.timestamp,
295 pid: ev.pid,
296 tid: ev.tid,
297 cpu: ev.cpu,
298 address: ev.address,
299 });
300 }
301 })
302 .build()?;
303 Ok(Self {
304 _skel: skel,
305 perf_buf,
306 _links: links,
307 events,
308 })
309 }
310}
311
312impl<'a> CacheMonitor<'a> for PerfSampleMonitor<'a> {
313 fn poll(&mut self) -> Result<()> {
314 let _ = self.perf_buf.poll(Duration::from_millis(0));
315 Ok(())
316 }
317 fn consume(&mut self, cb: &mut dyn FnMut(CacheMonitorValue)) -> Result<()> {
318 let mut q = self.events.borrow_mut();
319 while let Some(ev) = q.pop_front() {
320 cb(ev);
321 }
322 Ok(())
323 }
324}
325
326pub struct HintsTlsMonitor<'a> {
328 _skel: crate::bpf::BpfSkel<'a>,
329 _link: libbpf_rs::Link,
330 _ringbuf: RingBuffer<'a>,
331 _map_handle: libbpf_rs::MapHandle,
332 events: Rc<RefCell<VecDeque<CacheMonitorValue>>>,
333}
334
335impl<'a> HintsTlsMonitor<'a> {
336 pub fn new(
337 open_storage: &'a mut std::mem::MaybeUninit<libbpf_rs::OpenObject>,
338 pinned_map_path: &str,
339 ring_size: u64,
340 ) -> Result<Self> {
341 let mut open = crate::bpf::BpfSkelBuilder::default().open(open_storage)?;
342 let mut ring_capacity = ring_size.max(4096);
343 if !ring_capacity.is_power_of_two() {
344 ring_capacity = ring_capacity.next_power_of_two();
345 }
346 let max_entries = ring_capacity.min(u32::MAX as u64) as u32;
347 unsafe {
348 libbpf_sys::bpf_map__set_max_entries(
349 open.maps.hints_events.as_libbpf_object().as_ptr(),
350 max_entries,
351 );
352 }
353 let c_path = std::ffi::CString::new(pinned_map_path).unwrap();
355 let fd = unsafe { libbpf_sys::bpf_obj_get(c_path.as_ptr()) };
356 if fd < 0 {
357 return Err(anyhow::anyhow!(
358 "Failed to open pinned map at {}: {}",
359 pinned_map_path,
360 std::io::Error::last_os_error()
361 ));
362 }
363 let mut info = libbpf_sys::bpf_map_info::default();
364 let mut len = std::mem::size_of::<libbpf_sys::bpf_map_info>() as u32;
365 let ret = unsafe {
366 libbpf_sys::bpf_obj_get_info_by_fd(fd, &mut info as *mut _ as *mut _, &mut len)
367 };
368 if ret != 0 {
369 unsafe { libc::close(fd) };
370 return Err(anyhow::anyhow!(
371 "bpf_obj_get_info_by_fd failed for {}: {}",
372 pinned_map_path,
373 std::io::Error::last_os_error()
374 ));
375 }
376 const BPF_MAP_TYPE_TASK_STORAGE: u32 = 29;
378 if info.type_ != BPF_MAP_TYPE_TASK_STORAGE {
379 unsafe { libc::close(fd) };
380 return Err(anyhow::anyhow!(
381 "--hints-map path is not a TASK_STORAGE map (type={})",
382 info.type_
383 ));
384 }
385 if info.value_size < 8 {
386 unsafe { libc::close(fd) };
387 return Err(anyhow::anyhow!(
388 "--hints-map value size {} < 8 bytes",
389 info.value_size
390 ));
391 }
392 let map_handle = libbpf_rs::MapHandle::from_map_id(info.id)
393 .context("Failed to create MapHandle from map ID.")?;
394 let borrowed_fd = map_handle.as_fd();
395 open.maps
396 .scx_layered_task_hint_map
397 .reuse_fd(borrowed_fd)
398 .context("Failed to reuse_fd on task_hint_map")?;
399 unsafe { libc::close(fd) };
400 let skel = open.load()?;
401 let link = skel.progs.handle_map_update.attach()?;
402
403 let events = Rc::new(RefCell::new(VecDeque::new()));
404 let events_cb = Rc::clone(&events);
405 let mut builder = RingBufferBuilder::new();
406 let events_map = &skel.maps.hints_events;
407 builder.add(events_map, move |data: &[u8]| {
408 if data.len() == std::mem::size_of::<crate::bpf_intf::hints_event>() {
409 let ev: &crate::bpf_intf::hints_event = unsafe { &*(data.as_ptr() as *const _) };
410 events_cb
411 .borrow_mut()
412 .push_back(CacheMonitorValue::HintsUpdate {
413 timestamp: ev.timestamp,
414 pid: ev.pid,
415 tid: ev.tid,
416 cpu: ev.cpu,
417 hint_value: ev.hint_value,
418 });
419 }
420 0
421 })?;
422 let ringbuf = builder.build()?;
423
424 Ok(Self {
425 _skel: skel,
426 _link: link,
427 _ringbuf: ringbuf,
428 _map_handle: map_handle,
429 events,
430 })
431 }
432}
433
434impl<'a> CacheMonitor<'a> for HintsTlsMonitor<'a> {
435 fn poll(&mut self) -> Result<()> {
436 let _ = self._ringbuf.poll(Duration::from_millis(0));
437 Ok(())
438 }
439 fn consume(&mut self, cb: &mut dyn FnMut(CacheMonitorValue)) -> Result<()> {
440 let mut q = self.events.borrow_mut();
441 while let Some(ev) = q.pop_front() {
442 cb(ev);
443 }
444 Ok(())
445 }
446}