1use scx_utils::compat;
7use scxtop::bpf_skel::types::bpf_event;
8use scxtop::cli::{generate_completions, Cli, Commands, TraceArgs, TuiArgs};
9use scxtop::config::Config;
10use scxtop::edm::{ActionHandler, BpfEventActionPublisher, BpfEventHandler, EventDispatchManager};
11use scxtop::layered_util;
12use scxtop::mangoapp::poll_mangoapp;
13use scxtop::search;
14use scxtop::tracer::Tracer;
15use scxtop::util::{
16 check_bpf_capability, get_capability_warning_message, get_clock_value, is_root,
17 read_file_string,
18};
19use scxtop::Action;
20use scxtop::App;
21use scxtop::CpuStatTracker;
22use scxtop::Event;
23use scxtop::Key;
24use scxtop::KeyMap;
25use scxtop::MemStatSnapshot;
26use scxtop::PerfettoTraceManager;
27use scxtop::SystemStatAction;
28use scxtop::Tui;
29use scxtop::SCHED_NAME_PATH;
30use scxtop::{available_kprobe_events, UpdateColVisibilityAction};
31use scxtop::{bpf_skel::*, AppState};
32
33use anyhow::anyhow;
34use anyhow::bail;
35use anyhow::Result;
36use clap::{CommandFactory, Parser};
37use futures::future::join_all;
38use libbpf_rs::libbpf_sys;
39use libbpf_rs::num_possible_cpus;
40use libbpf_rs::skel::OpenSkel;
41use libbpf_rs::skel::SkelBuilder;
42use libbpf_rs::Link;
43use libbpf_rs::MapCore;
44use libbpf_rs::ProgramInput;
45use libbpf_rs::UprobeOpts;
46use log::debug;
47use log::info;
48use ratatui::crossterm::event::{KeyCode::Char, KeyEvent};
49use simplelog::{
50 ColorChoice, Config as SimplelogConfig, LevelFilter, TermLogger, TerminalMode, WriteLogger,
51};
52use std::ffi::CString;
53use std::fs::File;
54use std::mem::MaybeUninit;
55use std::os::fd::AsFd;
56use std::os::fd::AsRawFd;
57use std::str::FromStr;
58use std::sync::atomic::AtomicBool;
59use std::sync::atomic::Ordering;
60use std::sync::Arc;
61use std::time::Duration;
62use sysinfo::System;
63use tokio::sync::mpsc;
64
65struct SendRingBuffer(*mut libbpf_sys::ring_buffer);
68unsafe impl Send for SendRingBuffer {}
69
70impl SendRingBuffer {
71 fn poll(&self, timeout: i32) -> i32 {
72 unsafe { libbpf_sys::ring_buffer__poll(self.0, timeout) }
73 }
74
75 fn consume(&self) -> i32 {
76 unsafe { libbpf_sys::ring_buffer__consume(self.0) }
77 }
78
79 fn free(self) {
80 unsafe { libbpf_sys::ring_buffer__free(self.0) }
81 }
82}
83
84fn get_action(app: &App, keymap: &KeyMap, event: Event) -> Action {
85 match event {
86 Event::Error => Action::None,
87 Event::Tick => Action::Tick,
88 Event::TickRateChange(tick_rate_ms) => {
89 Action::TickRateChange(std::time::Duration::from_millis(tick_rate_ms))
90 }
91 Event::Key(key) => handle_key_event(app, keymap, key),
92 Event::Paste(paste) => handle_input_entry(app, paste).unwrap_or(Action::None),
93 _ => Action::None,
94 }
95}
96
97fn handle_key_event(app: &App, keymap: &KeyMap, key: KeyEvent) -> Action {
98 match key.code {
99 Char(c) => {
100 if let Some(action) = handle_input_entry(app, c.to_string()) {
102 action
103 } else {
104 match (app.state(), c) {
106 (AppState::BpfProgramDetail, 'p') => Action::ToggleBpfPerfSampling,
108 _ => keymap.action(&Key::Char(c)),
110 }
111 }
112 }
113 _ => keymap.action(&Key::Code(key.code)),
114 }
115}
116
117fn handle_input_entry(app: &App, s: String) -> Option<Action> {
118 match app.state() {
119 AppState::PerfEvent | AppState::KprobeEvent => Some(Action::InputEntry(s)),
120 AppState::Default
121 | AppState::Llc
122 | AppState::Node
123 | AppState::Process
124 | AppState::Memory
125 | AppState::PerfTop
126 | AppState::BpfPrograms
127 if app.filtering() =>
128 {
129 Some(Action::InputEntry(s))
130 }
131 _ => None,
132 }
133}
134
135fn attach_progs(skel: &mut BpfSkel) -> Result<(Vec<Link>, Vec<String>)> {
137 attach_progs_selective(skel, &[])
138}
139
140fn attach_progs_selective(
143 skel: &mut BpfSkel,
144 program_names: &[&str],
145) -> Result<(Vec<Link>, Vec<String>)> {
146 let mut links = Vec::new();
147 let mut warnings = Vec::new();
148
149 let has_bpf_cap = check_bpf_capability();
151
152 if !has_bpf_cap {
153 warnings
154 .push("BPF programs cannot be attached - scheduler monitoring disabled".to_string());
155 warnings.push("Try running as root or configure BPF permissions".to_string());
156 return Ok((links, warnings));
157 }
158
159 let attach_all = program_names.is_empty();
160
161 let should_attach = |name: &str| -> bool { attach_all || program_names.contains(&name) };
163
164 macro_rules! safe_attach {
166 ($prog:expr, $name:literal) => {
167 if should_attach($name) {
168 match $prog.attach() {
169 Ok(link) => {
170 links.push(link);
171 }
172 Err(e) => {
173 if is_root() {
174 return Err(anyhow!(
176 "Failed to attach {} (running as root): {}",
177 $name,
178 e
179 ));
180 } else {
181 warnings.push(format!("Failed to attach {}: {}", $name, e));
182 }
183 }
184 }
185 }
186 };
187 }
188
189 safe_attach!(skel.progs.on_sched_cpu_perf, "sched_cpu_perf");
191 safe_attach!(skel.progs.scx_sched_reg, "scx_sched_reg");
192 safe_attach!(skel.progs.scx_sched_unreg, "scx_sched_unreg");
193 safe_attach!(skel.progs.on_sched_switch, "on_sched_switch");
194 safe_attach!(skel.progs.on_sched_wakeup, "on_sched_wakeup");
195 safe_attach!(skel.progs.on_sched_wakeup_new, "sched_wakeup_new");
196 safe_attach!(skel.progs.on_sched_waking, "on_sched_waking");
197 safe_attach!(skel.progs.on_sched_migrate_task, "on_sched_migrate_task");
198 safe_attach!(skel.progs.on_sched_fork, "sched_fork");
199 safe_attach!(skel.progs.on_sched_exec, "sched_exec");
200 safe_attach!(skel.progs.on_sched_exit, "sched_exit");
201
202 if compat::ksym_exists("scx_bpf_dsq_insert_vtime")? {
204 safe_attach!(skel.progs.scx_insert_vtime, "scx_insert_vtime");
205 safe_attach!(skel.progs.scx_insert, "scx_insert");
206 safe_attach!(skel.progs.scx_dsq_move, "scx_dsq_move");
207 safe_attach!(skel.progs.scx_dsq_move_set_vtime, "scx_dsq_move_set_vtime");
208 safe_attach!(skel.progs.scx_dsq_move_set_slice, "scx_dsq_move_set_slice");
209 } else {
210 safe_attach!(skel.progs.scx_dispatch, "scx_dispatch");
211 safe_attach!(skel.progs.scx_dispatch_vtime, "scx_dispatch_vtime");
212 safe_attach!(
213 skel.progs.scx_dispatch_from_dsq_set_vtime,
214 "scx_dispatch_from_dsq_set_vtime"
215 );
216 safe_attach!(
217 skel.progs.scx_dispatch_from_dsq_set_slice,
218 "scx_dispatch_from_dsq_set_slice"
219 );
220 safe_attach!(skel.progs.scx_dispatch_from_dsq, "scx_dispatch_from_dsq");
221 }
222
223 safe_attach!(skel.progs.on_cpuhp_enter, "cpuhp_enter");
225 safe_attach!(skel.progs.on_cpuhp_exit, "cpuhp_exit");
226 safe_attach!(skel.progs.on_softirq_entry, "on_softirq_entry");
227 safe_attach!(skel.progs.on_softirq_exit, "on_softirq_exit");
228
229 if links.is_empty() && !is_root() {
231 warnings.extend(get_capability_warning_message());
232 }
233
234 Ok((links, warnings))
235}
236
237fn run_trace(trace_args: &TraceArgs) -> Result<()> {
238 if !is_root() {
240 return Err(anyhow!(
241 "Trace functionality requires root privileges. Please run as root"
242 ));
243 }
244
245 TermLogger::init(
246 match trace_args.verbose {
247 0 => simplelog::LevelFilter::Info,
248 1 => simplelog::LevelFilter::Debug,
249 _ => simplelog::LevelFilter::Trace,
250 },
251 SimplelogConfig::default(),
252 TerminalMode::Mixed,
253 ColorChoice::Auto,
254 )?;
255
256 let mut kprobe_events = available_kprobe_events()?;
257 kprobe_events.sort();
258 search::sorted_contains_all(&kprobe_events, &trace_args.kprobes)
259 .then_some(())
260 .ok_or_else(|| anyhow!("Invalid kprobe events"))?;
261
262 let config = Config::default_config();
263 let worker_threads = config.worker_threads() as usize;
264
265 let num_cpus = num_possible_cpus()?;
267 let rb_cnt = scxtop::topology::calculate_default_ringbuf_count(num_cpus);
268
269 let required_threads = std::cmp::max(rb_cnt + 4, worker_threads);
272
273 info!(
274 "Creating tokio runtime with {} worker threads for {} ringbuffers",
275 required_threads, rb_cnt
276 );
277
278 tokio::runtime::Builder::new_multi_thread()
279 .enable_all()
280 .worker_threads(required_threads)
281 .build()
282 .unwrap()
283 .block_on(async {
284 let (action_tx, mut action_rx) = mpsc::unbounded_channel();
285
286 let mut open_object = MaybeUninit::uninit();
288 let mut builder = BpfSkelBuilder::default();
289 if trace_args.verbose > 2 {
290 builder.obj_builder.debug(true);
291 }
292
293 let mut skel = builder.open(&mut open_object)?;
294 compat::cond_kprobe_enable("gpu_memory_total", &skel.progs.on_gpu_memory_total)?;
295 compat::cond_kprobe_enable("hw_pressure_update", &skel.progs.on_hw_pressure_update)?;
296 compat::cond_tracepoint_enable("sched:sched_process_wait", &skel.progs.on_sched_wait)?;
297 compat::cond_tracepoint_enable("sched:sched_process_hang", &skel.progs.on_sched_hang)?;
298
299 let num_cpus = num_possible_cpus()?;
301 let rb_cnt = scxtop::topology::calculate_default_ringbuf_count(num_cpus);
302 let rb_cpu_mapping = scxtop::topology::setup_cpu_to_ringbuf_mapping(rb_cnt, num_cpus)?;
303
304 log::info!("Using {} ringbuffers for {} CPUs", rb_cnt, num_cpus);
305
306 let cpu_cnt_pow2 = num_cpus.next_power_of_two();
308 skel.maps.rodata_data.as_mut().unwrap().rb_cpu_map_mask = (cpu_cnt_pow2 - 1) as u64;
309
310 skel.maps
312 .data_rb_cpu_map
313 .set_max_entries(cpu_cnt_pow2 as u32)?;
314
315 skel.maps.events.set_max_entries(rb_cnt as u32)?;
317
318 let mut skel = skel.load()?;
320
321 for (cpu_id, &rb_id) in rb_cpu_mapping.iter().enumerate() {
323 if cpu_id < cpu_cnt_pow2 {
324 skel.maps.data_rb_cpu_map.update(
325 &(cpu_id as u32).to_ne_bytes(),
326 &rb_id.to_ne_bytes(),
327 libbpf_rs::MapFlags::ANY,
328 )?;
329 }
330 }
331
332 skel.maps.data_data.as_mut().unwrap().enable_bpf_events = false;
333
334 let mut links = vec![
336 skel.progs.on_sched_cpu_perf.attach()?,
337 skel.progs.scx_sched_reg.attach()?,
338 skel.progs.scx_sched_unreg.attach()?,
339 skel.progs.on_sched_switch.attach()?,
340 skel.progs.on_sched_wakeup.attach()?,
341 skel.progs.on_sched_wakeup_new.attach()?,
342 skel.progs.on_sched_waking.attach()?,
343 skel.progs.on_sched_migrate_task.attach()?,
344 skel.progs.on_sched_fork.attach()?,
345 skel.progs.on_sched_exec.attach()?,
346 skel.progs.on_sched_exit.attach()?,
347 ];
348
349 if compat::ksym_exists("scx_bpf_dsq_insert_vtime")? {
351 if let Ok(link) = skel.progs.scx_insert_vtime.attach() {
352 links.push(link);
353 }
354 if let Ok(link) = skel.progs.scx_insert.attach() {
355 links.push(link);
356 }
357 if let Ok(link) = skel.progs.scx_dsq_move.attach() {
358 links.push(link);
359 }
360 if let Ok(link) = skel.progs.scx_dsq_move_set_vtime.attach() {
361 links.push(link);
362 }
363 if let Ok(link) = skel.progs.scx_dsq_move_set_slice.attach() {
364 links.push(link);
365 }
366 } else {
367 if let Ok(link) = skel.progs.scx_dispatch.attach() {
368 links.push(link);
369 }
370 if let Ok(link) = skel.progs.scx_dispatch_vtime.attach() {
371 links.push(link);
372 }
373 if let Ok(link) = skel.progs.scx_dispatch_from_dsq_set_vtime.attach() {
374 links.push(link);
375 }
376 if let Ok(link) = skel.progs.scx_dispatch_from_dsq_set_slice.attach() {
377 links.push(link);
378 }
379 if let Ok(link) = skel.progs.scx_dispatch_from_dsq.attach() {
380 links.push(link);
381 }
382 }
383 if let Ok(link) = skel.progs.on_cpuhp_enter.attach() {
384 links.push(link);
385 }
386 if let Ok(link) = skel.progs.on_cpuhp_exit.attach() {
387 links.push(link);
388 }
389 if let Ok(link) = skel.progs.on_softirq_entry.attach() {
390 links.push(link);
391 }
392 if let Ok(link) = skel.progs.on_softirq_exit.attach() {
393 links.push(link);
394 }
395
396 let dropped_invalid_ts = Arc::new(std::sync::atomic::AtomicU64::new(0));
398
399 let shutdown = Arc::new(AtomicBool::new(false));
401
402 let events_map_fd = skel.maps.events.as_fd().as_raw_fd();
404 let mut rb_fds = Vec::new();
405 let mut rb_managers: Vec<SendRingBuffer> = Vec::new();
406
407 for rb_id in 0..rb_cnt {
408 let rb_fd = unsafe {
410 libbpf_sys::bpf_map_create(
411 libbpf_sys::BPF_MAP_TYPE_RINGBUF,
412 std::ptr::null(),
413 0,
414 0,
415 (32 * 1024 * 1024) as u32, std::ptr::null(),
417 )
418 };
419
420 if rb_fd < 0 {
421 bail!(
422 "Failed to create ringbuffer #{}: {}",
423 rb_id,
424 std::io::Error::last_os_error()
425 );
426 }
427
428 let rb_id_u32 = rb_id as u32;
430 let ret = unsafe {
431 libbpf_sys::bpf_map_update_elem(
432 events_map_fd,
433 &rb_id_u32 as *const u32 as *const std::ffi::c_void,
434 &rb_fd as *const i32 as *const std::ffi::c_void,
435 libbpf_sys::BPF_NOEXIST.into(),
436 )
437 };
438
439 if ret < 0 {
440 bail!(
441 "Failed to add ringbuffer #{} to hash-of-maps: {}",
442 rb_id,
443 std::io::Error::last_os_error()
444 );
445 }
446
447 rb_fds.push(rb_fd);
448 }
449
450 struct RingBufContext {
453 dropped_invalid_ts: Arc<std::sync::atomic::AtomicU64>,
454 action_tx: mpsc::UnboundedSender<Action>,
455 shutdown: Arc<AtomicBool>,
456 }
457
458 extern "C" fn ring_buffer_sample_callback(
459 ctx: *mut std::ffi::c_void,
460 data: *mut std::ffi::c_void,
461 size: u64,
462 ) -> std::ffi::c_int {
463 unsafe {
464 let ctx = &*(ctx as *const RingBufContext);
465
466 if ctx.shutdown.load(std::sync::atomic::Ordering::Relaxed) {
468 return 0;
469 }
470
471 let data_slice = std::slice::from_raw_parts(data as *const u8, size as usize);
472
473 let mut event = bpf_event::default();
474 if plain::copy_from_bytes(&mut event, data_slice).is_err() {
475 return 0;
476 }
477
478 if event.ts == 0 {
480 ctx.dropped_invalid_ts
481 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
482 return 0;
483 }
484
485 let mut edm = EventDispatchManager::new(None, None);
486 edm.register_bpf_handler(Box::new(BpfEventActionPublisher::new(
487 ctx.action_tx.clone(),
488 )));
489 let _ = edm.on_event(&event);
490 }
491 0
492 }
493
494 for rb_fd in &rb_fds {
495 let ctx = Box::new(RingBufContext {
496 dropped_invalid_ts: dropped_invalid_ts.clone(),
497 action_tx: action_tx.clone(),
498 shutdown: shutdown.clone(),
499 });
500 let ctx_ptr = Box::into_raw(ctx) as *mut std::ffi::c_void;
501
502 let rb_ptr = unsafe {
503 libbpf_sys::ring_buffer__new(
504 *rb_fd,
505 Some(ring_buffer_sample_callback),
506 ctx_ptr,
507 std::ptr::null(),
508 )
509 };
510
511 if rb_ptr.is_null() {
512 unsafe {
513 let _ = Box::from_raw(ctx_ptr as *mut RingBufContext);
514 }
515 bail!("Failed to create ring buffer manager");
516 }
517
518 rb_managers.push(SendRingBuffer(rb_ptr));
519 }
520
521 let stop_poll = shutdown.clone();
523 let stop_stats = shutdown.clone();
524
525 let mut ringbuffer_handles = Vec::new();
526 let mut producer_handles = Vec::new();
527
528 for (rb_id, rb) in rb_managers.into_iter().enumerate() {
531 let stop_poll_clone = stop_poll.clone();
532 ringbuffer_handles.push(tokio::task::spawn_blocking(move || {
533 info!("ringbuffer #{} task started", rb_id);
534 let mut poll_count = 0;
535 loop {
536 rb.poll(1);
538 poll_count += 1;
539 if stop_poll_clone.load(Ordering::Relaxed) {
540 info!(
541 "ringbuffer #{} received shutdown after {} polls",
542 rb_id, poll_count
543 );
544 let consumed = rb.consume();
546 info!("ringbuffer #{} consumed {} events", rb_id, consumed);
547 rb.free();
549 info!("ringbuffer #{} freed", rb_id);
550 break;
551 }
552 }
553 info!("ringbuffer #{} exiting", rb_id);
554 }));
555 }
556 info!(
557 "spawned {} ringbuffer polling tasks",
558 ringbuffer_handles.len()
559 );
560
561 if trace_args.system_stats {
562 let mut cpu_stat_tracker = CpuStatTracker::default();
563 let mut mem_stats = MemStatSnapshot::default();
564 let mut system = System::new_all();
565 let action_tx_clone = action_tx.clone();
566
567 producer_handles.push(tokio::spawn(async move {
568 info!("stats task started");
569 let mut stats_count = 0;
570 loop {
571 if stop_stats.load(Ordering::Relaxed) {
572 info!("stats task received shutdown after {} samples", stats_count);
573 break;
574 }
575 let ts = get_clock_value(libc::CLOCK_BOOTTIME);
576
577 cpu_stat_tracker
578 .update(&mut system)
579 .expect("Failed to update cpu stats");
580
581 mem_stats.update().expect("Failed to update mem stats");
582
583 let sys_stat_action = Action::SystemStat(SystemStatAction {
584 ts,
585 cpu_data_prev: cpu_stat_tracker.prev.clone(),
586 cpu_data_current: cpu_stat_tracker.current.clone(),
587 mem_info: mem_stats.clone(),
588 });
589 action_tx_clone
590 .send(sys_stat_action)
591 .expect("Failed to send CpuStat action");
592
593 stats_count += 1;
594 tokio::time::sleep(Duration::from_millis(100)).await;
595 }
596 info!("stats task exiting");
597 }));
598 }
599
600 let trace_file_prefix = config.trace_file_prefix().to_string();
601 let trace_file = trace_args.output_file.clone();
602 let mut trace_manager = PerfettoTraceManager::new(trace_file_prefix, None);
603
604 info!("starting trace for {}ms", trace_args.trace_ms);
605 trace_manager.start()?;
606 let mut tracer = Tracer::new(skel);
607 tracer.trace(&trace_args.kprobes)?;
608
609 let shutdown_trace = shutdown.clone();
610 let trace_handle = tokio::spawn(async move {
611 debug!("trace generation task started");
612 let mut count = 0;
613 let mut last_log = std::time::Instant::now();
614 loop {
615 tokio::select! {
616 _ = tokio::time::sleep(Duration::from_millis(100)) => {
618 if shutdown_trace.load(Ordering::Relaxed) {
619 info!("trace task: shutdown requested, draining remaining events");
620 while let Ok(a) = action_rx.try_recv() {
622 count += 1;
623 trace_manager
624 .on_action(&a)
625 .expect("Action should have been resolved");
626 }
627 info!("trace task: stopping trace manager");
628 trace_manager.stop(trace_file, None).unwrap();
629 info!("trace file compiled, collected {count} events");
630 break;
631 }
632 }
633 action = action_rx.recv() => {
634 if let Some(a) = action {
635 count += 1;
636 if last_log.elapsed() > std::time::Duration::from_secs(1) {
637 debug!("trace task: {} events processed", count);
638 last_log = std::time::Instant::now();
639 }
640 trace_manager
641 .on_action(&a)
642 .expect("Action should have been resolved");
643 } else {
644 info!("trace task: channel closed, stopping trace manager");
645 trace_manager.stop(trace_file, None).unwrap();
646 info!("trace file compiled, collected {count} events");
647 break;
648 }
649 }
650 }
651 }
652 info!("trace task: exiting");
653 });
654
655 info!("waiting for trace duration ({}ms)", trace_args.trace_ms);
656 tokio::time::sleep(Duration::from_millis(trace_args.trace_ms)).await;
657 info!("trace duration complete, beginning shutdown");
658
659 info!("shutdown: clearing BPF links");
667 tracer.clear_links()?;
668 info!("shutdown: BPF links cleared");
669 drop(links);
670 info!("shutdown: links dropped");
671
672 info!("shutdown: setting shutdown flag");
673 shutdown.store(true, Ordering::Relaxed);
674 info!(
675 "shutdown: flag set, waiting for {} ringbuffer tasks",
676 ringbuffer_handles.len()
677 );
678
679 let results = join_all(ringbuffer_handles).await;
681 info!("shutdown: all {} ringbuffer tasks joined", results.len());
682 for (idx, result) in results.iter().enumerate() {
683 if let Err(e) = result {
684 eprintln!("Ringbuffer task {} panicked: {e}", idx);
685 } else {
686 debug!("ringbuffer task {} exited successfully", idx);
687 }
688 }
689 info!("shutdown: ringbuffer tasks complete");
690
691 info!(
693 "shutdown: waiting for {} producer tasks",
694 producer_handles.len()
695 );
696 let results = join_all(producer_handles).await;
697 info!("shutdown: all {} producer tasks joined", results.len());
698 for (idx, result) in results.iter().enumerate() {
699 if let Err(e) = result {
700 eprintln!("Producer task {} panicked: {e}", idx);
701 } else {
702 debug!("producer task {} exited successfully", idx);
703 }
704 }
705 info!("shutdown: producer tasks complete");
706
707 info!("shutdown: dropping action_tx");
709 drop(action_tx);
710 info!("shutdown: action_tx dropped, waiting for trace generation");
711
712 if let Err(e) = trace_handle.await {
714 eprintln!("Trace generation task panicked: {e}");
715 }
716 info!("shutdown: trace generation complete");
717
718 info!("shutdown: collecting final stats");
719 let stats = tracer.stats()?;
720 info!("shutdown: {stats:?}");
721
722 info!("shutdown: complete");
723 Ok(())
724 })
725}
726
727fn run_tui(tui_args: &TuiArgs) -> Result<()> {
728 if let Ok(log_path) = std::env::var("RUST_LOG_PATH") {
729 let log_level = match std::env::var("RUST_LOG") {
730 Ok(v) => LevelFilter::from_str(&v)?,
731 Err(_) => LevelFilter::Info,
732 };
733
734 WriteLogger::init(
735 log_level,
736 simplelog::Config::default(),
737 File::create(log_path)?,
738 )?;
739
740 log_panics::Config::new()
741 .backtrace_mode(log_panics::BacktraceMode::Resolved)
742 .install_panic_hook();
743 };
744
745 let config = Config::merge([
746 Config::from(tui_args.clone()),
747 Config::load_or_default().expect("Failed to load config or load default config"),
748 ]);
749 let keymap = config.active_keymap.clone();
750
751 let worker_threads = config.worker_threads() as usize;
753 let num_cpus = num_possible_cpus()?;
754 let rb_cnt = scxtop::topology::calculate_default_ringbuf_count(num_cpus);
755
756 let required_threads = std::cmp::max(rb_cnt + 4, worker_threads);
759
760 tokio::runtime::Builder::new_multi_thread()
761 .enable_all()
762 .worker_threads(required_threads)
763 .build()
764 .unwrap()
765 .block_on(async {
766 let mut open_object = MaybeUninit::uninit();
768
769 let (action_tx, mut action_rx) = mpsc::unbounded_channel();
770
771 let has_bpf_cap = check_bpf_capability();
773 let mut capability_warnings = Vec::new();
774 let mut _bpf_enabled = false;
775 let mut links = Vec::new();
776 let mut event_rb_data_opt: Option<(
777 Vec<i32>, Arc<std::sync::atomic::AtomicU64>, mpsc::UnboundedSender<Action>, )> = None;
781 let mut skel_opt = None;
782
783 if has_bpf_cap {
784 let mut builder = BpfSkelBuilder::default();
786 if config.debug() {
787 builder.obj_builder.debug(true);
788 }
789 let bpf_publisher = BpfEventActionPublisher::new(action_tx.clone());
790 let mut edm = EventDispatchManager::new(None, None);
791 edm.register_bpf_handler(Box::new(bpf_publisher));
792
793 match builder.open(&mut open_object) {
795 Ok(mut skel) => {
796 skel.maps.rodata_data.as_mut().unwrap().long_tail_tracing_min_latency_ns =
797 tui_args.experimental_long_tail_tracing_min_latency_ns;
798
799 let _map_handle = if tui_args.layered {
800 skel.maps.rodata_data.as_mut().unwrap().layered = true;
801 action_tx.send(Action::UpdateColVisibility(UpdateColVisibilityAction {
802 table: "Process".to_string(),
803 col: "Layer ID".to_string(),
804 visible: true,
805 }))?;
806 action_tx.send(Action::UpdateColVisibility(UpdateColVisibilityAction {
807 table: "Thread".to_string(),
808 col: "Layer ID".to_string(),
809 visible: true,
810 }))?;
811 match layered_util::attach_to_existing_map("task_ctxs", &mut skel.maps.task_ctxs) {
812 Ok(handle) => Some(handle),
813 Err(e) => {
814 capability_warnings.push(format!("Failed to attach to layered map: {e}"));
815 None
816 }
817 }
818 } else {
819 None
820 };
821
822 let num_cpus = num_possible_cpus()?;
824 let rb_cnt = scxtop::topology::calculate_default_ringbuf_count(num_cpus);
825 let rb_cpu_mapping = scxtop::topology::setup_cpu_to_ringbuf_mapping(rb_cnt, num_cpus)?;
826
827 log::info!("Using {} ringbuffers for {} CPUs", rb_cnt, num_cpus);
828
829 let cpu_cnt_pow2 = num_cpus.next_power_of_two();
831 skel.maps.rodata_data.as_mut().unwrap().rb_cpu_map_mask = (cpu_cnt_pow2 - 1) as u64;
832
833 if let Err(e) = skel.maps.data_rb_cpu_map.set_max_entries(cpu_cnt_pow2 as u32) {
835 capability_warnings.push(format!("Failed to set CPU-to-ringbuf map size: {e}"));
836 }
837
838 if let Err(e) = skel.maps.events.set_max_entries(rb_cnt as u32) {
840 capability_warnings.push(format!("Failed to set ringbuf count: {e}"));
841 }
842
843 if let Err(e) = compat::cond_kprobe_enable("gpu_memory_total", &skel.progs.on_gpu_memory_total) {
844 capability_warnings.push(format!("Failed to enable gpu_memory_total kprobe: {e}"));
845 }
846 if let Err(e) = compat::cond_kprobe_enable("hw_pressure_update", &skel.progs.on_hw_pressure_update) {
847 capability_warnings.push(format!("Failed to enable hw_pressure_update kprobe: {e}"));
848 }
849 if let Err(e) = compat::cond_tracepoint_enable("sched:sched_process_wait", &skel.progs.on_sched_wait) {
850 capability_warnings.push(format!("Failed to enable sched_process_wait tracepoint: {e}"));
851 }
852 if let Err(e) = compat::cond_tracepoint_enable("sched:sched_process_hang", &skel.progs.on_sched_hang) {
853 capability_warnings.push(format!("Failed to enable sched_process_hang tracepoint: {e}"));
854 }
855
856 match skel.load() {
858 Ok(mut loaded_skel) => {
859 for (cpu_id, &rb_id) in rb_cpu_mapping.iter().enumerate() {
861 if cpu_id < cpu_cnt_pow2 {
862 if let Err(e) = loaded_skel.maps.data_rb_cpu_map.update(
863 &(cpu_id as u32).to_ne_bytes(),
864 &rb_id.to_ne_bytes(),
865 libbpf_rs::MapFlags::ANY,
866 ) {
867 capability_warnings.push(format!("Failed to set CPU {} -> ringbuf {}: {}", cpu_id, rb_id, e));
868 }
869 }
870 }
871
872 let (skel_links, attach_warnings) = attach_progs(&mut loaded_skel)?;
873 links = skel_links;
874 capability_warnings.extend(attach_warnings);
875
876 if !links.is_empty() || is_root() {
877 if let Err(e) = loaded_skel.progs.scxtop_init.test_run(ProgramInput::default()) {
879 capability_warnings.push(format!("Failed to initialize scxtop BPF program: {e}"));
880 }
881 }
882
883 if !links.is_empty() {
885 let dropped_invalid_ts = Arc::new(std::sync::atomic::AtomicU64::new(0));
887
888 let events_map_fd = loaded_skel.maps.events.as_fd().as_raw_fd();
890 let mut rb_fds = Vec::new();
891
892 for rb_id in 0..rb_cnt {
893 let rb_fd = unsafe {
895 libbpf_sys::bpf_map_create(
896 libbpf_sys::BPF_MAP_TYPE_RINGBUF,
897 std::ptr::null(),
898 0,
899 0,
900 (32 * 1024 * 1024) as u32, std::ptr::null(),
902 )
903 };
904
905 if rb_fd < 0 {
906 capability_warnings.push(format!("Failed to create ringbuffer #{}: {}", rb_id, std::io::Error::last_os_error()));
907 continue;
908 }
909
910 let rb_id_u32 = rb_id as u32;
912 let ret = unsafe {
913 libbpf_sys::bpf_map_update_elem(
914 events_map_fd,
915 &rb_id_u32 as *const u32 as *const std::ffi::c_void,
916 &rb_fd as *const i32 as *const std::ffi::c_void,
917 libbpf_sys::BPF_NOEXIST.into(),
918 )
919 };
920
921 if ret < 0 {
922 capability_warnings.push(format!("Failed to add ringbuffer #{} to hash-of-maps: {}", rb_id, std::io::Error::last_os_error()));
923 continue;
924 }
925
926 rb_fds.push(rb_fd);
927 }
928
929 if !rb_fds.is_empty() {
930 event_rb_data_opt = Some((rb_fds, dropped_invalid_ts, action_tx.clone()));
932 _bpf_enabled = true;
933 }
934 }
935
936 skel_opt = Some(loaded_skel);
937 }
938 Err(e) => {
939 if is_root() {
940 return Err(anyhow!("Failed to load BPF skeleton (running as root): {e}"));
941 } else {
942 capability_warnings.push(format!("Failed to load BPF skeleton: {e}"));
943 capability_warnings.extend(get_capability_warning_message());
944 }
945 }
946 }
947 }
948 Err(e) => {
949 if is_root() {
950 return Err(anyhow!("Failed to open BPF skeleton (running as root): {e}"));
951 } else {
952 capability_warnings.push(format!("Failed to open BPF skeleton: {e}"));
953 capability_warnings.extend(get_capability_warning_message());
954 }
955 }
956 }
957 } else {
958 capability_warnings.extend(get_capability_warning_message());
960 }
961
962 if tui_args.experimental_long_tail_tracing {
964 if let Some(ref mut skel) = skel_opt {
965 skel.maps.data_data.as_mut().unwrap().trace_duration_ns = config.trace_duration_ns();
966 skel.maps.data_data.as_mut().unwrap().trace_warmup_ns = config.trace_warmup_ns();
967
968 let binary = tui_args
969 .experimental_long_tail_tracing_binary
970 .clone()
971 .unwrap();
972 let symbol = tui_args
973 .experimental_long_tail_tracing_symbol
974 .clone()
975 .unwrap();
976
977 match skel.progs.long_tail_tracker_exit.attach_uprobe_with_opts(
978 -1, binary.clone(),
980 0,
981 UprobeOpts {
982 retprobe: true,
983 func_name: Some(symbol.clone()),
984 ..Default::default()
985 },
986 ) {
987 Ok(link) => links.push(link),
988 Err(e) => capability_warnings.push(format!("Failed to attach long tail tracker exit: {e}"))
989 }
990
991 match skel.progs.long_tail_tracker_entry.attach_uprobe_with_opts(
992 -1, binary.clone(),
994 0,
995 UprobeOpts {
996 retprobe: false,
997 func_name: Some(symbol.clone()),
998 ..Default::default()
999 },
1000 ) {
1001 Ok(link) => links.push(link),
1002 Err(e) => capability_warnings.push(format!("Failed to attach long tail tracker entry: {e}"))
1003 }
1004 } else {
1005 capability_warnings.push("Long tail tracing requested but BPF skeleton not available".to_string());
1006 }
1007 }
1008
1009 let mut tui = Tui::new(keymap.clone(), config.tick_rate_ms(), config.frame_rate_ms())?;
1010 let scheduler = read_file_string(SCHED_NAME_PATH).unwrap_or("".to_string());
1011
1012 let mut app = if let Some(skel) = skel_opt {
1014 App::new(
1015 config,
1016 scheduler,
1017 100,
1018 tui_args.process_id,
1019 tui_args.layered,
1020 action_tx.clone(),
1021 skel,
1022 )?
1023 } else {
1024 App::new_without_bpf(
1026 config,
1027 scheduler,
1028 100,
1029 tui_args.process_id,
1030 tui_args.layered,
1031 action_tx.clone(),
1032 )?
1033 };
1034
1035 if !capability_warnings.is_empty() {
1037 app.set_capability_warnings(capability_warnings);
1038 }
1039
1040 tui.enter()?;
1041
1042 let shutdown = app.should_quit.clone();
1044 let mut ringbuffer_handles = Vec::new();
1045 if let Some((rb_fds, dropped_invalid_ts, rb_action_tx)) = event_rb_data_opt {
1046 struct RingBufContext {
1049 dropped_invalid_ts: Arc<std::sync::atomic::AtomicU64>,
1050 action_tx: mpsc::UnboundedSender<Action>,
1051 shutdown: Arc<AtomicBool>,
1052 }
1053
1054 extern "C" fn ring_buffer_sample_callback(
1055 ctx: *mut std::ffi::c_void,
1056 data: *mut std::ffi::c_void,
1057 size: u64,
1058 ) -> std::ffi::c_int {
1059 unsafe {
1060 let ctx = &*(ctx as *const RingBufContext);
1061
1062 if ctx.shutdown.load(std::sync::atomic::Ordering::Relaxed) {
1064 return 0;
1065 }
1066
1067 let data_slice = std::slice::from_raw_parts(data as *const u8, size as usize);
1068
1069 let mut event = bpf_event::default();
1070 if plain::copy_from_bytes(&mut event, data_slice).is_err() {
1071 return 0;
1072 }
1073
1074 if event.ts == 0 {
1076 ctx.dropped_invalid_ts.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1077 return 0;
1078 }
1079
1080 let mut edm = EventDispatchManager::new(None, None);
1081 edm.register_bpf_handler(Box::new(BpfEventActionPublisher::new(ctx.action_tx.clone())));
1082 let _ = edm.on_event(&event);
1083 }
1084 0
1085 }
1086
1087 for rb_fd in rb_fds {
1089 let ctx = Box::new(RingBufContext {
1090 dropped_invalid_ts: dropped_invalid_ts.clone(),
1091 action_tx: rb_action_tx.clone(),
1092 shutdown: shutdown.clone(),
1093 });
1094 let ctx_ptr = Box::into_raw(ctx) as *mut std::ffi::c_void;
1095
1096 let rb_ptr = unsafe {
1097 libbpf_sys::ring_buffer__new(
1098 rb_fd,
1099 Some(ring_buffer_sample_callback),
1100 ctx_ptr,
1101 std::ptr::null(),
1102 )
1103 };
1104
1105 if rb_ptr.is_null() {
1106 unsafe { let _ = Box::from_raw(ctx_ptr as *mut RingBufContext); }
1107 log::warn!("Failed to create ring buffer manager");
1108 continue;
1109 }
1110
1111 let rb = SendRingBuffer(rb_ptr);
1112 let shutdown_clone = shutdown.clone();
1113 let rb_id = ringbuffer_handles.len();
1114 ringbuffer_handles.push(tokio::task::spawn_blocking(move || {
1116 loop {
1117 rb.poll(1);
1119 if shutdown_clone.load(Ordering::Relaxed) {
1120 rb.consume();
1122 rb.free();
1124 log::debug!("ringbuffer #{} polling stopped", rb_id);
1125 break;
1126 }
1127 }
1128 }));
1129 }
1130 }
1131
1132 if tui_args.mangoapp_tracing {
1133 let stop_mangoapp = app.should_quit.clone();
1134 let mangoapp_path = CString::new(tui_args.mangoapp_path.clone()).unwrap();
1135 let poll_intvl_ms = tui_args.mangoapp_poll_intvl_ms;
1136 let tx = action_tx.clone();
1137 tokio::spawn(async move {
1138 poll_mangoapp(
1139 mangoapp_path,
1140 poll_intvl_ms,
1141 tx,
1142 stop_mangoapp,
1143 )
1144 .await
1145 });
1146 }
1147
1148 loop {
1149 tokio::select! {
1150 ev = tui.next() => {
1151 let ev = ev?;
1152 match ev {
1153 Event::Quit => { action_tx.send(Action::Quit)?; },
1154 Event::Tick => action_tx.send(Action::Tick)?,
1155 Event::TickRateChange(tick_rate_ms) => action_tx.send(
1156 Action::TickRateChange(std::time::Duration::from_millis(tick_rate_ms)),
1157 )?,
1158 Event::Render => {
1159 if app.should_quit.load(Ordering::Relaxed) {
1160 break;
1161 }
1162 if app.state() != AppState::Pause {
1163 tui.draw(|f| app.render(f).expect("Failed to render application"))?;
1164 }
1165 }
1166 Event::Key(_) => {
1167 let action = get_action(&app, &keymap, ev);
1168 action_tx.send(action)?;
1169 }
1170 _ => {}
1171 }}
1172
1173 ac = action_rx.recv() => {
1174 let ac = ac.ok_or(anyhow!("actions channel closed"))?;
1175 app.handle_action(&ac)?;
1176 }
1177 }
1178 }
1179 tui.exit()?;
1180
1181 log::debug!("waiting for {} ringbuffer tasks to complete", ringbuffer_handles.len());
1183 for handle in ringbuffer_handles {
1184 if let Err(e) = handle.await {
1185 log::error!("Ringbuffer task panicked: {e}");
1186 }
1187 }
1188
1189 drop(links);
1190
1191 Ok(())
1192 })
1193}
1194
1195fn run_mcp(mcp_args: &scxtop::cli::McpArgs) -> Result<()> {
1196 use scx_utils::Topology;
1197 use scxtop::mcp::{events::action_to_mcp_event, McpServer, McpServerConfig};
1198 use std::sync::Arc;
1199
1200 TermLogger::init(
1202 match mcp_args.verbose {
1203 0 => LevelFilter::Warn,
1204 1 => LevelFilter::Info,
1205 2 => LevelFilter::Debug,
1206 _ => LevelFilter::Trace,
1207 },
1208 SimplelogConfig::default(),
1209 TerminalMode::Stderr,
1210 ColorChoice::Auto,
1211 )?;
1212
1213 let topo = Topology::new().expect("Failed to create topology");
1215 let topo_arc = Arc::new(topo);
1216
1217 let mcp_config = McpServerConfig {
1218 daemon_mode: mcp_args.daemon,
1219 enable_logging: mcp_args.enable_logging,
1220 };
1221
1222 if mcp_args.daemon {
1223 tokio::runtime::Builder::new_multi_thread()
1225 .enable_all()
1226 .worker_threads(4)
1227 .build()
1228 .unwrap()
1229 .block_on(async {
1230 let mut open_object = MaybeUninit::uninit();
1231 let (action_tx, mut action_rx) = mpsc::unbounded_channel();
1232
1233 use scxtop::mcp::create_shared_stats;
1235 let shared_stats = create_shared_stats();
1236 let shared_stats_for_event_handler = shared_stats.clone();
1237
1238 let builder = BpfSkelBuilder::default();
1240 let mut skel = builder.open(&mut open_object)?;
1241
1242 let num_cpus = num_possible_cpus()?;
1244 let rb_cnt = scxtop::topology::calculate_default_ringbuf_count(num_cpus);
1245 let rb_cpu_mapping = scxtop::topology::setup_cpu_to_ringbuf_mapping(rb_cnt, num_cpus)?;
1246
1247 log::info!("Using {} ringbuffers for {} CPUs", rb_cnt, num_cpus);
1248
1249 let cpu_cnt_pow2 = num_cpus.next_power_of_two();
1251 skel.maps.rodata_data.as_mut().unwrap().rb_cpu_map_mask = (cpu_cnt_pow2 - 1) as u64;
1252
1253 skel.maps.data_rb_cpu_map.set_max_entries(cpu_cnt_pow2 as u32)?;
1255
1256 skel.maps.events.set_max_entries(rb_cnt as u32)?;
1258
1259 let mut skel = skel.load()?;
1260
1261 for (cpu_id, &rb_id) in rb_cpu_mapping.iter().enumerate() {
1263 if cpu_id < cpu_cnt_pow2 {
1264 skel.maps.data_rb_cpu_map.update(
1265 &(cpu_id as u32).to_ne_bytes(),
1266 &rb_id.to_ne_bytes(),
1267 libbpf_rs::MapFlags::ANY,
1268 )?;
1269 }
1270 }
1271
1272 use scxtop::mcp::{
1274 WakerWakeeAnalyzer, LatencyTracker, CpuHotspotAnalyzer, MigrationAnalyzer,
1275 ProcessEventHistory, DsqMonitor, EventRateMonitor, WakeupChainTracker, EventBuffer,
1276 SoftirqAnalyzer,
1277 };
1278 use std::sync::Mutex;
1279
1280 let mut waker_wakee = WakerWakeeAnalyzer::new();
1282 waker_wakee.set_topology(topo_arc.clone());
1283 let waker_wakee_arc = Arc::new(Mutex::new(waker_wakee));
1284
1285 let latency_tracker = LatencyTracker::new(1000); let latency_tracker_arc = Arc::new(Mutex::new(latency_tracker));
1288
1289 let cpu_hotspot = CpuHotspotAnalyzer::new(100); let cpu_hotspot_arc = Arc::new(Mutex::new(cpu_hotspot));
1292
1293 let migration_analyzer = MigrationAnalyzer::new(1000); let migration_analyzer_arc = Arc::new(Mutex::new(migration_analyzer));
1296
1297 let process_history = ProcessEventHistory::new(100); let process_history_arc = Arc::new(Mutex::new(process_history));
1300
1301 let dsq_monitor = DsqMonitor::new();
1303 let dsq_monitor_arc = Arc::new(Mutex::new(dsq_monitor));
1304
1305 let rate_monitor = EventRateMonitor::new(1000, 10); let rate_monitor_arc = Arc::new(Mutex::new(rate_monitor));
1308
1309 let wakeup_tracker = WakeupChainTracker::new(10); let wakeup_tracker_arc = Arc::new(Mutex::new(wakeup_tracker));
1312
1313 let event_buffer = EventBuffer::new();
1315 let event_buffer_arc = Arc::new(Mutex::new(event_buffer));
1316
1317 let softirq_analyzer = SoftirqAnalyzer::new(10000); let softirq_analyzer_arc = Arc::new(Mutex::new(softirq_analyzer));
1320
1321 let mut edm = EventDispatchManager::new(None, None);
1323 edm.register_bpf_handler(Box::new(BpfEventActionPublisher::new(action_tx.clone())));
1324
1325 let dropped_invalid_ts = Arc::new(std::sync::atomic::AtomicU64::new(0));
1327
1328 let shutdown = Arc::new(AtomicBool::new(false));
1330
1331 let events_map_fd = skel.maps.events.as_fd().as_raw_fd();
1333 let mut rb_fds = Vec::new();
1334 let mut rb_managers: Vec<SendRingBuffer> = Vec::new();
1335
1336 for rb_id in 0..rb_cnt {
1337 let rb_fd = unsafe {
1339 libbpf_sys::bpf_map_create(
1340 libbpf_sys::BPF_MAP_TYPE_RINGBUF,
1341 std::ptr::null(),
1342 0,
1343 0,
1344 (32 * 1024 * 1024) as u32, std::ptr::null(),
1346 )
1347 };
1348
1349 if rb_fd < 0 {
1350 bail!("Failed to create ringbuffer #{}: {}", rb_id, std::io::Error::last_os_error());
1351 }
1352
1353 let rb_id_u32 = rb_id as u32;
1355 let ret = unsafe {
1356 libbpf_sys::bpf_map_update_elem(
1357 events_map_fd,
1358 &rb_id_u32 as *const u32 as *const std::ffi::c_void,
1359 &rb_fd as *const i32 as *const std::ffi::c_void,
1360 libbpf_sys::BPF_NOEXIST.into(),
1361 )
1362 };
1363
1364 if ret < 0 {
1365 bail!("Failed to add ringbuffer #{} to hash-of-maps: {}", rb_id, std::io::Error::last_os_error());
1366 }
1367
1368 rb_fds.push(rb_fd);
1369 }
1370
1371 struct McpRingBufContext {
1376 dropped_invalid_ts: Arc<std::sync::atomic::AtomicU64>,
1377 shared_stats: Arc<std::sync::RwLock<scxtop::mcp::SharedStats>>,
1378 action_tx: mpsc::UnboundedSender<Action>,
1379 waker_wakee: Arc<std::sync::Mutex<scxtop::mcp::WakerWakeeAnalyzer>>,
1380 cpu_hotspot: Arc<std::sync::Mutex<scxtop::mcp::CpuHotspotAnalyzer>>,
1381 migration_analyzer: Arc<std::sync::Mutex<scxtop::mcp::MigrationAnalyzer>>,
1382 process_history: Arc<std::sync::Mutex<scxtop::mcp::ProcessEventHistory>>,
1383 rate_monitor: Arc<std::sync::Mutex<scxtop::mcp::EventRateMonitor>>,
1384 wakeup_tracker: Arc<std::sync::Mutex<scxtop::mcp::WakeupChainTracker>>,
1385 softirq_analyzer: Arc<std::sync::Mutex<scxtop::mcp::SoftirqAnalyzer>>,
1386 shutdown: Arc<AtomicBool>,
1387 }
1388
1389 extern "C" fn mcp_ring_buffer_callback(
1390 ctx: *mut std::ffi::c_void,
1391 data: *mut std::ffi::c_void,
1392 size: u64,
1393 ) -> std::ffi::c_int {
1394 unsafe {
1395 let ctx = &*(ctx as *const McpRingBufContext);
1396
1397 if ctx.shutdown.load(std::sync::atomic::Ordering::Relaxed) {
1399 return 0;
1400 }
1401
1402 let data_slice = std::slice::from_raw_parts(data as *const u8, size as usize);
1403
1404 let mut event = bpf_event::default();
1405 if plain::copy_from_bytes(&mut event, data_slice).is_err() {
1406 return 0;
1407 }
1408
1409 if event.ts == 0 {
1411 ctx.dropped_invalid_ts.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1412 return 0;
1413 }
1414
1415 if let Ok(mut stats) = ctx.shared_stats.write() {
1417 stats.update_from_event(&event);
1418 }
1419
1420 use scxtop::bpf_intf;
1422 let event_type = event.r#type as u32;
1423
1424 if let Ok(mut analyzer) = ctx.waker_wakee.try_lock() {
1426 match event_type {
1427 bpf_intf::event_type_SCHED_WAKEUP => {
1428 let wakeup = &event.event.wakeup;
1429 analyzer.record_wakeup(
1430 wakeup.pid, wakeup.waker_pid,
1431 &String::from_utf8_lossy(&wakeup.waker_comm),
1432 event.cpu, event.ts,
1433 );
1434 }
1435 bpf_intf::event_type_SCHED_WAKING => {
1436 let waking = &event.event.waking;
1437 analyzer.record_wakeup(
1438 waking.pid, waking.waker_pid,
1439 &String::from_utf8_lossy(&waking.waker_comm),
1440 event.cpu, event.ts,
1441 );
1442 }
1443 bpf_intf::event_type_SCHED_SWITCH => {
1444 let switch = &event.event.sched_switch;
1445 analyzer.record_wakee_run(
1446 switch.next_pid,
1447 &String::from_utf8_lossy(&switch.next_comm),
1448 event.cpu, event.ts,
1449 );
1450 }
1451 _ => {}
1452 }
1453 }
1454
1455 if let Ok(mut analyzer) = ctx.cpu_hotspot.try_lock() {
1457 let json = serde_json::json!({
1458 "cpu": event.cpu, "ts": event.ts, "event_type": event_type
1459 });
1460 analyzer.record_event(&json);
1461 }
1462
1463 if let Ok(mut analyzer) = ctx.migration_analyzer.try_lock() {
1465 if event_type == bpf_intf::event_type_SCHED_MIGRATE {
1466 let migrate = &event.event.migrate;
1467 let json = serde_json::json!({
1468 "pid": migrate.pid, "from_cpu": event.cpu,
1469 "to_cpu": migrate.dest_cpu, "ts": event.ts
1470 });
1471 analyzer.record_migration(&json, event.ts);
1472 }
1473 }
1474
1475 if let Ok(mut history) = ctx.process_history.try_lock() {
1477 let (event_type_str, pid) = match event_type {
1478 bpf_intf::event_type_SCHED_SWITCH => ("sched_switch", event.event.sched_switch.next_pid),
1479 bpf_intf::event_type_SCHED_WAKEUP => ("sched_wakeup", event.event.wakeup.pid),
1480 bpf_intf::event_type_SCHED_WAKING => ("sched_waking", event.event.waking.pid),
1481 bpf_intf::event_type_SCHED_MIGRATE => ("sched_migrate", event.event.migrate.pid),
1482 bpf_intf::event_type_EXIT => ("exit", event.event.exit.pid),
1483 bpf_intf::event_type_EXEC => ("exec", event.event.exec.pid),
1484 _ => ("other", 0),
1485 };
1486 if pid > 0 {
1487 history.record_event(
1488 pid, event_type_str.to_string(), Some(event.cpu),
1489 serde_json::json!({"ts": event.ts}), event.ts,
1490 );
1491 }
1492 }
1493
1494 if let Ok(mut monitor) = ctx.rate_monitor.try_lock() {
1496 let event_type_str = match event_type {
1497 bpf_intf::event_type_SCHED_SWITCH => "sched_switch",
1498 bpf_intf::event_type_SCHED_WAKEUP => "sched_wakeup",
1499 bpf_intf::event_type_SCHED_WAKING => "sched_waking",
1500 bpf_intf::event_type_SCHED_MIGRATE => "sched_migrate",
1501 _ => "other",
1502 };
1503 monitor.record_event(event_type_str.to_string(), event.ts);
1504 }
1505
1506 if let Ok(mut tracker) = ctx.wakeup_tracker.try_lock() {
1508 if event_type == bpf_intf::event_type_SCHED_WAKEUP || event_type == bpf_intf::event_type_SCHED_WAKING {
1509 let (pid, waker_pid) = if event_type == bpf_intf::event_type_SCHED_WAKEUP {
1510 (event.event.wakeup.pid, event.event.wakeup.waker_pid)
1511 } else {
1512 (event.event.waking.pid, event.event.waking.waker_pid)
1513 };
1514 let json = serde_json::json!({
1515 "pid": pid, "waker_pid": waker_pid, "ts": event.ts, "cpu": event.cpu
1516 });
1517 tracker.record_wakeup(&json, event.ts);
1518 }
1519 }
1520
1521 if let Ok(mut analyzer) = ctx.softirq_analyzer.try_lock() {
1523 if event_type == bpf_intf::event_type_SOFTIRQ {
1524 let softirq = &event.event.softirq;
1525 let json = serde_json::json!({
1526 "type": "softirq", "pid": softirq.pid, "softirq_nr": softirq.softirq_nr,
1527 "entry_ts": softirq.entry_ts, "exit_ts": softirq.exit_ts, "cpu": event.cpu,
1528 });
1529 analyzer.record_event(&json);
1530 }
1531 }
1532
1533 let mut edm = EventDispatchManager::new(None, None);
1535 edm.register_bpf_handler(Box::new(BpfEventActionPublisher::new(ctx.action_tx.clone())));
1536 let _ = edm.on_event(&event);
1537 }
1538 0
1539 }
1540
1541 for rb_fd in &rb_fds {
1542 let ctx = Box::new(McpRingBufContext {
1543 dropped_invalid_ts: dropped_invalid_ts.clone(),
1544 shared_stats: shared_stats_for_event_handler.clone(),
1545 action_tx: action_tx.clone(),
1546 waker_wakee: waker_wakee_arc.clone(),
1547 cpu_hotspot: cpu_hotspot_arc.clone(),
1548 migration_analyzer: migration_analyzer_arc.clone(),
1549 process_history: process_history_arc.clone(),
1550 rate_monitor: rate_monitor_arc.clone(),
1551 wakeup_tracker: wakeup_tracker_arc.clone(),
1552 softirq_analyzer: softirq_analyzer_arc.clone(),
1553 shutdown: shutdown.clone(),
1554 });
1555 let ctx_ptr = Box::into_raw(ctx) as *mut std::ffi::c_void;
1556
1557 let rb_ptr = unsafe {
1558 libbpf_sys::ring_buffer__new(
1559 *rb_fd,
1560 Some(mcp_ring_buffer_callback),
1561 ctx_ptr,
1562 std::ptr::null(),
1563 )
1564 };
1565
1566 if rb_ptr.is_null() {
1567 unsafe { let _ = Box::from_raw(ctx_ptr as *mut McpRingBufContext); }
1568 bail!("Failed to create ring buffer manager");
1569 }
1570
1571 rb_managers.push(SendRingBuffer(rb_ptr));
1572 }
1573
1574 let (initial_links, _warnings) = attach_progs(&mut skel)?;
1576
1577 if !initial_links.is_empty() || is_root() {
1579 skel.progs
1580 .scxtop_init
1581 .test_run(ProgramInput::default())
1582 .ok();
1583 }
1584
1585 use scxtop::mcp::BpfPerfEventAttacher;
1589 let perf_program_addr = &skel.progs.perf_sample_handler as *const _ as usize;
1591
1592 let bpf_attacher = BpfPerfEventAttacher::new(move |perf_fd| {
1593 unsafe {
1596 let prog =
1598 &*(perf_program_addr as *const libbpf_rs::ProgramImpl<libbpf_rs::Mut>);
1599 prog.attach_perf_event(perf_fd)
1600 .map(|link| Box::new(link) as Box<dyn std::any::Any + Send>)
1601 .map_err(|e| anyhow::anyhow!("Failed to attach perf event: {}", e))
1602 }
1603 });
1604 let bpf_attacher_arc = Arc::new(bpf_attacher);
1605
1606 use scxtop::mcp::{AttachCallback, EventControl, StatsControlCommand};
1608 let mut event_control_instance = EventControl::new();
1609
1610 let skel_ptr = &mut skel as *mut _ as usize;
1613 let attach_callback: AttachCallback = Box::new(move |program_names: &[&str]| {
1614 unsafe {
1615 let skel_ref = &mut *(skel_ptr as *mut BpfSkel);
1616 attach_progs_selective(skel_ref, program_names).map(|(links, _)| links)
1617 }
1618 });
1619
1620 event_control_instance.set_bpf_links(initial_links, attach_callback);
1623 event_control_instance.disable_event_tracking()?;
1624 info!("BPF programs detached by default - use control_event_tracking to enable");
1625
1626 let (stats_tx, stats_rx) = mpsc::unbounded_channel::<StatsControlCommand>();
1628 event_control_instance.set_stats_control_channel(stats_tx);
1629
1630 let event_control = Arc::new(event_control_instance);
1632
1633 let config = Config::default_config();
1635 let scheduler =
1636 read_file_string(SCHED_NAME_PATH).unwrap_or_else(|_| "".to_string());
1637 let mut app = App::new(
1638 config,
1639 scheduler,
1640 100,
1641 mcp_args.process_id,
1642 mcp_args.layered,
1643 action_tx.clone(),
1644 skel,
1645 )?;
1646
1647 use scxtop::mcp::AnalyzerControl;
1649
1650 let mut analyzer_control = AnalyzerControl::new();
1651 analyzer_control.set_event_control(event_control.clone());
1652
1653 analyzer_control.set_event_buffer(event_buffer_arc.clone());
1655 analyzer_control.set_latency_tracker(latency_tracker_arc.clone());
1656 analyzer_control.set_cpu_hotspot_analyzer(cpu_hotspot_arc.clone());
1657 analyzer_control.set_migration_analyzer(migration_analyzer_arc.clone());
1658 analyzer_control.set_process_history(process_history_arc.clone());
1659 analyzer_control.set_dsq_monitor(dsq_monitor_arc.clone());
1660 analyzer_control.set_rate_monitor(rate_monitor_arc.clone());
1661 analyzer_control.set_wakeup_tracker(wakeup_tracker_arc.clone());
1662 analyzer_control.set_waker_wakee_analyzer(waker_wakee_arc.clone());
1663 analyzer_control.set_softirq_analyzer(softirq_analyzer_arc.clone());
1664
1665 let analyzer_control = Arc::new(Mutex::new(analyzer_control));
1667
1668 use std::collections::HashMap;
1670 let trace_cache = Arc::new(Mutex::new(HashMap::new()));
1671
1672 let mut server = McpServer::new(mcp_config)
1674 .with_topology(topo_arc)
1675 .setup_scheduler_resource()
1676 .setup_profiling_resources()
1677 .with_bpf_perf_attacher(bpf_attacher_arc)
1678 .with_shared_stats(shared_stats.clone())
1679 .with_stats_client(None)
1680 .with_event_control(event_control.clone())
1681 .with_analyzer_control(analyzer_control.clone())
1682 .with_trace_cache(trace_cache)
1683 .setup_stats_resources();
1684
1685 let _event_stream_rx = server.enable_event_streaming();
1687 let resources = server.get_resources_handle();
1688
1689 let bpf_stats = server.get_bpf_stats_collector();
1691
1692 let perf_profiler = server.get_perf_profiler();
1694
1695 let shutdown_poll = shutdown.clone();
1697
1698 let mut ringbuffer_handles = Vec::new();
1699 for (rb_id, rb) in rb_managers.into_iter().enumerate() {
1700 let stop_poll_clone = shutdown_poll.clone();
1701 ringbuffer_handles.push(tokio::spawn(async move {
1702 loop {
1703 rb.poll(1);
1705 if stop_poll_clone.load(Ordering::Relaxed) {
1706 rb.consume();
1708 rb.free();
1710 debug!("ringbuffer #{} polling stopped", rb_id);
1711 break;
1712 }
1713 }
1714 }));
1715 }
1716
1717 if let Some(collector) = bpf_stats {
1720 let shutdown_stats = shutdown.clone();
1721 let mut stats_rx_task = stats_rx;
1722 tokio::spawn(async move {
1723 let mut running = false;
1724 let mut interval_ms = 100u64;
1725 let mut interval = tokio::time::interval(Duration::from_millis(interval_ms));
1726
1727 loop {
1728 tokio::select! {
1729 Some(cmd) = stats_rx_task.recv() => {
1731 match cmd {
1732 StatsControlCommand::Start(new_interval_ms) => {
1733 running = true;
1734 interval_ms = new_interval_ms;
1735 interval = tokio::time::interval(Duration::from_millis(interval_ms));
1736 info!("Stats collection started with {}ms interval", interval_ms);
1737 }
1738 StatsControlCommand::Stop => {
1739 running = false;
1740 info!("Stats collection stopped");
1741 }
1742 }
1743 }
1744
1745 _ = interval.tick(), if running => {
1747 if shutdown_stats.load(Ordering::Relaxed) {
1748 break;
1749 }
1750 let _ = collector.collect_sample();
1751 }
1752
1753 _ = tokio::time::sleep(Duration::from_millis(100)), if !running => {
1755 if shutdown_stats.load(Ordering::Relaxed) {
1756 break;
1757 }
1758 }
1759 }
1760 }
1761 });
1762 }
1763
1764 info!("MCP daemon started, processing BPF events");
1765
1766 let mut mcp_server_task = Box::pin(server.run_async());
1768 let mcp_result;
1769 loop {
1770 tokio::select! {
1771 result = &mut mcp_server_task => {
1773 info!("MCP server exited");
1774 shutdown.store(true, Ordering::Relaxed);
1775 mcp_result = result;
1776 break;
1777 }
1778
1779 Some(action) = action_rx.recv() => {
1781 if matches!(action, Action::Quit) {
1783 info!("Received quit action");
1784 shutdown.store(true, Ordering::Relaxed);
1785 mcp_result = Ok(());
1786 break;
1787 }
1788
1789 if let Some(ref profiler) = perf_profiler {
1791 if let Action::PerfSample(ref perf_sample) = action {
1792 use scxtop::mcp::RawSample;
1793 profiler.add_sample(RawSample {
1794 address: perf_sample.instruction_pointer,
1795 pid: perf_sample.pid,
1796 cpu_id: perf_sample.cpu_id,
1797 is_kernel: perf_sample.is_kernel,
1798 kernel_stack: perf_sample.kernel_stack.clone(),
1799 user_stack: perf_sample.user_stack.clone(),
1800 layer_id: if perf_sample.layer_id >= 0 {
1801 Some(perf_sample.layer_id)
1802 } else {
1803 None
1804 },
1805 });
1806 }
1807 }
1808
1809 let _ = app.handle_action(&action);
1811
1812 if let Some(event) = action_to_mcp_event(&action) {
1814 let _ = resources.push_event(event);
1815 }
1816 }
1817 }
1818 }
1819
1820 debug!("waiting for {} ringbuffer tasks to complete", ringbuffer_handles.len());
1822 for handle in ringbuffer_handles {
1823 if let Err(e) = handle.await {
1824 log::error!("Ringbuffer task panicked: {e}");
1825 }
1826 }
1827
1828 mcp_result
1830 })
1831 } else {
1832 let mut server = McpServer::new(mcp_config)
1834 .with_topology(topo_arc)
1835 .setup_scheduler_resource()
1836 .setup_profiling_resources()
1837 .with_stats_client(None)
1838 .setup_stats_resources();
1839 server.run_blocking()
1840 }
1841}
1842
1843fn main() -> Result<()> {
1844 let args = Cli::parse();
1845
1846 match &args.command.unwrap_or(Commands::Tui(args.tui)) {
1847 Commands::Tui(tui_args) => {
1848 run_tui(tui_args)?;
1849 }
1850 Commands::Trace(trace_args) => {
1851 run_trace(trace_args)?;
1852 }
1853 Commands::Mcp(mcp_args) => {
1854 run_mcp(mcp_args)?;
1855 }
1856 Commands::GenerateCompletions { shell, output } => {
1857 generate_completions(Cli::command(), *shell, output.clone())
1858 .unwrap_or_else(|_| panic!("Failed to generate completions for {shell}"));
1859 }
1860 }
1861 Ok(())
1862}