scx_stats/
client.rs

1use crate::{StatsErrno, StatsRequest, StatsResponse};
2use anyhow::{Result, anyhow, bail};
3use log::trace;
4use serde::Deserialize;
5use std::io::{BufRead, BufReader, Write};
6use std::os::unix::net::UnixStream;
7use std::path::{Path, PathBuf};
8
9pub struct StatsClient {
10    base_path: PathBuf,
11    sched_path: PathBuf,
12    stats_path: PathBuf,
13    path: Option<PathBuf>,
14
15    stream: Option<UnixStream>,
16    reader: Option<BufReader<UnixStream>>,
17}
18
19impl StatsClient {
20    pub fn new() -> Self {
21        Self {
22            base_path: PathBuf::from("/var/run/scx"),
23            sched_path: PathBuf::from("root"),
24            stats_path: PathBuf::from("stats"),
25            path: None,
26
27            stream: None,
28            reader: None,
29        }
30    }
31
32    pub fn set_base_path<P: AsRef<Path>>(mut self, path: P) -> Self {
33        self.base_path = PathBuf::from(path.as_ref());
34        self
35    }
36
37    pub fn set_sched_path<P: AsRef<Path>>(mut self, path: P) -> Self {
38        self.sched_path = PathBuf::from(path.as_ref());
39        self
40    }
41
42    pub fn set_stats_path<P: AsRef<Path>>(mut self, path: P) -> Self {
43        self.stats_path = PathBuf::from(path.as_ref());
44        self
45    }
46
47    pub fn set_path<P: AsRef<Path>>(mut self, path: P) -> Self {
48        self.path = Some(PathBuf::from(path.as_ref()));
49        self
50    }
51
52    pub fn connect(mut self) -> Result<Self> {
53        if self.path.is_none() {
54            self.path = Some(self.base_path.join(&self.sched_path).join(&self.stats_path));
55        }
56        let path = &self.path.as_ref().unwrap();
57
58        let stream = UnixStream::connect(path)?;
59        self.stream = Some(stream.try_clone()?);
60        self.reader = Some(BufReader::new(stream));
61        Ok(self)
62    }
63
64    pub fn send_request<T>(&mut self, req: &StatsRequest) -> Result<T>
65    where
66        T: for<'a> Deserialize<'a>,
67    {
68        if self.stream.is_none() {
69            bail!("not connected");
70        }
71
72        let req = serde_json::to_string(&req)? + "\n";
73        trace!("Sending: {}", req.trim());
74        self.stream.as_ref().unwrap().write_all(req.as_bytes())?;
75
76        let mut line = String::new();
77        self.reader.as_mut().unwrap().read_line(&mut line)?;
78        trace!("Received: {}", line.trim());
79        let mut resp: StatsResponse = serde_json::from_str(&line)?;
80
81        let (errno, resp) = (
82            resp.errno,
83            resp.args.remove("resp").unwrap_or(serde_json::Value::Null),
84        );
85
86        if errno != 0 {
87            Err(anyhow!("{}", &resp).context(StatsErrno(errno)))?;
88        }
89
90        Ok(serde_json::from_value(resp)?)
91    }
92
93    pub fn request<T>(&mut self, req: &str, args: Vec<(String, String)>) -> Result<T>
94    where
95        T: for<'a> Deserialize<'a>,
96    {
97        self.send_request(&StatsRequest::new(req, args))
98    }
99}