1use crate::StatsClient;
2use crate::{Meta, StatsData, StatsKind, StatsMeta};
3use anyhow::{anyhow, bail, Context, Result};
4use crossbeam::channel::{unbounded, Receiver, RecvError, Select, Sender};
5use log::{debug, error, warn};
6use serde::{Deserialize, Serialize};
7use serde_json::Value;
8use std::collections::{BTreeMap, BTreeSet};
9use std::io::{BufRead, BufReader, Write};
10use std::os::unix::net::{UnixListener, UnixStream};
11use std::path::{Path, PathBuf};
12use std::sync::atomic::{AtomicBool, Ordering};
13use std::sync::{Arc, Mutex};
14use std::thread::spawn;
15
16pub trait StatsReader<Req, Res>:
17 FnMut(&BTreeMap<String, String>, (&Sender<Req>, &Receiver<Res>)) -> Result<Value>
18{
19}
20impl<
21 Req,
22 Res,
23 T: FnMut(&BTreeMap<String, String>, (&Sender<Req>, &Receiver<Res>)) -> Result<Value>,
24 > StatsReader<Req, Res> for T
25{
26}
27
28pub trait StatsReaderSend<Req, Res>:
29 FnMut(&BTreeMap<String, String>, (&Sender<Req>, &Receiver<Res>)) -> Result<Value> + Send
30{
31}
32impl<
33 Req,
34 Res,
35 T: FnMut(&BTreeMap<String, String>, (&Sender<Req>, &Receiver<Res>)) -> Result<Value> + Send,
36 > StatsReaderSend<Req, Res> for T
37{
38}
39
40pub trait StatsReaderSync<Req, Res>:
41 Fn(&BTreeMap<String, String>, (&Sender<Req>, &Receiver<Res>)) -> Result<Value> + Send + Sync
42{
43}
44impl<
45 Req,
46 Res,
47 T: Fn(&BTreeMap<String, String>, (&Sender<Req>, &Receiver<Res>)) -> Result<Value>
48 + Send
49 + Sync,
50 > StatsReaderSync<Req, Res> for T
51{
52}
53
54pub trait StatsOpener<Req, Res>:
55 FnMut((&Sender<Req>, &Receiver<Res>)) -> Result<Box<dyn StatsReader<Req, Res>>> + Send
56{
57}
58impl<
59 Req,
60 Res,
61 T: FnMut((&Sender<Req>, &Receiver<Res>)) -> Result<Box<dyn StatsReader<Req, Res>>> + Send,
62 > StatsOpener<Req, Res> for T
63{
64}
65
66pub trait StatsCloser<Req, Res>: FnOnce((&Sender<Req>, &Receiver<Res>)) + Send {}
67impl<Req, Res, T: FnOnce((&Sender<Req>, &Receiver<Res>)) + Send> StatsCloser<Req, Res> for T {}
68
69pub struct StatsOps<Req, Res> {
70 pub open: Box<dyn StatsOpener<Req, Res>>,
71 pub close: Option<Box<dyn StatsCloser<Req, Res>>>,
72}
73
74struct StatsOpenOps<Req, Res> {
75 map: BTreeMap<
76 String,
77 (
78 Arc<Mutex<StatsOps<Req, Res>>>,
79 Box<dyn StatsReader<Req, Res>>,
80 ChannelPair<Req, Res>,
81 ),
82 >,
83}
84
85impl<Req, Res> StatsOpenOps<Req, Res> {
86 fn new() -> Self {
87 Self {
88 map: BTreeMap::new(),
89 }
90 }
91}
92
93impl<Req, Res> std::ops::Drop for StatsOpenOps<Req, Res> {
94 fn drop(&mut self) {
95 for (_, (ops, _, ch)) in self.map.iter_mut() {
96 if let Some(close) = ops.lock().unwrap().close.take() {
97 close((&ch.req, &ch.res));
98 }
99 }
100 }
101}
102
103#[derive(Clone, Debug, Serialize, Deserialize)]
104pub struct StatsRequest {
105 pub req: String,
106 #[serde(default)]
107 pub args: BTreeMap<String, String>,
108}
109
110impl StatsRequest {
111 pub fn new(req: &str, args: Vec<(String, String)>) -> Self {
112 Self {
113 req: req.to_string(),
114 args: args.into_iter().collect(),
115 }
116 }
117}
118
119#[derive(Clone, Debug, Serialize, Deserialize)]
120pub struct StatsResponse {
121 pub errno: i32,
122 pub args: BTreeMap<String, Value>,
123}
124
125pub struct StatsErrno(pub i32);
126
127impl std::fmt::Display for StatsErrno {
128 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
129 write!(f, "{}", std::io::Error::from_raw_os_error(self.0))
130 }
131}
132
133impl std::fmt::Debug for StatsErrno {
134 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
135 write!(f, "{:?}", std::io::Error::from_raw_os_error(self.0))
136 }
137}
138
139struct ChannelPair<Req, Res> {
140 req: Sender<Req>,
141 res: Receiver<Res>,
142}
143
144impl<Req, Res> ChannelPair<Req, Res> {
145 fn bidi() -> (ChannelPair<Req, Res>, ChannelPair<Res, Req>) {
146 let (req, res) = (unbounded::<Req>(), unbounded::<Res>());
147 (
148 ChannelPair {
149 req: req.0,
150 res: res.1,
151 },
152 ChannelPair {
153 req: res.0,
154 res: req.1,
155 },
156 )
157 }
158}
159
160impl<Req, Res> Clone for ChannelPair<Req, Res> {
161 fn clone(&self) -> Self {
162 Self {
163 req: self.req.clone(),
164 res: self.res.clone(),
165 }
166 }
167}
168
169pub struct StatsServerData<Req, Res>
170where
171 Req: Send + 'static,
172 Res: Send + 'static,
173{
174 top: Option<String>,
175 meta: BTreeMap<String, StatsMeta>,
176 ops: BTreeMap<String, Arc<Mutex<StatsOps<Req, Res>>>>,
177}
178
179impl<Req, Res> StatsServerData<Req, Res>
180where
181 Req: Send + 'static,
182 Res: Send + 'static,
183{
184 pub fn new() -> Self {
185 Self {
186 top: None,
187 meta: BTreeMap::new(),
188 ops: BTreeMap::new(),
189 }
190 }
191
192 pub fn add_meta(mut self, meta: StatsMeta) -> Self {
193 if meta.attrs.top.is_some() && self.top.is_none() {
194 self.top = Some(meta.name.clone());
195 }
196 self.meta.insert(meta.name.clone(), meta);
197 self
198 }
199
200 pub fn add_ops(mut self, name: &str, ops: StatsOps<Req, Res>) -> Self {
201 self.ops.insert(name.to_string(), Arc::new(Mutex::new(ops)));
202 self
203 }
204
205 pub fn add_stats(self, name: &str, fetch: Box<dyn StatsReaderSend<Req, Res>>) -> Self {
206 let wrapped_fetch = Mutex::new(fetch);
207 let read: Box<dyn StatsReaderSync<Req, Res>> =
208 Box::new(move |args, chan| wrapped_fetch.lock().unwrap()(args, chan));
209 let wrapped_read = Arc::new(read);
210 let ops = StatsOps {
211 open: Box::new(move |_| {
212 let copy = wrapped_read.clone();
213 Ok(Box::new(move |args, chan| copy(args, chan)))
214 }),
215 close: None,
216 };
217
218 self.add_ops(name, ops)
219 }
220
221 fn visit_meta_inner(
222 &self,
223 name: &str,
224 visit: &mut impl FnMut(&StatsMeta) -> Result<()>,
225 nesting: &mut BTreeSet<String>,
226 visited: &mut BTreeSet<String>,
227 ) -> Result<()> {
228 let m = match self.meta.get(name) {
229 Some(v) => v,
230 None => bail!("unknown stats meta name {}", name),
231 };
232
233 if !nesting.insert(name.into()) {
234 bail!("loop in stats meta detected, {} already nested", name);
235 }
236 if !visited.insert(name.into()) {
237 return Ok(());
238 }
239
240 visit(m)?;
241
242 for (fname, field) in m.fields.iter() {
243 match &field.data {
244 StatsData::Array(StatsKind::Struct(inner)) => {
245 self.visit_meta_inner(inner, visit, nesting, visited)?
246 }
247 StatsData::Dict {
248 key: StatsKind::Struct(inner),
249 datum: _,
250 } => bail!("{}.{} is a dict with struct {} as key", name, fname, inner),
251 StatsData::Dict {
252 key: _,
253 datum: StatsKind::Struct(inner),
254 } => self.visit_meta_inner(inner, visit, nesting, visited)?,
255 _ => {}
256 }
257 }
258
259 nesting.remove(name);
260 Ok(())
261 }
262
263 fn visit_meta(
264 &self,
265 from: &str,
266 visit: &mut impl FnMut(&StatsMeta) -> Result<()>,
267 ) -> Result<()> {
268 let mut nesting = BTreeSet::<String>::new();
269 let mut visited = BTreeSet::<String>::new();
270 self.visit_meta_inner(from, visit, &mut nesting, &mut visited)
271 }
272
273 fn verify_meta(&self) -> Result<()> {
274 if self.top.is_none() {
275 debug!("top-level stats metadata missing");
276 return Ok(());
277 }
278
279 self.visit_meta(self.top.as_ref().unwrap(), &mut |_| Ok(()))
281 }
282
283 pub fn describe_meta<W: Write>(&self, w: &mut W, from: Option<&[&str]>) -> Result<()> {
284 let meta_names = match from {
285 Some(v) if !v.is_empty() => v,
286 Some(_) | None => {
287 let top = self
288 .top
289 .as_ref()
290 .ok_or_else(|| anyhow!("don't know where to start"))?;
291 return self.describe_meta_inner(w, top);
292 }
293 };
294
295 for meta_name in meta_names {
296 self.describe_meta_inner(w, meta_name)?;
297 }
298
299 Ok(())
300 }
301
302 pub fn describe_meta_inner<W: Write>(&self, w: &mut W, from: &str) -> Result<()> {
303 let (mut nwidth, mut fwidth, mut dwidth) = (0usize, 0usize, 0usize);
304
305 self.visit_meta(from, &mut |m| {
306 nwidth = nwidth.max(m.name.len());
307 (fwidth, dwidth) = m.fields.iter().fold((fwidth, dwidth), |acc, (n, f)| {
308 (acc.0.max(n.len()), acc.1.max(f.data.to_string().len() + 2))
309 });
310 Ok(())
311 })?;
312
313 let mut first = true;
314 self.visit_meta(from, &mut |m| {
315 if !first {
316 writeln!(w, "")?;
317 }
318 first = false;
319
320 write!(w, "[{:nw$}]", m.name, nw = nwidth)?;
321 if let Some(desc) = &m.attrs.desc {
322 write!(w, " {}", desc)?;
323 }
324 writeln!(w, "")?;
325
326 for (fname, f) in m.fields.iter() {
327 write!(
328 w,
329 " {:fw$} {:dw$}",
330 fname,
331 format!("({})", f.data.to_string()),
332 fw = fwidth,
333 dw = dwidth
334 )?;
335 if let Some(desc) = &f.attrs.desc {
336 write!(w, " : {}", desc)?;
337 }
338 writeln!(w, "")?;
339 }
340 Ok(())
341 })
342 }
343}
344
345struct StatsServerInner<Req, Res>
346where
347 Req: Send + 'static,
348 Res: Send + 'static,
349{
350 listener: UnixListener,
351 data: Arc<Mutex<StatsServerData<Req, Res>>>,
352 inner_ch: ChannelPair<Req, Res>,
353 exit: Arc<AtomicBool>,
354}
355
356impl<Req, Res> StatsServerInner<Req, Res>
357where
358 Req: Send + 'static,
359 Res: Send + 'static,
360{
361 fn new(
362 listener: UnixListener,
363 data: Arc<Mutex<StatsServerData<Req, Res>>>,
364 inner_ch: ChannelPair<Req, Res>,
365 exit: Arc<AtomicBool>,
366 ) -> Self {
367 Self {
368 listener,
369 data,
370 inner_ch,
371 exit,
372 }
373 }
374
375 fn build_resp<T>(errno: i32, resp: &T) -> Result<StatsResponse>
376 where
377 T: Serialize,
378 {
379 Ok(StatsResponse {
380 errno,
381 args: [("resp".into(), serde_json::to_value(resp)?)]
382 .into_iter()
383 .collect(),
384 })
385 }
386
387 fn handle_request(
388 line: String,
389 data: &Arc<Mutex<StatsServerData<Req, Res>>>,
390 ch: &ChannelPair<Req, Res>,
391 open_ops: &mut StatsOpenOps<Req, Res>,
392 ) -> Result<StatsResponse> {
393 let req: StatsRequest = serde_json::from_str(&line)?;
394
395 match req.req.as_str() {
396 "stats" => {
397 let target = match req.args.get("target") {
398 Some(v) => v,
399 None => "top",
400 };
401
402 let ops =
403 match data.lock().unwrap().ops.get(target) {
404 Some(v) => v.clone(),
405 None => Err(anyhow!("unknown stat target {:?}", req)
406 .context(StatsErrno(libc::EINVAL)))?,
407 };
408
409 if !open_ops.map.contains_key(target) {
410 let read = (ops.lock().unwrap().open)((&ch.req, &ch.res))?;
411 open_ops
412 .map
413 .insert(target.into(), (ops.clone(), read, ch.clone()));
414 }
415
416 let read = &mut open_ops.map.get_mut(target).unwrap().1;
417
418 let resp = read(&req.args, (&ch.req, &ch.res))?;
419
420 Self::build_resp(0, &resp)
421 }
422 "stats_meta" => Ok(Self::build_resp(0, &data.lock().unwrap().meta)?),
423 req => Err(anyhow!("unknown command {:?}", req).context(StatsErrno(libc::EINVAL)))?,
424 }
425 }
426
427 fn serve(
428 mut stream: UnixStream,
429 data: Arc<Mutex<StatsServerData<Req, Res>>>,
430 inner_ch: ChannelPair<Req, Res>,
431 exit: Arc<AtomicBool>,
432 ) -> Result<()> {
433 let mut stream_reader = BufReader::new(stream.try_clone()?);
434 let mut open_ops = StatsOpenOps::new();
435
436 loop {
437 let mut line = String::new();
438 stream_reader.read_line(&mut line)?;
439 if line.is_empty() {
440 return Ok(());
441 }
442 if exit.load(Ordering::Relaxed) {
443 debug!("server exiting due to exit");
444 return Ok(());
445 }
446
447 let resp = match Self::handle_request(line, &data, &inner_ch, &mut open_ops) {
448 Ok(v) => v,
449 Err(e) => {
450 let errno = match e.downcast_ref::<StatsErrno>() {
451 Some(e) if e.0 != 0 => e.0,
452 _ => libc::EINVAL,
453 };
454 Self::build_resp(errno, &format!("{:?}", &e))?
455 }
456 };
457
458 let output = serde_json::to_string(&resp)? + "\n";
459 stream.write_all(output.as_bytes())?;
460 }
461 }
462
463 fn proxy(inner_ch: ChannelPair<Req, Res>, add_res: Receiver<ChannelPair<Res, Req>>) {
464 let mut chs_cursor = 0;
465 let mut chs = BTreeMap::<u64, ChannelPair<Res, Req>>::new();
466 let mut ch_to_add: Option<ChannelPair<Res, Req>> = None;
467 let mut idx_to_drop: Option<u64> = None;
468
469 'outer: loop {
470 if let Some(new_ch) = ch_to_add.take() {
471 let idx = chs_cursor;
472 chs_cursor += 1;
473 chs.insert(idx, new_ch);
474 debug!("proxy: added new channel idx={}, total={}", idx, chs.len());
475 }
476
477 if let Some(idx) = idx_to_drop.take() {
478 debug!("proxy: dropping channel {}, total={}", idx, chs.len());
479 chs.remove(&idx).unwrap();
480 }
481
482 let mut sel = Select::new();
483 let inner_idx = sel.recv(&inner_ch.res);
484 let add_idx = sel.recv(&add_res);
485
486 let mut chs_sel_idx = BTreeMap::<usize, u64>::new();
487 for (idx, cp) in chs.iter() {
488 let sel_idx = sel.recv(&cp.res);
489 chs_sel_idx.insert(sel_idx, *idx);
490 }
491
492 'select: loop {
493 let oper = sel.select();
494 match oper.index() {
495 sel_idx if sel_idx == add_idx => match oper.recv(&add_res) {
496 Ok(ch) => {
497 ch_to_add = Some(ch);
498 debug!("proxy: received new channel from add_res");
499 break 'select;
500 }
501 Err(RecvError) => {
502 debug!("proxy: add_res disconnected, terminating");
503 break 'outer;
504 }
505 },
506 sel_idx if sel_idx == inner_idx => match oper.recv(&inner_ch.res) {
507 Ok(_) => {
508 error!("proxy: unexpected data in StatsServer.channels().0");
509 panic!();
510 }
511 Err(RecvError) => break 'outer,
512 },
513 sel_idx => {
514 let idx = chs_sel_idx.get(&sel_idx).unwrap();
515 let pair = chs.get(idx).unwrap();
516
517 let req = match oper.recv(&pair.res) {
518 Ok(v) => v,
519 Err(RecvError) => {
520 idx_to_drop = Some(*idx);
521 break 'select;
522 }
523 };
524
525 if inner_ch.req.send(req).is_err() {
526 break 'outer;
527 }
528
529 let resp = match inner_ch.res.recv() {
530 Ok(v) => v,
531 Err(RecvError) => break 'outer,
532 };
533
534 if pair.req.send(resp).is_err() {
535 idx_to_drop = Some(*idx);
536 break 'select;
537 }
538 }
539 }
540 }
541 }
542 }
543
544 fn listen(self) {
545 let inner_ch_copy = self.inner_ch.clone();
546 let (add_req, add_res) = unbounded::<ChannelPair<Res, Req>>();
547
548 spawn(move || Self::proxy(inner_ch_copy, add_res));
549
550 for stream in self.listener.incoming() {
551 if self.exit.load(Ordering::Relaxed) {
552 debug!("listener exiting");
553 break;
554 }
555 match stream {
556 Ok(stream) => {
557 let data = self.data.clone();
558 let exit = self.exit.clone();
559
560 let (req_pair, res_pair) = ChannelPair::<Req, Res>::bidi();
561 match add_req.send(res_pair) {
562 Ok(()) => debug!("sent new channel to proxy"),
563 Err(e) => warn!("StatsServer::proxy() failed ({})", e),
564 }
565
566 spawn(move || {
567 if let Err(e) = Self::serve(stream, data, req_pair, exit) {
568 warn!("stat communication errored ({})", e);
569 }
570 });
571 }
572 Err(e) => warn!("failed to accept stat connection ({})", e),
573 }
574 }
575 }
576}
577
578pub struct StatsServer<Req, Res>
579where
580 Req: Send + 'static,
581 Res: Send + 'static,
582{
583 base_path: PathBuf,
584 sched_path: PathBuf,
585 stats_path: PathBuf,
586 path: Option<PathBuf>,
587
588 data: Arc<Mutex<StatsServerData<Req, Res>>>,
589
590 outer_ch: ChannelPair<Res, Req>,
591 inner_ch: Option<ChannelPair<Req, Res>>,
592 exit: Arc<AtomicBool>,
593}
594
595impl<Req, Res> StatsServer<Req, Res>
596where
597 Req: Send + 'static,
598 Res: Send + 'static,
599{
600 pub fn new(data: StatsServerData<Req, Res>) -> Self {
601 let (ich, och) = ChannelPair::<Req, Res>::bidi();
602
603 Self {
604 base_path: PathBuf::from("/var/run/scx"),
605 sched_path: PathBuf::from("root"),
606 stats_path: PathBuf::from("stats"),
607 path: None,
608 data: Arc::new(Mutex::new(data)),
609 outer_ch: och,
610 inner_ch: Some(ich),
611 exit: Arc::new(AtomicBool::new(false)),
612 }
613 }
614
615 pub fn set_base_path<P: AsRef<Path>>(mut self, path: P) -> Self {
616 self.base_path = PathBuf::from(path.as_ref());
617 self
618 }
619
620 pub fn set_sched_path<P: AsRef<Path>>(mut self, path: P) -> Self {
621 self.sched_path = PathBuf::from(path.as_ref());
622 self
623 }
624
625 pub fn set_stats_path<P: AsRef<Path>>(mut self, path: P) -> Self {
626 self.stats_path = PathBuf::from(path.as_ref());
627 self
628 }
629
630 pub fn set_path<P: AsRef<Path>>(mut self, path: P) -> Self {
631 self.path = Some(PathBuf::from(path.as_ref()));
632 self
633 }
634
635 pub fn launch(mut self) -> Result<Self> {
636 self.data.lock().unwrap().verify_meta()?;
637
638 if self.path.is_none() {
639 self.path = Some(self.base_path.join(&self.sched_path).join(&self.stats_path));
640 }
641 let path = &self.path.as_ref().unwrap();
642
643 if let Some(dir) = path.parent() {
644 std::fs::create_dir_all(dir).with_context(|| format!("creating {:?}", dir))?;
645 }
646
647 let res = std::fs::remove_file(path);
648 if let std::io::Result::Err(e) = &res {
649 if e.kind() != std::io::ErrorKind::NotFound {
650 res.with_context(|| format!("deleting {:?}", path))?;
651 }
652 }
653
654 let listener =
655 UnixListener::bind(path).with_context(|| format!("creating UNIX socket {:?}", path))?;
656
657 let inner = StatsServerInner::new(
658 listener,
659 self.data.clone(),
660 self.inner_ch.take().unwrap(),
661 self.exit.clone(),
662 );
663
664 spawn(move || inner.listen());
665 Ok(self)
666 }
667
668 pub fn channels(&self) -> (Sender<Res>, Receiver<Req>) {
669 (self.outer_ch.req.clone(), self.outer_ch.res.clone())
670 }
671}
672
673impl<Req, Res> std::ops::Drop for StatsServer<Req, Res>
674where
675 Req: Send + 'static,
676 Res: Send + 'static,
677{
678 fn drop(&mut self) {
679 self.exit.store(true, Ordering::Relaxed);
680 if let Some(path) = self.path.as_ref() {
681 let _ = StatsClient::new().set_path(path).connect();
682 }
683 }
684}
685
686pub trait ToJson {
687 fn to_json(&self) -> Result<Value>;
688}
689
690impl<T> ToJson for T
691where
692 T: Meta + Serialize,
693{
694 fn to_json(&self) -> Result<Value> {
695 Ok(serde_json::to_value(self)?)
696 }
697}