scxtop/
main.rs

1// Copyright (c) Meta Platforms, Inc. and affiliates.
2//
3// This software may be used and distributed according to the terms of the
4// GNU General Public License version 2.
5
6use scx_utils::compat;
7use scxtop::bpf_skel::types::bpf_event;
8use scxtop::cli::{generate_completions, Cli, Commands, TraceArgs, TuiArgs};
9use scxtop::config::Config;
10use scxtop::edm::{ActionHandler, BpfEventActionPublisher, BpfEventHandler, EventDispatchManager};
11use scxtop::layered_util;
12use scxtop::mangoapp::poll_mangoapp;
13use scxtop::search;
14use scxtop::tracer::Tracer;
15use scxtop::util::{get_clock_value, read_file_string};
16use scxtop::Action;
17use scxtop::App;
18use scxtop::CpuStatTracker;
19use scxtop::Event;
20use scxtop::Key;
21use scxtop::KeyMap;
22use scxtop::MemStatSnapshot;
23use scxtop::PerfettoTraceManager;
24use scxtop::SystemStatAction;
25use scxtop::Tui;
26use scxtop::SCHED_NAME_PATH;
27use scxtop::{available_kprobe_events, UpdateColVisibilityAction};
28use scxtop::{bpf_skel::*, AppState};
29
30use anyhow::anyhow;
31use anyhow::Result;
32use clap::{CommandFactory, Parser};
33use futures::future::join_all;
34use libbpf_rs::skel::OpenSkel;
35use libbpf_rs::skel::SkelBuilder;
36use libbpf_rs::Link;
37use libbpf_rs::ProgramInput;
38use libbpf_rs::RingBufferBuilder;
39use libbpf_rs::UprobeOpts;
40use log::debug;
41use log::info;
42use ratatui::crossterm::event::{KeyCode::Char, KeyEvent};
43use simplelog::{
44    ColorChoice, Config as SimplelogConfig, LevelFilter, TermLogger, TerminalMode, WriteLogger,
45};
46use std::ffi::CString;
47use std::fs::File;
48use std::mem::MaybeUninit;
49use std::str::FromStr;
50use std::sync::atomic::AtomicBool;
51use std::sync::atomic::Ordering;
52use std::sync::Arc;
53use std::time::Duration;
54use sysinfo::System;
55use tokio::sync::mpsc;
56
57fn get_action(app: &App, keymap: &KeyMap, event: Event) -> Action {
58    match event {
59        Event::Error => Action::None,
60        Event::Tick => Action::Tick,
61        Event::TickRateChange(tick_rate_ms) => {
62            Action::TickRateChange(std::time::Duration::from_millis(tick_rate_ms))
63        }
64        Event::Key(key) => handle_key_event(app, keymap, key),
65        Event::Paste(paste) => handle_input_entry(app, paste).unwrap_or(Action::None),
66        _ => Action::None,
67    }
68}
69
70fn handle_key_event(app: &App, keymap: &KeyMap, key: KeyEvent) -> Action {
71    match key.code {
72        Char(c) => handle_input_entry(app, c.to_string()).unwrap_or(keymap.action(&Key::Char(c))),
73        _ => keymap.action(&Key::Code(key.code)),
74    }
75}
76
77fn handle_input_entry(app: &App, s: String) -> Option<Action> {
78    match app.state() {
79        AppState::PerfEvent | AppState::KprobeEvent => Some(Action::InputEntry(s)),
80        AppState::Default | AppState::Llc | AppState::Node => {
81            if app.filtering() {
82                Some(Action::InputEntry(s))
83            } else {
84                None
85            }
86        }
87        _ => None,
88    }
89}
90
91/// Attaches BPF programs to the skel.
92fn attach_progs(skel: &mut BpfSkel) -> Result<Vec<Link>> {
93    // Attach probes
94    let mut links = vec![
95        skel.progs.on_sched_cpu_perf.attach()?,
96        skel.progs.scx_sched_reg.attach()?,
97        skel.progs.scx_sched_unreg.attach()?,
98        skel.progs.on_sched_switch.attach()?,
99        skel.progs.on_sched_wakeup.attach()?,
100        skel.progs.on_sched_wakeup_new.attach()?,
101        skel.progs.on_sched_waking.attach()?,
102        skel.progs.on_sched_migrate_task.attach()?,
103        skel.progs.on_sched_fork.attach()?,
104        skel.progs.on_sched_exec.attach()?,
105        skel.progs.on_sched_exit.attach()?,
106    ];
107
108    // 6.13 compatibility
109    if compat::ksym_exists("scx_bpf_dsq_insert_vtime")? {
110        if let Ok(link) = skel.progs.scx_insert_vtime.attach() {
111            links.push(link);
112        }
113        if let Ok(link) = skel.progs.scx_insert.attach() {
114            links.push(link);
115        }
116        if let Ok(link) = skel.progs.scx_dsq_move.attach() {
117            links.push(link);
118        }
119        if let Ok(link) = skel.progs.scx_dsq_move_set_vtime.attach() {
120            links.push(link);
121        }
122        if let Ok(link) = skel.progs.scx_dsq_move_set_slice.attach() {
123            links.push(link);
124        }
125    } else {
126        if let Ok(link) = skel.progs.scx_dispatch.attach() {
127            links.push(link);
128        }
129        if let Ok(link) = skel.progs.scx_dispatch_vtime.attach() {
130            links.push(link);
131        }
132        if let Ok(link) = skel.progs.scx_dispatch_from_dsq_set_vtime.attach() {
133            links.push(link);
134        }
135        if let Ok(link) = skel.progs.scx_dispatch_from_dsq_set_slice.attach() {
136            links.push(link);
137        }
138        if let Ok(link) = skel.progs.scx_dispatch_from_dsq.attach() {
139            links.push(link);
140        }
141    }
142    if let Ok(link) = skel.progs.on_cpuhp_enter.attach() {
143        links.push(link);
144    }
145    if let Ok(link) = skel.progs.on_cpuhp_exit.attach() {
146        links.push(link);
147    }
148
149    Ok(links)
150}
151
152fn run_trace(trace_args: &TraceArgs) -> Result<()> {
153    TermLogger::init(
154        match trace_args.verbose {
155            0 => simplelog::LevelFilter::Info,
156            1 => simplelog::LevelFilter::Debug,
157            _ => simplelog::LevelFilter::Trace,
158        },
159        SimplelogConfig::default(),
160        TerminalMode::Mixed,
161        ColorChoice::Auto,
162    )?;
163
164    let mut kprobe_events = available_kprobe_events()?;
165    kprobe_events.sort();
166    search::sorted_contains_all(&kprobe_events, &trace_args.kprobes)
167        .then_some(())
168        .ok_or_else(|| anyhow!("Invalid kprobe events"))?;
169
170    let config = Config::default_config();
171    let worker_threads = config.worker_threads() as usize;
172    tokio::runtime::Builder::new_multi_thread()
173        .enable_all()
174        .worker_threads(if worker_threads > 2 {
175            worker_threads
176        } else {
177            4
178        })
179        .build()
180        .unwrap()
181        .block_on(async {
182            let (action_tx, mut action_rx) = mpsc::unbounded_channel();
183
184            // Set up the BPF skel and publisher
185            let mut open_object = MaybeUninit::uninit();
186            let mut builder = BpfSkelBuilder::default();
187            if trace_args.verbose > 2 {
188                builder.obj_builder.debug(true);
189            }
190
191            let skel = builder.open(&mut open_object)?;
192            compat::cond_kprobe_enable("gpu_memory_total", &skel.progs.on_gpu_memory_total)?;
193            compat::cond_kprobe_enable("hw_pressure_update", &skel.progs.on_hw_pressure_update)?;
194            compat::cond_tracepoint_enable("sched:sched_process_wait", &skel.progs.on_sched_wait)?;
195            compat::cond_tracepoint_enable("sched:sched_process_hang", &skel.progs.on_sched_hang)?;
196
197            let mut skel = skel.load()?;
198            skel.maps.data_data.as_mut().unwrap().enable_bpf_events = false;
199            let links = attach_progs(&mut skel)?;
200
201            let bpf_publisher = BpfEventActionPublisher::new(action_tx.clone());
202
203            // Set up the event buffer
204            let mut event_rbb = RingBufferBuilder::new();
205            let mut edm = EventDispatchManager::new(None, None);
206            edm.register_bpf_handler(Box::new(bpf_publisher));
207            let event_handler = move |data: &[u8]| {
208                let mut event = bpf_event::default();
209                plain::copy_from_bytes(&mut event, data).expect("Event data buffer was too short");
210                let _ = edm.on_event(&event);
211                0
212            };
213            event_rbb.add(&skel.maps.events, event_handler)?;
214            let event_rb = event_rbb.build()?;
215
216            // Set up the background threads
217            let shutdown = Arc::new(AtomicBool::new(false));
218            let stop_poll = shutdown.clone();
219            let stop_stats = shutdown.clone();
220
221            let mut handles = Vec::new();
222            handles.push(tokio::spawn(async move {
223                loop {
224                    let _ = event_rb.poll(Duration::from_millis(1));
225                    if stop_poll.load(Ordering::Relaxed) {
226                        // Flush the ring buffer to ensure all events are processed
227                        let _ = event_rb.consume();
228                        debug!("polling stopped");
229                        break;
230                    }
231                }
232            }));
233
234            if trace_args.system_stats {
235                let mut cpu_stat_tracker = CpuStatTracker::default();
236                let mut mem_stats = MemStatSnapshot::default();
237                let mut system = System::new_all();
238                let action_tx_clone = action_tx.clone();
239
240                handles.push(tokio::spawn(async move {
241                    loop {
242                        if stop_stats.load(Ordering::Relaxed) {
243                            break;
244                        }
245                        let ts = get_clock_value(libc::CLOCK_BOOTTIME);
246
247                        cpu_stat_tracker
248                            .update(&mut system)
249                            .expect("Failed to update cpu stats");
250
251                        mem_stats.update().expect("Failed to update mem stats");
252
253                        let sys_stat_action = Action::SystemStat(SystemStatAction {
254                            ts,
255                            cpu_data_prev: cpu_stat_tracker.prev.clone(),
256                            cpu_data_current: cpu_stat_tracker.current.clone(),
257                            mem_info: mem_stats.clone(),
258                        });
259                        action_tx_clone
260                            .send(sys_stat_action)
261                            .expect("Failed to send CpuStat action");
262
263                        tokio::time::sleep(Duration::from_millis(100)).await;
264                    }
265                }));
266            }
267
268            let trace_file_prefix = config.trace_file_prefix().to_string();
269            let trace_file = trace_args.output_file.clone();
270            let mut trace_manager = PerfettoTraceManager::new(trace_file_prefix, None);
271
272            info!("starting trace for {}ms", trace_args.trace_ms);
273            trace_manager.start()?;
274            let mut tracer = Tracer::new(skel);
275            tracer.trace(&trace_args.kprobes)?;
276
277            handles.push(tokio::spawn(async move {
278                let mut count = 0;
279                loop {
280                    let action = action_rx.recv().await;
281                    if let Some(a) = action {
282                        count += 1;
283                        trace_manager
284                            .on_action(&a)
285                            .expect("Action should have been resolved");
286                    } else {
287                        trace_manager.stop(trace_file, None).unwrap();
288                        info!("trace file compiled, collected {count} events");
289                        break;
290                    }
291                }
292            }));
293            tokio::time::sleep(Duration::from_millis(trace_args.trace_ms)).await;
294
295            // 1) set the shutdown variable to stop background tokio threads
296            // 2) next, drop the links to detach the attached BPF programs
297            // 3) drop the action_tx to ensure action_rx closes
298            // 4) wait for the completion of the trace file generation to complete
299            shutdown.store(true, Ordering::Relaxed);
300            tracer.clear_links()?;
301            drop(links);
302            drop(action_tx);
303            info!("generating trace");
304            let results = join_all(handles).await;
305            for result in results {
306                if let Err(e) = result {
307                    eprintln!("Task panicked: {e}");
308                }
309            }
310
311            let stats = tracer.stats()?;
312            info!("{stats:?}");
313
314            Ok(())
315        })
316}
317
318fn run_tui(tui_args: &TuiArgs) -> Result<()> {
319    if let Ok(log_path) = std::env::var("RUST_LOG_PATH") {
320        let log_level = match std::env::var("RUST_LOG") {
321            Ok(v) => LevelFilter::from_str(&v)?,
322            Err(_) => LevelFilter::Info,
323        };
324
325        WriteLogger::init(
326            log_level,
327            simplelog::Config::default(),
328            File::create(log_path)?,
329        )?;
330
331        log_panics::Config::new()
332            .backtrace_mode(log_panics::BacktraceMode::Resolved)
333            .install_panic_hook();
334    };
335
336    let config = Config::merge([
337        Config::from(tui_args.clone()),
338        Config::load_or_default().expect("Failed to load config or load default config"),
339    ]);
340    let keymap = config.active_keymap.clone();
341
342    tokio::runtime::Builder::new_multi_thread()
343        .enable_all()
344        .worker_threads(config.worker_threads() as usize)
345        .build()
346        .unwrap()
347        .block_on(async {
348            let (action_tx, mut action_rx) = mpsc::unbounded_channel();
349
350            let mut open_object = MaybeUninit::uninit();
351            let mut builder = BpfSkelBuilder::default();
352            if config.debug() {
353                builder.obj_builder.debug(true);
354            }
355            let bpf_publisher = BpfEventActionPublisher::new(action_tx.clone());
356            let mut edm = EventDispatchManager::new(None, None);
357            edm.register_bpf_handler(Box::new(bpf_publisher));
358
359            let mut skel = builder.open(&mut open_object)?;
360            skel.maps.rodata_data.as_mut().unwrap().long_tail_tracing_min_latency_ns =
361                tui_args.experimental_long_tail_tracing_min_latency_ns;
362
363            let map_handle = if tui_args.layered {
364                skel.maps.rodata_data.as_mut().unwrap().layered = true;
365                action_tx.send(Action::UpdateColVisibility(UpdateColVisibilityAction {
366                    table: "Process".to_string(),
367                    col: "Layer ID".to_string(),
368                    visible: true,
369                }))?;
370                Some(layered_util::attach_to_existing_map("task_ctxs", &mut skel.maps.task_ctxs)?)
371            } else {
372                None
373            };
374
375            compat::cond_kprobe_enable("gpu_memory_total", &skel.progs.on_gpu_memory_total)?;
376            compat::cond_kprobe_enable("hw_pressure_update", &skel.progs.on_hw_pressure_update)?;
377            compat::cond_tracepoint_enable("sched:sched_process_wait", &skel.progs.on_sched_wait)?;
378            compat::cond_tracepoint_enable("sched:sched_process_hang", &skel.progs.on_sched_hang)?;
379            let mut skel = skel.load()?;
380            let mut links = attach_progs(&mut skel)?;
381            skel.progs.scxtop_init.test_run(ProgramInput::default())?;
382
383            if tui_args.experimental_long_tail_tracing {
384                skel.maps.data_data.as_mut().unwrap().trace_duration_ns = config.trace_duration_ns();
385                skel.maps.data_data.as_mut().unwrap().trace_warmup_ns = config.trace_warmup_ns();
386
387                let binary = tui_args
388                    .experimental_long_tail_tracing_binary
389                    .clone()
390                    .unwrap();
391                let symbol = tui_args
392                    .experimental_long_tail_tracing_symbol
393                    .clone()
394                    .unwrap();
395
396                links.extend([
397                    skel.progs.long_tail_tracker_exit.attach_uprobe_with_opts(
398                        -1, /* pid, -1 == all */
399                        binary.clone(),
400                        0,
401                        UprobeOpts {
402                            retprobe: true,
403                            func_name: Some(symbol.clone()),
404                            ..Default::default()
405                        },
406                    )?,
407                    skel.progs.long_tail_tracker_entry.attach_uprobe_with_opts(
408                        -1, /* pid, -1 == all */
409                        binary.clone(),
410                        0,
411                        UprobeOpts {
412                            retprobe: false,
413                            func_name: Some(symbol.clone()),
414                            ..Default::default()
415                        },
416                    )?,
417                ]);
418            };
419
420            let mut tui = Tui::new(keymap.clone(), config.tick_rate_ms(), config.frame_rate_ms())?;
421            let mut event_rbb = RingBufferBuilder::new();
422            let event_handler = move |data: &[u8]| {
423                let mut event = bpf_event::default();
424                plain::copy_from_bytes(&mut event, data).expect("Event data buffer was too short");
425                let _ = edm.on_event(&event);
426                0
427            };
428            event_rbb.add(&skel.maps.events, event_handler)?;
429            let event_rb = event_rbb.build()?;
430            let scheduler = read_file_string(SCHED_NAME_PATH).unwrap_or("".to_string());
431
432            let mut app = App::new(
433                config,
434                scheduler,
435                100,
436                tui_args.process_id,
437                action_tx.clone(),
438                skel,
439            )?;
440
441            tui.enter()?;
442
443            let shutdown = app.should_quit.clone();
444            tokio::spawn(async move {
445                loop {
446                    let _ = event_rb.poll(Duration::from_millis(1));
447                    if shutdown.load(Ordering::Relaxed) {
448                        break;
449                    }
450                }
451            });
452
453            if tui_args.mangoapp_tracing {
454                let stop_mangoapp = app.should_quit.clone();
455                let mangoapp_path = CString::new(tui_args.mangoapp_path.clone()).unwrap();
456                let poll_intvl_ms = tui_args.mangoapp_poll_intvl_ms;
457                let tx = action_tx.clone();
458                tokio::spawn(async move {
459                    poll_mangoapp(
460                        mangoapp_path,
461                        poll_intvl_ms,
462                        tx,
463                        stop_mangoapp,
464                    )
465                    .await
466                });
467            }
468
469            loop {
470                tokio::select! {
471                    ev = tui.next() => {
472                        let ev = ev?;
473                        match ev {
474                            Event::Quit => { action_tx.send(Action::Quit)?; },
475                            Event::Tick => action_tx.send(Action::Tick)?,
476                            Event::TickRateChange(tick_rate_ms) => action_tx.send(
477                                Action::TickRateChange(std::time::Duration::from_millis(tick_rate_ms)),
478                            )?,
479                            Event::Render => {
480                                if app.should_quit.load(Ordering::Relaxed) {
481                                    break;
482                                }
483                                if app.state() != AppState::Pause {
484                                    tui.draw(|f| app.render(f).expect("Failed to render application"))?;
485                                }
486                            }
487                            Event::Key(_) => {
488                                let action = get_action(&app, &keymap, ev);
489                                action_tx.send(action)?;
490                            }
491                            _ => {}
492                    }}
493
494                    ac = action_rx.recv() => {
495                        let ac = ac.ok_or(anyhow!("actions channel closed"))?;
496                        app.handle_action(&ac)?;
497                    }
498                }
499            }
500            tui.exit()?;
501            drop(links);
502            drop(map_handle);
503
504            Ok(())
505        })
506}
507
508fn main() -> Result<()> {
509    let args = Cli::parse();
510
511    match &args.command.unwrap_or(Commands::Tui(args.tui)) {
512        Commands::Tui(tui_args) => {
513            run_tui(tui_args)?;
514        }
515        Commands::Trace(trace_args) => {
516            run_trace(trace_args)?;
517        }
518        Commands::GenerateCompletions { shell, output } => {
519            generate_completions(Cli::command(), *shell, output.clone())
520                .unwrap_or_else(|_| panic!("Failed to generate completions for {shell}"));
521        }
522    }
523    Ok(())
524}