scxtop/
main.rs

1// Copyright (c) Meta Platforms, Inc. and affiliates.
2//
3// This software may be used and distributed according to the terms of the
4// GNU General Public License version 2.
5
6use scx_utils::compat;
7use scxtop::bpf_skel::types::bpf_event;
8use scxtop::cli::{generate_completions, Cli, Commands, TraceArgs, TuiArgs};
9use scxtop::config::Config;
10use scxtop::edm::{ActionHandler, BpfEventActionPublisher, BpfEventHandler, EventDispatchManager};
11use scxtop::layered_util;
12use scxtop::mangoapp::poll_mangoapp;
13use scxtop::search;
14use scxtop::tracer::Tracer;
15use scxtop::util::{
16    check_bpf_capability, get_capability_warning_message, get_clock_value, is_root,
17    read_file_string,
18};
19use scxtop::Action;
20use scxtop::App;
21use scxtop::CpuStatTracker;
22use scxtop::Event;
23use scxtop::Key;
24use scxtop::KeyMap;
25use scxtop::MemStatSnapshot;
26use scxtop::PerfettoTraceManager;
27use scxtop::SystemStatAction;
28use scxtop::Tui;
29use scxtop::SCHED_NAME_PATH;
30use scxtop::{available_kprobe_events, UpdateColVisibilityAction};
31use scxtop::{bpf_skel::*, AppState};
32
33use anyhow::anyhow;
34use anyhow::bail;
35use anyhow::Result;
36use clap::{CommandFactory, Parser};
37use futures::future::join_all;
38use libbpf_rs::libbpf_sys;
39use libbpf_rs::num_possible_cpus;
40use libbpf_rs::skel::OpenSkel;
41use libbpf_rs::skel::SkelBuilder;
42use libbpf_rs::Link;
43use libbpf_rs::MapCore;
44use libbpf_rs::ProgramInput;
45use libbpf_rs::UprobeOpts;
46use log::debug;
47use log::info;
48use ratatui::crossterm::event::{KeyCode::Char, KeyEvent};
49use simplelog::{
50    ColorChoice, Config as SimplelogConfig, LevelFilter, TermLogger, TerminalMode, WriteLogger,
51};
52use std::ffi::CString;
53use std::fs::File;
54use std::mem::MaybeUninit;
55use std::os::fd::AsFd;
56use std::os::fd::AsRawFd;
57use std::str::FromStr;
58use std::sync::atomic::AtomicBool;
59use std::sync::atomic::Ordering;
60use std::sync::Arc;
61use std::time::Duration;
62use sysinfo::System;
63use tokio::sync::mpsc;
64
65// Wrapper to make ring buffer pointer Send-safe for tokio spawn
66// SAFETY: We ensure the pointer remains valid for the task lifetime
67struct SendRingBuffer(*mut libbpf_sys::ring_buffer);
68unsafe impl Send for SendRingBuffer {}
69
70impl SendRingBuffer {
71    fn poll(&self, timeout: i32) -> i32 {
72        unsafe { libbpf_sys::ring_buffer__poll(self.0, timeout) }
73    }
74
75    fn consume(&self) -> i32 {
76        unsafe { libbpf_sys::ring_buffer__consume(self.0) }
77    }
78
79    fn free(self) {
80        unsafe { libbpf_sys::ring_buffer__free(self.0) }
81    }
82}
83
84fn get_action(app: &App, keymap: &KeyMap, event: Event) -> Action {
85    match event {
86        Event::Error => Action::None,
87        Event::Tick => Action::Tick,
88        Event::TickRateChange(tick_rate_ms) => {
89            Action::TickRateChange(std::time::Duration::from_millis(tick_rate_ms))
90        }
91        Event::Key(key) => handle_key_event(app, keymap, key),
92        Event::Paste(paste) => handle_input_entry(app, paste).unwrap_or(Action::None),
93        _ => Action::None,
94    }
95}
96
97fn handle_key_event(app: &App, keymap: &KeyMap, key: KeyEvent) -> Action {
98    match key.code {
99        Char(c) => {
100            // Check if we should handle this character as input for filtering
101            if let Some(action) = handle_input_entry(app, c.to_string()) {
102                action
103            } else {
104                // Check for state-specific key bindings before falling back to global keymap
105                match (app.state(), c) {
106                    // In BPF program detail view, 'p' toggles perf sampling
107                    (AppState::BpfProgramDetail, 'p') => Action::ToggleBpfPerfSampling,
108                    // Fall back to global keymap for all other cases
109                    _ => keymap.action(&Key::Char(c)),
110                }
111            }
112        }
113        _ => keymap.action(&Key::Code(key.code)),
114    }
115}
116
117fn handle_input_entry(app: &App, s: String) -> Option<Action> {
118    match app.state() {
119        AppState::PerfEvent | AppState::KprobeEvent => Some(Action::InputEntry(s)),
120        AppState::Default
121        | AppState::Llc
122        | AppState::Node
123        | AppState::Process
124        | AppState::Memory
125        | AppState::PerfTop
126        | AppState::BpfPrograms
127            if app.filtering() =>
128        {
129            Some(Action::InputEntry(s))
130        }
131        _ => None,
132    }
133}
134
135/// Attaches BPF programs to the skel, handling non-root scenarios gracefully
136fn attach_progs(skel: &mut BpfSkel) -> Result<(Vec<Link>, Vec<String>)> {
137    attach_progs_selective(skel, &[])
138}
139
140/// Attaches specified BPF programs to the skel
141/// If program_names is empty, attaches all programs
142fn attach_progs_selective(
143    skel: &mut BpfSkel,
144    program_names: &[&str],
145) -> Result<(Vec<Link>, Vec<String>)> {
146    let mut links = Vec::new();
147    let mut warnings = Vec::new();
148
149    // Check capabilities before attempting to attach
150    let has_bpf_cap = check_bpf_capability();
151
152    if !has_bpf_cap {
153        warnings
154            .push("BPF programs cannot be attached - scheduler monitoring disabled".to_string());
155        warnings.push("Try running as root or configure BPF permissions".to_string());
156        return Ok((links, warnings));
157    }
158
159    let attach_all = program_names.is_empty();
160
161    // Helper function to check if a program should be attached
162    let should_attach = |name: &str| -> bool { attach_all || program_names.contains(&name) };
163
164    // Helper macro to safely attach programs and collect warnings
165    macro_rules! safe_attach {
166        ($prog:expr, $name:literal) => {
167            if should_attach($name) {
168                match $prog.attach() {
169                    Ok(link) => {
170                        links.push(link);
171                    }
172                    Err(e) => {
173                        if is_root() {
174                            // If running as root and still failing, it's a real error
175                            return Err(anyhow!(
176                                "Failed to attach {} (running as root): {}",
177                                $name,
178                                e
179                            ));
180                        } else {
181                            warnings.push(format!("Failed to attach {}: {}", $name, e));
182                        }
183                    }
184                }
185            }
186        };
187    }
188
189    // Try to attach core scheduler probes
190    safe_attach!(skel.progs.on_sched_cpu_perf, "sched_cpu_perf");
191    safe_attach!(skel.progs.scx_sched_reg, "scx_sched_reg");
192    safe_attach!(skel.progs.scx_sched_unreg, "scx_sched_unreg");
193    safe_attach!(skel.progs.on_sched_switch, "on_sched_switch");
194    safe_attach!(skel.progs.on_sched_wakeup, "on_sched_wakeup");
195    safe_attach!(skel.progs.on_sched_wakeup_new, "sched_wakeup_new");
196    safe_attach!(skel.progs.on_sched_waking, "on_sched_waking");
197    safe_attach!(skel.progs.on_sched_migrate_task, "on_sched_migrate_task");
198    safe_attach!(skel.progs.on_sched_fork, "sched_fork");
199    safe_attach!(skel.progs.on_sched_exec, "sched_exec");
200    safe_attach!(skel.progs.on_sched_exit, "sched_exit");
201
202    // 6.13 compatibility probes
203    if compat::ksym_exists("scx_bpf_dsq_insert_vtime")? {
204        safe_attach!(skel.progs.scx_insert_vtime, "scx_insert_vtime");
205        safe_attach!(skel.progs.scx_insert, "scx_insert");
206        safe_attach!(skel.progs.scx_dsq_move, "scx_dsq_move");
207        safe_attach!(skel.progs.scx_dsq_move_set_vtime, "scx_dsq_move_set_vtime");
208        safe_attach!(skel.progs.scx_dsq_move_set_slice, "scx_dsq_move_set_slice");
209    } else {
210        safe_attach!(skel.progs.scx_dispatch, "scx_dispatch");
211        safe_attach!(skel.progs.scx_dispatch_vtime, "scx_dispatch_vtime");
212        safe_attach!(
213            skel.progs.scx_dispatch_from_dsq_set_vtime,
214            "scx_dispatch_from_dsq_set_vtime"
215        );
216        safe_attach!(
217            skel.progs.scx_dispatch_from_dsq_set_slice,
218            "scx_dispatch_from_dsq_set_slice"
219        );
220        safe_attach!(skel.progs.scx_dispatch_from_dsq, "scx_dispatch_from_dsq");
221    }
222
223    // Optional probes
224    safe_attach!(skel.progs.on_cpuhp_enter, "cpuhp_enter");
225    safe_attach!(skel.progs.on_cpuhp_exit, "cpuhp_exit");
226    safe_attach!(skel.progs.on_softirq_entry, "on_softirq_entry");
227    safe_attach!(skel.progs.on_softirq_exit, "on_softirq_exit");
228
229    // If no links were successfully attached and we're not root, provide helpful guidance
230    if links.is_empty() && !is_root() {
231        warnings.extend(get_capability_warning_message());
232    }
233
234    Ok((links, warnings))
235}
236
237fn run_trace(trace_args: &TraceArgs) -> Result<()> {
238    // Trace function always requires root privileges
239    if !is_root() {
240        return Err(anyhow!(
241            "Trace functionality requires root privileges. Please run as root"
242        ));
243    }
244
245    TermLogger::init(
246        match trace_args.verbose {
247            0 => simplelog::LevelFilter::Info,
248            1 => simplelog::LevelFilter::Debug,
249            _ => simplelog::LevelFilter::Trace,
250        },
251        SimplelogConfig::default(),
252        TerminalMode::Mixed,
253        ColorChoice::Auto,
254    )?;
255
256    let mut kprobe_events = available_kprobe_events()?;
257    kprobe_events.sort();
258    search::sorted_contains_all(&kprobe_events, &trace_args.kprobes)
259        .then_some(())
260        .ok_or_else(|| anyhow!("Invalid kprobe events"))?;
261
262    let config = Config::default_config();
263    let worker_threads = config.worker_threads() as usize;
264
265    // Calculate how many ringbuffers we'll need to ensure enough worker threads
266    let num_cpus = num_possible_cpus()?;
267    let rb_cnt = scxtop::topology::calculate_default_ringbuf_count(num_cpus);
268
269    // Ensure we have at least rb_cnt + 4 worker threads
270    // (+4 for trace generation, stats, and other async tasks)
271    let required_threads = std::cmp::max(rb_cnt + 4, worker_threads);
272
273    info!(
274        "Creating tokio runtime with {} worker threads for {} ringbuffers",
275        required_threads, rb_cnt
276    );
277
278    tokio::runtime::Builder::new_multi_thread()
279        .enable_all()
280        .worker_threads(required_threads)
281        .build()
282        .unwrap()
283        .block_on(async {
284            let (action_tx, mut action_rx) = mpsc::unbounded_channel();
285
286            // Set up the BPF skel and publisher
287            let mut open_object = MaybeUninit::uninit();
288            let mut builder = BpfSkelBuilder::default();
289            if trace_args.verbose > 2 {
290                builder.obj_builder.debug(true);
291            }
292
293            let mut skel = builder.open(&mut open_object)?;
294            compat::cond_kprobe_enable("gpu_memory_total", &skel.progs.on_gpu_memory_total)?;
295            compat::cond_kprobe_enable("hw_pressure_update", &skel.progs.on_hw_pressure_update)?;
296            compat::cond_tracepoint_enable("sched:sched_process_wait", &skel.progs.on_sched_wait)?;
297            compat::cond_tracepoint_enable("sched:sched_process_hang", &skel.progs.on_sched_hang)?;
298
299            // Set up multiple ringbuffers for scalability
300            let num_cpus = num_possible_cpus()?;
301            let rb_cnt = scxtop::topology::calculate_default_ringbuf_count(num_cpus);
302            let rb_cpu_mapping = scxtop::topology::setup_cpu_to_ringbuf_mapping(rb_cnt, num_cpus)?;
303
304            log::info!("Using {} ringbuffers for {} CPUs", rb_cnt, num_cpus);
305
306            // Set up CPU-to-ringbuffer mapping in BPF
307            let cpu_cnt_pow2 = num_cpus.next_power_of_two();
308            skel.maps.rodata_data.as_mut().unwrap().rb_cpu_map_mask = (cpu_cnt_pow2 - 1) as u64;
309
310            // Set max entries for the CPU-to-ringbuf map array
311            skel.maps
312                .data_rb_cpu_map
313                .set_max_entries(cpu_cnt_pow2 as u32)?;
314
315            // Set max entries for events hash-of-maps
316            skel.maps.events.set_max_entries(rb_cnt as u32)?;
317
318            // Load the BPF skeleton (no graceful handling for trace mode - requires root)
319            let mut skel = skel.load()?;
320
321            // Populate the CPU-to-ringbuffer mapping after loading
322            for (cpu_id, &rb_id) in rb_cpu_mapping.iter().enumerate() {
323                if cpu_id < cpu_cnt_pow2 {
324                    skel.maps.data_rb_cpu_map.update(
325                        &(cpu_id as u32).to_ne_bytes(),
326                        &rb_id.to_ne_bytes(),
327                        libbpf_rs::MapFlags::ANY,
328                    )?;
329                }
330            }
331
332            skel.maps.data_data.as_mut().unwrap().enable_bpf_events = false;
333
334            // Attach programs (no graceful handling for trace mode - requires root)
335            let mut links = vec![
336                skel.progs.on_sched_cpu_perf.attach()?,
337                skel.progs.scx_sched_reg.attach()?,
338                skel.progs.scx_sched_unreg.attach()?,
339                skel.progs.on_sched_switch.attach()?,
340                skel.progs.on_sched_wakeup.attach()?,
341                skel.progs.on_sched_wakeup_new.attach()?,
342                skel.progs.on_sched_waking.attach()?,
343                skel.progs.on_sched_migrate_task.attach()?,
344                skel.progs.on_sched_fork.attach()?,
345                skel.progs.on_sched_exec.attach()?,
346                skel.progs.on_sched_exit.attach()?,
347            ];
348
349            // 6.13 compatibility
350            if compat::ksym_exists("scx_bpf_dsq_insert_vtime")? {
351                if let Ok(link) = skel.progs.scx_insert_vtime.attach() {
352                    links.push(link);
353                }
354                if let Ok(link) = skel.progs.scx_insert.attach() {
355                    links.push(link);
356                }
357                if let Ok(link) = skel.progs.scx_dsq_move.attach() {
358                    links.push(link);
359                }
360                if let Ok(link) = skel.progs.scx_dsq_move_set_vtime.attach() {
361                    links.push(link);
362                }
363                if let Ok(link) = skel.progs.scx_dsq_move_set_slice.attach() {
364                    links.push(link);
365                }
366            } else {
367                if let Ok(link) = skel.progs.scx_dispatch.attach() {
368                    links.push(link);
369                }
370                if let Ok(link) = skel.progs.scx_dispatch_vtime.attach() {
371                    links.push(link);
372                }
373                if let Ok(link) = skel.progs.scx_dispatch_from_dsq_set_vtime.attach() {
374                    links.push(link);
375                }
376                if let Ok(link) = skel.progs.scx_dispatch_from_dsq_set_slice.attach() {
377                    links.push(link);
378                }
379                if let Ok(link) = skel.progs.scx_dispatch_from_dsq.attach() {
380                    links.push(link);
381                }
382            }
383            if let Ok(link) = skel.progs.on_cpuhp_enter.attach() {
384                links.push(link);
385            }
386            if let Ok(link) = skel.progs.on_cpuhp_exit.attach() {
387                links.push(link);
388            }
389            if let Ok(link) = skel.progs.on_softirq_entry.attach() {
390                links.push(link);
391            }
392            if let Ok(link) = skel.progs.on_softirq_exit.attach() {
393                links.push(link);
394            }
395
396            // Counter for events dropped due to invalid timestamps (userspace filtering)
397            let dropped_invalid_ts = Arc::new(std::sync::atomic::AtomicU64::new(0));
398
399            // Create shutdown flag early so it can be used in ringbuffer callbacks
400            let shutdown = Arc::new(AtomicBool::new(false));
401
402            // Create multiple ringbuffers and add them to the hash-of-maps
403            let events_map_fd = skel.maps.events.as_fd().as_raw_fd();
404            let mut rb_fds = Vec::new();
405            let mut rb_managers: Vec<SendRingBuffer> = Vec::new();
406
407            for rb_id in 0..rb_cnt {
408                // Create individual ringbuffer (size must be power of 2)
409                let rb_fd = unsafe {
410                    libbpf_sys::bpf_map_create(
411                        libbpf_sys::BPF_MAP_TYPE_RINGBUF,
412                        std::ptr::null(),
413                        0,
414                        0,
415                        (32 * 1024 * 1024) as u32, // 32MB per ringbuffer (must be power of 2)
416                        std::ptr::null(),
417                    )
418                };
419
420                if rb_fd < 0 {
421                    bail!(
422                        "Failed to create ringbuffer #{}: {}",
423                        rb_id,
424                        std::io::Error::last_os_error()
425                    );
426                }
427
428                // Add ringbuffer to hash-of-maps
429                let rb_id_u32 = rb_id as u32;
430                let ret = unsafe {
431                    libbpf_sys::bpf_map_update_elem(
432                        events_map_fd,
433                        &rb_id_u32 as *const u32 as *const std::ffi::c_void,
434                        &rb_fd as *const i32 as *const std::ffi::c_void,
435                        libbpf_sys::BPF_NOEXIST.into(),
436                    )
437                };
438
439                if ret < 0 {
440                    bail!(
441                        "Failed to add ringbuffer #{} to hash-of-maps: {}",
442                        rb_id,
443                        std::io::Error::last_os_error()
444                    );
445                }
446
447                rb_fds.push(rb_fd);
448            }
449
450            // Set up ring buffer managers using raw libbpf C API
451            // We use the C API because we're creating ringbuffers dynamically
452            struct RingBufContext {
453                dropped_invalid_ts: Arc<std::sync::atomic::AtomicU64>,
454                action_tx: mpsc::UnboundedSender<Action>,
455                shutdown: Arc<AtomicBool>,
456            }
457
458            extern "C" fn ring_buffer_sample_callback(
459                ctx: *mut std::ffi::c_void,
460                data: *mut std::ffi::c_void,
461                size: u64,
462            ) -> std::ffi::c_int {
463                unsafe {
464                    let ctx = &*(ctx as *const RingBufContext);
465
466                    // Stop processing if shutdown requested
467                    if ctx.shutdown.load(std::sync::atomic::Ordering::Relaxed) {
468                        return 0;
469                    }
470
471                    let data_slice = std::slice::from_raw_parts(data as *const u8, size as usize);
472
473                    let mut event = bpf_event::default();
474                    if plain::copy_from_bytes(&mut event, data_slice).is_err() {
475                        return 0;
476                    }
477
478                    // Drop events with invalid timestamps
479                    if event.ts == 0 {
480                        ctx.dropped_invalid_ts
481                            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
482                        return 0;
483                    }
484
485                    let mut edm = EventDispatchManager::new(None, None);
486                    edm.register_bpf_handler(Box::new(BpfEventActionPublisher::new(
487                        ctx.action_tx.clone(),
488                    )));
489                    let _ = edm.on_event(&event);
490                }
491                0
492            }
493
494            for rb_fd in &rb_fds {
495                let ctx = Box::new(RingBufContext {
496                    dropped_invalid_ts: dropped_invalid_ts.clone(),
497                    action_tx: action_tx.clone(),
498                    shutdown: shutdown.clone(),
499                });
500                let ctx_ptr = Box::into_raw(ctx) as *mut std::ffi::c_void;
501
502                let rb_ptr = unsafe {
503                    libbpf_sys::ring_buffer__new(
504                        *rb_fd,
505                        Some(ring_buffer_sample_callback),
506                        ctx_ptr,
507                        std::ptr::null(),
508                    )
509                };
510
511                if rb_ptr.is_null() {
512                    unsafe {
513                        let _ = Box::from_raw(ctx_ptr as *mut RingBufContext);
514                    }
515                    bail!("Failed to create ring buffer manager");
516                }
517
518                rb_managers.push(SendRingBuffer(rb_ptr));
519            }
520
521            // Set up the background threads to poll all ringbuffers
522            let stop_poll = shutdown.clone();
523            let stop_stats = shutdown.clone();
524
525            let mut ringbuffer_handles = Vec::new();
526            let mut producer_handles = Vec::new();
527
528            // Spawn a separate blocking task for each ringbuffer
529            // Use spawn_blocking because rb.poll() is a blocking C FFI call
530            for (rb_id, rb) in rb_managers.into_iter().enumerate() {
531                let stop_poll_clone = stop_poll.clone();
532                ringbuffer_handles.push(tokio::task::spawn_blocking(move || {
533                    info!("ringbuffer #{} task started", rb_id);
534                    let mut poll_count = 0;
535                    loop {
536                        // Poll with 1ms timeout (blocking call)
537                        rb.poll(1);
538                        poll_count += 1;
539                        if stop_poll_clone.load(Ordering::Relaxed) {
540                            info!(
541                                "ringbuffer #{} received shutdown after {} polls",
542                                rb_id, poll_count
543                            );
544                            // Consume remaining events
545                            let consumed = rb.consume();
546                            info!("ringbuffer #{} consumed {} events", rb_id, consumed);
547                            // Free the ring buffer
548                            rb.free();
549                            info!("ringbuffer #{} freed", rb_id);
550                            break;
551                        }
552                    }
553                    info!("ringbuffer #{} exiting", rb_id);
554                }));
555            }
556            info!(
557                "spawned {} ringbuffer polling tasks",
558                ringbuffer_handles.len()
559            );
560
561            if trace_args.system_stats {
562                let mut cpu_stat_tracker = CpuStatTracker::default();
563                let mut mem_stats = MemStatSnapshot::default();
564                let mut system = System::new_all();
565                let action_tx_clone = action_tx.clone();
566
567                producer_handles.push(tokio::spawn(async move {
568                    info!("stats task started");
569                    let mut stats_count = 0;
570                    loop {
571                        if stop_stats.load(Ordering::Relaxed) {
572                            info!("stats task received shutdown after {} samples", stats_count);
573                            break;
574                        }
575                        let ts = get_clock_value(libc::CLOCK_BOOTTIME);
576
577                        cpu_stat_tracker
578                            .update(&mut system)
579                            .expect("Failed to update cpu stats");
580
581                        mem_stats.update().expect("Failed to update mem stats");
582
583                        let sys_stat_action = Action::SystemStat(SystemStatAction {
584                            ts,
585                            cpu_data_prev: cpu_stat_tracker.prev.clone(),
586                            cpu_data_current: cpu_stat_tracker.current.clone(),
587                            mem_info: mem_stats.clone(),
588                        });
589                        action_tx_clone
590                            .send(sys_stat_action)
591                            .expect("Failed to send CpuStat action");
592
593                        stats_count += 1;
594                        tokio::time::sleep(Duration::from_millis(100)).await;
595                    }
596                    info!("stats task exiting");
597                }));
598            }
599
600            let trace_file_prefix = config.trace_file_prefix().to_string();
601            let trace_file = trace_args.output_file.clone();
602            let mut trace_manager = PerfettoTraceManager::new(trace_file_prefix, None);
603
604            info!("starting trace for {}ms", trace_args.trace_ms);
605            trace_manager.start()?;
606            let mut tracer = Tracer::new(skel);
607            tracer.trace(&trace_args.kprobes)?;
608
609            let shutdown_trace = shutdown.clone();
610            let trace_handle = tokio::spawn(async move {
611                debug!("trace generation task started");
612                let mut count = 0;
613                let mut last_log = std::time::Instant::now();
614                loop {
615                    tokio::select! {
616                        // Check shutdown flag to stop early if requested
617                        _ = tokio::time::sleep(Duration::from_millis(100)) => {
618                            if shutdown_trace.load(Ordering::Relaxed) {
619                                info!("trace task: shutdown requested, draining remaining events");
620                                // Drain remaining events in the channel
621                                while let Ok(a) = action_rx.try_recv() {
622                                    count += 1;
623                                    trace_manager
624                                        .on_action(&a)
625                                        .expect("Action should have been resolved");
626                                }
627                                info!("trace task: stopping trace manager");
628                                trace_manager.stop(trace_file, None).unwrap();
629                                info!("trace file compiled, collected {count} events");
630                                break;
631                            }
632                        }
633                        action = action_rx.recv() => {
634                            if let Some(a) = action {
635                                count += 1;
636                                if last_log.elapsed() > std::time::Duration::from_secs(1) {
637                                    debug!("trace task: {} events processed", count);
638                                    last_log = std::time::Instant::now();
639                                }
640                                trace_manager
641                                    .on_action(&a)
642                                    .expect("Action should have been resolved");
643                            } else {
644                                info!("trace task: channel closed, stopping trace manager");
645                                trace_manager.stop(trace_file, None).unwrap();
646                                info!("trace file compiled, collected {count} events");
647                                break;
648                            }
649                        }
650                    }
651                }
652                info!("trace task: exiting");
653            });
654
655            info!("waiting for trace duration ({}ms)", trace_args.trace_ms);
656            tokio::time::sleep(Duration::from_millis(trace_args.trace_ms)).await;
657            info!("trace duration complete, beginning shutdown");
658
659            // Proper shutdown sequence to avoid hanging:
660            // 1) Stop new BPF events by detaching programs
661            // 2) Set shutdown flag to stop polling tasks
662            // 3) Wait for all ringbuffer tasks to consume remaining events and exit
663            // 4) Wait for stats task to exit
664            // 5) Drop action_tx to close the channel (all producers are done)
665            // 6) Wait for trace generation to complete
666            info!("shutdown: clearing BPF links");
667            tracer.clear_links()?;
668            info!("shutdown: BPF links cleared");
669            drop(links);
670            info!("shutdown: links dropped");
671
672            info!("shutdown: setting shutdown flag");
673            shutdown.store(true, Ordering::Relaxed);
674            info!(
675                "shutdown: flag set, waiting for {} ringbuffer tasks",
676                ringbuffer_handles.len()
677            );
678
679            // Wait for all ringbuffer polling tasks to finish consuming
680            let results = join_all(ringbuffer_handles).await;
681            info!("shutdown: all {} ringbuffer tasks joined", results.len());
682            for (idx, result) in results.iter().enumerate() {
683                if let Err(e) = result {
684                    eprintln!("Ringbuffer task {} panicked: {e}", idx);
685                } else {
686                    debug!("ringbuffer task {} exited successfully", idx);
687                }
688            }
689            info!("shutdown: ringbuffer tasks complete");
690
691            // Wait for producer tasks (stats) to complete
692            info!(
693                "shutdown: waiting for {} producer tasks",
694                producer_handles.len()
695            );
696            let results = join_all(producer_handles).await;
697            info!("shutdown: all {} producer tasks joined", results.len());
698            for (idx, result) in results.iter().enumerate() {
699                if let Err(e) = result {
700                    eprintln!("Producer task {} panicked: {e}", idx);
701                } else {
702                    debug!("producer task {} exited successfully", idx);
703                }
704            }
705            info!("shutdown: producer tasks complete");
706
707            // Now safe to drop action_tx - all producers are done
708            info!("shutdown: dropping action_tx");
709            drop(action_tx);
710            info!("shutdown: action_tx dropped, waiting for trace generation");
711
712            // Wait for trace generation to complete
713            if let Err(e) = trace_handle.await {
714                eprintln!("Trace generation task panicked: {e}");
715            }
716            info!("shutdown: trace generation complete");
717
718            info!("shutdown: collecting final stats");
719            let stats = tracer.stats()?;
720            info!("shutdown: {stats:?}");
721
722            info!("shutdown: complete");
723            Ok(())
724        })
725}
726
727fn run_tui(tui_args: &TuiArgs) -> Result<()> {
728    if let Ok(log_path) = std::env::var("RUST_LOG_PATH") {
729        let log_level = match std::env::var("RUST_LOG") {
730            Ok(v) => LevelFilter::from_str(&v)?,
731            Err(_) => LevelFilter::Info,
732        };
733
734        WriteLogger::init(
735            log_level,
736            simplelog::Config::default(),
737            File::create(log_path)?,
738        )?;
739
740        log_panics::Config::new()
741            .backtrace_mode(log_panics::BacktraceMode::Resolved)
742            .install_panic_hook();
743    };
744
745    let config = Config::merge([
746        Config::from(tui_args.clone()),
747        Config::load_or_default().expect("Failed to load config or load default config"),
748    ]);
749    let keymap = config.active_keymap.clone();
750
751    // Calculate how many ringbuffers we'll need to ensure enough worker threads
752    let worker_threads = config.worker_threads() as usize;
753    let num_cpus = num_possible_cpus()?;
754    let rb_cnt = scxtop::topology::calculate_default_ringbuf_count(num_cpus);
755
756    // Ensure we have at least rb_cnt + 4 worker threads
757    // (+4 for UI rendering, event handling, and other async tasks)
758    let required_threads = std::cmp::max(rb_cnt + 4, worker_threads);
759
760    tokio::runtime::Builder::new_multi_thread()
761        .enable_all()
762        .worker_threads(required_threads)
763        .build()
764        .unwrap()
765        .block_on(async {
766            // Declare open_object at the very beginning so it lives for the entire async block
767            let mut open_object = MaybeUninit::uninit();
768
769            let (action_tx, mut action_rx) = mpsc::unbounded_channel();
770
771            // Check capabilities early to determine if we can run with BPF functionality
772            let has_bpf_cap = check_bpf_capability();
773            let mut capability_warnings = Vec::new();
774            let mut _bpf_enabled = false;
775            let mut links = Vec::new();
776            let mut event_rb_data_opt: Option<(
777                Vec<i32>, // rb_fds
778                Arc<std::sync::atomic::AtomicU64>, // dropped_invalid_ts
779                mpsc::UnboundedSender<Action>, // action_tx for ringbuffer contexts
780            )> = None;
781            let mut skel_opt = None;
782
783            if has_bpf_cap {
784                // Try to initialize BPF components
785                let mut builder = BpfSkelBuilder::default();
786                if config.debug() {
787                    builder.obj_builder.debug(true);
788                }
789                let bpf_publisher = BpfEventActionPublisher::new(action_tx.clone());
790                let mut edm = EventDispatchManager::new(None, None);
791                edm.register_bpf_handler(Box::new(bpf_publisher));
792
793                // Try to open the BPF skeleton with graceful error handling
794                match builder.open(&mut open_object) {
795                    Ok(mut skel) => {
796                        skel.maps.rodata_data.as_mut().unwrap().long_tail_tracing_min_latency_ns =
797                            tui_args.experimental_long_tail_tracing_min_latency_ns;
798
799                        let _map_handle = if tui_args.layered {
800                            skel.maps.rodata_data.as_mut().unwrap().layered = true;
801                            action_tx.send(Action::UpdateColVisibility(UpdateColVisibilityAction {
802                                table: "Process".to_string(),
803                                col: "Layer ID".to_string(),
804                                visible: true,
805                            }))?;
806                            action_tx.send(Action::UpdateColVisibility(UpdateColVisibilityAction {
807                                table: "Thread".to_string(),
808                                col: "Layer ID".to_string(),
809                                visible: true,
810                            }))?;
811                            match layered_util::attach_to_existing_map("task_ctxs", &mut skel.maps.task_ctxs) {
812                                Ok(handle) => Some(handle),
813                                Err(e) => {
814                                    capability_warnings.push(format!("Failed to attach to layered map: {e}"));
815                                    None
816                                }
817                            }
818                        } else {
819                            None
820                        };
821
822                        // Set up multiple ringbuffers for scalability
823                        let num_cpus = num_possible_cpus()?;
824                        let rb_cnt = scxtop::topology::calculate_default_ringbuf_count(num_cpus);
825                        let rb_cpu_mapping = scxtop::topology::setup_cpu_to_ringbuf_mapping(rb_cnt, num_cpus)?;
826
827                        log::info!("Using {} ringbuffers for {} CPUs", rb_cnt, num_cpus);
828
829                        // Set up CPU-to-ringbuffer mapping in BPF
830                        let cpu_cnt_pow2 = num_cpus.next_power_of_two();
831                        skel.maps.rodata_data.as_mut().unwrap().rb_cpu_map_mask = (cpu_cnt_pow2 - 1) as u64;
832
833                        // Set max entries for the CPU-to-ringbuf map array
834                        if let Err(e) = skel.maps.data_rb_cpu_map.set_max_entries(cpu_cnt_pow2 as u32) {
835                            capability_warnings.push(format!("Failed to set CPU-to-ringbuf map size: {e}"));
836                        }
837
838                        // Set max entries for events hash-of-maps
839                        if let Err(e) = skel.maps.events.set_max_entries(rb_cnt as u32) {
840                            capability_warnings.push(format!("Failed to set ringbuf count: {e}"));
841                        }
842
843                        if let Err(e) = compat::cond_kprobe_enable("gpu_memory_total", &skel.progs.on_gpu_memory_total) {
844                            capability_warnings.push(format!("Failed to enable gpu_memory_total kprobe: {e}"));
845                        }
846                        if let Err(e) = compat::cond_kprobe_enable("hw_pressure_update", &skel.progs.on_hw_pressure_update) {
847                            capability_warnings.push(format!("Failed to enable hw_pressure_update kprobe: {e}"));
848                        }
849                        if let Err(e) = compat::cond_tracepoint_enable("sched:sched_process_wait", &skel.progs.on_sched_wait) {
850                            capability_warnings.push(format!("Failed to enable sched_process_wait tracepoint: {e}"));
851                        }
852                        if let Err(e) = compat::cond_tracepoint_enable("sched:sched_process_hang", &skel.progs.on_sched_hang) {
853                            capability_warnings.push(format!("Failed to enable sched_process_hang tracepoint: {e}"));
854                        }
855
856                        // Try to load the BPF skeleton
857                        match skel.load() {
858                            Ok(mut loaded_skel) => {
859                                // Populate the CPU-to-ringbuffer mapping after loading
860                                for (cpu_id, &rb_id) in rb_cpu_mapping.iter().enumerate() {
861                                    if cpu_id < cpu_cnt_pow2 {
862                                        if let Err(e) = loaded_skel.maps.data_rb_cpu_map.update(
863                                            &(cpu_id as u32).to_ne_bytes(),
864                                            &rb_id.to_ne_bytes(),
865                                            libbpf_rs::MapFlags::ANY,
866                                        ) {
867                                            capability_warnings.push(format!("Failed to set CPU {} -> ringbuf {}: {}", cpu_id, rb_id, e));
868                                        }
869                                    }
870                                }
871
872                                let (skel_links, attach_warnings) = attach_progs(&mut loaded_skel)?;
873                                links = skel_links;
874                                capability_warnings.extend(attach_warnings);
875
876                                if !links.is_empty() || is_root() {
877                                    // Only run scxtop_init if we have some BPF functionality
878                                    if let Err(e) = loaded_skel.progs.scxtop_init.test_run(ProgramInput::default()) {
879                                        capability_warnings.push(format!("Failed to initialize scxtop BPF program: {e}"));
880                                    }
881                                }
882
883                                // Set up event ring buffer if we have any attached programs
884                                if !links.is_empty() {
885                                    // Counter for events dropped due to invalid timestamps (userspace filtering)
886                                    let dropped_invalid_ts = Arc::new(std::sync::atomic::AtomicU64::new(0));
887
888                                    // Create multiple ringbuffers and add them to the hash-of-maps
889                                    let events_map_fd = loaded_skel.maps.events.as_fd().as_raw_fd();
890                                    let mut rb_fds = Vec::new();
891
892                                    for rb_id in 0..rb_cnt {
893                                        // Create individual ringbuffer (size must be power of 2)
894                                        let rb_fd = unsafe {
895                                            libbpf_sys::bpf_map_create(
896                                                libbpf_sys::BPF_MAP_TYPE_RINGBUF,
897                                                std::ptr::null(),
898                                                0,
899                                                0,
900                                                (32 * 1024 * 1024) as u32, // 32MB per ringbuffer (must be power of 2)
901                                                std::ptr::null(),
902                                            )
903                                        };
904
905                                        if rb_fd < 0 {
906                                            capability_warnings.push(format!("Failed to create ringbuffer #{}: {}", rb_id, std::io::Error::last_os_error()));
907                                            continue;
908                                        }
909
910                                        // Add ringbuffer to hash-of-maps
911                                        let rb_id_u32 = rb_id as u32;
912                                        let ret = unsafe {
913                                            libbpf_sys::bpf_map_update_elem(
914                                                events_map_fd,
915                                                &rb_id_u32 as *const u32 as *const std::ffi::c_void,
916                                                &rb_fd as *const i32 as *const std::ffi::c_void,
917                                                libbpf_sys::BPF_NOEXIST.into(),
918                                            )
919                                        };
920
921                                        if ret < 0 {
922                                            capability_warnings.push(format!("Failed to add ringbuffer #{} to hash-of-maps: {}", rb_id, std::io::Error::last_os_error()));
923                                            continue;
924                                        }
925
926                                        rb_fds.push(rb_fd);
927                                    }
928
929                                    if !rb_fds.is_empty() {
930                                        // Save data for later ringbuffer manager creation (after app is created)
931                                        event_rb_data_opt = Some((rb_fds, dropped_invalid_ts, action_tx.clone()));
932                                        _bpf_enabled = true;
933                                    }
934                                }
935
936                                skel_opt = Some(loaded_skel);
937                            }
938                            Err(e) => {
939                                if is_root() {
940                                    return Err(anyhow!("Failed to load BPF skeleton (running as root): {e}"));
941                                } else {
942                                    capability_warnings.push(format!("Failed to load BPF skeleton: {e}"));
943                                    capability_warnings.extend(get_capability_warning_message());
944                                }
945                            }
946                        }
947                    }
948                    Err(e) => {
949                        if is_root() {
950                            return Err(anyhow!("Failed to open BPF skeleton (running as root): {e}"));
951                        } else {
952                            capability_warnings.push(format!("Failed to open BPF skeleton: {e}"));
953                            capability_warnings.extend(get_capability_warning_message());
954                        }
955                    }
956                }
957            } else {
958                // No BPF capabilities detected
959                capability_warnings.extend(get_capability_warning_message());
960            }
961
962            // Handle experimental long tail tracing if enabled and we have a skeleton
963            if tui_args.experimental_long_tail_tracing {
964                if let Some(ref mut skel) = skel_opt {
965                    skel.maps.data_data.as_mut().unwrap().trace_duration_ns = config.trace_duration_ns();
966                    skel.maps.data_data.as_mut().unwrap().trace_warmup_ns = config.trace_warmup_ns();
967
968                    let binary = tui_args
969                        .experimental_long_tail_tracing_binary
970                        .clone()
971                        .unwrap();
972                    let symbol = tui_args
973                        .experimental_long_tail_tracing_symbol
974                        .clone()
975                        .unwrap();
976
977                    match skel.progs.long_tail_tracker_exit.attach_uprobe_with_opts(
978                        -1, /* pid, -1 == all */
979                        binary.clone(),
980                        0,
981                        UprobeOpts {
982                            retprobe: true,
983                            func_name: Some(symbol.clone()),
984                            ..Default::default()
985                        },
986                    ) {
987                        Ok(link) => links.push(link),
988                        Err(e) => capability_warnings.push(format!("Failed to attach long tail tracker exit: {e}"))
989                    }
990
991                    match skel.progs.long_tail_tracker_entry.attach_uprobe_with_opts(
992                        -1, /* pid, -1 == all */
993                        binary.clone(),
994                        0,
995                        UprobeOpts {
996                            retprobe: false,
997                            func_name: Some(symbol.clone()),
998                            ..Default::default()
999                        },
1000                    ) {
1001                        Ok(link) => links.push(link),
1002                        Err(e) => capability_warnings.push(format!("Failed to attach long tail tracker entry: {e}"))
1003                    }
1004                } else {
1005                    capability_warnings.push("Long tail tracing requested but BPF skeleton not available".to_string());
1006                }
1007            }
1008
1009            let mut tui = Tui::new(keymap.clone(), config.tick_rate_ms(), config.frame_rate_ms())?;
1010            let scheduler = read_file_string(SCHED_NAME_PATH).unwrap_or("".to_string());
1011
1012            // Create app with or without BPF skeleton
1013            let mut app = if let Some(skel) = skel_opt {
1014                App::new(
1015                    config,
1016                    scheduler,
1017                    100,
1018                    tui_args.process_id,
1019                    tui_args.layered,
1020                    action_tx.clone(),
1021                    skel,
1022                )?
1023            } else {
1024                // Create app without BPF functionality
1025                App::new_without_bpf(
1026                    config,
1027                    scheduler,
1028                    100,
1029                    tui_args.process_id,
1030                    tui_args.layered,
1031                    action_tx.clone(),
1032                )?
1033            };
1034
1035            // Pass warnings to the app if any exist
1036            if !capability_warnings.is_empty() {
1037                app.set_capability_warnings(capability_warnings);
1038            }
1039
1040            tui.enter()?;
1041
1042            // Start BPF event polling only if we have ringbuffer data
1043            let shutdown = app.should_quit.clone();
1044            let mut ringbuffer_handles = Vec::new();
1045            if let Some((rb_fds, dropped_invalid_ts, rb_action_tx)) = event_rb_data_opt {
1046                // Set up ring buffer managers using raw libbpf C API
1047                // Now that app is created, we can use app.should_quit for the callbacks
1048                struct RingBufContext {
1049                    dropped_invalid_ts: Arc<std::sync::atomic::AtomicU64>,
1050                    action_tx: mpsc::UnboundedSender<Action>,
1051                    shutdown: Arc<AtomicBool>,
1052                }
1053
1054                extern "C" fn ring_buffer_sample_callback(
1055                    ctx: *mut std::ffi::c_void,
1056                    data: *mut std::ffi::c_void,
1057                    size: u64,
1058                ) -> std::ffi::c_int {
1059                    unsafe {
1060                        let ctx = &*(ctx as *const RingBufContext);
1061
1062                        // Stop processing if shutdown requested
1063                        if ctx.shutdown.load(std::sync::atomic::Ordering::Relaxed) {
1064                            return 0;
1065                        }
1066
1067                        let data_slice = std::slice::from_raw_parts(data as *const u8, size as usize);
1068
1069                        let mut event = bpf_event::default();
1070                        if plain::copy_from_bytes(&mut event, data_slice).is_err() {
1071                            return 0;
1072                        }
1073
1074                        // Drop events with invalid timestamps
1075                        if event.ts == 0 {
1076                            ctx.dropped_invalid_ts.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1077                            return 0;
1078                        }
1079
1080                        let mut edm = EventDispatchManager::new(None, None);
1081                        edm.register_bpf_handler(Box::new(BpfEventActionPublisher::new(ctx.action_tx.clone())));
1082                        let _ = edm.on_event(&event);
1083                    }
1084                    0
1085                }
1086
1087                // Spawn a separate task for each ringbuffer
1088                for rb_fd in rb_fds {
1089                    let ctx = Box::new(RingBufContext {
1090                        dropped_invalid_ts: dropped_invalid_ts.clone(),
1091                        action_tx: rb_action_tx.clone(),
1092                        shutdown: shutdown.clone(),
1093                    });
1094                    let ctx_ptr = Box::into_raw(ctx) as *mut std::ffi::c_void;
1095
1096                    let rb_ptr = unsafe {
1097                        libbpf_sys::ring_buffer__new(
1098                            rb_fd,
1099                            Some(ring_buffer_sample_callback),
1100                            ctx_ptr,
1101                            std::ptr::null(),
1102                        )
1103                    };
1104
1105                    if rb_ptr.is_null() {
1106                        unsafe { let _ = Box::from_raw(ctx_ptr as *mut RingBufContext); }
1107                        log::warn!("Failed to create ring buffer manager");
1108                        continue;
1109                    }
1110
1111                    let rb = SendRingBuffer(rb_ptr);
1112                    let shutdown_clone = shutdown.clone();
1113                    let rb_id = ringbuffer_handles.len();
1114                    // Use spawn_blocking because rb.poll() is a blocking C FFI call
1115                    ringbuffer_handles.push(tokio::task::spawn_blocking(move || {
1116                        loop {
1117                            // Poll with 1ms timeout (blocking call)
1118                            rb.poll(1);
1119                            if shutdown_clone.load(Ordering::Relaxed) {
1120                                // Consume remaining events
1121                                rb.consume();
1122                                // Free the ring buffer
1123                                rb.free();
1124                                log::debug!("ringbuffer #{} polling stopped", rb_id);
1125                                break;
1126                            }
1127                        }
1128                    }));
1129                }
1130            }
1131
1132            if tui_args.mangoapp_tracing {
1133                let stop_mangoapp = app.should_quit.clone();
1134                let mangoapp_path = CString::new(tui_args.mangoapp_path.clone()).unwrap();
1135                let poll_intvl_ms = tui_args.mangoapp_poll_intvl_ms;
1136                let tx = action_tx.clone();
1137                tokio::spawn(async move {
1138                    poll_mangoapp(
1139                        mangoapp_path,
1140                        poll_intvl_ms,
1141                        tx,
1142                        stop_mangoapp,
1143                    )
1144                    .await
1145                });
1146            }
1147
1148            loop {
1149                tokio::select! {
1150                    ev = tui.next() => {
1151                        let ev = ev?;
1152                        match ev {
1153                            Event::Quit => { action_tx.send(Action::Quit)?; },
1154                            Event::Tick => action_tx.send(Action::Tick)?,
1155                            Event::TickRateChange(tick_rate_ms) => action_tx.send(
1156                                Action::TickRateChange(std::time::Duration::from_millis(tick_rate_ms)),
1157                            )?,
1158                            Event::Render => {
1159                                if app.should_quit.load(Ordering::Relaxed) {
1160                                    break;
1161                                }
1162                                if app.state() != AppState::Pause {
1163                                    tui.draw(|f| app.render(f).expect("Failed to render application"))?;
1164                                }
1165                            }
1166                            Event::Key(_) => {
1167                                let action = get_action(&app, &keymap, ev);
1168                                action_tx.send(action)?;
1169                            }
1170                            _ => {}
1171                    }}
1172
1173                    ac = action_rx.recv() => {
1174                        let ac = ac.ok_or(anyhow!("actions channel closed"))?;
1175                        app.handle_action(&ac)?;
1176                    }
1177                }
1178            }
1179            tui.exit()?;
1180
1181            // Wait for all ringbuffer tasks to finish consuming remaining events
1182            log::debug!("waiting for {} ringbuffer tasks to complete", ringbuffer_handles.len());
1183            for handle in ringbuffer_handles {
1184                if let Err(e) = handle.await {
1185                    log::error!("Ringbuffer task panicked: {e}");
1186                }
1187            }
1188
1189            drop(links);
1190
1191            Ok(())
1192        })
1193}
1194
1195fn run_mcp(mcp_args: &scxtop::cli::McpArgs) -> Result<()> {
1196    use scx_utils::Topology;
1197    use scxtop::mcp::{events::action_to_mcp_event, McpServer, McpServerConfig};
1198    use std::sync::Arc;
1199
1200    // Set up logging to stderr (important: not stdout, which is used for MCP protocol)
1201    TermLogger::init(
1202        match mcp_args.verbose {
1203            0 => LevelFilter::Warn,
1204            1 => LevelFilter::Info,
1205            2 => LevelFilter::Debug,
1206            _ => LevelFilter::Trace,
1207        },
1208        SimplelogConfig::default(),
1209        TerminalMode::Stderr,
1210        ColorChoice::Auto,
1211    )?;
1212
1213    // Initialize topology
1214    let topo = Topology::new().expect("Failed to create topology");
1215    let topo_arc = Arc::new(topo);
1216
1217    let mcp_config = McpServerConfig {
1218        daemon_mode: mcp_args.daemon,
1219        enable_logging: mcp_args.enable_logging,
1220    };
1221
1222    if mcp_args.daemon {
1223        // Daemon mode: Full BPF event processing
1224        tokio::runtime::Builder::new_multi_thread()
1225            .enable_all()
1226            .worker_threads(4)
1227            .build()
1228            .unwrap()
1229            .block_on(async {
1230                let mut open_object = MaybeUninit::uninit();
1231                let (action_tx, mut action_rx) = mpsc::unbounded_channel();
1232
1233                // Create shared stats for MCP server
1234                use scxtop::mcp::create_shared_stats;
1235                let shared_stats = create_shared_stats();
1236                let shared_stats_for_event_handler = shared_stats.clone();
1237
1238                // Set up BPF
1239                let builder = BpfSkelBuilder::default();
1240                let mut skel = builder.open(&mut open_object)?;
1241
1242                // Set up multiple ringbuffers for scalability
1243                let num_cpus = num_possible_cpus()?;
1244                let rb_cnt = scxtop::topology::calculate_default_ringbuf_count(num_cpus);
1245                let rb_cpu_mapping = scxtop::topology::setup_cpu_to_ringbuf_mapping(rb_cnt, num_cpus)?;
1246
1247                log::info!("Using {} ringbuffers for {} CPUs", rb_cnt, num_cpus);
1248
1249                // Set up CPU-to-ringbuffer mapping in BPF
1250                let cpu_cnt_pow2 = num_cpus.next_power_of_two();
1251                skel.maps.rodata_data.as_mut().unwrap().rb_cpu_map_mask = (cpu_cnt_pow2 - 1) as u64;
1252
1253                // Set max entries for the CPU-to-ringbuf map array
1254                skel.maps.data_rb_cpu_map.set_max_entries(cpu_cnt_pow2 as u32)?;
1255
1256                // Set max entries for events hash-of-maps
1257                skel.maps.events.set_max_entries(rb_cnt as u32)?;
1258
1259                let mut skel = skel.load()?;
1260
1261                // Populate the CPU-to-ringbuffer mapping after loading
1262                for (cpu_id, &rb_id) in rb_cpu_mapping.iter().enumerate() {
1263                    if cpu_id < cpu_cnt_pow2 {
1264                        skel.maps.data_rb_cpu_map.update(
1265                            &(cpu_id as u32).to_ne_bytes(),
1266                            &rb_id.to_ne_bytes(),
1267                            libbpf_rs::MapFlags::ANY,
1268                        )?;
1269                    }
1270                }
1271
1272                // Create ALL analyzers BEFORE setting up event handlers
1273                use scxtop::mcp::{
1274                    WakerWakeeAnalyzer, LatencyTracker, CpuHotspotAnalyzer, MigrationAnalyzer,
1275                    ProcessEventHistory, DsqMonitor, EventRateMonitor, WakeupChainTracker, EventBuffer,
1276                    SoftirqAnalyzer,
1277                };
1278                use std::sync::Mutex;
1279
1280                // 1. Waker/Wakee Analyzer
1281                let mut waker_wakee = WakerWakeeAnalyzer::new();
1282                waker_wakee.set_topology(topo_arc.clone());
1283                let waker_wakee_arc = Arc::new(Mutex::new(waker_wakee));
1284
1285                // 2. Latency Tracker
1286                let latency_tracker = LatencyTracker::new(1000); // 1 second window
1287                let latency_tracker_arc = Arc::new(Mutex::new(latency_tracker));
1288
1289                // 3. CPU Hotspot Analyzer
1290                let cpu_hotspot = CpuHotspotAnalyzer::new(100); // 100ms window
1291                let cpu_hotspot_arc = Arc::new(Mutex::new(cpu_hotspot));
1292
1293                // 4. Migration Analyzer
1294                let migration_analyzer = MigrationAnalyzer::new(1000); // 1 second window
1295                let migration_analyzer_arc = Arc::new(Mutex::new(migration_analyzer));
1296
1297                // 5. Process Event History
1298                let process_history = ProcessEventHistory::new(100); // 100 events per process
1299                let process_history_arc = Arc::new(Mutex::new(process_history));
1300
1301                // 6. DSQ Monitor
1302                let dsq_monitor = DsqMonitor::new();
1303                let dsq_monitor_arc = Arc::new(Mutex::new(dsq_monitor));
1304
1305                // 7. Event Rate Monitor
1306                let rate_monitor = EventRateMonitor::new(1000, 10); // 1s window, 10 baselines
1307                let rate_monitor_arc = Arc::new(Mutex::new(rate_monitor));
1308
1309                // 8. Wakeup Chain Tracker
1310                let wakeup_tracker = WakeupChainTracker::new(10); // max 10 chain length
1311                let wakeup_tracker_arc = Arc::new(Mutex::new(wakeup_tracker));
1312
1313                // 9. Event Buffer
1314                let event_buffer = EventBuffer::new();
1315                let event_buffer_arc = Arc::new(Mutex::new(event_buffer));
1316
1317                // 10. Softirq Analyzer
1318                let softirq_analyzer = SoftirqAnalyzer::new(10000); // 10 second window
1319                let softirq_analyzer_arc = Arc::new(Mutex::new(softirq_analyzer));
1320
1321                // Set up event dispatch manager
1322                let mut edm = EventDispatchManager::new(None, None);
1323                edm.register_bpf_handler(Box::new(BpfEventActionPublisher::new(action_tx.clone())));
1324
1325                // Counter for events dropped due to invalid timestamps (userspace filtering)
1326                let dropped_invalid_ts = Arc::new(std::sync::atomic::AtomicU64::new(0));
1327
1328                // Create shutdown flag early so it can be used in ringbuffer callbacks
1329                let shutdown = Arc::new(AtomicBool::new(false));
1330
1331                // Create multiple ringbuffers and add them to the hash-of-maps
1332                let events_map_fd = skel.maps.events.as_fd().as_raw_fd();
1333                let mut rb_fds = Vec::new();
1334                let mut rb_managers: Vec<SendRingBuffer> = Vec::new();
1335
1336                for rb_id in 0..rb_cnt {
1337                    // Create individual ringbuffer (size must be power of 2)
1338                    let rb_fd = unsafe {
1339                        libbpf_sys::bpf_map_create(
1340                            libbpf_sys::BPF_MAP_TYPE_RINGBUF,
1341                            std::ptr::null(),
1342                            0,
1343                            0,
1344                            (32 * 1024 * 1024) as u32, // 32MB per ringbuffer (must be power of 2)
1345                            std::ptr::null(),
1346                        )
1347                    };
1348
1349                    if rb_fd < 0 {
1350                        bail!("Failed to create ringbuffer #{}: {}", rb_id, std::io::Error::last_os_error());
1351                    }
1352
1353                    // Add ringbuffer to hash-of-maps
1354                    let rb_id_u32 = rb_id as u32;
1355                    let ret = unsafe {
1356                        libbpf_sys::bpf_map_update_elem(
1357                            events_map_fd,
1358                            &rb_id_u32 as *const u32 as *const std::ffi::c_void,
1359                            &rb_fd as *const i32 as *const std::ffi::c_void,
1360                            libbpf_sys::BPF_NOEXIST.into(),
1361                        )
1362                    };
1363
1364                    if ret < 0 {
1365                        bail!("Failed to add ringbuffer #{} to hash-of-maps: {}", rb_id, std::io::Error::last_os_error());
1366                    }
1367
1368                    rb_fds.push(rb_fd);
1369                }
1370
1371                // Set up ring buffer managers using raw libbpf C API
1372                // We use the C API because we're creating ringbuffers dynamically
1373
1374                // Context struct holding all the data needed by the callback
1375                struct McpRingBufContext {
1376                    dropped_invalid_ts: Arc<std::sync::atomic::AtomicU64>,
1377                    shared_stats: Arc<std::sync::RwLock<scxtop::mcp::SharedStats>>,
1378                    action_tx: mpsc::UnboundedSender<Action>,
1379                    waker_wakee: Arc<std::sync::Mutex<scxtop::mcp::WakerWakeeAnalyzer>>,
1380                    cpu_hotspot: Arc<std::sync::Mutex<scxtop::mcp::CpuHotspotAnalyzer>>,
1381                    migration_analyzer: Arc<std::sync::Mutex<scxtop::mcp::MigrationAnalyzer>>,
1382                    process_history: Arc<std::sync::Mutex<scxtop::mcp::ProcessEventHistory>>,
1383                    rate_monitor: Arc<std::sync::Mutex<scxtop::mcp::EventRateMonitor>>,
1384                    wakeup_tracker: Arc<std::sync::Mutex<scxtop::mcp::WakeupChainTracker>>,
1385                    softirq_analyzer: Arc<std::sync::Mutex<scxtop::mcp::SoftirqAnalyzer>>,
1386                    shutdown: Arc<AtomicBool>,
1387                }
1388
1389                extern "C" fn mcp_ring_buffer_callback(
1390                    ctx: *mut std::ffi::c_void,
1391                    data: *mut std::ffi::c_void,
1392                    size: u64,
1393                ) -> std::ffi::c_int {
1394                    unsafe {
1395                        let ctx = &*(ctx as *const McpRingBufContext);
1396
1397                        // Stop processing if shutdown requested
1398                        if ctx.shutdown.load(std::sync::atomic::Ordering::Relaxed) {
1399                            return 0;
1400                        }
1401
1402                        let data_slice = std::slice::from_raw_parts(data as *const u8, size as usize);
1403
1404                        let mut event = bpf_event::default();
1405                        if plain::copy_from_bytes(&mut event, data_slice).is_err() {
1406                            return 0;
1407                        }
1408
1409                        // Drop events with invalid timestamps
1410                        if event.ts == 0 {
1411                            ctx.dropped_invalid_ts.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1412                            return 0;
1413                        }
1414
1415                        // Update shared stats from BPF event
1416                        if let Ok(mut stats) = ctx.shared_stats.write() {
1417                            stats.update_from_event(&event);
1418                        }
1419
1420                        // Feed events to all analyzers
1421                        use scxtop::bpf_intf;
1422                        let event_type = event.r#type as u32;
1423
1424                        // 1. Waker/Wakee Analyzer
1425                        if let Ok(mut analyzer) = ctx.waker_wakee.try_lock() {
1426                            match event_type {
1427                                bpf_intf::event_type_SCHED_WAKEUP => {
1428                                    let wakeup = &event.event.wakeup;
1429                                    analyzer.record_wakeup(
1430                                        wakeup.pid, wakeup.waker_pid,
1431                                        &String::from_utf8_lossy(&wakeup.waker_comm),
1432                                        event.cpu, event.ts,
1433                                    );
1434                                }
1435                                bpf_intf::event_type_SCHED_WAKING => {
1436                                    let waking = &event.event.waking;
1437                                    analyzer.record_wakeup(
1438                                        waking.pid, waking.waker_pid,
1439                                        &String::from_utf8_lossy(&waking.waker_comm),
1440                                        event.cpu, event.ts,
1441                                    );
1442                                }
1443                                bpf_intf::event_type_SCHED_SWITCH => {
1444                                    let switch = &event.event.sched_switch;
1445                                    analyzer.record_wakee_run(
1446                                        switch.next_pid,
1447                                        &String::from_utf8_lossy(&switch.next_comm),
1448                                        event.cpu, event.ts,
1449                                    );
1450                                }
1451                                _ => {}
1452                            }
1453                        }
1454
1455                        // 2. CPU Hotspot Analyzer
1456                        if let Ok(mut analyzer) = ctx.cpu_hotspot.try_lock() {
1457                            let json = serde_json::json!({
1458                                "cpu": event.cpu, "ts": event.ts, "event_type": event_type
1459                            });
1460                            analyzer.record_event(&json);
1461                        }
1462
1463                        // 3. Migration Analyzer
1464                        if let Ok(mut analyzer) = ctx.migration_analyzer.try_lock() {
1465                            if event_type == bpf_intf::event_type_SCHED_MIGRATE {
1466                                let migrate = &event.event.migrate;
1467                                let json = serde_json::json!({
1468                                    "pid": migrate.pid, "from_cpu": event.cpu,
1469                                    "to_cpu": migrate.dest_cpu, "ts": event.ts
1470                                });
1471                                analyzer.record_migration(&json, event.ts);
1472                            }
1473                        }
1474
1475                        // 4. Process Event History
1476                        if let Ok(mut history) = ctx.process_history.try_lock() {
1477                            let (event_type_str, pid) = match event_type {
1478                                bpf_intf::event_type_SCHED_SWITCH => ("sched_switch", event.event.sched_switch.next_pid),
1479                                bpf_intf::event_type_SCHED_WAKEUP => ("sched_wakeup", event.event.wakeup.pid),
1480                                bpf_intf::event_type_SCHED_WAKING => ("sched_waking", event.event.waking.pid),
1481                                bpf_intf::event_type_SCHED_MIGRATE => ("sched_migrate", event.event.migrate.pid),
1482                                bpf_intf::event_type_EXIT => ("exit", event.event.exit.pid),
1483                                bpf_intf::event_type_EXEC => ("exec", event.event.exec.pid),
1484                                _ => ("other", 0),
1485                            };
1486                            if pid > 0 {
1487                                history.record_event(
1488                                    pid, event_type_str.to_string(), Some(event.cpu),
1489                                    serde_json::json!({"ts": event.ts}), event.ts,
1490                                );
1491                            }
1492                        }
1493
1494                        // 6. Event Rate Monitor
1495                        if let Ok(mut monitor) = ctx.rate_monitor.try_lock() {
1496                            let event_type_str = match event_type {
1497                                bpf_intf::event_type_SCHED_SWITCH => "sched_switch",
1498                                bpf_intf::event_type_SCHED_WAKEUP => "sched_wakeup",
1499                                bpf_intf::event_type_SCHED_WAKING => "sched_waking",
1500                                bpf_intf::event_type_SCHED_MIGRATE => "sched_migrate",
1501                                _ => "other",
1502                            };
1503                            monitor.record_event(event_type_str.to_string(), event.ts);
1504                        }
1505
1506                        // 7. Wakeup Chain Tracker
1507                        if let Ok(mut tracker) = ctx.wakeup_tracker.try_lock() {
1508                            if event_type == bpf_intf::event_type_SCHED_WAKEUP || event_type == bpf_intf::event_type_SCHED_WAKING {
1509                                let (pid, waker_pid) = if event_type == bpf_intf::event_type_SCHED_WAKEUP {
1510                                    (event.event.wakeup.pid, event.event.wakeup.waker_pid)
1511                                } else {
1512                                    (event.event.waking.pid, event.event.waking.waker_pid)
1513                                };
1514                                let json = serde_json::json!({
1515                                    "pid": pid, "waker_pid": waker_pid, "ts": event.ts, "cpu": event.cpu
1516                                });
1517                                tracker.record_wakeup(&json, event.ts);
1518                            }
1519                        }
1520
1521                        // 8. Softirq Analyzer
1522                        if let Ok(mut analyzer) = ctx.softirq_analyzer.try_lock() {
1523                            if event_type == bpf_intf::event_type_SOFTIRQ {
1524                                let softirq = &event.event.softirq;
1525                                let json = serde_json::json!({
1526                                    "type": "softirq", "pid": softirq.pid, "softirq_nr": softirq.softirq_nr,
1527                                    "entry_ts": softirq.entry_ts, "exit_ts": softirq.exit_ts, "cpu": event.cpu,
1528                                });
1529                                analyzer.record_event(&json);
1530                            }
1531                        }
1532
1533                        // Dispatch to action channel
1534                        let mut edm = EventDispatchManager::new(None, None);
1535                        edm.register_bpf_handler(Box::new(BpfEventActionPublisher::new(ctx.action_tx.clone())));
1536                        let _ = edm.on_event(&event);
1537                    }
1538                    0
1539                }
1540
1541                for rb_fd in &rb_fds {
1542                    let ctx = Box::new(McpRingBufContext {
1543                        dropped_invalid_ts: dropped_invalid_ts.clone(),
1544                        shared_stats: shared_stats_for_event_handler.clone(),
1545                        action_tx: action_tx.clone(),
1546                        waker_wakee: waker_wakee_arc.clone(),
1547                        cpu_hotspot: cpu_hotspot_arc.clone(),
1548                        migration_analyzer: migration_analyzer_arc.clone(),
1549                        process_history: process_history_arc.clone(),
1550                        rate_monitor: rate_monitor_arc.clone(),
1551                        wakeup_tracker: wakeup_tracker_arc.clone(),
1552                        softirq_analyzer: softirq_analyzer_arc.clone(),
1553                        shutdown: shutdown.clone(),
1554                    });
1555                    let ctx_ptr = Box::into_raw(ctx) as *mut std::ffi::c_void;
1556
1557                    let rb_ptr = unsafe {
1558                        libbpf_sys::ring_buffer__new(
1559                            *rb_fd,
1560                            Some(mcp_ring_buffer_callback),
1561                            ctx_ptr,
1562                            std::ptr::null(),
1563                        )
1564                    };
1565
1566                    if rb_ptr.is_null() {
1567                        unsafe { let _ = Box::from_raw(ctx_ptr as *mut McpRingBufContext); }
1568                        bail!("Failed to create ring buffer manager");
1569                    }
1570
1571                    rb_managers.push(SendRingBuffer(rb_ptr));
1572                }
1573
1574                // Attach BPF programs initially
1575                let (initial_links, _warnings) = attach_progs(&mut skel)?;
1576
1577                // Initialize BPF program
1578                if !initial_links.is_empty() || is_root() {
1579                    skel.progs
1580                        .scxtop_init
1581                        .test_run(ProgramInput::default())
1582                        .ok();
1583                }
1584
1585                // Create BPF perf event attacher BEFORE passing skeleton to App
1586                // This gives us a handle to attach perf events for profiling
1587                // We create a closure that captures a raw pointer to the program
1588                use scxtop::mcp::BpfPerfEventAttacher;
1589                // Get raw pointer to the perf_sample_handler program as usize to make it Send
1590                let perf_program_addr = &skel.progs.perf_sample_handler as *const _ as usize;
1591
1592                let bpf_attacher = BpfPerfEventAttacher::new(move |perf_fd| {
1593                    // SAFETY: The skeleton is kept alive in the App and not dropped
1594                    // until the MCP server is done, so this pointer remains valid
1595                    unsafe {
1596                        // Cast back to ProgramImpl with Mut parameter
1597                        let prog =
1598                            &*(perf_program_addr as *const libbpf_rs::ProgramImpl<libbpf_rs::Mut>);
1599                        prog.attach_perf_event(perf_fd)
1600                            .map(|link| Box::new(link) as Box<dyn std::any::Any + Send>)
1601                            .map_err(|e| anyhow::anyhow!("Failed to attach perf event: {}", e))
1602                    }
1603                });
1604                let bpf_attacher_arc = Arc::new(bpf_attacher);
1605
1606                // Create event control for dynamic BPF program attachment/detachment
1607                use scxtop::mcp::{AttachCallback, EventControl, StatsControlCommand};
1608                let mut event_control_instance = EventControl::new();
1609
1610                // Create attach callback using skeleton pointer (similar to perf attacher)
1611                // SAFETY: Skeleton is kept alive in App until daemon shutdown
1612                let skel_ptr = &mut skel as *mut _ as usize;
1613                let attach_callback: AttachCallback = Box::new(move |program_names: &[&str]| {
1614                    unsafe {
1615                        let skel_ref = &mut *(skel_ptr as *mut BpfSkel);
1616                        attach_progs_selective(skel_ref, program_names).map(|(links, _)| links)
1617                    }
1618                });
1619
1620                // Give EventControl the initial links and callback, then immediately detach
1621                // to start with minimal overhead
1622                event_control_instance.set_bpf_links(initial_links, attach_callback);
1623                event_control_instance.disable_event_tracking()?;
1624                info!("BPF programs detached by default - use control_event_tracking to enable");
1625
1626                // Create stats control channel
1627                let (stats_tx, stats_rx) = mpsc::unbounded_channel::<StatsControlCommand>();
1628                event_control_instance.set_stats_control_channel(stats_tx);
1629
1630                // Wrap in Arc after configuration
1631                let event_control = Arc::new(event_control_instance);
1632
1633                // Create App (but don't use it in spawned tasks due to Send constraints)
1634                let config = Config::default_config();
1635                let scheduler =
1636                    read_file_string(SCHED_NAME_PATH).unwrap_or_else(|_| "".to_string());
1637                let mut app = App::new(
1638                    config,
1639                    scheduler,
1640                    100,
1641                    mcp_args.process_id,
1642                    mcp_args.layered,
1643                    action_tx.clone(),
1644                    skel,
1645                )?;
1646
1647                // Create analyzer control and register ALL analyzers
1648                use scxtop::mcp::AnalyzerControl;
1649
1650                let mut analyzer_control = AnalyzerControl::new();
1651                analyzer_control.set_event_control(event_control.clone());
1652
1653                // Register all analyzers
1654                analyzer_control.set_event_buffer(event_buffer_arc.clone());
1655                analyzer_control.set_latency_tracker(latency_tracker_arc.clone());
1656                analyzer_control.set_cpu_hotspot_analyzer(cpu_hotspot_arc.clone());
1657                analyzer_control.set_migration_analyzer(migration_analyzer_arc.clone());
1658                analyzer_control.set_process_history(process_history_arc.clone());
1659                analyzer_control.set_dsq_monitor(dsq_monitor_arc.clone());
1660                analyzer_control.set_rate_monitor(rate_monitor_arc.clone());
1661                analyzer_control.set_wakeup_tracker(wakeup_tracker_arc.clone());
1662                analyzer_control.set_waker_wakee_analyzer(waker_wakee_arc.clone());
1663                analyzer_control.set_softirq_analyzer(softirq_analyzer_arc.clone());
1664
1665                // Wrap analyzer control in Arc<Mutex<>>
1666                let analyzer_control = Arc::new(Mutex::new(analyzer_control));
1667
1668                // Create trace cache for perfetto analysis
1669                use std::collections::HashMap;
1670                let trace_cache = Arc::new(Mutex::new(HashMap::new()));
1671
1672                // Create MCP server
1673                let mut server = McpServer::new(mcp_config)
1674                    .with_topology(topo_arc)
1675                    .setup_scheduler_resource()
1676                    .setup_profiling_resources()
1677                    .with_bpf_perf_attacher(bpf_attacher_arc)
1678                    .with_shared_stats(shared_stats.clone())
1679                    .with_stats_client(None)
1680                    .with_event_control(event_control.clone())
1681                    .with_analyzer_control(analyzer_control.clone())
1682                    .with_trace_cache(trace_cache)
1683                    .setup_stats_resources();
1684
1685                // Enable event streaming
1686                let _event_stream_rx = server.enable_event_streaming();
1687                let resources = server.get_resources_handle();
1688
1689                // Get BPF stats collector for periodic sampling
1690                let bpf_stats = server.get_bpf_stats_collector();
1691
1692                // Get perf profiler for stack trace collection
1693                let perf_profiler = server.get_perf_profiler();
1694
1695                // Start BPF polling tasks - spawn a separate task for each ringbuffer
1696                let shutdown_poll = shutdown.clone();
1697
1698                let mut ringbuffer_handles = Vec::new();
1699                for (rb_id, rb) in rb_managers.into_iter().enumerate() {
1700                    let stop_poll_clone = shutdown_poll.clone();
1701                    ringbuffer_handles.push(tokio::spawn(async move {
1702                        loop {
1703                            // Poll with 1ms timeout
1704                            rb.poll(1);
1705                            if stop_poll_clone.load(Ordering::Relaxed) {
1706                                // Consume remaining events
1707                                rb.consume();
1708                                // Free the ring buffer
1709                                rb.free();
1710                                debug!("ringbuffer #{} polling stopped", rb_id);
1711                                break;
1712                            }
1713                        }
1714                    }));
1715                }
1716
1717                // Start controllable BPF stats collection task
1718                // Task responds to start/stop commands via channel, starts in stopped state
1719                if let Some(collector) = bpf_stats {
1720                    let shutdown_stats = shutdown.clone();
1721                    let mut stats_rx_task = stats_rx;
1722                    tokio::spawn(async move {
1723                        let mut running = false;
1724                        let mut interval_ms = 100u64;
1725                        let mut interval = tokio::time::interval(Duration::from_millis(interval_ms));
1726
1727                        loop {
1728                            tokio::select! {
1729                                // Handle control commands
1730                                Some(cmd) = stats_rx_task.recv() => {
1731                                    match cmd {
1732                                        StatsControlCommand::Start(new_interval_ms) => {
1733                                            running = true;
1734                                            interval_ms = new_interval_ms;
1735                                            interval = tokio::time::interval(Duration::from_millis(interval_ms));
1736                                            info!("Stats collection started with {}ms interval", interval_ms);
1737                                        }
1738                                        StatsControlCommand::Stop => {
1739                                            running = false;
1740                                            info!("Stats collection stopped");
1741                                        }
1742                                    }
1743                                }
1744
1745                                // Collect stats if running
1746                                _ = interval.tick(), if running => {
1747                                    if shutdown_stats.load(Ordering::Relaxed) {
1748                                        break;
1749                                    }
1750                                    let _ = collector.collect_sample();
1751                                }
1752
1753                                // Check shutdown even when not running
1754                                _ = tokio::time::sleep(Duration::from_millis(100)), if !running => {
1755                                    if shutdown_stats.load(Ordering::Relaxed) {
1756                                        break;
1757                                    }
1758                                }
1759                            }
1760                        }
1761                    });
1762                }
1763
1764                info!("MCP daemon started, processing BPF events");
1765
1766                // Main loop: handle both MCP server and action processing
1767                let mut mcp_server_task = Box::pin(server.run_async());
1768                let mcp_result;
1769                loop {
1770                    tokio::select! {
1771                        // Handle MCP server
1772                        result = &mut mcp_server_task => {
1773                            info!("MCP server exited");
1774                            shutdown.store(true, Ordering::Relaxed);
1775                            mcp_result = result;
1776                            break;
1777                        }
1778
1779                        // Handle actions from BPF
1780                        Some(action) = action_rx.recv() => {
1781                            // Check for shutdown
1782                            if matches!(action, Action::Quit) {
1783                                info!("Received quit action");
1784                                shutdown.store(true, Ordering::Relaxed);
1785                                mcp_result = Ok(());
1786                                break;
1787                            }
1788
1789                            // Feed perf samples to profiler if it's collecting
1790                            if let Some(ref profiler) = perf_profiler {
1791                                if let Action::PerfSample(ref perf_sample) = action {
1792                                    use scxtop::mcp::RawSample;
1793                                    profiler.add_sample(RawSample {
1794                                        address: perf_sample.instruction_pointer,
1795                                        pid: perf_sample.pid,
1796                                        cpu_id: perf_sample.cpu_id,
1797                                        is_kernel: perf_sample.is_kernel,
1798                                        kernel_stack: perf_sample.kernel_stack.clone(),
1799                                        user_stack: perf_sample.user_stack.clone(),
1800                                        layer_id: if perf_sample.layer_id >= 0 {
1801                                            Some(perf_sample.layer_id)
1802                                        } else {
1803                                            None
1804                                        },
1805                                    });
1806                                }
1807                            }
1808
1809                            // Update app state
1810                            let _ = app.handle_action(&action);
1811
1812                            // Convert action to MCP event and push to stream
1813                            if let Some(event) = action_to_mcp_event(&action) {
1814                                let _ = resources.push_event(event);
1815                            }
1816                        }
1817                    }
1818                }
1819
1820                // Wait for all ringbuffer tasks to finish consuming remaining events
1821                debug!("waiting for {} ringbuffer tasks to complete", ringbuffer_handles.len());
1822                for handle in ringbuffer_handles {
1823                    if let Err(e) = handle.await {
1824                        log::error!("Ringbuffer task panicked: {e}");
1825                    }
1826                }
1827
1828                // Links are managed by EventControl and will be dropped on shutdown
1829                mcp_result
1830            })
1831    } else {
1832        // One-shot mode: No BPF, just serve static data
1833        let mut server = McpServer::new(mcp_config)
1834            .with_topology(topo_arc)
1835            .setup_scheduler_resource()
1836            .setup_profiling_resources()
1837            .with_stats_client(None)
1838            .setup_stats_resources();
1839        server.run_blocking()
1840    }
1841}
1842
1843fn main() -> Result<()> {
1844    let args = Cli::parse();
1845
1846    match &args.command.unwrap_or(Commands::Tui(args.tui)) {
1847        Commands::Tui(tui_args) => {
1848            run_tui(tui_args)?;
1849        }
1850        Commands::Trace(trace_args) => {
1851            run_trace(trace_args)?;
1852        }
1853        Commands::Mcp(mcp_args) => {
1854            run_mcp(mcp_args)?;
1855        }
1856        Commands::GenerateCompletions { shell, output } => {
1857            generate_completions(Cli::command(), *shell, output.clone())
1858                .unwrap_or_else(|_| panic!("Failed to generate completions for {shell}"));
1859        }
1860    }
1861    Ok(())
1862}