server/
server.rs

1use log::{debug, info, warn};
2use scx_stats::prelude::*;
3use scx_stats_derive::Stats;
4use serde::{Deserialize, Serialize};
5use std::collections::BTreeMap;
6use std::env::args;
7use std::io::Read;
8use std::thread::{ThreadId, current, spawn};
9
10// Hacky definition sharing. See stats_def.rs.h.
11include!("stats_defs.rs.h");
12
13fn main() {
14    simple_logger::SimpleLogger::new()
15        .with_level(log::LevelFilter::Info)
16        .env()
17        .init()
18        .unwrap();
19
20    let stats = ClusterStats {
21        name: "test cluster".into(),
22        at: 12345,
23        bitmap: vec![0xdeadbeef, 0xbeefdead],
24        doms_dict: BTreeMap::from([
25            (
26                0,
27                DomainStats {
28                    name: "domain 0".into(),
29                    events: 1234,
30                    pressure: 1.234,
31                },
32            ),
33            (
34                3,
35                DomainStats {
36                    name: "domain 3".into(),
37                    events: 5678,
38                    pressure: 5.678,
39                },
40            ),
41        ]),
42    };
43
44    std::assert_eq!(args().len(), 2, "Usage: server UNIX_SOCKET_PATH");
45    let path = args().nth(1).unwrap();
46
47    // If communication from the stats generating closure is not necessary,
48    // StatsServer::<(), ()> can be used. This example sends thread ID and
49    // receives the formatted string just for demonstration.
50    let sdata = StatsServerData::<ThreadId, String>::new()
51        .add_meta(ClusterStats::meta())
52        .add_meta(DomainStats::meta())
53        .add_stats(
54            "top",
55            Box::new(move |_args, (tx, rx)| {
56                let id = current().id();
57                let res = tx.send(id);
58                debug!("Sendt {:?} {:?}", id, &res);
59                let res = rx.recv();
60                debug!("Recevied {:?}", res);
61                stats.to_json()
62            }),
63        );
64
65    info!("stats_meta:");
66    sdata.describe_meta(&mut std::io::stderr(), None).unwrap();
67
68    let server = StatsServer::<ThreadId, String>::new(sdata)
69        .set_path(&path)
70        .launch()
71        .unwrap();
72
73    debug!("Doing unnecessary server channel handling");
74    let (tx, rx) = server.channels();
75    spawn(move || {
76        while let Ok(id) = rx.recv() {
77            if let Err(e) = tx.send(format!("hello {:?}", &id)) {
78                warn!("Server channel errored ({:?})", &e);
79                break;
80            }
81        }
82    });
83
84    info!("Server listening. Run `client {:?}`.", &path);
85    info!("Use `socat - UNIX-CONNECT:{:?}` for raw connection.", &path);
86    info!("Press any key to exit.");
87
88    let mut buf: [u8; 1] = [0];
89    let _ = std::io::stdin().read(&mut buf);
90}