Skip to main content

scxtop/
main.rs

1// Copyright (c) Meta Platforms, Inc. and affiliates.
2//
3// This software may be used and distributed according to the terms of the
4// GNU General Public License version 2.
5
6use scx_utils::compat;
7use scxtop::bpf_skel::types::bpf_event;
8use scxtop::cli::{generate_completions, Cli, Commands, TraceArgs, TuiArgs};
9use scxtop::config::Config;
10use scxtop::edm::{ActionHandler, BpfEventActionPublisher, BpfEventHandler, EventDispatchManager};
11use scxtop::layered_util;
12use scxtop::mangoapp::poll_mangoapp;
13use scxtop::search;
14use scxtop::tracer::Tracer;
15use scxtop::util::{
16    check_bpf_capability, get_capability_warning_message, get_clock_value, is_root,
17    read_file_string,
18};
19use scxtop::Action;
20use scxtop::App;
21use scxtop::CpuStatTracker;
22use scxtop::Event;
23use scxtop::Key;
24use scxtop::KeyMap;
25use scxtop::MemStatSnapshot;
26use scxtop::PerfettoTraceManager;
27use scxtop::SystemStatAction;
28use scxtop::Tui;
29use scxtop::SCHED_NAME_PATH;
30use scxtop::{available_kprobe_events, UpdateColVisibilityAction};
31use scxtop::{bpf_skel::*, AppState};
32
33use anyhow::anyhow;
34use anyhow::bail;
35use anyhow::Result;
36use clap::{CommandFactory, Parser};
37use futures::future::join_all;
38use libbpf_rs::libbpf_sys;
39use libbpf_rs::num_possible_cpus;
40use libbpf_rs::skel::OpenSkel;
41use libbpf_rs::skel::SkelBuilder;
42use libbpf_rs::Link;
43use libbpf_rs::MapCore;
44use libbpf_rs::ProgramInput;
45use libbpf_rs::UprobeOpts;
46use log::debug;
47use log::info;
48use ratatui::crossterm::event::{KeyCode::Char, KeyEvent};
49use simplelog::{
50    ColorChoice, Config as SimplelogConfig, LevelFilter, TermLogger, TerminalMode, WriteLogger,
51};
52use std::ffi::CString;
53use std::fs::File;
54use std::mem::MaybeUninit;
55use std::os::fd::AsFd;
56use std::os::fd::AsRawFd;
57use std::str::FromStr;
58use std::sync::atomic::AtomicBool;
59use std::sync::atomic::Ordering;
60use std::sync::Arc;
61use std::time::Duration;
62use sysinfo::System;
63use tokio::sync::mpsc;
64
65// Wrapper to make ring buffer pointer Send-safe for tokio spawn
66// SAFETY: We ensure the pointer remains valid for the task lifetime
67struct SendRingBuffer(*mut libbpf_sys::ring_buffer);
68unsafe impl Send for SendRingBuffer {}
69
70impl SendRingBuffer {
71    fn poll(&self, timeout: i32) -> i32 {
72        unsafe { libbpf_sys::ring_buffer__poll(self.0, timeout) }
73    }
74
75    fn consume(&self) -> i32 {
76        unsafe { libbpf_sys::ring_buffer__consume(self.0) }
77    }
78
79    fn free(self) {
80        unsafe { libbpf_sys::ring_buffer__free(self.0) }
81    }
82}
83
84fn get_action(app: &App, keymap: &KeyMap, event: Event) -> Action {
85    match event {
86        Event::Error => Action::None,
87        Event::Tick => Action::Tick,
88        Event::TickRateChange(tick_rate_ms) => {
89            Action::TickRateChange(std::time::Duration::from_millis(tick_rate_ms))
90        }
91        Event::Key(key) => handle_key_event(app, keymap, key),
92        Event::Paste(paste) => handle_input_entry(app, paste).unwrap_or(Action::None),
93        _ => Action::None,
94    }
95}
96
97fn handle_key_event(app: &App, keymap: &KeyMap, key: KeyEvent) -> Action {
98    match key.code {
99        Char(c) => {
100            // Check if we should handle this character as input for filtering
101            if let Some(action) = handle_input_entry(app, c.to_string()) {
102                action
103            } else {
104                // Check for state-specific key bindings before falling back to global keymap
105                match (app.state(), c) {
106                    // In BPF program detail view, 'p' toggles perf sampling
107                    (AppState::BpfProgramDetail, 'p') => Action::ToggleBpfPerfSampling,
108                    // Fall back to global keymap for all other cases
109                    _ => keymap.action(&Key::Char(c)),
110                }
111            }
112        }
113        _ => keymap.action(&Key::Code(key.code)),
114    }
115}
116
117fn handle_input_entry(app: &App, s: String) -> Option<Action> {
118    match app.state() {
119        AppState::PerfEvent | AppState::KprobeEvent => Some(Action::InputEntry(s)),
120        AppState::Default
121        | AppState::Llc
122        | AppState::Node
123        | AppState::Process
124        | AppState::Memory
125        | AppState::PerfTop
126        | AppState::BpfPrograms
127        | AppState::Scheduler
128            if app.filtering() =>
129        {
130            Some(Action::InputEntry(s))
131        }
132        _ => None,
133    }
134}
135
136/// Attaches BPF programs to the skel, handling non-root scenarios gracefully
137fn attach_progs(skel: &mut BpfSkel) -> Result<(Vec<Link>, Vec<String>)> {
138    attach_progs_selective(skel, &[])
139}
140
141/// Attaches specified BPF programs to the skel
142/// If program_names is empty, attaches all programs
143fn attach_progs_selective(
144    skel: &mut BpfSkel,
145    program_names: &[&str],
146) -> Result<(Vec<Link>, Vec<String>)> {
147    let mut links = Vec::new();
148    let mut warnings = Vec::new();
149
150    // Check capabilities before attempting to attach
151    let has_bpf_cap = check_bpf_capability();
152
153    if !has_bpf_cap {
154        warnings
155            .push("BPF programs cannot be attached - scheduler monitoring disabled".to_string());
156        warnings.push("Try running as root or configure BPF permissions".to_string());
157        return Ok((links, warnings));
158    }
159
160    let attach_all = program_names.is_empty();
161
162    // Helper function to check if a program should be attached
163    let should_attach = |name: &str| -> bool { attach_all || program_names.contains(&name) };
164
165    // Helper macro to safely attach programs and collect warnings
166    macro_rules! safe_attach {
167        ($prog:expr, $name:literal) => {
168            if should_attach($name) {
169                match $prog.attach() {
170                    Ok(link) => {
171                        links.push(link);
172                    }
173                    Err(e) => {
174                        if is_root() {
175                            // If running as root and still failing, it's a real error
176                            return Err(anyhow!(
177                                "Failed to attach {} (running as root): {}",
178                                $name,
179                                e
180                            ));
181                        } else {
182                            warnings.push(format!("Failed to attach {}: {}", $name, e));
183                        }
184                    }
185                }
186            }
187        };
188    }
189
190    // Try to attach core scheduler probes
191    safe_attach!(skel.progs.on_sched_cpu_perf, "sched_cpu_perf");
192    safe_attach!(skel.progs.scx_sched_reg, "scx_sched_reg");
193    safe_attach!(skel.progs.scx_sched_unreg, "scx_sched_unreg");
194    safe_attach!(skel.progs.on_sched_switch, "on_sched_switch");
195    safe_attach!(skel.progs.on_sched_wakeup, "on_sched_wakeup");
196    safe_attach!(skel.progs.on_sched_wakeup_new, "sched_wakeup_new");
197    safe_attach!(skel.progs.on_sched_waking, "on_sched_waking");
198    safe_attach!(skel.progs.on_sched_migrate_task, "on_sched_migrate_task");
199    safe_attach!(skel.progs.on_sched_fork, "sched_fork");
200    safe_attach!(skel.progs.on_sched_exec, "sched_exec");
201    safe_attach!(skel.progs.on_sched_exit, "sched_exit");
202
203    // 6.13 compatibility probes
204    if compat::ksym_exists("scx_bpf_dsq_insert_vtime")? {
205        safe_attach!(skel.progs.scx_insert_vtime, "scx_insert_vtime");
206        safe_attach!(skel.progs.scx_insert, "scx_insert");
207        safe_attach!(skel.progs.scx_dsq_move, "scx_dsq_move");
208        safe_attach!(skel.progs.scx_dsq_move_vtime, "scx_dsq_move_vtime");
209        safe_attach!(skel.progs.scx_dsq_move_set_vtime, "scx_dsq_move_set_vtime");
210        safe_attach!(skel.progs.scx_dsq_move_set_slice, "scx_dsq_move_set_slice");
211        // v2 API variants (6.19+) - schedulers call these directly via compat macros
212        if compat::ksym_exists("scx_bpf_dsq_insert___v2")? {
213            safe_attach!(skel.progs.scx_insert_v2, "scx_insert_v2");
214        }
215        if compat::ksym_exists("__scx_bpf_dsq_insert_vtime")? {
216            safe_attach!(skel.progs.scx_insert_vtime_args, "scx_insert_vtime_args");
217        }
218    } else {
219        safe_attach!(skel.progs.scx_dispatch, "scx_dispatch");
220        safe_attach!(skel.progs.scx_dispatch_vtime, "scx_dispatch_vtime");
221        safe_attach!(
222            skel.progs.scx_dispatch_from_dsq_set_vtime,
223            "scx_dispatch_from_dsq_set_vtime"
224        );
225        safe_attach!(
226            skel.progs.scx_dispatch_from_dsq_set_slice,
227            "scx_dispatch_from_dsq_set_slice"
228        );
229        safe_attach!(skel.progs.scx_dispatch_from_dsq, "scx_dispatch_from_dsq");
230        safe_attach!(
231            skel.progs.scx_dispatch_vtime_from_dsq,
232            "scx_dispatch_vtime_from_dsq"
233        );
234    }
235
236    // Optional probes
237    safe_attach!(skel.progs.on_cpuhp_enter, "cpuhp_enter");
238    safe_attach!(skel.progs.on_cpuhp_exit, "cpuhp_exit");
239    safe_attach!(skel.progs.on_softirq_entry, "on_softirq_entry");
240    safe_attach!(skel.progs.on_softirq_exit, "on_softirq_exit");
241
242    // If no links were successfully attached and we're not root, provide helpful guidance
243    if links.is_empty() && !is_root() {
244        warnings.extend(get_capability_warning_message());
245    }
246
247    Ok((links, warnings))
248}
249
250fn run_trace(trace_args: &TraceArgs) -> Result<()> {
251    // Trace function always requires root privileges
252    if !is_root() {
253        return Err(anyhow!(
254            "Trace functionality requires root privileges. Please run as root"
255        ));
256    }
257
258    TermLogger::init(
259        match trace_args.verbose {
260            0 => simplelog::LevelFilter::Info,
261            1 => simplelog::LevelFilter::Debug,
262            _ => simplelog::LevelFilter::Trace,
263        },
264        SimplelogConfig::default(),
265        TerminalMode::Mixed,
266        ColorChoice::Auto,
267    )?;
268
269    let mut kprobe_events = available_kprobe_events()?;
270    kprobe_events.sort();
271    search::sorted_contains_all(&kprobe_events, &trace_args.kprobes)
272        .then_some(())
273        .ok_or_else(|| anyhow!("Invalid kprobe events"))?;
274
275    let config = Config::default_config();
276    let worker_threads = config.worker_threads() as usize;
277
278    // Calculate how many ringbuffers we'll need to ensure enough worker threads
279    let num_cpus = num_possible_cpus()?;
280    let rb_cnt = scxtop::topology::calculate_default_ringbuf_count(num_cpus);
281
282    // Ensure we have at least rb_cnt + 4 worker threads
283    // (+4 for trace generation, stats, and other async tasks)
284    let required_threads = std::cmp::max(rb_cnt + 4, worker_threads);
285
286    info!(
287        "Creating tokio runtime with {} worker threads for {} ringbuffers",
288        required_threads, rb_cnt
289    );
290
291    tokio::runtime::Builder::new_multi_thread()
292        .enable_all()
293        .worker_threads(required_threads)
294        .build()
295        .unwrap()
296        .block_on(async {
297            let (action_tx, mut action_rx) = mpsc::unbounded_channel();
298
299            // Set up the BPF skel and publisher
300            let mut open_object = MaybeUninit::uninit();
301            let mut builder = BpfSkelBuilder::default();
302            if trace_args.verbose > 2 {
303                builder.obj_builder.debug(true);
304            }
305
306            let mut skel = builder.open(&mut open_object)?;
307            compat::cond_kprobe_enable("gpu_memory_total", &skel.progs.on_gpu_memory_total)?;
308            compat::cond_kprobe_enable("hw_pressure_update", &skel.progs.on_hw_pressure_update)?;
309            compat::cond_tracepoint_enable("sched:sched_process_wait", &skel.progs.on_sched_wait)?;
310            compat::cond_tracepoint_enable("sched:sched_process_hang", &skel.progs.on_sched_hang)?;
311
312            // Set up multiple ringbuffers for scalability
313            let num_cpus = num_possible_cpus()?;
314            let rb_cnt = scxtop::topology::calculate_default_ringbuf_count(num_cpus);
315            let rb_cpu_mapping = scxtop::topology::setup_cpu_to_ringbuf_mapping(rb_cnt, num_cpus)?;
316
317            log::info!("Using {} ringbuffers for {} CPUs", rb_cnt, num_cpus);
318
319            // Set up CPU-to-ringbuffer mapping in BPF
320            let cpu_cnt_pow2 = num_cpus.next_power_of_two();
321            skel.maps.rodata_data.as_mut().unwrap().rb_cpu_map_mask = (cpu_cnt_pow2 - 1) as u64;
322
323            // Set max entries for the CPU-to-ringbuf map array
324            skel.maps
325                .data_rb_cpu_map
326                .set_max_entries(cpu_cnt_pow2 as u32)?;
327
328            // Set max entries for events hash-of-maps
329            skel.maps.events.set_max_entries(rb_cnt as u32)?;
330
331            // Load the BPF skeleton (no graceful handling for trace mode - requires root)
332            let mut skel = skel.load()?;
333
334            // Populate the CPU-to-ringbuffer mapping after loading
335            for (cpu_id, &rb_id) in rb_cpu_mapping.iter().enumerate() {
336                if cpu_id < cpu_cnt_pow2 {
337                    skel.maps.data_rb_cpu_map.update(
338                        &(cpu_id as u32).to_ne_bytes(),
339                        &rb_id.to_ne_bytes(),
340                        libbpf_rs::MapFlags::ANY,
341                    )?;
342                }
343            }
344
345            skel.maps.data_data.as_mut().unwrap().enable_bpf_events = false;
346
347            // Attach programs (no graceful handling for trace mode - requires root)
348            let mut links = vec![
349                skel.progs.on_sched_cpu_perf.attach()?,
350                skel.progs.scx_sched_reg.attach()?,
351                skel.progs.scx_sched_unreg.attach()?,
352                skel.progs.on_sched_switch.attach()?,
353                skel.progs.on_sched_wakeup.attach()?,
354                skel.progs.on_sched_wakeup_new.attach()?,
355                skel.progs.on_sched_waking.attach()?,
356                skel.progs.on_sched_migrate_task.attach()?,
357                skel.progs.on_sched_fork.attach()?,
358                skel.progs.on_sched_exec.attach()?,
359                skel.progs.on_sched_exit.attach()?,
360            ];
361
362            // 6.13 compatibility
363            if compat::ksym_exists("scx_bpf_dsq_insert_vtime")? {
364                if let Ok(link) = skel.progs.scx_insert_vtime.attach() {
365                    links.push(link);
366                }
367                if let Ok(link) = skel.progs.scx_insert.attach() {
368                    links.push(link);
369                }
370                if let Ok(link) = skel.progs.scx_dsq_move.attach() {
371                    links.push(link);
372                }
373                if let Ok(link) = skel.progs.scx_dsq_move_vtime.attach() {
374                    links.push(link);
375                }
376                if let Ok(link) = skel.progs.scx_dsq_move_set_vtime.attach() {
377                    links.push(link);
378                }
379                if let Ok(link) = skel.progs.scx_dsq_move_set_slice.attach() {
380                    links.push(link);
381                }
382                // v2 API variants (6.19+)
383                if compat::ksym_exists("scx_bpf_dsq_insert___v2")? {
384                    if let Ok(link) = skel.progs.scx_insert_v2.attach() {
385                        links.push(link);
386                    }
387                }
388                if compat::ksym_exists("__scx_bpf_dsq_insert_vtime")? {
389                    if let Ok(link) = skel.progs.scx_insert_vtime_args.attach() {
390                        links.push(link);
391                    }
392                }
393            } else {
394                if let Ok(link) = skel.progs.scx_dispatch.attach() {
395                    links.push(link);
396                }
397                if let Ok(link) = skel.progs.scx_dispatch_vtime.attach() {
398                    links.push(link);
399                }
400                if let Ok(link) = skel.progs.scx_dispatch_from_dsq_set_vtime.attach() {
401                    links.push(link);
402                }
403                if let Ok(link) = skel.progs.scx_dispatch_from_dsq_set_slice.attach() {
404                    links.push(link);
405                }
406                if let Ok(link) = skel.progs.scx_dispatch_from_dsq.attach() {
407                    links.push(link);
408                }
409                if let Ok(link) = skel.progs.scx_dispatch_vtime_from_dsq.attach() {
410                    links.push(link);
411                }
412            }
413            if let Ok(link) = skel.progs.on_cpuhp_enter.attach() {
414                links.push(link);
415            }
416            if let Ok(link) = skel.progs.on_cpuhp_exit.attach() {
417                links.push(link);
418            }
419            if let Ok(link) = skel.progs.on_softirq_entry.attach() {
420                links.push(link);
421            }
422            if let Ok(link) = skel.progs.on_softirq_exit.attach() {
423                links.push(link);
424            }
425
426            // Counter for events dropped due to invalid timestamps (userspace filtering)
427            let dropped_invalid_ts = Arc::new(std::sync::atomic::AtomicU64::new(0));
428
429            // Create shutdown flag early so it can be used in ringbuffer callbacks
430            let shutdown = Arc::new(AtomicBool::new(false));
431
432            // Create multiple ringbuffers and add them to the hash-of-maps
433            let events_map_fd = skel.maps.events.as_fd().as_raw_fd();
434            let mut rb_fds = Vec::new();
435            let mut rb_managers: Vec<SendRingBuffer> = Vec::new();
436
437            for rb_id in 0..rb_cnt {
438                // Create individual ringbuffer (size must be power of 2)
439                let rb_fd = unsafe {
440                    libbpf_sys::bpf_map_create(
441                        libbpf_sys::BPF_MAP_TYPE_RINGBUF,
442                        std::ptr::null(),
443                        0,
444                        0,
445                        (32 * 1024 * 1024) as u32, // 32MB per ringbuffer (must be power of 2)
446                        std::ptr::null(),
447                    )
448                };
449
450                if rb_fd < 0 {
451                    bail!(
452                        "Failed to create ringbuffer #{}: {}",
453                        rb_id,
454                        std::io::Error::last_os_error()
455                    );
456                }
457
458                // Add ringbuffer to hash-of-maps
459                let rb_id_u32 = rb_id as u32;
460                let ret = unsafe {
461                    libbpf_sys::bpf_map_update_elem(
462                        events_map_fd,
463                        &rb_id_u32 as *const u32 as *const std::ffi::c_void,
464                        &rb_fd as *const i32 as *const std::ffi::c_void,
465                        libbpf_sys::BPF_NOEXIST.into(),
466                    )
467                };
468
469                if ret < 0 {
470                    bail!(
471                        "Failed to add ringbuffer #{} to hash-of-maps: {}",
472                        rb_id,
473                        std::io::Error::last_os_error()
474                    );
475                }
476
477                rb_fds.push(rb_fd);
478            }
479
480            // Set up ring buffer managers using raw libbpf C API
481            // We use the C API because we're creating ringbuffers dynamically
482            struct RingBufContext {
483                dropped_invalid_ts: Arc<std::sync::atomic::AtomicU64>,
484                action_tx: mpsc::UnboundedSender<Action>,
485                shutdown: Arc<AtomicBool>,
486            }
487
488            extern "C" fn ring_buffer_sample_callback(
489                ctx: *mut std::ffi::c_void,
490                data: *mut std::ffi::c_void,
491                size: u64,
492            ) -> std::ffi::c_int {
493                unsafe {
494                    let ctx = &*(ctx as *const RingBufContext);
495
496                    // Stop processing if shutdown requested
497                    if ctx.shutdown.load(std::sync::atomic::Ordering::Relaxed) {
498                        return 0;
499                    }
500
501                    let mut event = bpf_event::default();
502                    let copy_size = std::cmp::min(size as usize, std::mem::size_of::<bpf_event>());
503                    std::ptr::copy_nonoverlapping(
504                        data as *const u8,
505                        &mut event as *mut bpf_event as *mut u8,
506                        copy_size,
507                    );
508
509                    // Drop events with invalid timestamps
510                    if event.ts == 0 {
511                        ctx.dropped_invalid_ts
512                            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
513                        return 0;
514                    }
515
516                    let mut edm = EventDispatchManager::new(None, None);
517                    edm.register_bpf_handler(Box::new(BpfEventActionPublisher::new(
518                        ctx.action_tx.clone(),
519                    )));
520                    let _ = edm.on_event(&event);
521                }
522                0
523            }
524
525            for rb_fd in &rb_fds {
526                let ctx = Box::new(RingBufContext {
527                    dropped_invalid_ts: dropped_invalid_ts.clone(),
528                    action_tx: action_tx.clone(),
529                    shutdown: shutdown.clone(),
530                });
531                let ctx_ptr = Box::into_raw(ctx) as *mut std::ffi::c_void;
532
533                let rb_ptr = unsafe {
534                    libbpf_sys::ring_buffer__new(
535                        *rb_fd,
536                        Some(ring_buffer_sample_callback),
537                        ctx_ptr,
538                        std::ptr::null(),
539                    )
540                };
541
542                if rb_ptr.is_null() {
543                    unsafe {
544                        let _ = Box::from_raw(ctx_ptr as *mut RingBufContext);
545                    }
546                    bail!("Failed to create ring buffer manager");
547                }
548
549                rb_managers.push(SendRingBuffer(rb_ptr));
550            }
551
552            // Set up the background threads to poll all ringbuffers
553            let stop_poll = shutdown.clone();
554            let stop_stats = shutdown.clone();
555
556            let mut ringbuffer_handles = Vec::new();
557            let mut producer_handles = Vec::new();
558
559            // Spawn a separate blocking task for each ringbuffer
560            // Use spawn_blocking because rb.poll() is a blocking C FFI call
561            for (rb_id, rb) in rb_managers.into_iter().enumerate() {
562                let stop_poll_clone = stop_poll.clone();
563                ringbuffer_handles.push(tokio::task::spawn_blocking(move || {
564                    info!("ringbuffer #{} task started", rb_id);
565                    let mut poll_count = 0;
566                    loop {
567                        // Poll with 1ms timeout (blocking call)
568                        rb.poll(1);
569                        poll_count += 1;
570                        if stop_poll_clone.load(Ordering::Relaxed) {
571                            info!(
572                                "ringbuffer #{} received shutdown after {} polls",
573                                rb_id, poll_count
574                            );
575                            // Consume remaining events
576                            let consumed = rb.consume();
577                            info!("ringbuffer #{} consumed {} events", rb_id, consumed);
578                            // Free the ring buffer
579                            rb.free();
580                            info!("ringbuffer #{} freed", rb_id);
581                            break;
582                        }
583                    }
584                    info!("ringbuffer #{} exiting", rb_id);
585                }));
586            }
587            info!(
588                "spawned {} ringbuffer polling tasks",
589                ringbuffer_handles.len()
590            );
591
592            if trace_args.system_stats {
593                let mut cpu_stat_tracker = CpuStatTracker::default();
594                let mut mem_stats = MemStatSnapshot::default();
595                let mut system = System::new_all();
596                let action_tx_clone = action_tx.clone();
597
598                producer_handles.push(tokio::spawn(async move {
599                    info!("stats task started");
600                    let mut stats_count = 0;
601                    loop {
602                        if stop_stats.load(Ordering::Relaxed) {
603                            info!("stats task received shutdown after {} samples", stats_count);
604                            break;
605                        }
606                        let ts = get_clock_value(libc::CLOCK_BOOTTIME);
607
608                        cpu_stat_tracker
609                            .update(&mut system)
610                            .expect("Failed to update cpu stats");
611
612                        mem_stats.update().expect("Failed to update mem stats");
613
614                        let sys_stat_action = Action::SystemStat(SystemStatAction {
615                            ts,
616                            cpu_data_prev: cpu_stat_tracker.prev.clone(),
617                            cpu_data_current: cpu_stat_tracker.current.clone(),
618                            mem_info: mem_stats.clone(),
619                        });
620                        action_tx_clone
621                            .send(sys_stat_action)
622                            .expect("Failed to send CpuStat action");
623
624                        stats_count += 1;
625                        tokio::time::sleep(Duration::from_millis(100)).await;
626                    }
627                    info!("stats task exiting");
628                }));
629            }
630
631            let trace_file_prefix = config.trace_file_prefix().to_string();
632            let trace_file = trace_args.output_file.clone();
633            let mut trace_manager = PerfettoTraceManager::new(trace_file_prefix, None);
634
635            // Embed topology metadata in traces for cross-machine analysis
636            {
637                use scx_utils::Topology;
638                if let Ok(topo) = Topology::new() {
639                    let mut cpu_to_llc = std::collections::HashMap::new();
640                    let mut cpu_to_numa = std::collections::HashMap::new();
641                    let mut cpu_to_core = std::collections::HashMap::new();
642                    for cpu in topo.all_cpus.values() {
643                        cpu_to_llc.insert(cpu.id as u32, cpu.llc_id as u32);
644                        cpu_to_numa.insert(cpu.id as u32, cpu.node_id as u32);
645                        cpu_to_core.insert(cpu.id as u32, cpu.core_id as u32);
646                    }
647                    trace_manager.set_topology(cpu_to_llc, cpu_to_numa, cpu_to_core);
648                }
649            }
650
651            info!("starting trace for {}ms", trace_args.trace_ms);
652            trace_manager.start()?;
653            let mut tracer = Tracer::new(skel);
654            tracer.trace(&trace_args.kprobes)?;
655
656            let shutdown_trace = shutdown.clone();
657            let trace_handle = tokio::spawn(async move {
658                debug!("trace generation task started");
659                let mut count = 0;
660                let mut last_log = std::time::Instant::now();
661                loop {
662                    tokio::select! {
663                        // Check shutdown flag to stop early if requested
664                        _ = tokio::time::sleep(Duration::from_millis(100)) => {
665                            if shutdown_trace.load(Ordering::Relaxed) {
666                                break;
667                            }
668                        }
669                        action = action_rx.recv() => {
670                            if let Some(a) = action {
671                                count += 1;
672                                if last_log.elapsed() > std::time::Duration::from_secs(1) {
673                                    debug!("trace task: {} events processed", count);
674                                    last_log = std::time::Instant::now();
675                                }
676                                trace_manager
677                                    .on_action(&a)
678                                    .expect("Action should have been resolved");
679                                // After processing, check shutdown to avoid
680                                // draining the entire buffered channel through
681                                // select! one event at a time.
682                                if shutdown_trace.load(Ordering::Relaxed) {
683                                    break;
684                                }
685                            } else {
686                                break;
687                            }
688                        }
689                    }
690                }
691                // Drain remaining events in a tight loop without select!
692                // overhead. This is much faster for large buffered channels.
693                info!("trace task: draining remaining events");
694                while let Ok(a) = action_rx.try_recv() {
695                    count += 1;
696                    trace_manager
697                        .on_action(&a)
698                        .expect("Action should have been resolved");
699                }
700                info!("trace task: stopping trace manager");
701                trace_manager.stop(trace_file, None).unwrap();
702                info!("trace file compiled, collected {count} events");
703            });
704
705            info!("waiting for trace duration ({}ms)", trace_args.trace_ms);
706            tokio::time::sleep(Duration::from_millis(trace_args.trace_ms)).await;
707            info!("trace duration complete, beginning shutdown");
708
709            // Proper shutdown sequence to avoid hanging:
710            // 1) Stop new BPF events by detaching programs
711            // 2) Set shutdown flag to stop polling tasks
712            // 3) Wait for all ringbuffer tasks to consume remaining events and exit
713            // 4) Wait for stats task to exit
714            // 5) Drop action_tx to close the channel (all producers are done)
715            // 6) Wait for trace generation to complete
716            info!("shutdown: clearing BPF links");
717            tracer.clear_links()?;
718            info!("shutdown: BPF links cleared");
719            drop(links);
720            info!("shutdown: links dropped");
721
722            info!("shutdown: setting shutdown flag");
723            shutdown.store(true, Ordering::Relaxed);
724            info!(
725                "shutdown: flag set, waiting for {} ringbuffer tasks",
726                ringbuffer_handles.len()
727            );
728
729            // Wait for all ringbuffer polling tasks to finish consuming
730            let results = join_all(ringbuffer_handles).await;
731            info!("shutdown: all {} ringbuffer tasks joined", results.len());
732            for (idx, result) in results.iter().enumerate() {
733                if let Err(e) = result {
734                    eprintln!("Ringbuffer task {} panicked: {e}", idx);
735                } else {
736                    debug!("ringbuffer task {} exited successfully", idx);
737                }
738            }
739            info!("shutdown: ringbuffer tasks complete");
740
741            // Wait for producer tasks (stats) to complete
742            info!(
743                "shutdown: waiting for {} producer tasks",
744                producer_handles.len()
745            );
746            let results = join_all(producer_handles).await;
747            info!("shutdown: all {} producer tasks joined", results.len());
748            for (idx, result) in results.iter().enumerate() {
749                if let Err(e) = result {
750                    eprintln!("Producer task {} panicked: {e}", idx);
751                } else {
752                    debug!("producer task {} exited successfully", idx);
753                }
754            }
755            info!("shutdown: producer tasks complete");
756
757            // Now safe to drop action_tx - all producers are done
758            info!("shutdown: dropping action_tx");
759            drop(action_tx);
760            info!("shutdown: action_tx dropped, waiting for trace generation");
761
762            // Wait for trace generation to complete
763            if let Err(e) = trace_handle.await {
764                eprintln!("Trace generation task panicked: {e}");
765            }
766            info!("shutdown: trace generation complete");
767
768            info!("shutdown: collecting final stats");
769            let stats = tracer.stats()?;
770            info!("shutdown: {stats:?}");
771
772            info!("shutdown: complete");
773            Ok(())
774        })
775}
776
777fn run_tui(tui_args: &TuiArgs) -> Result<()> {
778    if let Ok(log_path) = std::env::var("RUST_LOG_PATH") {
779        let log_level = match std::env::var("RUST_LOG") {
780            Ok(v) => LevelFilter::from_str(&v)?,
781            Err(_) => LevelFilter::Info,
782        };
783
784        WriteLogger::init(
785            log_level,
786            simplelog::Config::default(),
787            File::create(log_path)?,
788        )?;
789
790        log_panics::Config::new()
791            .backtrace_mode(log_panics::BacktraceMode::Resolved)
792            .install_panic_hook();
793    };
794
795    let config = Config::merge([
796        Config::from(tui_args.clone()),
797        Config::load_or_default().expect("Failed to load config or load default config"),
798    ]);
799    let keymap = config.active_keymap.clone();
800
801    // Calculate how many ringbuffers we'll need to ensure enough worker threads
802    let worker_threads = config.worker_threads() as usize;
803    let num_cpus = num_possible_cpus()?;
804    let rb_cnt = scxtop::topology::calculate_default_ringbuf_count(num_cpus);
805
806    // Ensure we have at least rb_cnt + 4 worker threads
807    // (+4 for UI rendering, event handling, and other async tasks)
808    let required_threads = std::cmp::max(rb_cnt + 4, worker_threads);
809
810    tokio::runtime::Builder::new_multi_thread()
811        .enable_all()
812        .worker_threads(required_threads)
813        .build()
814        .unwrap()
815        .block_on(async {
816            // Declare open_object at the very beginning so it lives for the entire async block
817            let mut open_object = MaybeUninit::uninit();
818
819            let (action_tx, mut action_rx) = mpsc::unbounded_channel();
820
821            // Check capabilities early to determine if we can run with BPF functionality
822            let has_bpf_cap = check_bpf_capability();
823            let mut capability_warnings = Vec::new();
824            let mut _bpf_enabled = false;
825            let mut links = Vec::new();
826            let mut event_rb_data_opt: Option<(
827                Vec<i32>, // rb_fds
828                Arc<std::sync::atomic::AtomicU64>, // dropped_invalid_ts
829                mpsc::UnboundedSender<Action>, // action_tx for ringbuffer contexts
830            )> = None;
831            let mut skel_opt = None;
832
833            if has_bpf_cap {
834                // Try to initialize BPF components
835                let mut builder = BpfSkelBuilder::default();
836                if config.debug() {
837                    builder.obj_builder.debug(true);
838                }
839                let bpf_publisher = BpfEventActionPublisher::new(action_tx.clone());
840                let mut edm = EventDispatchManager::new(None, None);
841                edm.register_bpf_handler(Box::new(bpf_publisher));
842
843                // Try to open the BPF skeleton with graceful error handling
844                match builder.open(&mut open_object) {
845                    Ok(mut skel) => {
846                        skel.maps.rodata_data.as_mut().unwrap().long_tail_tracing_min_latency_ns =
847                            tui_args.experimental_long_tail_tracing_min_latency_ns;
848
849                        let _map_handle = if tui_args.layered {
850                            skel.maps.rodata_data.as_mut().unwrap().layered = true;
851                            action_tx.send(Action::UpdateColVisibility(UpdateColVisibilityAction {
852                                table: "Process".to_string(),
853                                col: "Layer ID".to_string(),
854                                visible: true,
855                            }))?;
856                            action_tx.send(Action::UpdateColVisibility(UpdateColVisibilityAction {
857                                table: "Thread".to_string(),
858                                col: "Layer ID".to_string(),
859                                visible: true,
860                            }))?;
861                            match layered_util::attach_to_existing_map("task_ctxs", &mut skel.maps.task_ctxs) {
862                                Ok(handle) => Some(handle),
863                                Err(e) => {
864                                    capability_warnings.push(format!("Failed to attach to layered map: {e}"));
865                                    None
866                                }
867                            }
868                        } else {
869                            None
870                        };
871
872                        // Set up multiple ringbuffers for scalability
873                        let num_cpus = num_possible_cpus()?;
874                        let rb_cnt = scxtop::topology::calculate_default_ringbuf_count(num_cpus);
875                        let rb_cpu_mapping = scxtop::topology::setup_cpu_to_ringbuf_mapping(rb_cnt, num_cpus)?;
876
877                        log::info!("Using {} ringbuffers for {} CPUs", rb_cnt, num_cpus);
878
879                        // Set up CPU-to-ringbuffer mapping in BPF
880                        let cpu_cnt_pow2 = num_cpus.next_power_of_two();
881                        skel.maps.rodata_data.as_mut().unwrap().rb_cpu_map_mask = (cpu_cnt_pow2 - 1) as u64;
882
883                        // Set max entries for the CPU-to-ringbuf map array
884                        if let Err(e) = skel.maps.data_rb_cpu_map.set_max_entries(cpu_cnt_pow2 as u32) {
885                            capability_warnings.push(format!("Failed to set CPU-to-ringbuf map size: {e}"));
886                        }
887
888                        // Set max entries for events hash-of-maps
889                        if let Err(e) = skel.maps.events.set_max_entries(rb_cnt as u32) {
890                            capability_warnings.push(format!("Failed to set ringbuf count: {e}"));
891                        }
892
893                        if let Err(e) = compat::cond_kprobe_enable("gpu_memory_total", &skel.progs.on_gpu_memory_total) {
894                            capability_warnings.push(format!("Failed to enable gpu_memory_total kprobe: {e}"));
895                        }
896                        if let Err(e) = compat::cond_kprobe_enable("hw_pressure_update", &skel.progs.on_hw_pressure_update) {
897                            capability_warnings.push(format!("Failed to enable hw_pressure_update kprobe: {e}"));
898                        }
899                        if let Err(e) = compat::cond_tracepoint_enable("sched:sched_process_wait", &skel.progs.on_sched_wait) {
900                            capability_warnings.push(format!("Failed to enable sched_process_wait tracepoint: {e}"));
901                        }
902                        if let Err(e) = compat::cond_tracepoint_enable("sched:sched_process_hang", &skel.progs.on_sched_hang) {
903                            capability_warnings.push(format!("Failed to enable sched_process_hang tracepoint: {e}"));
904                        }
905
906                        // Try to load the BPF skeleton
907                        match skel.load() {
908                            Ok(mut loaded_skel) => {
909                                // Populate the CPU-to-ringbuffer mapping after loading
910                                for (cpu_id, &rb_id) in rb_cpu_mapping.iter().enumerate() {
911                                    if cpu_id < cpu_cnt_pow2 {
912                                        if let Err(e) = loaded_skel.maps.data_rb_cpu_map.update(
913                                            &(cpu_id as u32).to_ne_bytes(),
914                                            &rb_id.to_ne_bytes(),
915                                            libbpf_rs::MapFlags::ANY,
916                                        ) {
917                                            capability_warnings.push(format!("Failed to set CPU {} -> ringbuf {}: {}", cpu_id, rb_id, e));
918                                        }
919                                    }
920                                }
921
922                                let (skel_links, attach_warnings) = attach_progs(&mut loaded_skel)?;
923                                links = skel_links;
924                                capability_warnings.extend(attach_warnings);
925
926                                if !links.is_empty() || is_root() {
927                                    // Only run scxtop_init if we have some BPF functionality
928                                    if let Err(e) = loaded_skel.progs.scxtop_init.test_run(ProgramInput::default()) {
929                                        capability_warnings.push(format!("Failed to initialize scxtop BPF program: {e}"));
930                                    }
931                                }
932
933                                // Set up event ring buffer if we have any attached programs
934                                if !links.is_empty() {
935                                    // Counter for events dropped due to invalid timestamps (userspace filtering)
936                                    let dropped_invalid_ts = Arc::new(std::sync::atomic::AtomicU64::new(0));
937
938                                    // Create multiple ringbuffers and add them to the hash-of-maps
939                                    let events_map_fd = loaded_skel.maps.events.as_fd().as_raw_fd();
940                                    let mut rb_fds = Vec::new();
941
942                                    for rb_id in 0..rb_cnt {
943                                        // Create individual ringbuffer (size must be power of 2)
944                                        let rb_fd = unsafe {
945                                            libbpf_sys::bpf_map_create(
946                                                libbpf_sys::BPF_MAP_TYPE_RINGBUF,
947                                                std::ptr::null(),
948                                                0,
949                                                0,
950                                                (32 * 1024 * 1024) as u32, // 32MB per ringbuffer (must be power of 2)
951                                                std::ptr::null(),
952                                            )
953                                        };
954
955                                        if rb_fd < 0 {
956                                            capability_warnings.push(format!("Failed to create ringbuffer #{}: {}", rb_id, std::io::Error::last_os_error()));
957                                            continue;
958                                        }
959
960                                        // Add ringbuffer to hash-of-maps
961                                        let rb_id_u32 = rb_id as u32;
962                                        let ret = unsafe {
963                                            libbpf_sys::bpf_map_update_elem(
964                                                events_map_fd,
965                                                &rb_id_u32 as *const u32 as *const std::ffi::c_void,
966                                                &rb_fd as *const i32 as *const std::ffi::c_void,
967                                                libbpf_sys::BPF_NOEXIST.into(),
968                                            )
969                                        };
970
971                                        if ret < 0 {
972                                            capability_warnings.push(format!("Failed to add ringbuffer #{} to hash-of-maps: {}", rb_id, std::io::Error::last_os_error()));
973                                            continue;
974                                        }
975
976                                        rb_fds.push(rb_fd);
977                                    }
978
979                                    if !rb_fds.is_empty() {
980                                        // Save data for later ringbuffer manager creation (after app is created)
981                                        event_rb_data_opt = Some((rb_fds, dropped_invalid_ts, action_tx.clone()));
982                                        _bpf_enabled = true;
983                                    }
984                                }
985
986                                skel_opt = Some(loaded_skel);
987                            }
988                            Err(e) => {
989                                if is_root() {
990                                    return Err(anyhow!("Failed to load BPF skeleton (running as root): {e}"));
991                                } else {
992                                    capability_warnings.push(format!("Failed to load BPF skeleton: {e}"));
993                                    capability_warnings.extend(get_capability_warning_message());
994                                }
995                            }
996                        }
997                    }
998                    Err(e) => {
999                        if is_root() {
1000                            return Err(anyhow!("Failed to open BPF skeleton (running as root): {e}"));
1001                        } else {
1002                            capability_warnings.push(format!("Failed to open BPF skeleton: {e}"));
1003                            capability_warnings.extend(get_capability_warning_message());
1004                        }
1005                    }
1006                }
1007            } else {
1008                // No BPF capabilities detected
1009                capability_warnings.extend(get_capability_warning_message());
1010            }
1011
1012            // Handle experimental long tail tracing if enabled and we have a skeleton
1013            if tui_args.experimental_long_tail_tracing {
1014                if let Some(ref mut skel) = skel_opt {
1015                    skel.maps.data_data.as_mut().unwrap().trace_duration_ns = config.trace_duration_ns();
1016                    skel.maps.data_data.as_mut().unwrap().trace_warmup_ns = config.trace_warmup_ns();
1017
1018                    let binary = tui_args
1019                        .experimental_long_tail_tracing_binary
1020                        .clone()
1021                        .unwrap();
1022                    let symbol = tui_args
1023                        .experimental_long_tail_tracing_symbol
1024                        .clone()
1025                        .unwrap();
1026
1027                    match skel.progs.long_tail_tracker_exit.attach_uprobe_with_opts(
1028                        -1, /* pid, -1 == all */
1029                        binary.clone(),
1030                        0,
1031                        UprobeOpts {
1032                            retprobe: true,
1033                            func_name: Some(symbol.clone()),
1034                            ..Default::default()
1035                        },
1036                    ) {
1037                        Ok(link) => links.push(link),
1038                        Err(e) => capability_warnings.push(format!("Failed to attach long tail tracker exit: {e}"))
1039                    }
1040
1041                    match skel.progs.long_tail_tracker_entry.attach_uprobe_with_opts(
1042                        -1, /* pid, -1 == all */
1043                        binary.clone(),
1044                        0,
1045                        UprobeOpts {
1046                            retprobe: false,
1047                            func_name: Some(symbol.clone()),
1048                            ..Default::default()
1049                        },
1050                    ) {
1051                        Ok(link) => links.push(link),
1052                        Err(e) => capability_warnings.push(format!("Failed to attach long tail tracker entry: {e}"))
1053                    }
1054                } else {
1055                    capability_warnings.push("Long tail tracing requested but BPF skeleton not available".to_string());
1056                }
1057            }
1058
1059            let mut tui = Tui::new(keymap.clone(), config.tick_rate_ms(), config.frame_rate_ms())?;
1060            let scheduler = read_file_string(SCHED_NAME_PATH).unwrap_or("".to_string());
1061
1062            // Create app with or without BPF skeleton
1063            let mut app = if let Some(skel) = skel_opt {
1064                App::new(
1065                    config,
1066                    scheduler,
1067                    100,
1068                    tui_args.process_id,
1069                    tui_args.layered,
1070                    action_tx.clone(),
1071                    skel,
1072                )?
1073            } else {
1074                // Create app without BPF functionality
1075                App::new_without_bpf(
1076                    config,
1077                    scheduler,
1078                    100,
1079                    tui_args.process_id,
1080                    tui_args.layered,
1081                    action_tx.clone(),
1082                )?
1083            };
1084
1085            // Pass warnings to the app if any exist
1086            if !capability_warnings.is_empty() {
1087                app.set_capability_warnings(capability_warnings);
1088            }
1089
1090            tui.enter()?;
1091
1092            // Start BPF event polling only if we have ringbuffer data
1093            let shutdown = app.should_quit.clone();
1094            let mut ringbuffer_handles = Vec::new();
1095            if let Some((rb_fds, dropped_invalid_ts, rb_action_tx)) = event_rb_data_opt {
1096                // Set up ring buffer managers using raw libbpf C API
1097                // Now that app is created, we can use app.should_quit for the callbacks
1098                struct RingBufContext {
1099                    dropped_invalid_ts: Arc<std::sync::atomic::AtomicU64>,
1100                    action_tx: mpsc::UnboundedSender<Action>,
1101                    shutdown: Arc<AtomicBool>,
1102                }
1103
1104                extern "C" fn ring_buffer_sample_callback(
1105                    ctx: *mut std::ffi::c_void,
1106                    data: *mut std::ffi::c_void,
1107                    size: u64,
1108                ) -> std::ffi::c_int {
1109                    unsafe {
1110                        let ctx = &*(ctx as *const RingBufContext);
1111
1112                        // Stop processing if shutdown requested
1113                        if ctx.shutdown.load(std::sync::atomic::Ordering::Relaxed) {
1114                            return 0;
1115                        }
1116
1117                        let mut event = bpf_event::default();
1118                        let copy_size = std::cmp::min(size as usize, std::mem::size_of::<bpf_event>());
1119                        std::ptr::copy_nonoverlapping(
1120                            data as *const u8,
1121                            &mut event as *mut bpf_event as *mut u8,
1122                            copy_size,
1123                        );
1124
1125                        // Drop events with invalid timestamps
1126                        if event.ts == 0 {
1127                            ctx.dropped_invalid_ts.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1128                            return 0;
1129                        }
1130
1131                        let mut edm = EventDispatchManager::new(None, None);
1132                        edm.register_bpf_handler(Box::new(BpfEventActionPublisher::new(ctx.action_tx.clone())));
1133                        let _ = edm.on_event(&event);
1134                    }
1135                    0
1136                }
1137
1138                // Spawn a separate task for each ringbuffer
1139                for rb_fd in rb_fds {
1140                    let ctx = Box::new(RingBufContext {
1141                        dropped_invalid_ts: dropped_invalid_ts.clone(),
1142                        action_tx: rb_action_tx.clone(),
1143                        shutdown: shutdown.clone(),
1144                    });
1145                    let ctx_ptr = Box::into_raw(ctx) as *mut std::ffi::c_void;
1146
1147                    let rb_ptr = unsafe {
1148                        libbpf_sys::ring_buffer__new(
1149                            rb_fd,
1150                            Some(ring_buffer_sample_callback),
1151                            ctx_ptr,
1152                            std::ptr::null(),
1153                        )
1154                    };
1155
1156                    if rb_ptr.is_null() {
1157                        unsafe { let _ = Box::from_raw(ctx_ptr as *mut RingBufContext); }
1158                        log::warn!("Failed to create ring buffer manager");
1159                        continue;
1160                    }
1161
1162                    let rb = SendRingBuffer(rb_ptr);
1163                    let shutdown_clone = shutdown.clone();
1164                    let rb_id = ringbuffer_handles.len();
1165                    // Use spawn_blocking because rb.poll() is a blocking C FFI call
1166                    ringbuffer_handles.push(tokio::task::spawn_blocking(move || {
1167                        loop {
1168                            // Poll with 1ms timeout (blocking call)
1169                            rb.poll(1);
1170                            if shutdown_clone.load(Ordering::Relaxed) {
1171                                // Consume remaining events
1172                                rb.consume();
1173                                // Free the ring buffer
1174                                rb.free();
1175                                log::debug!("ringbuffer #{} polling stopped", rb_id);
1176                                break;
1177                            }
1178                        }
1179                    }));
1180                }
1181            }
1182
1183            if tui_args.mangoapp_tracing {
1184                let stop_mangoapp = app.should_quit.clone();
1185                let mangoapp_path = CString::new(tui_args.mangoapp_path.clone()).unwrap();
1186                let poll_intvl_ms = tui_args.mangoapp_poll_intvl_ms;
1187                let tx = action_tx.clone();
1188                tokio::spawn(async move {
1189                    poll_mangoapp(
1190                        mangoapp_path,
1191                        poll_intvl_ms,
1192                        tx,
1193                        stop_mangoapp,
1194                    )
1195                    .await
1196                });
1197            }
1198
1199            loop {
1200                tokio::select! {
1201                    ev = tui.next() => {
1202                        let ev = ev?;
1203                        match ev {
1204                            Event::Quit => { action_tx.send(Action::Quit)?; },
1205                            Event::Tick => action_tx.send(Action::Tick)?,
1206                            Event::TickRateChange(tick_rate_ms) => action_tx.send(
1207                                Action::TickRateChange(std::time::Duration::from_millis(tick_rate_ms)),
1208                            )?,
1209                            Event::Render => {
1210                                if app.should_quit.load(Ordering::Relaxed) {
1211                                    break;
1212                                }
1213                                if app.state() != AppState::Pause {
1214                                    tui.draw(|f| app.render(f).expect("Failed to render application"))?;
1215                                }
1216                            }
1217                            Event::Key(_) => {
1218                                let action = get_action(&app, &keymap, ev);
1219                                action_tx.send(action)?;
1220                            }
1221                            _ => {}
1222                    }}
1223
1224                    ac = action_rx.recv() => {
1225                        let ac = ac.ok_or(anyhow!("actions channel closed"))?;
1226                        app.handle_action(&ac)?;
1227                    }
1228                }
1229            }
1230            tui.exit()?;
1231
1232            // Wait for all ringbuffer tasks to finish consuming remaining events
1233            log::debug!("waiting for {} ringbuffer tasks to complete", ringbuffer_handles.len());
1234            for handle in ringbuffer_handles {
1235                if let Err(e) = handle.await {
1236                    log::error!("Ringbuffer task panicked: {e}");
1237                }
1238            }
1239
1240            drop(links);
1241
1242            Ok(())
1243        })
1244}
1245
1246fn run_mcp(mcp_args: &scxtop::cli::McpArgs) -> Result<()> {
1247    use scx_utils::Topology;
1248    use scxtop::mcp::{events::action_to_mcp_event, McpServer, McpServerConfig};
1249    use std::sync::Arc;
1250
1251    // Set up logging to stderr (important: not stdout, which is used for MCP protocol)
1252    TermLogger::init(
1253        match mcp_args.verbose {
1254            0 => LevelFilter::Warn,
1255            1 => LevelFilter::Info,
1256            2 => LevelFilter::Debug,
1257            _ => LevelFilter::Trace,
1258        },
1259        SimplelogConfig::default(),
1260        TerminalMode::Stderr,
1261        ColorChoice::Auto,
1262    )?;
1263
1264    // Initialize topology
1265    let topo = Topology::new().expect("Failed to create topology");
1266    let topo_arc = Arc::new(topo);
1267
1268    let mcp_config = McpServerConfig {
1269        daemon_mode: mcp_args.daemon,
1270        enable_logging: mcp_args.enable_logging,
1271    };
1272
1273    if mcp_args.daemon {
1274        // Daemon mode: Full BPF event processing
1275        tokio::runtime::Builder::new_multi_thread()
1276            .enable_all()
1277            .worker_threads(4)
1278            .build()
1279            .unwrap()
1280            .block_on(async {
1281                let mut open_object = MaybeUninit::uninit();
1282                let (action_tx, mut action_rx) = mpsc::unbounded_channel();
1283
1284                // Create shared stats for MCP server
1285                use scxtop::mcp::create_shared_stats;
1286                let shared_stats = create_shared_stats();
1287                let shared_stats_for_event_handler = shared_stats.clone();
1288
1289                // Set up BPF
1290                let builder = BpfSkelBuilder::default();
1291                let mut skel = builder.open(&mut open_object)?;
1292
1293                // Set up multiple ringbuffers for scalability
1294                let num_cpus = num_possible_cpus()?;
1295                let rb_cnt = scxtop::topology::calculate_default_ringbuf_count(num_cpus);
1296                let rb_cpu_mapping = scxtop::topology::setup_cpu_to_ringbuf_mapping(rb_cnt, num_cpus)?;
1297
1298                log::info!("Using {} ringbuffers for {} CPUs", rb_cnt, num_cpus);
1299
1300                // Set up CPU-to-ringbuffer mapping in BPF
1301                let cpu_cnt_pow2 = num_cpus.next_power_of_two();
1302                skel.maps.rodata_data.as_mut().unwrap().rb_cpu_map_mask = (cpu_cnt_pow2 - 1) as u64;
1303
1304                // Set max entries for the CPU-to-ringbuf map array
1305                skel.maps.data_rb_cpu_map.set_max_entries(cpu_cnt_pow2 as u32)?;
1306
1307                // Set max entries for events hash-of-maps
1308                skel.maps.events.set_max_entries(rb_cnt as u32)?;
1309
1310                let mut skel = skel.load()?;
1311
1312                // Populate the CPU-to-ringbuffer mapping after loading
1313                for (cpu_id, &rb_id) in rb_cpu_mapping.iter().enumerate() {
1314                    if cpu_id < cpu_cnt_pow2 {
1315                        skel.maps.data_rb_cpu_map.update(
1316                            &(cpu_id as u32).to_ne_bytes(),
1317                            &rb_id.to_ne_bytes(),
1318                            libbpf_rs::MapFlags::ANY,
1319                        )?;
1320                    }
1321                }
1322
1323                // Create ALL analyzers BEFORE setting up event handlers
1324                use scxtop::mcp::{
1325                    WakerWakeeAnalyzer, LatencyTracker, CpuHotspotAnalyzer, MigrationAnalyzer,
1326                    ProcessEventHistory, DsqMonitor, EventRateMonitor, WakeupChainTracker, EventBuffer,
1327                    SoftirqAnalyzer,
1328                };
1329                use std::sync::Mutex;
1330
1331                // 1. Waker/Wakee Analyzer
1332                let mut waker_wakee = WakerWakeeAnalyzer::new();
1333                waker_wakee.set_topology(topo_arc.clone());
1334                let waker_wakee_arc = Arc::new(Mutex::new(waker_wakee));
1335
1336                // 2. Latency Tracker
1337                let latency_tracker = LatencyTracker::new(1000); // 1 second window
1338                let latency_tracker_arc = Arc::new(Mutex::new(latency_tracker));
1339
1340                // 3. CPU Hotspot Analyzer
1341                let cpu_hotspot = CpuHotspotAnalyzer::new(100); // 100ms window
1342                let cpu_hotspot_arc = Arc::new(Mutex::new(cpu_hotspot));
1343
1344                // 4. Migration Analyzer
1345                let migration_analyzer = MigrationAnalyzer::new(1000); // 1 second window
1346                let migration_analyzer_arc = Arc::new(Mutex::new(migration_analyzer));
1347
1348                // 5. Process Event History
1349                let process_history = ProcessEventHistory::new(100); // 100 events per process
1350                let process_history_arc = Arc::new(Mutex::new(process_history));
1351
1352                // 6. DSQ Monitor
1353                let dsq_monitor = DsqMonitor::new();
1354                let dsq_monitor_arc = Arc::new(Mutex::new(dsq_monitor));
1355
1356                // 7. Event Rate Monitor
1357                let rate_monitor = EventRateMonitor::new(1000, 10); // 1s window, 10 baselines
1358                let rate_monitor_arc = Arc::new(Mutex::new(rate_monitor));
1359
1360                // 8. Wakeup Chain Tracker
1361                let wakeup_tracker = WakeupChainTracker::new(10); // max 10 chain length
1362                let wakeup_tracker_arc = Arc::new(Mutex::new(wakeup_tracker));
1363
1364                // 9. Event Buffer
1365                let event_buffer = EventBuffer::new();
1366                let event_buffer_arc = Arc::new(Mutex::new(event_buffer));
1367
1368                // 10. Softirq Analyzer
1369                let softirq_analyzer = SoftirqAnalyzer::new(10000); // 10 second window
1370                let softirq_analyzer_arc = Arc::new(Mutex::new(softirq_analyzer));
1371
1372                // Set up event dispatch manager
1373                let mut edm = EventDispatchManager::new(None, None);
1374                edm.register_bpf_handler(Box::new(BpfEventActionPublisher::new(action_tx.clone())));
1375
1376                // Counter for events dropped due to invalid timestamps (userspace filtering)
1377                let dropped_invalid_ts = Arc::new(std::sync::atomic::AtomicU64::new(0));
1378
1379                // Create shutdown flag early so it can be used in ringbuffer callbacks
1380                let shutdown = Arc::new(AtomicBool::new(false));
1381
1382                // Create multiple ringbuffers and add them to the hash-of-maps
1383                let events_map_fd = skel.maps.events.as_fd().as_raw_fd();
1384                let mut rb_fds = Vec::new();
1385                let mut rb_managers: Vec<SendRingBuffer> = Vec::new();
1386
1387                for rb_id in 0..rb_cnt {
1388                    // Create individual ringbuffer (size must be power of 2)
1389                    let rb_fd = unsafe {
1390                        libbpf_sys::bpf_map_create(
1391                            libbpf_sys::BPF_MAP_TYPE_RINGBUF,
1392                            std::ptr::null(),
1393                            0,
1394                            0,
1395                            (32 * 1024 * 1024) as u32, // 32MB per ringbuffer (must be power of 2)
1396                            std::ptr::null(),
1397                        )
1398                    };
1399
1400                    if rb_fd < 0 {
1401                        bail!("Failed to create ringbuffer #{}: {}", rb_id, std::io::Error::last_os_error());
1402                    }
1403
1404                    // Add ringbuffer to hash-of-maps
1405                    let rb_id_u32 = rb_id as u32;
1406                    let ret = unsafe {
1407                        libbpf_sys::bpf_map_update_elem(
1408                            events_map_fd,
1409                            &rb_id_u32 as *const u32 as *const std::ffi::c_void,
1410                            &rb_fd as *const i32 as *const std::ffi::c_void,
1411                            libbpf_sys::BPF_NOEXIST.into(),
1412                        )
1413                    };
1414
1415                    if ret < 0 {
1416                        bail!("Failed to add ringbuffer #{} to hash-of-maps: {}", rb_id, std::io::Error::last_os_error());
1417                    }
1418
1419                    rb_fds.push(rb_fd);
1420                }
1421
1422                // Set up ring buffer managers using raw libbpf C API
1423                // We use the C API because we're creating ringbuffers dynamically
1424
1425                // Context struct holding all the data needed by the callback
1426                struct McpRingBufContext {
1427                    dropped_invalid_ts: Arc<std::sync::atomic::AtomicU64>,
1428                    shared_stats: Arc<std::sync::RwLock<scxtop::mcp::SharedStats>>,
1429                    action_tx: mpsc::UnboundedSender<Action>,
1430                    waker_wakee: Arc<std::sync::Mutex<scxtop::mcp::WakerWakeeAnalyzer>>,
1431                    cpu_hotspot: Arc<std::sync::Mutex<scxtop::mcp::CpuHotspotAnalyzer>>,
1432                    migration_analyzer: Arc<std::sync::Mutex<scxtop::mcp::MigrationAnalyzer>>,
1433                    process_history: Arc<std::sync::Mutex<scxtop::mcp::ProcessEventHistory>>,
1434                    rate_monitor: Arc<std::sync::Mutex<scxtop::mcp::EventRateMonitor>>,
1435                    wakeup_tracker: Arc<std::sync::Mutex<scxtop::mcp::WakeupChainTracker>>,
1436                    softirq_analyzer: Arc<std::sync::Mutex<scxtop::mcp::SoftirqAnalyzer>>,
1437                    shutdown: Arc<AtomicBool>,
1438                }
1439
1440                extern "C" fn mcp_ring_buffer_callback(
1441                    ctx: *mut std::ffi::c_void,
1442                    data: *mut std::ffi::c_void,
1443                    size: u64,
1444                ) -> std::ffi::c_int {
1445                    unsafe {
1446                        let ctx = &*(ctx as *const McpRingBufContext);
1447
1448                        // Stop processing if shutdown requested
1449                        if ctx.shutdown.load(std::sync::atomic::Ordering::Relaxed) {
1450                            return 0;
1451                        }
1452
1453                        let mut event = bpf_event::default();
1454                        let copy_size = std::cmp::min(size as usize, std::mem::size_of::<bpf_event>());
1455                        std::ptr::copy_nonoverlapping(
1456                            data as *const u8,
1457                            &mut event as *mut bpf_event as *mut u8,
1458                            copy_size,
1459                        );
1460
1461                        // Drop events with invalid timestamps
1462                        if event.ts == 0 {
1463                            ctx.dropped_invalid_ts.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1464                            return 0;
1465                        }
1466
1467                        // Update shared stats from BPF event
1468                        if let Ok(mut stats) = ctx.shared_stats.write() {
1469                            stats.update_from_event(&event);
1470                        }
1471
1472                        // Feed events to all analyzers
1473                        use scxtop::bpf_intf;
1474                        let event_type = event.r#type as u32;
1475
1476                        // 1. Waker/Wakee Analyzer
1477                        if let Ok(mut analyzer) = ctx.waker_wakee.try_lock() {
1478                            match event_type {
1479                                bpf_intf::event_type_SCHED_WAKEUP => {
1480                                    let wakeup = &event.event.wakeup;
1481                                    analyzer.record_wakeup(
1482                                        wakeup.pid, wakeup.waker_pid,
1483                                        &String::from_utf8_lossy(&wakeup.waker_comm),
1484                                        event.cpu, event.ts,
1485                                    );
1486                                }
1487                                bpf_intf::event_type_SCHED_WAKING => {
1488                                    let waking = &event.event.waking;
1489                                    analyzer.record_wakeup(
1490                                        waking.pid, waking.waker_pid,
1491                                        &String::from_utf8_lossy(&waking.waker_comm),
1492                                        event.cpu, event.ts,
1493                                    );
1494                                }
1495                                bpf_intf::event_type_SCHED_SWITCH => {
1496                                    let switch = &event.event.sched_switch;
1497                                    analyzer.record_wakee_run(
1498                                        switch.next_pid,
1499                                        &String::from_utf8_lossy(&switch.next_comm),
1500                                        event.cpu, event.ts,
1501                                    );
1502                                }
1503                                _ => {}
1504                            }
1505                        }
1506
1507                        // 2. CPU Hotspot Analyzer
1508                        if let Ok(mut analyzer) = ctx.cpu_hotspot.try_lock() {
1509                            let json = serde_json::json!({
1510                                "cpu": event.cpu, "ts": event.ts, "event_type": event_type
1511                            });
1512                            analyzer.record_event(&json);
1513                        }
1514
1515                        // 3. Migration Analyzer
1516                        if let Ok(mut analyzer) = ctx.migration_analyzer.try_lock() {
1517                            if event_type == bpf_intf::event_type_SCHED_MIGRATE {
1518                                let migrate = &event.event.migrate;
1519                                let json = serde_json::json!({
1520                                    "pid": migrate.pid, "from_cpu": event.cpu,
1521                                    "to_cpu": migrate.dest_cpu, "ts": event.ts
1522                                });
1523                                analyzer.record_migration(&json, event.ts);
1524                            }
1525                        }
1526
1527                        // 4. Process Event History
1528                        if let Ok(mut history) = ctx.process_history.try_lock() {
1529                            let (event_type_str, pid) = match event_type {
1530                                bpf_intf::event_type_SCHED_SWITCH => ("sched_switch", event.event.sched_switch.next_pid),
1531                                bpf_intf::event_type_SCHED_WAKEUP => ("sched_wakeup", event.event.wakeup.pid),
1532                                bpf_intf::event_type_SCHED_WAKING => ("sched_waking", event.event.waking.pid),
1533                                bpf_intf::event_type_SCHED_MIGRATE => ("sched_migrate", event.event.migrate.pid),
1534                                bpf_intf::event_type_EXIT => ("exit", event.event.exit.pid),
1535                                bpf_intf::event_type_EXEC => ("exec", event.event.exec.pid),
1536                                _ => ("other", 0),
1537                            };
1538                            if pid > 0 {
1539                                history.record_event(
1540                                    pid, event_type_str.to_string(), Some(event.cpu),
1541                                    serde_json::json!({"ts": event.ts}), event.ts,
1542                                );
1543                            }
1544                        }
1545
1546                        // 6. Event Rate Monitor
1547                        if let Ok(mut monitor) = ctx.rate_monitor.try_lock() {
1548                            let event_type_str = match event_type {
1549                                bpf_intf::event_type_SCHED_SWITCH => "sched_switch",
1550                                bpf_intf::event_type_SCHED_WAKEUP => "sched_wakeup",
1551                                bpf_intf::event_type_SCHED_WAKING => "sched_waking",
1552                                bpf_intf::event_type_SCHED_MIGRATE => "sched_migrate",
1553                                _ => "other",
1554                            };
1555                            monitor.record_event(event_type_str.to_string(), event.ts);
1556                        }
1557
1558                        // 7. Wakeup Chain Tracker
1559                        if let Ok(mut tracker) = ctx.wakeup_tracker.try_lock() {
1560                            if event_type == bpf_intf::event_type_SCHED_WAKEUP || event_type == bpf_intf::event_type_SCHED_WAKING {
1561                                let (pid, waker_pid) = if event_type == bpf_intf::event_type_SCHED_WAKEUP {
1562                                    (event.event.wakeup.pid, event.event.wakeup.waker_pid)
1563                                } else {
1564                                    (event.event.waking.pid, event.event.waking.waker_pid)
1565                                };
1566                                let json = serde_json::json!({
1567                                    "pid": pid, "waker_pid": waker_pid, "ts": event.ts, "cpu": event.cpu
1568                                });
1569                                tracker.record_wakeup(&json, event.ts);
1570                            }
1571                        }
1572
1573                        // 8. Softirq Analyzer
1574                        if let Ok(mut analyzer) = ctx.softirq_analyzer.try_lock() {
1575                            if event_type == bpf_intf::event_type_SOFTIRQ {
1576                                let softirq = &event.event.softirq;
1577                                let json = serde_json::json!({
1578                                    "type": "softirq", "pid": softirq.pid, "softirq_nr": softirq.softirq_nr,
1579                                    "entry_ts": softirq.entry_ts, "exit_ts": softirq.exit_ts, "cpu": event.cpu,
1580                                });
1581                                analyzer.record_event(&json);
1582                            }
1583                        }
1584
1585                        // Dispatch to action channel
1586                        let mut edm = EventDispatchManager::new(None, None);
1587                        edm.register_bpf_handler(Box::new(BpfEventActionPublisher::new(ctx.action_tx.clone())));
1588                        let _ = edm.on_event(&event);
1589                    }
1590                    0
1591                }
1592
1593                for rb_fd in &rb_fds {
1594                    let ctx = Box::new(McpRingBufContext {
1595                        dropped_invalid_ts: dropped_invalid_ts.clone(),
1596                        shared_stats: shared_stats_for_event_handler.clone(),
1597                        action_tx: action_tx.clone(),
1598                        waker_wakee: waker_wakee_arc.clone(),
1599                        cpu_hotspot: cpu_hotspot_arc.clone(),
1600                        migration_analyzer: migration_analyzer_arc.clone(),
1601                        process_history: process_history_arc.clone(),
1602                        rate_monitor: rate_monitor_arc.clone(),
1603                        wakeup_tracker: wakeup_tracker_arc.clone(),
1604                        softirq_analyzer: softirq_analyzer_arc.clone(),
1605                        shutdown: shutdown.clone(),
1606                    });
1607                    let ctx_ptr = Box::into_raw(ctx) as *mut std::ffi::c_void;
1608
1609                    let rb_ptr = unsafe {
1610                        libbpf_sys::ring_buffer__new(
1611                            *rb_fd,
1612                            Some(mcp_ring_buffer_callback),
1613                            ctx_ptr,
1614                            std::ptr::null(),
1615                        )
1616                    };
1617
1618                    if rb_ptr.is_null() {
1619                        unsafe { let _ = Box::from_raw(ctx_ptr as *mut McpRingBufContext); }
1620                        bail!("Failed to create ring buffer manager");
1621                    }
1622
1623                    rb_managers.push(SendRingBuffer(rb_ptr));
1624                }
1625
1626                // Attach BPF programs initially
1627                let (initial_links, _warnings) = attach_progs(&mut skel)?;
1628
1629                // Initialize BPF program
1630                if !initial_links.is_empty() || is_root() {
1631                    skel.progs
1632                        .scxtop_init
1633                        .test_run(ProgramInput::default())
1634                        .ok();
1635                }
1636
1637                // Create BPF perf event attacher BEFORE passing skeleton to App
1638                // This gives us a handle to attach perf events for profiling
1639                // We create a closure that captures a raw pointer to the program
1640                use scxtop::mcp::BpfPerfEventAttacher;
1641                // Get raw pointer to the perf_sample_handler program as usize to make it Send
1642                let perf_program_addr = &skel.progs.perf_sample_handler as *const _ as usize;
1643
1644                let bpf_attacher = BpfPerfEventAttacher::new(move |perf_fd| {
1645                    // SAFETY: The skeleton is kept alive in the App and not dropped
1646                    // until the MCP server is done, so this pointer remains valid
1647                    unsafe {
1648                        // Cast back to ProgramImpl with Mut parameter
1649                        let prog =
1650                            &*(perf_program_addr as *const libbpf_rs::ProgramImpl<libbpf_rs::Mut>);
1651                        prog.attach_perf_event(perf_fd)
1652                            .map(|link| Box::new(link) as Box<dyn std::any::Any + Send>)
1653                            .map_err(|e| anyhow::anyhow!("Failed to attach perf event: {}", e))
1654                    }
1655                });
1656                let bpf_attacher_arc = Arc::new(bpf_attacher);
1657
1658                // Create event control for dynamic BPF program attachment/detachment
1659                use scxtop::mcp::{AttachCallback, EventControl, StatsControlCommand};
1660                let mut event_control_instance = EventControl::new();
1661
1662                // Create attach callback using skeleton pointer (similar to perf attacher)
1663                // SAFETY: Skeleton is kept alive in App until daemon shutdown
1664                let skel_ptr = &mut skel as *mut _ as usize;
1665                let attach_callback: AttachCallback = Box::new(move |program_names: &[&str]| {
1666                    unsafe {
1667                        let skel_ref = &mut *(skel_ptr as *mut BpfSkel);
1668                        attach_progs_selective(skel_ref, program_names).map(|(links, _)| links)
1669                    }
1670                });
1671
1672                // Give EventControl the initial links and callback, then immediately detach
1673                // to start with minimal overhead
1674                event_control_instance.set_bpf_links(initial_links, attach_callback);
1675                event_control_instance.disable_event_tracking()?;
1676                info!("BPF programs detached by default - use control_event_tracking to enable");
1677
1678                // Create stats control channel
1679                let (stats_tx, stats_rx) = mpsc::unbounded_channel::<StatsControlCommand>();
1680                event_control_instance.set_stats_control_channel(stats_tx);
1681
1682                // Wrap in Arc after configuration
1683                let event_control = Arc::new(event_control_instance);
1684
1685                // Create App (but don't use it in spawned tasks due to Send constraints)
1686                let config = Config::default_config();
1687                let scheduler =
1688                    read_file_string(SCHED_NAME_PATH).unwrap_or_else(|_| "".to_string());
1689                let mut app = App::new(
1690                    config,
1691                    scheduler,
1692                    100,
1693                    mcp_args.process_id,
1694                    mcp_args.layered,
1695                    action_tx.clone(),
1696                    skel,
1697                )?;
1698
1699                // Create analyzer control and register ALL analyzers
1700                use scxtop::mcp::AnalyzerControl;
1701
1702                let mut analyzer_control = AnalyzerControl::new();
1703                analyzer_control.set_event_control(event_control.clone());
1704
1705                // Register all analyzers
1706                analyzer_control.set_event_buffer(event_buffer_arc.clone());
1707                analyzer_control.set_latency_tracker(latency_tracker_arc.clone());
1708                analyzer_control.set_cpu_hotspot_analyzer(cpu_hotspot_arc.clone());
1709                analyzer_control.set_migration_analyzer(migration_analyzer_arc.clone());
1710                analyzer_control.set_process_history(process_history_arc.clone());
1711                analyzer_control.set_dsq_monitor(dsq_monitor_arc.clone());
1712                analyzer_control.set_rate_monitor(rate_monitor_arc.clone());
1713                analyzer_control.set_wakeup_tracker(wakeup_tracker_arc.clone());
1714                analyzer_control.set_waker_wakee_analyzer(waker_wakee_arc.clone());
1715                analyzer_control.set_softirq_analyzer(softirq_analyzer_arc.clone());
1716
1717                // Wrap analyzer control in Arc<Mutex<>>
1718                let analyzer_control = Arc::new(Mutex::new(analyzer_control));
1719
1720                // Create trace cache for perfetto analysis
1721                use std::collections::HashMap;
1722                let trace_cache = Arc::new(Mutex::new(HashMap::new()));
1723
1724                // Create MCP server
1725                let mut server = McpServer::new(mcp_config)
1726                    .with_topology(topo_arc)
1727                    .setup_scheduler_resource()
1728                    .setup_profiling_resources()
1729                    .with_bpf_perf_attacher(bpf_attacher_arc)
1730                    .with_shared_stats(shared_stats.clone())
1731                    .with_stats_client(None)
1732                    .with_event_control(event_control.clone())
1733                    .with_analyzer_control(analyzer_control.clone())
1734                    .with_trace_cache(trace_cache)
1735                    .setup_stats_resources();
1736
1737                // Enable event streaming
1738                let _event_stream_rx = server.enable_event_streaming();
1739                let resources = server.get_resources_handle();
1740
1741                // Get BPF stats collector for periodic sampling
1742                let bpf_stats = server.get_bpf_stats_collector();
1743
1744                // Get perf profiler for stack trace collection
1745                let perf_profiler = server.get_perf_profiler();
1746
1747                // Start BPF polling tasks - spawn a separate task for each ringbuffer
1748                let shutdown_poll = shutdown.clone();
1749
1750                let mut ringbuffer_handles = Vec::new();
1751                for (rb_id, rb) in rb_managers.into_iter().enumerate() {
1752                    let stop_poll_clone = shutdown_poll.clone();
1753                    ringbuffer_handles.push(tokio::spawn(async move {
1754                        loop {
1755                            // Poll with 1ms timeout
1756                            rb.poll(1);
1757                            if stop_poll_clone.load(Ordering::Relaxed) {
1758                                // Consume remaining events
1759                                rb.consume();
1760                                // Free the ring buffer
1761                                rb.free();
1762                                debug!("ringbuffer #{} polling stopped", rb_id);
1763                                break;
1764                            }
1765                        }
1766                    }));
1767                }
1768
1769                // Start controllable BPF stats collection task
1770                // Task responds to start/stop commands via channel, starts in stopped state
1771                if let Some(collector) = bpf_stats {
1772                    let shutdown_stats = shutdown.clone();
1773                    let mut stats_rx_task = stats_rx;
1774                    tokio::spawn(async move {
1775                        let mut running = false;
1776                        let mut interval_ms = 100u64;
1777                        let mut interval = tokio::time::interval(Duration::from_millis(interval_ms));
1778
1779                        loop {
1780                            tokio::select! {
1781                                // Handle control commands
1782                                Some(cmd) = stats_rx_task.recv() => {
1783                                    match cmd {
1784                                        StatsControlCommand::Start(new_interval_ms) => {
1785                                            running = true;
1786                                            interval_ms = new_interval_ms;
1787                                            interval = tokio::time::interval(Duration::from_millis(interval_ms));
1788                                            info!("Stats collection started with {}ms interval", interval_ms);
1789                                        }
1790                                        StatsControlCommand::Stop => {
1791                                            running = false;
1792                                            info!("Stats collection stopped");
1793                                        }
1794                                    }
1795                                }
1796
1797                                // Collect stats if running
1798                                _ = interval.tick(), if running => {
1799                                    if shutdown_stats.load(Ordering::Relaxed) {
1800                                        break;
1801                                    }
1802                                    let _ = collector.collect_sample();
1803                                }
1804
1805                                // Check shutdown even when not running
1806                                _ = tokio::time::sleep(Duration::from_millis(100)), if !running => {
1807                                    if shutdown_stats.load(Ordering::Relaxed) {
1808                                        break;
1809                                    }
1810                                }
1811                            }
1812                        }
1813                    });
1814                }
1815
1816                info!("MCP daemon started, processing BPF events");
1817
1818                // Main loop: handle both MCP server and action processing
1819                let mut mcp_server_task = Box::pin(server.run_async());
1820                let mcp_result;
1821                loop {
1822                    tokio::select! {
1823                        // Handle MCP server
1824                        result = &mut mcp_server_task => {
1825                            info!("MCP server exited");
1826                            shutdown.store(true, Ordering::Relaxed);
1827                            mcp_result = result;
1828                            break;
1829                        }
1830
1831                        // Handle actions from BPF
1832                        Some(action) = action_rx.recv() => {
1833                            // Check for shutdown
1834                            if matches!(action, Action::Quit) {
1835                                info!("Received quit action");
1836                                shutdown.store(true, Ordering::Relaxed);
1837                                mcp_result = Ok(());
1838                                break;
1839                            }
1840
1841                            // Feed perf samples to profiler if it's collecting
1842                            if let Some(ref profiler) = perf_profiler {
1843                                if let Action::PerfSample(ref perf_sample) = action {
1844                                    use scxtop::mcp::RawSample;
1845                                    profiler.add_sample(RawSample {
1846                                        address: perf_sample.instruction_pointer,
1847                                        pid: perf_sample.pid,
1848                                        cpu_id: perf_sample.cpu_id,
1849                                        is_kernel: perf_sample.is_kernel,
1850                                        kernel_stack: perf_sample.kernel_stack.clone(),
1851                                        user_stack: perf_sample.user_stack.clone(),
1852                                        layer_id: if perf_sample.layer_id >= 0 {
1853                                            Some(perf_sample.layer_id)
1854                                        } else {
1855                                            None
1856                                        },
1857                                    });
1858                                }
1859                            }
1860
1861                            // Update app state
1862                            let _ = app.handle_action(&action);
1863
1864                            // Convert action to MCP event and push to stream
1865                            if let Some(event) = action_to_mcp_event(&action) {
1866                                let _ = resources.push_event(event);
1867                            }
1868                        }
1869                    }
1870                }
1871
1872                // Wait for all ringbuffer tasks to finish consuming remaining events
1873                debug!("waiting for {} ringbuffer tasks to complete", ringbuffer_handles.len());
1874                for handle in ringbuffer_handles {
1875                    if let Err(e) = handle.await {
1876                        log::error!("Ringbuffer task panicked: {e}");
1877                    }
1878                }
1879
1880                // Links are managed by EventControl and will be dropped on shutdown
1881                mcp_result
1882            })
1883    } else {
1884        // One-shot mode: No BPF, just serve static data
1885        use std::collections::HashMap;
1886        use std::sync::Mutex;
1887        let trace_cache = Arc::new(Mutex::new(HashMap::new()));
1888        let mut server = McpServer::new(mcp_config)
1889            .with_topology(topo_arc)
1890            .setup_scheduler_resource()
1891            .setup_profiling_resources()
1892            .with_stats_client(None)
1893            .with_trace_cache(trace_cache)
1894            .setup_stats_resources();
1895        server.run_blocking()
1896    }
1897}
1898
1899fn main() -> Result<()> {
1900    let args = Cli::parse();
1901
1902    match &args.command.unwrap_or(Commands::Tui(args.tui)) {
1903        Commands::Tui(tui_args) => {
1904            run_tui(tui_args)?;
1905        }
1906        Commands::Trace(trace_args) => {
1907            run_trace(trace_args)?;
1908        }
1909        Commands::Mcp(mcp_args) => {
1910            run_mcp(mcp_args)?;
1911        }
1912        Commands::GenerateCompletions { shell, output } => {
1913            generate_completions(Cli::command(), *shell, output.clone())
1914                .unwrap_or_else(|_| panic!("Failed to generate completions for {shell}"));
1915        }
1916    }
1917    Ok(())
1918}