1use scx_utils::compat;
7use scxtop::bpf_skel::types::bpf_event;
8use scxtop::cli::{generate_completions, Cli, Commands, TraceArgs, TuiArgs};
9use scxtop::config::Config;
10use scxtop::edm::{ActionHandler, BpfEventActionPublisher, BpfEventHandler, EventDispatchManager};
11use scxtop::layered_util;
12use scxtop::mangoapp::poll_mangoapp;
13use scxtop::search;
14use scxtop::tracer::Tracer;
15use scxtop::util::{
16 check_bpf_capability, get_capability_warning_message, get_clock_value, is_root,
17 read_file_string,
18};
19use scxtop::Action;
20use scxtop::App;
21use scxtop::CpuStatTracker;
22use scxtop::Event;
23use scxtop::Key;
24use scxtop::KeyMap;
25use scxtop::MemStatSnapshot;
26use scxtop::PerfettoTraceManager;
27use scxtop::SystemStatAction;
28use scxtop::Tui;
29use scxtop::SCHED_NAME_PATH;
30use scxtop::{available_kprobe_events, UpdateColVisibilityAction};
31use scxtop::{bpf_skel::*, AppState};
32
33use anyhow::anyhow;
34use anyhow::bail;
35use anyhow::Result;
36use clap::{CommandFactory, Parser};
37use futures::future::join_all;
38use libbpf_rs::libbpf_sys;
39use libbpf_rs::num_possible_cpus;
40use libbpf_rs::skel::OpenSkel;
41use libbpf_rs::skel::SkelBuilder;
42use libbpf_rs::Link;
43use libbpf_rs::MapCore;
44use libbpf_rs::ProgramInput;
45use libbpf_rs::UprobeOpts;
46use log::debug;
47use log::info;
48use ratatui::crossterm::event::{KeyCode::Char, KeyEvent};
49use simplelog::{
50 ColorChoice, Config as SimplelogConfig, LevelFilter, TermLogger, TerminalMode, WriteLogger,
51};
52use std::ffi::CString;
53use std::fs::File;
54use std::mem::MaybeUninit;
55use std::os::fd::AsFd;
56use std::os::fd::AsRawFd;
57use std::str::FromStr;
58use std::sync::atomic::AtomicBool;
59use std::sync::atomic::Ordering;
60use std::sync::Arc;
61use std::time::Duration;
62use sysinfo::System;
63use tokio::sync::mpsc;
64
65struct SendRingBuffer(*mut libbpf_sys::ring_buffer);
68unsafe impl Send for SendRingBuffer {}
69
70impl SendRingBuffer {
71 fn poll(&self, timeout: i32) -> i32 {
72 unsafe { libbpf_sys::ring_buffer__poll(self.0, timeout) }
73 }
74
75 fn consume(&self) -> i32 {
76 unsafe { libbpf_sys::ring_buffer__consume(self.0) }
77 }
78
79 fn free(self) {
80 unsafe { libbpf_sys::ring_buffer__free(self.0) }
81 }
82}
83
84fn get_action(app: &App, keymap: &KeyMap, event: Event) -> Action {
85 match event {
86 Event::Error => Action::None,
87 Event::Tick => Action::Tick,
88 Event::TickRateChange(tick_rate_ms) => {
89 Action::TickRateChange(std::time::Duration::from_millis(tick_rate_ms))
90 }
91 Event::Key(key) => handle_key_event(app, keymap, key),
92 Event::Paste(paste) => handle_input_entry(app, paste).unwrap_or(Action::None),
93 _ => Action::None,
94 }
95}
96
97fn handle_key_event(app: &App, keymap: &KeyMap, key: KeyEvent) -> Action {
98 match key.code {
99 Char(c) => {
100 if let Some(action) = handle_input_entry(app, c.to_string()) {
102 action
103 } else {
104 match (app.state(), c) {
106 (AppState::BpfProgramDetail, 'p') => Action::ToggleBpfPerfSampling,
108 _ => keymap.action(&Key::Char(c)),
110 }
111 }
112 }
113 _ => keymap.action(&Key::Code(key.code)),
114 }
115}
116
117fn handle_input_entry(app: &App, s: String) -> Option<Action> {
118 match app.state() {
119 AppState::PerfEvent | AppState::KprobeEvent => Some(Action::InputEntry(s)),
120 AppState::Default
121 | AppState::Llc
122 | AppState::Node
123 | AppState::Process
124 | AppState::Memory
125 | AppState::PerfTop
126 | AppState::BpfPrograms
127 | AppState::Scheduler
128 if app.filtering() =>
129 {
130 Some(Action::InputEntry(s))
131 }
132 _ => None,
133 }
134}
135
136fn attach_progs(skel: &mut BpfSkel) -> Result<(Vec<Link>, Vec<String>)> {
138 attach_progs_selective(skel, &[])
139}
140
141fn attach_progs_selective(
144 skel: &mut BpfSkel,
145 program_names: &[&str],
146) -> Result<(Vec<Link>, Vec<String>)> {
147 let mut links = Vec::new();
148 let mut warnings = Vec::new();
149
150 let has_bpf_cap = check_bpf_capability();
152
153 if !has_bpf_cap {
154 warnings
155 .push("BPF programs cannot be attached - scheduler monitoring disabled".to_string());
156 warnings.push("Try running as root or configure BPF permissions".to_string());
157 return Ok((links, warnings));
158 }
159
160 let attach_all = program_names.is_empty();
161
162 let should_attach = |name: &str| -> bool { attach_all || program_names.contains(&name) };
164
165 macro_rules! safe_attach {
167 ($prog:expr, $name:literal) => {
168 if should_attach($name) {
169 match $prog.attach() {
170 Ok(link) => {
171 links.push(link);
172 }
173 Err(e) => {
174 if is_root() {
175 return Err(anyhow!(
177 "Failed to attach {} (running as root): {}",
178 $name,
179 e
180 ));
181 } else {
182 warnings.push(format!("Failed to attach {}: {}", $name, e));
183 }
184 }
185 }
186 }
187 };
188 }
189
190 safe_attach!(skel.progs.on_sched_cpu_perf, "sched_cpu_perf");
192 safe_attach!(skel.progs.scx_sched_reg, "scx_sched_reg");
193 safe_attach!(skel.progs.scx_sched_unreg, "scx_sched_unreg");
194 safe_attach!(skel.progs.on_sched_switch, "on_sched_switch");
195 safe_attach!(skel.progs.on_sched_wakeup, "on_sched_wakeup");
196 safe_attach!(skel.progs.on_sched_wakeup_new, "sched_wakeup_new");
197 safe_attach!(skel.progs.on_sched_waking, "on_sched_waking");
198 safe_attach!(skel.progs.on_sched_migrate_task, "on_sched_migrate_task");
199 safe_attach!(skel.progs.on_sched_fork, "sched_fork");
200 safe_attach!(skel.progs.on_sched_exec, "sched_exec");
201 safe_attach!(skel.progs.on_sched_exit, "sched_exit");
202
203 if compat::ksym_exists("scx_bpf_dsq_insert_vtime")? {
205 safe_attach!(skel.progs.scx_insert_vtime, "scx_insert_vtime");
206 safe_attach!(skel.progs.scx_insert, "scx_insert");
207 safe_attach!(skel.progs.scx_dsq_move, "scx_dsq_move");
208 safe_attach!(skel.progs.scx_dsq_move_vtime, "scx_dsq_move_vtime");
209 safe_attach!(skel.progs.scx_dsq_move_set_vtime, "scx_dsq_move_set_vtime");
210 safe_attach!(skel.progs.scx_dsq_move_set_slice, "scx_dsq_move_set_slice");
211 if compat::ksym_exists("scx_bpf_dsq_insert___v2")? {
213 safe_attach!(skel.progs.scx_insert_v2, "scx_insert_v2");
214 }
215 if compat::ksym_exists("__scx_bpf_dsq_insert_vtime")? {
216 safe_attach!(skel.progs.scx_insert_vtime_args, "scx_insert_vtime_args");
217 }
218 } else {
219 safe_attach!(skel.progs.scx_dispatch, "scx_dispatch");
220 safe_attach!(skel.progs.scx_dispatch_vtime, "scx_dispatch_vtime");
221 safe_attach!(
222 skel.progs.scx_dispatch_from_dsq_set_vtime,
223 "scx_dispatch_from_dsq_set_vtime"
224 );
225 safe_attach!(
226 skel.progs.scx_dispatch_from_dsq_set_slice,
227 "scx_dispatch_from_dsq_set_slice"
228 );
229 safe_attach!(skel.progs.scx_dispatch_from_dsq, "scx_dispatch_from_dsq");
230 safe_attach!(
231 skel.progs.scx_dispatch_vtime_from_dsq,
232 "scx_dispatch_vtime_from_dsq"
233 );
234 }
235
236 safe_attach!(skel.progs.on_cpuhp_enter, "cpuhp_enter");
238 safe_attach!(skel.progs.on_cpuhp_exit, "cpuhp_exit");
239 safe_attach!(skel.progs.on_softirq_entry, "on_softirq_entry");
240 safe_attach!(skel.progs.on_softirq_exit, "on_softirq_exit");
241
242 if links.is_empty() && !is_root() {
244 warnings.extend(get_capability_warning_message());
245 }
246
247 Ok((links, warnings))
248}
249
250fn run_trace(trace_args: &TraceArgs) -> Result<()> {
251 if !is_root() {
253 return Err(anyhow!(
254 "Trace functionality requires root privileges. Please run as root"
255 ));
256 }
257
258 TermLogger::init(
259 match trace_args.verbose {
260 0 => simplelog::LevelFilter::Info,
261 1 => simplelog::LevelFilter::Debug,
262 _ => simplelog::LevelFilter::Trace,
263 },
264 SimplelogConfig::default(),
265 TerminalMode::Mixed,
266 ColorChoice::Auto,
267 )?;
268
269 let mut kprobe_events = available_kprobe_events()?;
270 kprobe_events.sort();
271 search::sorted_contains_all(&kprobe_events, &trace_args.kprobes)
272 .then_some(())
273 .ok_or_else(|| anyhow!("Invalid kprobe events"))?;
274
275 let config = Config::default_config();
276 let worker_threads = config.worker_threads() as usize;
277
278 let num_cpus = num_possible_cpus()?;
280 let rb_cnt = scxtop::topology::calculate_default_ringbuf_count(num_cpus);
281
282 let required_threads = std::cmp::max(rb_cnt + 4, worker_threads);
285
286 info!(
287 "Creating tokio runtime with {} worker threads for {} ringbuffers",
288 required_threads, rb_cnt
289 );
290
291 tokio::runtime::Builder::new_multi_thread()
292 .enable_all()
293 .worker_threads(required_threads)
294 .build()
295 .unwrap()
296 .block_on(async {
297 let (action_tx, mut action_rx) = mpsc::unbounded_channel();
298
299 let mut open_object = MaybeUninit::uninit();
301 let mut builder = BpfSkelBuilder::default();
302 if trace_args.verbose > 2 {
303 builder.obj_builder.debug(true);
304 }
305
306 let mut skel = builder.open(&mut open_object)?;
307 compat::cond_kprobe_enable("gpu_memory_total", &skel.progs.on_gpu_memory_total)?;
308 compat::cond_kprobe_enable("hw_pressure_update", &skel.progs.on_hw_pressure_update)?;
309 compat::cond_tracepoint_enable("sched:sched_process_wait", &skel.progs.on_sched_wait)?;
310 compat::cond_tracepoint_enable("sched:sched_process_hang", &skel.progs.on_sched_hang)?;
311
312 let num_cpus = num_possible_cpus()?;
314 let rb_cnt = scxtop::topology::calculate_default_ringbuf_count(num_cpus);
315 let rb_cpu_mapping = scxtop::topology::setup_cpu_to_ringbuf_mapping(rb_cnt, num_cpus)?;
316
317 log::info!("Using {} ringbuffers for {} CPUs", rb_cnt, num_cpus);
318
319 let cpu_cnt_pow2 = num_cpus.next_power_of_two();
321 skel.maps.rodata_data.as_mut().unwrap().rb_cpu_map_mask = (cpu_cnt_pow2 - 1) as u64;
322
323 skel.maps
325 .data_rb_cpu_map
326 .set_max_entries(cpu_cnt_pow2 as u32)?;
327
328 skel.maps.events.set_max_entries(rb_cnt as u32)?;
330
331 let mut skel = skel.load()?;
333
334 for (cpu_id, &rb_id) in rb_cpu_mapping.iter().enumerate() {
336 if cpu_id < cpu_cnt_pow2 {
337 skel.maps.data_rb_cpu_map.update(
338 &(cpu_id as u32).to_ne_bytes(),
339 &rb_id.to_ne_bytes(),
340 libbpf_rs::MapFlags::ANY,
341 )?;
342 }
343 }
344
345 skel.maps.data_data.as_mut().unwrap().enable_bpf_events = false;
346
347 let mut links = vec![
349 skel.progs.on_sched_cpu_perf.attach()?,
350 skel.progs.scx_sched_reg.attach()?,
351 skel.progs.scx_sched_unreg.attach()?,
352 skel.progs.on_sched_switch.attach()?,
353 skel.progs.on_sched_wakeup.attach()?,
354 skel.progs.on_sched_wakeup_new.attach()?,
355 skel.progs.on_sched_waking.attach()?,
356 skel.progs.on_sched_migrate_task.attach()?,
357 skel.progs.on_sched_fork.attach()?,
358 skel.progs.on_sched_exec.attach()?,
359 skel.progs.on_sched_exit.attach()?,
360 ];
361
362 if compat::ksym_exists("scx_bpf_dsq_insert_vtime")? {
364 if let Ok(link) = skel.progs.scx_insert_vtime.attach() {
365 links.push(link);
366 }
367 if let Ok(link) = skel.progs.scx_insert.attach() {
368 links.push(link);
369 }
370 if let Ok(link) = skel.progs.scx_dsq_move.attach() {
371 links.push(link);
372 }
373 if let Ok(link) = skel.progs.scx_dsq_move_vtime.attach() {
374 links.push(link);
375 }
376 if let Ok(link) = skel.progs.scx_dsq_move_set_vtime.attach() {
377 links.push(link);
378 }
379 if let Ok(link) = skel.progs.scx_dsq_move_set_slice.attach() {
380 links.push(link);
381 }
382 if compat::ksym_exists("scx_bpf_dsq_insert___v2")? {
384 if let Ok(link) = skel.progs.scx_insert_v2.attach() {
385 links.push(link);
386 }
387 }
388 if compat::ksym_exists("__scx_bpf_dsq_insert_vtime")? {
389 if let Ok(link) = skel.progs.scx_insert_vtime_args.attach() {
390 links.push(link);
391 }
392 }
393 } else {
394 if let Ok(link) = skel.progs.scx_dispatch.attach() {
395 links.push(link);
396 }
397 if let Ok(link) = skel.progs.scx_dispatch_vtime.attach() {
398 links.push(link);
399 }
400 if let Ok(link) = skel.progs.scx_dispatch_from_dsq_set_vtime.attach() {
401 links.push(link);
402 }
403 if let Ok(link) = skel.progs.scx_dispatch_from_dsq_set_slice.attach() {
404 links.push(link);
405 }
406 if let Ok(link) = skel.progs.scx_dispatch_from_dsq.attach() {
407 links.push(link);
408 }
409 if let Ok(link) = skel.progs.scx_dispatch_vtime_from_dsq.attach() {
410 links.push(link);
411 }
412 }
413 if let Ok(link) = skel.progs.on_cpuhp_enter.attach() {
414 links.push(link);
415 }
416 if let Ok(link) = skel.progs.on_cpuhp_exit.attach() {
417 links.push(link);
418 }
419 if let Ok(link) = skel.progs.on_softirq_entry.attach() {
420 links.push(link);
421 }
422 if let Ok(link) = skel.progs.on_softirq_exit.attach() {
423 links.push(link);
424 }
425
426 let dropped_invalid_ts = Arc::new(std::sync::atomic::AtomicU64::new(0));
428
429 let shutdown = Arc::new(AtomicBool::new(false));
431
432 let events_map_fd = skel.maps.events.as_fd().as_raw_fd();
434 let mut rb_fds = Vec::new();
435 let mut rb_managers: Vec<SendRingBuffer> = Vec::new();
436
437 for rb_id in 0..rb_cnt {
438 let rb_fd = unsafe {
440 libbpf_sys::bpf_map_create(
441 libbpf_sys::BPF_MAP_TYPE_RINGBUF,
442 std::ptr::null(),
443 0,
444 0,
445 (32 * 1024 * 1024) as u32, std::ptr::null(),
447 )
448 };
449
450 if rb_fd < 0 {
451 bail!(
452 "Failed to create ringbuffer #{}: {}",
453 rb_id,
454 std::io::Error::last_os_error()
455 );
456 }
457
458 let rb_id_u32 = rb_id as u32;
460 let ret = unsafe {
461 libbpf_sys::bpf_map_update_elem(
462 events_map_fd,
463 &rb_id_u32 as *const u32 as *const std::ffi::c_void,
464 &rb_fd as *const i32 as *const std::ffi::c_void,
465 libbpf_sys::BPF_NOEXIST.into(),
466 )
467 };
468
469 if ret < 0 {
470 bail!(
471 "Failed to add ringbuffer #{} to hash-of-maps: {}",
472 rb_id,
473 std::io::Error::last_os_error()
474 );
475 }
476
477 rb_fds.push(rb_fd);
478 }
479
480 struct RingBufContext {
483 dropped_invalid_ts: Arc<std::sync::atomic::AtomicU64>,
484 action_tx: mpsc::UnboundedSender<Action>,
485 shutdown: Arc<AtomicBool>,
486 }
487
488 extern "C" fn ring_buffer_sample_callback(
489 ctx: *mut std::ffi::c_void,
490 data: *mut std::ffi::c_void,
491 size: u64,
492 ) -> std::ffi::c_int {
493 unsafe {
494 let ctx = &*(ctx as *const RingBufContext);
495
496 if ctx.shutdown.load(std::sync::atomic::Ordering::Relaxed) {
498 return 0;
499 }
500
501 let mut event = bpf_event::default();
502 let copy_size = std::cmp::min(size as usize, std::mem::size_of::<bpf_event>());
503 std::ptr::copy_nonoverlapping(
504 data as *const u8,
505 &mut event as *mut bpf_event as *mut u8,
506 copy_size,
507 );
508
509 if event.ts == 0 {
511 ctx.dropped_invalid_ts
512 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
513 return 0;
514 }
515
516 let mut edm = EventDispatchManager::new(None, None);
517 edm.register_bpf_handler(Box::new(BpfEventActionPublisher::new(
518 ctx.action_tx.clone(),
519 )));
520 let _ = edm.on_event(&event);
521 }
522 0
523 }
524
525 for rb_fd in &rb_fds {
526 let ctx = Box::new(RingBufContext {
527 dropped_invalid_ts: dropped_invalid_ts.clone(),
528 action_tx: action_tx.clone(),
529 shutdown: shutdown.clone(),
530 });
531 let ctx_ptr = Box::into_raw(ctx) as *mut std::ffi::c_void;
532
533 let rb_ptr = unsafe {
534 libbpf_sys::ring_buffer__new(
535 *rb_fd,
536 Some(ring_buffer_sample_callback),
537 ctx_ptr,
538 std::ptr::null(),
539 )
540 };
541
542 if rb_ptr.is_null() {
543 unsafe {
544 let _ = Box::from_raw(ctx_ptr as *mut RingBufContext);
545 }
546 bail!("Failed to create ring buffer manager");
547 }
548
549 rb_managers.push(SendRingBuffer(rb_ptr));
550 }
551
552 let stop_poll = shutdown.clone();
554 let stop_stats = shutdown.clone();
555
556 let mut ringbuffer_handles = Vec::new();
557 let mut producer_handles = Vec::new();
558
559 for (rb_id, rb) in rb_managers.into_iter().enumerate() {
562 let stop_poll_clone = stop_poll.clone();
563 ringbuffer_handles.push(tokio::task::spawn_blocking(move || {
564 info!("ringbuffer #{} task started", rb_id);
565 let mut poll_count = 0;
566 loop {
567 rb.poll(1);
569 poll_count += 1;
570 if stop_poll_clone.load(Ordering::Relaxed) {
571 info!(
572 "ringbuffer #{} received shutdown after {} polls",
573 rb_id, poll_count
574 );
575 let consumed = rb.consume();
577 info!("ringbuffer #{} consumed {} events", rb_id, consumed);
578 rb.free();
580 info!("ringbuffer #{} freed", rb_id);
581 break;
582 }
583 }
584 info!("ringbuffer #{} exiting", rb_id);
585 }));
586 }
587 info!(
588 "spawned {} ringbuffer polling tasks",
589 ringbuffer_handles.len()
590 );
591
592 if trace_args.system_stats {
593 let mut cpu_stat_tracker = CpuStatTracker::default();
594 let mut mem_stats = MemStatSnapshot::default();
595 let mut system = System::new_all();
596 let action_tx_clone = action_tx.clone();
597
598 producer_handles.push(tokio::spawn(async move {
599 info!("stats task started");
600 let mut stats_count = 0;
601 loop {
602 if stop_stats.load(Ordering::Relaxed) {
603 info!("stats task received shutdown after {} samples", stats_count);
604 break;
605 }
606 let ts = get_clock_value(libc::CLOCK_BOOTTIME);
607
608 cpu_stat_tracker
609 .update(&mut system)
610 .expect("Failed to update cpu stats");
611
612 mem_stats.update().expect("Failed to update mem stats");
613
614 let sys_stat_action = Action::SystemStat(SystemStatAction {
615 ts,
616 cpu_data_prev: cpu_stat_tracker.prev.clone(),
617 cpu_data_current: cpu_stat_tracker.current.clone(),
618 mem_info: mem_stats.clone(),
619 });
620 action_tx_clone
621 .send(sys_stat_action)
622 .expect("Failed to send CpuStat action");
623
624 stats_count += 1;
625 tokio::time::sleep(Duration::from_millis(100)).await;
626 }
627 info!("stats task exiting");
628 }));
629 }
630
631 let trace_file_prefix = config.trace_file_prefix().to_string();
632 let trace_file = trace_args.output_file.clone();
633 let mut trace_manager = PerfettoTraceManager::new(trace_file_prefix, None);
634
635 {
637 use scx_utils::Topology;
638 if let Ok(topo) = Topology::new() {
639 let mut cpu_to_llc = std::collections::HashMap::new();
640 let mut cpu_to_numa = std::collections::HashMap::new();
641 let mut cpu_to_core = std::collections::HashMap::new();
642 for cpu in topo.all_cpus.values() {
643 cpu_to_llc.insert(cpu.id as u32, cpu.llc_id as u32);
644 cpu_to_numa.insert(cpu.id as u32, cpu.node_id as u32);
645 cpu_to_core.insert(cpu.id as u32, cpu.core_id as u32);
646 }
647 trace_manager.set_topology(cpu_to_llc, cpu_to_numa, cpu_to_core);
648 }
649 }
650
651 info!("starting trace for {}ms", trace_args.trace_ms);
652 trace_manager.start()?;
653 let mut tracer = Tracer::new(skel);
654 tracer.trace(&trace_args.kprobes)?;
655
656 let shutdown_trace = shutdown.clone();
657 let trace_handle = tokio::spawn(async move {
658 debug!("trace generation task started");
659 let mut count = 0;
660 let mut last_log = std::time::Instant::now();
661 loop {
662 tokio::select! {
663 _ = tokio::time::sleep(Duration::from_millis(100)) => {
665 if shutdown_trace.load(Ordering::Relaxed) {
666 break;
667 }
668 }
669 action = action_rx.recv() => {
670 if let Some(a) = action {
671 count += 1;
672 if last_log.elapsed() > std::time::Duration::from_secs(1) {
673 debug!("trace task: {} events processed", count);
674 last_log = std::time::Instant::now();
675 }
676 trace_manager
677 .on_action(&a)
678 .expect("Action should have been resolved");
679 if shutdown_trace.load(Ordering::Relaxed) {
683 break;
684 }
685 } else {
686 break;
687 }
688 }
689 }
690 }
691 info!("trace task: draining remaining events");
694 while let Ok(a) = action_rx.try_recv() {
695 count += 1;
696 trace_manager
697 .on_action(&a)
698 .expect("Action should have been resolved");
699 }
700 info!("trace task: stopping trace manager");
701 trace_manager.stop(trace_file, None).unwrap();
702 info!("trace file compiled, collected {count} events");
703 });
704
705 info!("waiting for trace duration ({}ms)", trace_args.trace_ms);
706 tokio::time::sleep(Duration::from_millis(trace_args.trace_ms)).await;
707 info!("trace duration complete, beginning shutdown");
708
709 info!("shutdown: clearing BPF links");
717 tracer.clear_links()?;
718 info!("shutdown: BPF links cleared");
719 drop(links);
720 info!("shutdown: links dropped");
721
722 info!("shutdown: setting shutdown flag");
723 shutdown.store(true, Ordering::Relaxed);
724 info!(
725 "shutdown: flag set, waiting for {} ringbuffer tasks",
726 ringbuffer_handles.len()
727 );
728
729 let results = join_all(ringbuffer_handles).await;
731 info!("shutdown: all {} ringbuffer tasks joined", results.len());
732 for (idx, result) in results.iter().enumerate() {
733 if let Err(e) = result {
734 eprintln!("Ringbuffer task {} panicked: {e}", idx);
735 } else {
736 debug!("ringbuffer task {} exited successfully", idx);
737 }
738 }
739 info!("shutdown: ringbuffer tasks complete");
740
741 info!(
743 "shutdown: waiting for {} producer tasks",
744 producer_handles.len()
745 );
746 let results = join_all(producer_handles).await;
747 info!("shutdown: all {} producer tasks joined", results.len());
748 for (idx, result) in results.iter().enumerate() {
749 if let Err(e) = result {
750 eprintln!("Producer task {} panicked: {e}", idx);
751 } else {
752 debug!("producer task {} exited successfully", idx);
753 }
754 }
755 info!("shutdown: producer tasks complete");
756
757 info!("shutdown: dropping action_tx");
759 drop(action_tx);
760 info!("shutdown: action_tx dropped, waiting for trace generation");
761
762 if let Err(e) = trace_handle.await {
764 eprintln!("Trace generation task panicked: {e}");
765 }
766 info!("shutdown: trace generation complete");
767
768 info!("shutdown: collecting final stats");
769 let stats = tracer.stats()?;
770 info!("shutdown: {stats:?}");
771
772 info!("shutdown: complete");
773 Ok(())
774 })
775}
776
777fn run_tui(tui_args: &TuiArgs) -> Result<()> {
778 if let Ok(log_path) = std::env::var("RUST_LOG_PATH") {
779 let log_level = match std::env::var("RUST_LOG") {
780 Ok(v) => LevelFilter::from_str(&v)?,
781 Err(_) => LevelFilter::Info,
782 };
783
784 WriteLogger::init(
785 log_level,
786 simplelog::Config::default(),
787 File::create(log_path)?,
788 )?;
789
790 log_panics::Config::new()
791 .backtrace_mode(log_panics::BacktraceMode::Resolved)
792 .install_panic_hook();
793 };
794
795 let config = Config::merge([
796 Config::from(tui_args.clone()),
797 Config::load_or_default().expect("Failed to load config or load default config"),
798 ]);
799 let keymap = config.active_keymap.clone();
800
801 let worker_threads = config.worker_threads() as usize;
803 let num_cpus = num_possible_cpus()?;
804 let rb_cnt = scxtop::topology::calculate_default_ringbuf_count(num_cpus);
805
806 let required_threads = std::cmp::max(rb_cnt + 4, worker_threads);
809
810 tokio::runtime::Builder::new_multi_thread()
811 .enable_all()
812 .worker_threads(required_threads)
813 .build()
814 .unwrap()
815 .block_on(async {
816 let mut open_object = MaybeUninit::uninit();
818
819 let (action_tx, mut action_rx) = mpsc::unbounded_channel();
820
821 let has_bpf_cap = check_bpf_capability();
823 let mut capability_warnings = Vec::new();
824 let mut _bpf_enabled = false;
825 let mut links = Vec::new();
826 let mut event_rb_data_opt: Option<(
827 Vec<i32>, Arc<std::sync::atomic::AtomicU64>, mpsc::UnboundedSender<Action>, )> = None;
831 let mut skel_opt = None;
832
833 if has_bpf_cap {
834 let mut builder = BpfSkelBuilder::default();
836 if config.debug() {
837 builder.obj_builder.debug(true);
838 }
839 let bpf_publisher = BpfEventActionPublisher::new(action_tx.clone());
840 let mut edm = EventDispatchManager::new(None, None);
841 edm.register_bpf_handler(Box::new(bpf_publisher));
842
843 match builder.open(&mut open_object) {
845 Ok(mut skel) => {
846 skel.maps.rodata_data.as_mut().unwrap().long_tail_tracing_min_latency_ns =
847 tui_args.experimental_long_tail_tracing_min_latency_ns;
848
849 let _map_handle = if tui_args.layered {
850 skel.maps.rodata_data.as_mut().unwrap().layered = true;
851 action_tx.send(Action::UpdateColVisibility(UpdateColVisibilityAction {
852 table: "Process".to_string(),
853 col: "Layer ID".to_string(),
854 visible: true,
855 }))?;
856 action_tx.send(Action::UpdateColVisibility(UpdateColVisibilityAction {
857 table: "Thread".to_string(),
858 col: "Layer ID".to_string(),
859 visible: true,
860 }))?;
861 match layered_util::attach_to_existing_map("task_ctxs", &mut skel.maps.task_ctxs) {
862 Ok(handle) => Some(handle),
863 Err(e) => {
864 capability_warnings.push(format!("Failed to attach to layered map: {e}"));
865 None
866 }
867 }
868 } else {
869 None
870 };
871
872 let num_cpus = num_possible_cpus()?;
874 let rb_cnt = scxtop::topology::calculate_default_ringbuf_count(num_cpus);
875 let rb_cpu_mapping = scxtop::topology::setup_cpu_to_ringbuf_mapping(rb_cnt, num_cpus)?;
876
877 log::info!("Using {} ringbuffers for {} CPUs", rb_cnt, num_cpus);
878
879 let cpu_cnt_pow2 = num_cpus.next_power_of_two();
881 skel.maps.rodata_data.as_mut().unwrap().rb_cpu_map_mask = (cpu_cnt_pow2 - 1) as u64;
882
883 if let Err(e) = skel.maps.data_rb_cpu_map.set_max_entries(cpu_cnt_pow2 as u32) {
885 capability_warnings.push(format!("Failed to set CPU-to-ringbuf map size: {e}"));
886 }
887
888 if let Err(e) = skel.maps.events.set_max_entries(rb_cnt as u32) {
890 capability_warnings.push(format!("Failed to set ringbuf count: {e}"));
891 }
892
893 if let Err(e) = compat::cond_kprobe_enable("gpu_memory_total", &skel.progs.on_gpu_memory_total) {
894 capability_warnings.push(format!("Failed to enable gpu_memory_total kprobe: {e}"));
895 }
896 if let Err(e) = compat::cond_kprobe_enable("hw_pressure_update", &skel.progs.on_hw_pressure_update) {
897 capability_warnings.push(format!("Failed to enable hw_pressure_update kprobe: {e}"));
898 }
899 if let Err(e) = compat::cond_tracepoint_enable("sched:sched_process_wait", &skel.progs.on_sched_wait) {
900 capability_warnings.push(format!("Failed to enable sched_process_wait tracepoint: {e}"));
901 }
902 if let Err(e) = compat::cond_tracepoint_enable("sched:sched_process_hang", &skel.progs.on_sched_hang) {
903 capability_warnings.push(format!("Failed to enable sched_process_hang tracepoint: {e}"));
904 }
905
906 match skel.load() {
908 Ok(mut loaded_skel) => {
909 for (cpu_id, &rb_id) in rb_cpu_mapping.iter().enumerate() {
911 if cpu_id < cpu_cnt_pow2 {
912 if let Err(e) = loaded_skel.maps.data_rb_cpu_map.update(
913 &(cpu_id as u32).to_ne_bytes(),
914 &rb_id.to_ne_bytes(),
915 libbpf_rs::MapFlags::ANY,
916 ) {
917 capability_warnings.push(format!("Failed to set CPU {} -> ringbuf {}: {}", cpu_id, rb_id, e));
918 }
919 }
920 }
921
922 let (skel_links, attach_warnings) = attach_progs(&mut loaded_skel)?;
923 links = skel_links;
924 capability_warnings.extend(attach_warnings);
925
926 if !links.is_empty() || is_root() {
927 if let Err(e) = loaded_skel.progs.scxtop_init.test_run(ProgramInput::default()) {
929 capability_warnings.push(format!("Failed to initialize scxtop BPF program: {e}"));
930 }
931 }
932
933 if !links.is_empty() {
935 let dropped_invalid_ts = Arc::new(std::sync::atomic::AtomicU64::new(0));
937
938 let events_map_fd = loaded_skel.maps.events.as_fd().as_raw_fd();
940 let mut rb_fds = Vec::new();
941
942 for rb_id in 0..rb_cnt {
943 let rb_fd = unsafe {
945 libbpf_sys::bpf_map_create(
946 libbpf_sys::BPF_MAP_TYPE_RINGBUF,
947 std::ptr::null(),
948 0,
949 0,
950 (32 * 1024 * 1024) as u32, std::ptr::null(),
952 )
953 };
954
955 if rb_fd < 0 {
956 capability_warnings.push(format!("Failed to create ringbuffer #{}: {}", rb_id, std::io::Error::last_os_error()));
957 continue;
958 }
959
960 let rb_id_u32 = rb_id as u32;
962 let ret = unsafe {
963 libbpf_sys::bpf_map_update_elem(
964 events_map_fd,
965 &rb_id_u32 as *const u32 as *const std::ffi::c_void,
966 &rb_fd as *const i32 as *const std::ffi::c_void,
967 libbpf_sys::BPF_NOEXIST.into(),
968 )
969 };
970
971 if ret < 0 {
972 capability_warnings.push(format!("Failed to add ringbuffer #{} to hash-of-maps: {}", rb_id, std::io::Error::last_os_error()));
973 continue;
974 }
975
976 rb_fds.push(rb_fd);
977 }
978
979 if !rb_fds.is_empty() {
980 event_rb_data_opt = Some((rb_fds, dropped_invalid_ts, action_tx.clone()));
982 _bpf_enabled = true;
983 }
984 }
985
986 skel_opt = Some(loaded_skel);
987 }
988 Err(e) => {
989 if is_root() {
990 return Err(anyhow!("Failed to load BPF skeleton (running as root): {e}"));
991 } else {
992 capability_warnings.push(format!("Failed to load BPF skeleton: {e}"));
993 capability_warnings.extend(get_capability_warning_message());
994 }
995 }
996 }
997 }
998 Err(e) => {
999 if is_root() {
1000 return Err(anyhow!("Failed to open BPF skeleton (running as root): {e}"));
1001 } else {
1002 capability_warnings.push(format!("Failed to open BPF skeleton: {e}"));
1003 capability_warnings.extend(get_capability_warning_message());
1004 }
1005 }
1006 }
1007 } else {
1008 capability_warnings.extend(get_capability_warning_message());
1010 }
1011
1012 if tui_args.experimental_long_tail_tracing {
1014 if let Some(ref mut skel) = skel_opt {
1015 skel.maps.data_data.as_mut().unwrap().trace_duration_ns = config.trace_duration_ns();
1016 skel.maps.data_data.as_mut().unwrap().trace_warmup_ns = config.trace_warmup_ns();
1017
1018 let binary = tui_args
1019 .experimental_long_tail_tracing_binary
1020 .clone()
1021 .unwrap();
1022 let symbol = tui_args
1023 .experimental_long_tail_tracing_symbol
1024 .clone()
1025 .unwrap();
1026
1027 match skel.progs.long_tail_tracker_exit.attach_uprobe_with_opts(
1028 -1, binary.clone(),
1030 0,
1031 UprobeOpts {
1032 retprobe: true,
1033 func_name: Some(symbol.clone()),
1034 ..Default::default()
1035 },
1036 ) {
1037 Ok(link) => links.push(link),
1038 Err(e) => capability_warnings.push(format!("Failed to attach long tail tracker exit: {e}"))
1039 }
1040
1041 match skel.progs.long_tail_tracker_entry.attach_uprobe_with_opts(
1042 -1, binary.clone(),
1044 0,
1045 UprobeOpts {
1046 retprobe: false,
1047 func_name: Some(symbol.clone()),
1048 ..Default::default()
1049 },
1050 ) {
1051 Ok(link) => links.push(link),
1052 Err(e) => capability_warnings.push(format!("Failed to attach long tail tracker entry: {e}"))
1053 }
1054 } else {
1055 capability_warnings.push("Long tail tracing requested but BPF skeleton not available".to_string());
1056 }
1057 }
1058
1059 let mut tui = Tui::new(keymap.clone(), config.tick_rate_ms(), config.frame_rate_ms())?;
1060 let scheduler = read_file_string(SCHED_NAME_PATH).unwrap_or("".to_string());
1061
1062 let mut app = if let Some(skel) = skel_opt {
1064 App::new(
1065 config,
1066 scheduler,
1067 100,
1068 tui_args.process_id,
1069 tui_args.layered,
1070 action_tx.clone(),
1071 skel,
1072 )?
1073 } else {
1074 App::new_without_bpf(
1076 config,
1077 scheduler,
1078 100,
1079 tui_args.process_id,
1080 tui_args.layered,
1081 action_tx.clone(),
1082 )?
1083 };
1084
1085 if !capability_warnings.is_empty() {
1087 app.set_capability_warnings(capability_warnings);
1088 }
1089
1090 tui.enter()?;
1091
1092 let shutdown = app.should_quit.clone();
1094 let mut ringbuffer_handles = Vec::new();
1095 if let Some((rb_fds, dropped_invalid_ts, rb_action_tx)) = event_rb_data_opt {
1096 struct RingBufContext {
1099 dropped_invalid_ts: Arc<std::sync::atomic::AtomicU64>,
1100 action_tx: mpsc::UnboundedSender<Action>,
1101 shutdown: Arc<AtomicBool>,
1102 }
1103
1104 extern "C" fn ring_buffer_sample_callback(
1105 ctx: *mut std::ffi::c_void,
1106 data: *mut std::ffi::c_void,
1107 size: u64,
1108 ) -> std::ffi::c_int {
1109 unsafe {
1110 let ctx = &*(ctx as *const RingBufContext);
1111
1112 if ctx.shutdown.load(std::sync::atomic::Ordering::Relaxed) {
1114 return 0;
1115 }
1116
1117 let mut event = bpf_event::default();
1118 let copy_size = std::cmp::min(size as usize, std::mem::size_of::<bpf_event>());
1119 std::ptr::copy_nonoverlapping(
1120 data as *const u8,
1121 &mut event as *mut bpf_event as *mut u8,
1122 copy_size,
1123 );
1124
1125 if event.ts == 0 {
1127 ctx.dropped_invalid_ts.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1128 return 0;
1129 }
1130
1131 let mut edm = EventDispatchManager::new(None, None);
1132 edm.register_bpf_handler(Box::new(BpfEventActionPublisher::new(ctx.action_tx.clone())));
1133 let _ = edm.on_event(&event);
1134 }
1135 0
1136 }
1137
1138 for rb_fd in rb_fds {
1140 let ctx = Box::new(RingBufContext {
1141 dropped_invalid_ts: dropped_invalid_ts.clone(),
1142 action_tx: rb_action_tx.clone(),
1143 shutdown: shutdown.clone(),
1144 });
1145 let ctx_ptr = Box::into_raw(ctx) as *mut std::ffi::c_void;
1146
1147 let rb_ptr = unsafe {
1148 libbpf_sys::ring_buffer__new(
1149 rb_fd,
1150 Some(ring_buffer_sample_callback),
1151 ctx_ptr,
1152 std::ptr::null(),
1153 )
1154 };
1155
1156 if rb_ptr.is_null() {
1157 unsafe { let _ = Box::from_raw(ctx_ptr as *mut RingBufContext); }
1158 log::warn!("Failed to create ring buffer manager");
1159 continue;
1160 }
1161
1162 let rb = SendRingBuffer(rb_ptr);
1163 let shutdown_clone = shutdown.clone();
1164 let rb_id = ringbuffer_handles.len();
1165 ringbuffer_handles.push(tokio::task::spawn_blocking(move || {
1167 loop {
1168 rb.poll(1);
1170 if shutdown_clone.load(Ordering::Relaxed) {
1171 rb.consume();
1173 rb.free();
1175 log::debug!("ringbuffer #{} polling stopped", rb_id);
1176 break;
1177 }
1178 }
1179 }));
1180 }
1181 }
1182
1183 if tui_args.mangoapp_tracing {
1184 let stop_mangoapp = app.should_quit.clone();
1185 let mangoapp_path = CString::new(tui_args.mangoapp_path.clone()).unwrap();
1186 let poll_intvl_ms = tui_args.mangoapp_poll_intvl_ms;
1187 let tx = action_tx.clone();
1188 tokio::spawn(async move {
1189 poll_mangoapp(
1190 mangoapp_path,
1191 poll_intvl_ms,
1192 tx,
1193 stop_mangoapp,
1194 )
1195 .await
1196 });
1197 }
1198
1199 loop {
1200 tokio::select! {
1201 ev = tui.next() => {
1202 let ev = ev?;
1203 match ev {
1204 Event::Quit => { action_tx.send(Action::Quit)?; },
1205 Event::Tick => action_tx.send(Action::Tick)?,
1206 Event::TickRateChange(tick_rate_ms) => action_tx.send(
1207 Action::TickRateChange(std::time::Duration::from_millis(tick_rate_ms)),
1208 )?,
1209 Event::Render => {
1210 if app.should_quit.load(Ordering::Relaxed) {
1211 break;
1212 }
1213 if app.state() != AppState::Pause {
1214 tui.draw(|f| app.render(f).expect("Failed to render application"))?;
1215 }
1216 }
1217 Event::Key(_) => {
1218 let action = get_action(&app, &keymap, ev);
1219 action_tx.send(action)?;
1220 }
1221 _ => {}
1222 }}
1223
1224 ac = action_rx.recv() => {
1225 let ac = ac.ok_or(anyhow!("actions channel closed"))?;
1226 app.handle_action(&ac)?;
1227 }
1228 }
1229 }
1230 tui.exit()?;
1231
1232 log::debug!("waiting for {} ringbuffer tasks to complete", ringbuffer_handles.len());
1234 for handle in ringbuffer_handles {
1235 if let Err(e) = handle.await {
1236 log::error!("Ringbuffer task panicked: {e}");
1237 }
1238 }
1239
1240 drop(links);
1241
1242 Ok(())
1243 })
1244}
1245
1246fn run_mcp(mcp_args: &scxtop::cli::McpArgs) -> Result<()> {
1247 use scx_utils::Topology;
1248 use scxtop::mcp::{events::action_to_mcp_event, McpServer, McpServerConfig};
1249 use std::sync::Arc;
1250
1251 TermLogger::init(
1253 match mcp_args.verbose {
1254 0 => LevelFilter::Warn,
1255 1 => LevelFilter::Info,
1256 2 => LevelFilter::Debug,
1257 _ => LevelFilter::Trace,
1258 },
1259 SimplelogConfig::default(),
1260 TerminalMode::Stderr,
1261 ColorChoice::Auto,
1262 )?;
1263
1264 let topo = Topology::new().expect("Failed to create topology");
1266 let topo_arc = Arc::new(topo);
1267
1268 let mcp_config = McpServerConfig {
1269 daemon_mode: mcp_args.daemon,
1270 enable_logging: mcp_args.enable_logging,
1271 };
1272
1273 if mcp_args.daemon {
1274 tokio::runtime::Builder::new_multi_thread()
1276 .enable_all()
1277 .worker_threads(4)
1278 .build()
1279 .unwrap()
1280 .block_on(async {
1281 let mut open_object = MaybeUninit::uninit();
1282 let (action_tx, mut action_rx) = mpsc::unbounded_channel();
1283
1284 use scxtop::mcp::create_shared_stats;
1286 let shared_stats = create_shared_stats();
1287 let shared_stats_for_event_handler = shared_stats.clone();
1288
1289 let builder = BpfSkelBuilder::default();
1291 let mut skel = builder.open(&mut open_object)?;
1292
1293 let num_cpus = num_possible_cpus()?;
1295 let rb_cnt = scxtop::topology::calculate_default_ringbuf_count(num_cpus);
1296 let rb_cpu_mapping = scxtop::topology::setup_cpu_to_ringbuf_mapping(rb_cnt, num_cpus)?;
1297
1298 log::info!("Using {} ringbuffers for {} CPUs", rb_cnt, num_cpus);
1299
1300 let cpu_cnt_pow2 = num_cpus.next_power_of_two();
1302 skel.maps.rodata_data.as_mut().unwrap().rb_cpu_map_mask = (cpu_cnt_pow2 - 1) as u64;
1303
1304 skel.maps.data_rb_cpu_map.set_max_entries(cpu_cnt_pow2 as u32)?;
1306
1307 skel.maps.events.set_max_entries(rb_cnt as u32)?;
1309
1310 let mut skel = skel.load()?;
1311
1312 for (cpu_id, &rb_id) in rb_cpu_mapping.iter().enumerate() {
1314 if cpu_id < cpu_cnt_pow2 {
1315 skel.maps.data_rb_cpu_map.update(
1316 &(cpu_id as u32).to_ne_bytes(),
1317 &rb_id.to_ne_bytes(),
1318 libbpf_rs::MapFlags::ANY,
1319 )?;
1320 }
1321 }
1322
1323 use scxtop::mcp::{
1325 WakerWakeeAnalyzer, LatencyTracker, CpuHotspotAnalyzer, MigrationAnalyzer,
1326 ProcessEventHistory, DsqMonitor, EventRateMonitor, WakeupChainTracker, EventBuffer,
1327 SoftirqAnalyzer,
1328 };
1329 use std::sync::Mutex;
1330
1331 let mut waker_wakee = WakerWakeeAnalyzer::new();
1333 waker_wakee.set_topology(topo_arc.clone());
1334 let waker_wakee_arc = Arc::new(Mutex::new(waker_wakee));
1335
1336 let latency_tracker = LatencyTracker::new(1000); let latency_tracker_arc = Arc::new(Mutex::new(latency_tracker));
1339
1340 let cpu_hotspot = CpuHotspotAnalyzer::new(100); let cpu_hotspot_arc = Arc::new(Mutex::new(cpu_hotspot));
1343
1344 let migration_analyzer = MigrationAnalyzer::new(1000); let migration_analyzer_arc = Arc::new(Mutex::new(migration_analyzer));
1347
1348 let process_history = ProcessEventHistory::new(100); let process_history_arc = Arc::new(Mutex::new(process_history));
1351
1352 let dsq_monitor = DsqMonitor::new();
1354 let dsq_monitor_arc = Arc::new(Mutex::new(dsq_monitor));
1355
1356 let rate_monitor = EventRateMonitor::new(1000, 10); let rate_monitor_arc = Arc::new(Mutex::new(rate_monitor));
1359
1360 let wakeup_tracker = WakeupChainTracker::new(10); let wakeup_tracker_arc = Arc::new(Mutex::new(wakeup_tracker));
1363
1364 let event_buffer = EventBuffer::new();
1366 let event_buffer_arc = Arc::new(Mutex::new(event_buffer));
1367
1368 let softirq_analyzer = SoftirqAnalyzer::new(10000); let softirq_analyzer_arc = Arc::new(Mutex::new(softirq_analyzer));
1371
1372 let mut edm = EventDispatchManager::new(None, None);
1374 edm.register_bpf_handler(Box::new(BpfEventActionPublisher::new(action_tx.clone())));
1375
1376 let dropped_invalid_ts = Arc::new(std::sync::atomic::AtomicU64::new(0));
1378
1379 let shutdown = Arc::new(AtomicBool::new(false));
1381
1382 let events_map_fd = skel.maps.events.as_fd().as_raw_fd();
1384 let mut rb_fds = Vec::new();
1385 let mut rb_managers: Vec<SendRingBuffer> = Vec::new();
1386
1387 for rb_id in 0..rb_cnt {
1388 let rb_fd = unsafe {
1390 libbpf_sys::bpf_map_create(
1391 libbpf_sys::BPF_MAP_TYPE_RINGBUF,
1392 std::ptr::null(),
1393 0,
1394 0,
1395 (32 * 1024 * 1024) as u32, std::ptr::null(),
1397 )
1398 };
1399
1400 if rb_fd < 0 {
1401 bail!("Failed to create ringbuffer #{}: {}", rb_id, std::io::Error::last_os_error());
1402 }
1403
1404 let rb_id_u32 = rb_id as u32;
1406 let ret = unsafe {
1407 libbpf_sys::bpf_map_update_elem(
1408 events_map_fd,
1409 &rb_id_u32 as *const u32 as *const std::ffi::c_void,
1410 &rb_fd as *const i32 as *const std::ffi::c_void,
1411 libbpf_sys::BPF_NOEXIST.into(),
1412 )
1413 };
1414
1415 if ret < 0 {
1416 bail!("Failed to add ringbuffer #{} to hash-of-maps: {}", rb_id, std::io::Error::last_os_error());
1417 }
1418
1419 rb_fds.push(rb_fd);
1420 }
1421
1422 struct McpRingBufContext {
1427 dropped_invalid_ts: Arc<std::sync::atomic::AtomicU64>,
1428 shared_stats: Arc<std::sync::RwLock<scxtop::mcp::SharedStats>>,
1429 action_tx: mpsc::UnboundedSender<Action>,
1430 waker_wakee: Arc<std::sync::Mutex<scxtop::mcp::WakerWakeeAnalyzer>>,
1431 cpu_hotspot: Arc<std::sync::Mutex<scxtop::mcp::CpuHotspotAnalyzer>>,
1432 migration_analyzer: Arc<std::sync::Mutex<scxtop::mcp::MigrationAnalyzer>>,
1433 process_history: Arc<std::sync::Mutex<scxtop::mcp::ProcessEventHistory>>,
1434 rate_monitor: Arc<std::sync::Mutex<scxtop::mcp::EventRateMonitor>>,
1435 wakeup_tracker: Arc<std::sync::Mutex<scxtop::mcp::WakeupChainTracker>>,
1436 softirq_analyzer: Arc<std::sync::Mutex<scxtop::mcp::SoftirqAnalyzer>>,
1437 shutdown: Arc<AtomicBool>,
1438 }
1439
1440 extern "C" fn mcp_ring_buffer_callback(
1441 ctx: *mut std::ffi::c_void,
1442 data: *mut std::ffi::c_void,
1443 size: u64,
1444 ) -> std::ffi::c_int {
1445 unsafe {
1446 let ctx = &*(ctx as *const McpRingBufContext);
1447
1448 if ctx.shutdown.load(std::sync::atomic::Ordering::Relaxed) {
1450 return 0;
1451 }
1452
1453 let mut event = bpf_event::default();
1454 let copy_size = std::cmp::min(size as usize, std::mem::size_of::<bpf_event>());
1455 std::ptr::copy_nonoverlapping(
1456 data as *const u8,
1457 &mut event as *mut bpf_event as *mut u8,
1458 copy_size,
1459 );
1460
1461 if event.ts == 0 {
1463 ctx.dropped_invalid_ts.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1464 return 0;
1465 }
1466
1467 if let Ok(mut stats) = ctx.shared_stats.write() {
1469 stats.update_from_event(&event);
1470 }
1471
1472 use scxtop::bpf_intf;
1474 let event_type = event.r#type as u32;
1475
1476 if let Ok(mut analyzer) = ctx.waker_wakee.try_lock() {
1478 match event_type {
1479 bpf_intf::event_type_SCHED_WAKEUP => {
1480 let wakeup = &event.event.wakeup;
1481 analyzer.record_wakeup(
1482 wakeup.pid, wakeup.waker_pid,
1483 &String::from_utf8_lossy(&wakeup.waker_comm),
1484 event.cpu, event.ts,
1485 );
1486 }
1487 bpf_intf::event_type_SCHED_WAKING => {
1488 let waking = &event.event.waking;
1489 analyzer.record_wakeup(
1490 waking.pid, waking.waker_pid,
1491 &String::from_utf8_lossy(&waking.waker_comm),
1492 event.cpu, event.ts,
1493 );
1494 }
1495 bpf_intf::event_type_SCHED_SWITCH => {
1496 let switch = &event.event.sched_switch;
1497 analyzer.record_wakee_run(
1498 switch.next_pid,
1499 &String::from_utf8_lossy(&switch.next_comm),
1500 event.cpu, event.ts,
1501 );
1502 }
1503 _ => {}
1504 }
1505 }
1506
1507 if let Ok(mut analyzer) = ctx.cpu_hotspot.try_lock() {
1509 let json = serde_json::json!({
1510 "cpu": event.cpu, "ts": event.ts, "event_type": event_type
1511 });
1512 analyzer.record_event(&json);
1513 }
1514
1515 if let Ok(mut analyzer) = ctx.migration_analyzer.try_lock() {
1517 if event_type == bpf_intf::event_type_SCHED_MIGRATE {
1518 let migrate = &event.event.migrate;
1519 let json = serde_json::json!({
1520 "pid": migrate.pid, "from_cpu": event.cpu,
1521 "to_cpu": migrate.dest_cpu, "ts": event.ts
1522 });
1523 analyzer.record_migration(&json, event.ts);
1524 }
1525 }
1526
1527 if let Ok(mut history) = ctx.process_history.try_lock() {
1529 let (event_type_str, pid) = match event_type {
1530 bpf_intf::event_type_SCHED_SWITCH => ("sched_switch", event.event.sched_switch.next_pid),
1531 bpf_intf::event_type_SCHED_WAKEUP => ("sched_wakeup", event.event.wakeup.pid),
1532 bpf_intf::event_type_SCHED_WAKING => ("sched_waking", event.event.waking.pid),
1533 bpf_intf::event_type_SCHED_MIGRATE => ("sched_migrate", event.event.migrate.pid),
1534 bpf_intf::event_type_EXIT => ("exit", event.event.exit.pid),
1535 bpf_intf::event_type_EXEC => ("exec", event.event.exec.pid),
1536 _ => ("other", 0),
1537 };
1538 if pid > 0 {
1539 history.record_event(
1540 pid, event_type_str.to_string(), Some(event.cpu),
1541 serde_json::json!({"ts": event.ts}), event.ts,
1542 );
1543 }
1544 }
1545
1546 if let Ok(mut monitor) = ctx.rate_monitor.try_lock() {
1548 let event_type_str = match event_type {
1549 bpf_intf::event_type_SCHED_SWITCH => "sched_switch",
1550 bpf_intf::event_type_SCHED_WAKEUP => "sched_wakeup",
1551 bpf_intf::event_type_SCHED_WAKING => "sched_waking",
1552 bpf_intf::event_type_SCHED_MIGRATE => "sched_migrate",
1553 _ => "other",
1554 };
1555 monitor.record_event(event_type_str.to_string(), event.ts);
1556 }
1557
1558 if let Ok(mut tracker) = ctx.wakeup_tracker.try_lock() {
1560 if event_type == bpf_intf::event_type_SCHED_WAKEUP || event_type == bpf_intf::event_type_SCHED_WAKING {
1561 let (pid, waker_pid) = if event_type == bpf_intf::event_type_SCHED_WAKEUP {
1562 (event.event.wakeup.pid, event.event.wakeup.waker_pid)
1563 } else {
1564 (event.event.waking.pid, event.event.waking.waker_pid)
1565 };
1566 let json = serde_json::json!({
1567 "pid": pid, "waker_pid": waker_pid, "ts": event.ts, "cpu": event.cpu
1568 });
1569 tracker.record_wakeup(&json, event.ts);
1570 }
1571 }
1572
1573 if let Ok(mut analyzer) = ctx.softirq_analyzer.try_lock() {
1575 if event_type == bpf_intf::event_type_SOFTIRQ {
1576 let softirq = &event.event.softirq;
1577 let json = serde_json::json!({
1578 "type": "softirq", "pid": softirq.pid, "softirq_nr": softirq.softirq_nr,
1579 "entry_ts": softirq.entry_ts, "exit_ts": softirq.exit_ts, "cpu": event.cpu,
1580 });
1581 analyzer.record_event(&json);
1582 }
1583 }
1584
1585 let mut edm = EventDispatchManager::new(None, None);
1587 edm.register_bpf_handler(Box::new(BpfEventActionPublisher::new(ctx.action_tx.clone())));
1588 let _ = edm.on_event(&event);
1589 }
1590 0
1591 }
1592
1593 for rb_fd in &rb_fds {
1594 let ctx = Box::new(McpRingBufContext {
1595 dropped_invalid_ts: dropped_invalid_ts.clone(),
1596 shared_stats: shared_stats_for_event_handler.clone(),
1597 action_tx: action_tx.clone(),
1598 waker_wakee: waker_wakee_arc.clone(),
1599 cpu_hotspot: cpu_hotspot_arc.clone(),
1600 migration_analyzer: migration_analyzer_arc.clone(),
1601 process_history: process_history_arc.clone(),
1602 rate_monitor: rate_monitor_arc.clone(),
1603 wakeup_tracker: wakeup_tracker_arc.clone(),
1604 softirq_analyzer: softirq_analyzer_arc.clone(),
1605 shutdown: shutdown.clone(),
1606 });
1607 let ctx_ptr = Box::into_raw(ctx) as *mut std::ffi::c_void;
1608
1609 let rb_ptr = unsafe {
1610 libbpf_sys::ring_buffer__new(
1611 *rb_fd,
1612 Some(mcp_ring_buffer_callback),
1613 ctx_ptr,
1614 std::ptr::null(),
1615 )
1616 };
1617
1618 if rb_ptr.is_null() {
1619 unsafe { let _ = Box::from_raw(ctx_ptr as *mut McpRingBufContext); }
1620 bail!("Failed to create ring buffer manager");
1621 }
1622
1623 rb_managers.push(SendRingBuffer(rb_ptr));
1624 }
1625
1626 let (initial_links, _warnings) = attach_progs(&mut skel)?;
1628
1629 if !initial_links.is_empty() || is_root() {
1631 skel.progs
1632 .scxtop_init
1633 .test_run(ProgramInput::default())
1634 .ok();
1635 }
1636
1637 use scxtop::mcp::BpfPerfEventAttacher;
1641 let perf_program_addr = &skel.progs.perf_sample_handler as *const _ as usize;
1643
1644 let bpf_attacher = BpfPerfEventAttacher::new(move |perf_fd| {
1645 unsafe {
1648 let prog =
1650 &*(perf_program_addr as *const libbpf_rs::ProgramImpl<libbpf_rs::Mut>);
1651 prog.attach_perf_event(perf_fd)
1652 .map(|link| Box::new(link) as Box<dyn std::any::Any + Send>)
1653 .map_err(|e| anyhow::anyhow!("Failed to attach perf event: {}", e))
1654 }
1655 });
1656 let bpf_attacher_arc = Arc::new(bpf_attacher);
1657
1658 use scxtop::mcp::{AttachCallback, EventControl, StatsControlCommand};
1660 let mut event_control_instance = EventControl::new();
1661
1662 let skel_ptr = &mut skel as *mut _ as usize;
1665 let attach_callback: AttachCallback = Box::new(move |program_names: &[&str]| {
1666 unsafe {
1667 let skel_ref = &mut *(skel_ptr as *mut BpfSkel);
1668 attach_progs_selective(skel_ref, program_names).map(|(links, _)| links)
1669 }
1670 });
1671
1672 event_control_instance.set_bpf_links(initial_links, attach_callback);
1675 event_control_instance.disable_event_tracking()?;
1676 info!("BPF programs detached by default - use control_event_tracking to enable");
1677
1678 let (stats_tx, stats_rx) = mpsc::unbounded_channel::<StatsControlCommand>();
1680 event_control_instance.set_stats_control_channel(stats_tx);
1681
1682 let event_control = Arc::new(event_control_instance);
1684
1685 let config = Config::default_config();
1687 let scheduler =
1688 read_file_string(SCHED_NAME_PATH).unwrap_or_else(|_| "".to_string());
1689 let mut app = App::new(
1690 config,
1691 scheduler,
1692 100,
1693 mcp_args.process_id,
1694 mcp_args.layered,
1695 action_tx.clone(),
1696 skel,
1697 )?;
1698
1699 use scxtop::mcp::AnalyzerControl;
1701
1702 let mut analyzer_control = AnalyzerControl::new();
1703 analyzer_control.set_event_control(event_control.clone());
1704
1705 analyzer_control.set_event_buffer(event_buffer_arc.clone());
1707 analyzer_control.set_latency_tracker(latency_tracker_arc.clone());
1708 analyzer_control.set_cpu_hotspot_analyzer(cpu_hotspot_arc.clone());
1709 analyzer_control.set_migration_analyzer(migration_analyzer_arc.clone());
1710 analyzer_control.set_process_history(process_history_arc.clone());
1711 analyzer_control.set_dsq_monitor(dsq_monitor_arc.clone());
1712 analyzer_control.set_rate_monitor(rate_monitor_arc.clone());
1713 analyzer_control.set_wakeup_tracker(wakeup_tracker_arc.clone());
1714 analyzer_control.set_waker_wakee_analyzer(waker_wakee_arc.clone());
1715 analyzer_control.set_softirq_analyzer(softirq_analyzer_arc.clone());
1716
1717 let analyzer_control = Arc::new(Mutex::new(analyzer_control));
1719
1720 use std::collections::HashMap;
1722 let trace_cache = Arc::new(Mutex::new(HashMap::new()));
1723
1724 let mut server = McpServer::new(mcp_config)
1726 .with_topology(topo_arc)
1727 .setup_scheduler_resource()
1728 .setup_profiling_resources()
1729 .with_bpf_perf_attacher(bpf_attacher_arc)
1730 .with_shared_stats(shared_stats.clone())
1731 .with_stats_client(None)
1732 .with_event_control(event_control.clone())
1733 .with_analyzer_control(analyzer_control.clone())
1734 .with_trace_cache(trace_cache)
1735 .setup_stats_resources();
1736
1737 let _event_stream_rx = server.enable_event_streaming();
1739 let resources = server.get_resources_handle();
1740
1741 let bpf_stats = server.get_bpf_stats_collector();
1743
1744 let perf_profiler = server.get_perf_profiler();
1746
1747 let shutdown_poll = shutdown.clone();
1749
1750 let mut ringbuffer_handles = Vec::new();
1751 for (rb_id, rb) in rb_managers.into_iter().enumerate() {
1752 let stop_poll_clone = shutdown_poll.clone();
1753 ringbuffer_handles.push(tokio::spawn(async move {
1754 loop {
1755 rb.poll(1);
1757 if stop_poll_clone.load(Ordering::Relaxed) {
1758 rb.consume();
1760 rb.free();
1762 debug!("ringbuffer #{} polling stopped", rb_id);
1763 break;
1764 }
1765 }
1766 }));
1767 }
1768
1769 if let Some(collector) = bpf_stats {
1772 let shutdown_stats = shutdown.clone();
1773 let mut stats_rx_task = stats_rx;
1774 tokio::spawn(async move {
1775 let mut running = false;
1776 let mut interval_ms = 100u64;
1777 let mut interval = tokio::time::interval(Duration::from_millis(interval_ms));
1778
1779 loop {
1780 tokio::select! {
1781 Some(cmd) = stats_rx_task.recv() => {
1783 match cmd {
1784 StatsControlCommand::Start(new_interval_ms) => {
1785 running = true;
1786 interval_ms = new_interval_ms;
1787 interval = tokio::time::interval(Duration::from_millis(interval_ms));
1788 info!("Stats collection started with {}ms interval", interval_ms);
1789 }
1790 StatsControlCommand::Stop => {
1791 running = false;
1792 info!("Stats collection stopped");
1793 }
1794 }
1795 }
1796
1797 _ = interval.tick(), if running => {
1799 if shutdown_stats.load(Ordering::Relaxed) {
1800 break;
1801 }
1802 let _ = collector.collect_sample();
1803 }
1804
1805 _ = tokio::time::sleep(Duration::from_millis(100)), if !running => {
1807 if shutdown_stats.load(Ordering::Relaxed) {
1808 break;
1809 }
1810 }
1811 }
1812 }
1813 });
1814 }
1815
1816 info!("MCP daemon started, processing BPF events");
1817
1818 let mut mcp_server_task = Box::pin(server.run_async());
1820 let mcp_result;
1821 loop {
1822 tokio::select! {
1823 result = &mut mcp_server_task => {
1825 info!("MCP server exited");
1826 shutdown.store(true, Ordering::Relaxed);
1827 mcp_result = result;
1828 break;
1829 }
1830
1831 Some(action) = action_rx.recv() => {
1833 if matches!(action, Action::Quit) {
1835 info!("Received quit action");
1836 shutdown.store(true, Ordering::Relaxed);
1837 mcp_result = Ok(());
1838 break;
1839 }
1840
1841 if let Some(ref profiler) = perf_profiler {
1843 if let Action::PerfSample(ref perf_sample) = action {
1844 use scxtop::mcp::RawSample;
1845 profiler.add_sample(RawSample {
1846 address: perf_sample.instruction_pointer,
1847 pid: perf_sample.pid,
1848 cpu_id: perf_sample.cpu_id,
1849 is_kernel: perf_sample.is_kernel,
1850 kernel_stack: perf_sample.kernel_stack.clone(),
1851 user_stack: perf_sample.user_stack.clone(),
1852 layer_id: if perf_sample.layer_id >= 0 {
1853 Some(perf_sample.layer_id)
1854 } else {
1855 None
1856 },
1857 });
1858 }
1859 }
1860
1861 let _ = app.handle_action(&action);
1863
1864 if let Some(event) = action_to_mcp_event(&action) {
1866 let _ = resources.push_event(event);
1867 }
1868 }
1869 }
1870 }
1871
1872 debug!("waiting for {} ringbuffer tasks to complete", ringbuffer_handles.len());
1874 for handle in ringbuffer_handles {
1875 if let Err(e) = handle.await {
1876 log::error!("Ringbuffer task panicked: {e}");
1877 }
1878 }
1879
1880 mcp_result
1882 })
1883 } else {
1884 use std::collections::HashMap;
1886 use std::sync::Mutex;
1887 let trace_cache = Arc::new(Mutex::new(HashMap::new()));
1888 let mut server = McpServer::new(mcp_config)
1889 .with_topology(topo_arc)
1890 .setup_scheduler_resource()
1891 .setup_profiling_resources()
1892 .with_stats_client(None)
1893 .with_trace_cache(trace_cache)
1894 .setup_stats_resources();
1895 server.run_blocking()
1896 }
1897}
1898
1899fn main() -> Result<()> {
1900 let args = Cli::parse();
1901
1902 match &args.command.unwrap_or(Commands::Tui(args.tui)) {
1903 Commands::Tui(tui_args) => {
1904 run_tui(tui_args)?;
1905 }
1906 Commands::Trace(trace_args) => {
1907 run_trace(trace_args)?;
1908 }
1909 Commands::Mcp(mcp_args) => {
1910 run_mcp(mcp_args)?;
1911 }
1912 Commands::GenerateCompletions { shell, output } => {
1913 generate_completions(Cli::command(), *shell, output.clone())
1914 .unwrap_or_else(|_| panic!("Failed to generate completions for {shell}"));
1915 }
1916 }
1917 Ok(())
1918}