scx_stats/
server.rs

1use crate::StatsClient;
2use crate::{Meta, StatsData, StatsKind, StatsMeta};
3use anyhow::{anyhow, bail, Context, Result};
4use crossbeam::channel::{unbounded, Receiver, RecvError, Select, Sender};
5use log::{debug, error, warn};
6use serde::{Deserialize, Serialize};
7use serde_json::Value;
8use std::collections::{BTreeMap, BTreeSet};
9use std::io::{BufRead, BufReader, Write};
10use std::os::unix::net::{UnixListener, UnixStream};
11use std::path::{Path, PathBuf};
12use std::sync::atomic::{AtomicBool, Ordering};
13use std::sync::{Arc, Mutex};
14use std::thread::spawn;
15
16pub trait StatsReader<Req, Res>:
17    FnMut(&BTreeMap<String, String>, (&Sender<Req>, &Receiver<Res>)) -> Result<Value>
18{
19}
20impl<
21        Req,
22        Res,
23        T: FnMut(&BTreeMap<String, String>, (&Sender<Req>, &Receiver<Res>)) -> Result<Value>,
24    > StatsReader<Req, Res> for T
25{
26}
27
28pub trait StatsReaderSend<Req, Res>:
29    FnMut(&BTreeMap<String, String>, (&Sender<Req>, &Receiver<Res>)) -> Result<Value> + Send
30{
31}
32impl<
33        Req,
34        Res,
35        T: FnMut(&BTreeMap<String, String>, (&Sender<Req>, &Receiver<Res>)) -> Result<Value> + Send,
36    > StatsReaderSend<Req, Res> for T
37{
38}
39
40pub trait StatsReaderSync<Req, Res>:
41    Fn(&BTreeMap<String, String>, (&Sender<Req>, &Receiver<Res>)) -> Result<Value> + Send + Sync
42{
43}
44impl<
45        Req,
46        Res,
47        T: Fn(&BTreeMap<String, String>, (&Sender<Req>, &Receiver<Res>)) -> Result<Value>
48            + Send
49            + Sync,
50    > StatsReaderSync<Req, Res> for T
51{
52}
53
54pub trait StatsOpener<Req, Res>:
55    FnMut((&Sender<Req>, &Receiver<Res>)) -> Result<Box<dyn StatsReader<Req, Res>>> + Send
56{
57}
58impl<
59        Req,
60        Res,
61        T: FnMut((&Sender<Req>, &Receiver<Res>)) -> Result<Box<dyn StatsReader<Req, Res>>> + Send,
62    > StatsOpener<Req, Res> for T
63{
64}
65
66pub trait StatsCloser<Req, Res>: FnOnce((&Sender<Req>, &Receiver<Res>)) + Send {}
67impl<Req, Res, T: FnOnce((&Sender<Req>, &Receiver<Res>)) + Send> StatsCloser<Req, Res> for T {}
68
69pub struct StatsOps<Req, Res> {
70    pub open: Box<dyn StatsOpener<Req, Res>>,
71    pub close: Option<Box<dyn StatsCloser<Req, Res>>>,
72}
73
74struct StatsOpenOps<Req, Res> {
75    map: BTreeMap<
76        String,
77        (
78            Arc<Mutex<StatsOps<Req, Res>>>,
79            Box<dyn StatsReader<Req, Res>>,
80            ChannelPair<Req, Res>,
81        ),
82    >,
83}
84
85impl<Req, Res> StatsOpenOps<Req, Res> {
86    fn new() -> Self {
87        Self {
88            map: BTreeMap::new(),
89        }
90    }
91}
92
93impl<Req, Res> std::ops::Drop for StatsOpenOps<Req, Res> {
94    fn drop(&mut self) {
95        for (_, (ops, _, ch)) in self.map.iter_mut() {
96            if let Some(close) = ops.lock().unwrap().close.take() {
97                close((&ch.req, &ch.res));
98            }
99        }
100    }
101}
102
103#[derive(Clone, Debug, Serialize, Deserialize)]
104pub struct StatsRequest {
105    pub req: String,
106    #[serde(default)]
107    pub args: BTreeMap<String, String>,
108}
109
110impl StatsRequest {
111    pub fn new(req: &str, args: Vec<(String, String)>) -> Self {
112        Self {
113            req: req.to_string(),
114            args: args.into_iter().collect(),
115        }
116    }
117}
118
119#[derive(Clone, Debug, Serialize, Deserialize)]
120pub struct StatsResponse {
121    pub errno: i32,
122    pub args: BTreeMap<String, Value>,
123}
124
125pub struct StatsErrno(pub i32);
126
127impl std::fmt::Display for StatsErrno {
128    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
129        write!(f, "{}", std::io::Error::from_raw_os_error(self.0))
130    }
131}
132
133impl std::fmt::Debug for StatsErrno {
134    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
135        write!(f, "{:?}", std::io::Error::from_raw_os_error(self.0))
136    }
137}
138
139struct ChannelPair<Req, Res> {
140    req: Sender<Req>,
141    res: Receiver<Res>,
142}
143
144impl<Req, Res> ChannelPair<Req, Res> {
145    fn bidi() -> (ChannelPair<Req, Res>, ChannelPair<Res, Req>) {
146        let (req, res) = (unbounded::<Req>(), unbounded::<Res>());
147        (
148            ChannelPair {
149                req: req.0,
150                res: res.1,
151            },
152            ChannelPair {
153                req: res.0,
154                res: req.1,
155            },
156        )
157    }
158}
159
160impl<Req, Res> Clone for ChannelPair<Req, Res> {
161    fn clone(&self) -> Self {
162        Self {
163            req: self.req.clone(),
164            res: self.res.clone(),
165        }
166    }
167}
168
169pub struct StatsServerData<Req, Res>
170where
171    Req: Send + 'static,
172    Res: Send + 'static,
173{
174    top: Option<String>,
175    meta: BTreeMap<String, StatsMeta>,
176    ops: BTreeMap<String, Arc<Mutex<StatsOps<Req, Res>>>>,
177}
178
179impl<Req, Res> StatsServerData<Req, Res>
180where
181    Req: Send + 'static,
182    Res: Send + 'static,
183{
184    pub fn new() -> Self {
185        Self {
186            top: None,
187            meta: BTreeMap::new(),
188            ops: BTreeMap::new(),
189        }
190    }
191
192    pub fn add_meta(mut self, meta: StatsMeta) -> Self {
193        if meta.attrs.top.is_some() && self.top.is_none() {
194            self.top = Some(meta.name.clone());
195        }
196        self.meta.insert(meta.name.clone(), meta);
197        self
198    }
199
200    pub fn add_ops(mut self, name: &str, ops: StatsOps<Req, Res>) -> Self {
201        self.ops.insert(name.to_string(), Arc::new(Mutex::new(ops)));
202        self
203    }
204
205    pub fn add_stats(self, name: &str, fetch: Box<dyn StatsReaderSend<Req, Res>>) -> Self {
206        let wrapped_fetch = Mutex::new(fetch);
207        let read: Box<dyn StatsReaderSync<Req, Res>> =
208            Box::new(move |args, chan| wrapped_fetch.lock().unwrap()(args, chan));
209        let wrapped_read = Arc::new(read);
210        let ops = StatsOps {
211            open: Box::new(move |_| {
212                let copy = wrapped_read.clone();
213                Ok(Box::new(move |args, chan| copy(args, chan)))
214            }),
215            close: None,
216        };
217
218        self.add_ops(name, ops)
219    }
220
221    fn visit_meta_inner(
222        &self,
223        name: &str,
224        visit: &mut impl FnMut(&StatsMeta) -> Result<()>,
225        nesting: &mut BTreeSet<String>,
226        visited: &mut BTreeSet<String>,
227    ) -> Result<()> {
228        let m = match self.meta.get(name) {
229            Some(v) => v,
230            None => bail!("unknown stats meta name {}", name),
231        };
232
233        if !nesting.insert(name.into()) {
234            bail!("loop in stats meta detected, {} already nested", name);
235        }
236        if !visited.insert(name.into()) {
237            return Ok(());
238        }
239
240        visit(m)?;
241
242        for (fname, field) in m.fields.iter() {
243            match &field.data {
244                StatsData::Array(StatsKind::Struct(inner)) => {
245                    self.visit_meta_inner(inner, visit, nesting, visited)?
246                }
247                StatsData::Dict {
248                    key: StatsKind::Struct(inner),
249                    datum: _,
250                } => bail!("{}.{} is a dict with struct {} as key", name, fname, inner),
251                StatsData::Dict {
252                    key: _,
253                    datum: StatsKind::Struct(inner),
254                } => self.visit_meta_inner(inner, visit, nesting, visited)?,
255                _ => {}
256            }
257        }
258
259        nesting.remove(name);
260        Ok(())
261    }
262
263    fn visit_meta(
264        &self,
265        from: &str,
266        visit: &mut impl FnMut(&StatsMeta) -> Result<()>,
267    ) -> Result<()> {
268        let mut nesting = BTreeSet::<String>::new();
269        let mut visited = BTreeSet::<String>::new();
270        self.visit_meta_inner(from, visit, &mut nesting, &mut visited)
271    }
272
273    fn verify_meta(&self) -> Result<()> {
274        if self.top.is_none() {
275            debug!("top-level stats metadata missing");
276            return Ok(());
277        }
278
279        // Null visit checks all nested stats are reacheable without loops.
280        self.visit_meta(self.top.as_ref().unwrap(), &mut |_| Ok(()))
281    }
282
283    pub fn describe_meta<W: Write>(&self, w: &mut W, from: Option<&[&str]>) -> Result<()> {
284        let meta_names = match from {
285            Some(v) if !v.is_empty() => v,
286            Some(_) | None => {
287                let top = self
288                    .top
289                    .as_ref()
290                    .ok_or_else(|| anyhow!("don't know where to start"))?;
291                return self.describe_meta_inner(w, top);
292            }
293        };
294
295        for meta_name in meta_names {
296            self.describe_meta_inner(w, meta_name)?;
297        }
298
299        Ok(())
300    }
301
302    pub fn describe_meta_inner<W: Write>(&self, w: &mut W, from: &str) -> Result<()> {
303        let (mut nwidth, mut fwidth, mut dwidth) = (0usize, 0usize, 0usize);
304
305        self.visit_meta(from, &mut |m| {
306            nwidth = nwidth.max(m.name.len());
307            (fwidth, dwidth) = m.fields.iter().fold((fwidth, dwidth), |acc, (n, f)| {
308                (acc.0.max(n.len()), acc.1.max(f.data.to_string().len() + 2))
309            });
310            Ok(())
311        })?;
312
313        let mut first = true;
314        self.visit_meta(from, &mut |m| {
315            if !first {
316                writeln!(w, "")?;
317            }
318            first = false;
319
320            write!(w, "[{:nw$}]", m.name, nw = nwidth)?;
321            if let Some(desc) = &m.attrs.desc {
322                write!(w, " {}", desc)?;
323            }
324            writeln!(w, "")?;
325
326            for (fname, f) in m.fields.iter() {
327                write!(
328                    w,
329                    "  {:fw$} {:dw$}",
330                    fname,
331                    format!("({})", f.data.to_string()),
332                    fw = fwidth,
333                    dw = dwidth
334                )?;
335                if let Some(desc) = &f.attrs.desc {
336                    write!(w, " : {}", desc)?;
337                }
338                writeln!(w, "")?;
339            }
340            Ok(())
341        })
342    }
343}
344
345struct StatsServerInner<Req, Res>
346where
347    Req: Send + 'static,
348    Res: Send + 'static,
349{
350    listener: UnixListener,
351    data: Arc<Mutex<StatsServerData<Req, Res>>>,
352    inner_ch: ChannelPair<Req, Res>,
353    exit: Arc<AtomicBool>,
354}
355
356impl<Req, Res> StatsServerInner<Req, Res>
357where
358    Req: Send + 'static,
359    Res: Send + 'static,
360{
361    fn new(
362        listener: UnixListener,
363        data: Arc<Mutex<StatsServerData<Req, Res>>>,
364        inner_ch: ChannelPair<Req, Res>,
365        exit: Arc<AtomicBool>,
366    ) -> Self {
367        Self {
368            listener,
369            data,
370            inner_ch,
371            exit,
372        }
373    }
374
375    fn build_resp<T>(errno: i32, resp: &T) -> Result<StatsResponse>
376    where
377        T: Serialize,
378    {
379        Ok(StatsResponse {
380            errno,
381            args: [("resp".into(), serde_json::to_value(resp)?)]
382                .into_iter()
383                .collect(),
384        })
385    }
386
387    fn handle_request(
388        line: String,
389        data: &Arc<Mutex<StatsServerData<Req, Res>>>,
390        ch: &ChannelPair<Req, Res>,
391        open_ops: &mut StatsOpenOps<Req, Res>,
392    ) -> Result<StatsResponse> {
393        let req: StatsRequest = serde_json::from_str(&line)?;
394
395        match req.req.as_str() {
396            "stats" => {
397                let target = match req.args.get("target") {
398                    Some(v) => v,
399                    None => "top",
400                };
401
402                let ops =
403                    match data.lock().unwrap().ops.get(target) {
404                        Some(v) => v.clone(),
405                        None => Err(anyhow!("unknown stat target {:?}", req)
406                            .context(StatsErrno(libc::EINVAL)))?,
407                    };
408
409                if !open_ops.map.contains_key(target) {
410                    let read = (ops.lock().unwrap().open)((&ch.req, &ch.res))?;
411                    open_ops
412                        .map
413                        .insert(target.into(), (ops.clone(), read, ch.clone()));
414                }
415
416                let read = &mut open_ops.map.get_mut(target).unwrap().1;
417
418                let resp = read(&req.args, (&ch.req, &ch.res))?;
419
420                Self::build_resp(0, &resp)
421            }
422            "stats_meta" => Ok(Self::build_resp(0, &data.lock().unwrap().meta)?),
423            req => Err(anyhow!("unknown command {:?}", req).context(StatsErrno(libc::EINVAL)))?,
424        }
425    }
426
427    fn serve(
428        mut stream: UnixStream,
429        data: Arc<Mutex<StatsServerData<Req, Res>>>,
430        inner_ch: ChannelPair<Req, Res>,
431        exit: Arc<AtomicBool>,
432    ) -> Result<()> {
433        let mut stream_reader = BufReader::new(stream.try_clone()?);
434        let mut open_ops = StatsOpenOps::new();
435
436        loop {
437            let mut line = String::new();
438            stream_reader.read_line(&mut line)?;
439            if line.is_empty() {
440                return Ok(());
441            }
442            if exit.load(Ordering::Relaxed) {
443                debug!("server exiting due to exit");
444                return Ok(());
445            }
446
447            let resp = match Self::handle_request(line, &data, &inner_ch, &mut open_ops) {
448                Ok(v) => v,
449                Err(e) => {
450                    let errno = match e.downcast_ref::<StatsErrno>() {
451                        Some(e) if e.0 != 0 => e.0,
452                        _ => libc::EINVAL,
453                    };
454                    Self::build_resp(errno, &format!("{:?}", &e))?
455                }
456            };
457
458            let output = serde_json::to_string(&resp)? + "\n";
459            stream.write_all(output.as_bytes())?;
460        }
461    }
462
463    fn proxy(inner_ch: ChannelPair<Req, Res>, add_res: Receiver<ChannelPair<Res, Req>>) {
464        let mut chs_cursor = 0;
465        let mut chs = BTreeMap::<u64, ChannelPair<Res, Req>>::new();
466        let mut ch_to_add: Option<ChannelPair<Res, Req>> = None;
467        let mut idx_to_drop: Option<u64> = None;
468
469        'outer: loop {
470            if let Some(new_ch) = ch_to_add.take() {
471                let idx = chs_cursor;
472                chs_cursor += 1;
473                chs.insert(idx, new_ch);
474                debug!("proxy: added new channel idx={}, total={}", idx, chs.len());
475            }
476
477            if let Some(idx) = idx_to_drop.take() {
478                debug!("proxy: dropping channel {}, total={}", idx, chs.len());
479                chs.remove(&idx).unwrap();
480            }
481
482            let mut sel = Select::new();
483            let inner_idx = sel.recv(&inner_ch.res);
484            let add_idx = sel.recv(&add_res);
485
486            let mut chs_sel_idx = BTreeMap::<usize, u64>::new();
487            for (idx, cp) in chs.iter() {
488                let sel_idx = sel.recv(&cp.res);
489                chs_sel_idx.insert(sel_idx, *idx);
490            }
491
492            'select: loop {
493                let oper = sel.select();
494                match oper.index() {
495                    sel_idx if sel_idx == add_idx => match oper.recv(&add_res) {
496                        Ok(ch) => {
497                            ch_to_add = Some(ch);
498                            debug!("proxy: received new channel from add_res");
499                            break 'select;
500                        }
501                        Err(RecvError) => {
502                            debug!("proxy: add_res disconnected, terminating");
503                            break 'outer;
504                        }
505                    },
506                    sel_idx if sel_idx == inner_idx => match oper.recv(&inner_ch.res) {
507                        Ok(_) => {
508                            error!("proxy: unexpected data in StatsServer.channels().0");
509                            panic!();
510                        }
511                        Err(RecvError) => break 'outer,
512                    },
513                    sel_idx => {
514                        let idx = chs_sel_idx.get(&sel_idx).unwrap();
515                        let pair = chs.get(idx).unwrap();
516
517                        let req = match oper.recv(&pair.res) {
518                            Ok(v) => v,
519                            Err(RecvError) => {
520                                idx_to_drop = Some(*idx);
521                                break 'select;
522                            }
523                        };
524
525                        if inner_ch.req.send(req).is_err() {
526                            break 'outer;
527                        }
528
529                        let resp = match inner_ch.res.recv() {
530                            Ok(v) => v,
531                            Err(RecvError) => break 'outer,
532                        };
533
534                        if pair.req.send(resp).is_err() {
535                            idx_to_drop = Some(*idx);
536                            break 'select;
537                        }
538                    }
539                }
540            }
541        }
542    }
543
544    fn listen(self) {
545        let inner_ch_copy = self.inner_ch.clone();
546        let (add_req, add_res) = unbounded::<ChannelPair<Res, Req>>();
547
548        spawn(move || Self::proxy(inner_ch_copy, add_res));
549
550        for stream in self.listener.incoming() {
551            if self.exit.load(Ordering::Relaxed) {
552                debug!("listener exiting");
553                break;
554            }
555            match stream {
556                Ok(stream) => {
557                    let data = self.data.clone();
558                    let exit = self.exit.clone();
559
560                    let (req_pair, res_pair) = ChannelPair::<Req, Res>::bidi();
561                    match add_req.send(res_pair) {
562                        Ok(()) => debug!("sent new channel to proxy"),
563                        Err(e) => warn!("StatsServer::proxy() failed ({})", e),
564                    }
565
566                    spawn(move || {
567                        if let Err(e) = Self::serve(stream, data, req_pair, exit) {
568                            warn!("stat communication errored ({})", e);
569                        }
570                    });
571                }
572                Err(e) => warn!("failed to accept stat connection ({})", e),
573            }
574        }
575    }
576}
577
578pub struct StatsServer<Req, Res>
579where
580    Req: Send + 'static,
581    Res: Send + 'static,
582{
583    base_path: PathBuf,
584    sched_path: PathBuf,
585    stats_path: PathBuf,
586    path: Option<PathBuf>,
587
588    data: Arc<Mutex<StatsServerData<Req, Res>>>,
589
590    outer_ch: ChannelPair<Res, Req>,
591    inner_ch: Option<ChannelPair<Req, Res>>,
592    exit: Arc<AtomicBool>,
593}
594
595impl<Req, Res> StatsServer<Req, Res>
596where
597    Req: Send + 'static,
598    Res: Send + 'static,
599{
600    pub fn new(data: StatsServerData<Req, Res>) -> Self {
601        let (ich, och) = ChannelPair::<Req, Res>::bidi();
602
603        Self {
604            base_path: PathBuf::from("/var/run/scx"),
605            sched_path: PathBuf::from("root"),
606            stats_path: PathBuf::from("stats"),
607            path: None,
608            data: Arc::new(Mutex::new(data)),
609            outer_ch: och,
610            inner_ch: Some(ich),
611            exit: Arc::new(AtomicBool::new(false)),
612        }
613    }
614
615    pub fn set_base_path<P: AsRef<Path>>(mut self, path: P) -> Self {
616        self.base_path = PathBuf::from(path.as_ref());
617        self
618    }
619
620    pub fn set_sched_path<P: AsRef<Path>>(mut self, path: P) -> Self {
621        self.sched_path = PathBuf::from(path.as_ref());
622        self
623    }
624
625    pub fn set_stats_path<P: AsRef<Path>>(mut self, path: P) -> Self {
626        self.stats_path = PathBuf::from(path.as_ref());
627        self
628    }
629
630    pub fn set_path<P: AsRef<Path>>(mut self, path: P) -> Self {
631        self.path = Some(PathBuf::from(path.as_ref()));
632        self
633    }
634
635    pub fn launch(mut self) -> Result<Self> {
636        self.data.lock().unwrap().verify_meta()?;
637
638        if self.path.is_none() {
639            self.path = Some(self.base_path.join(&self.sched_path).join(&self.stats_path));
640        }
641        let path = &self.path.as_ref().unwrap();
642
643        if let Some(dir) = path.parent() {
644            std::fs::create_dir_all(dir).with_context(|| format!("creating {:?}", dir))?;
645        }
646
647        let res = std::fs::remove_file(path);
648        if let std::io::Result::Err(e) = &res {
649            if e.kind() != std::io::ErrorKind::NotFound {
650                res.with_context(|| format!("deleting {:?}", path))?;
651            }
652        }
653
654        let listener =
655            UnixListener::bind(path).with_context(|| format!("creating UNIX socket {:?}", path))?;
656
657        let inner = StatsServerInner::new(
658            listener,
659            self.data.clone(),
660            self.inner_ch.take().unwrap(),
661            self.exit.clone(),
662        );
663
664        spawn(move || inner.listen());
665        Ok(self)
666    }
667
668    pub fn channels(&self) -> (Sender<Res>, Receiver<Req>) {
669        (self.outer_ch.req.clone(), self.outer_ch.res.clone())
670    }
671}
672
673impl<Req, Res> std::ops::Drop for StatsServer<Req, Res>
674where
675    Req: Send + 'static,
676    Res: Send + 'static,
677{
678    fn drop(&mut self) {
679        self.exit.store(true, Ordering::Relaxed);
680        if let Some(path) = self.path.as_ref() {
681            let _ = StatsClient::new().set_path(path).connect();
682        }
683    }
684}
685
686pub trait ToJson {
687    fn to_json(&self) -> Result<Value>;
688}
689
690impl<T> ToJson for T
691where
692    T: Meta + Serialize,
693{
694    fn to_json(&self) -> Result<Value> {
695        Ok(serde_json::to_value(self)?)
696    }
697}