scx_stats: Implement ScxStatsServer::add_stats_ops()

This allows stats reader to maintain persistent states per connection.
This commit is contained in:
Tejun Heo 2024-08-16 13:04:32 -10:00
parent 96050f8bdd
commit 689d380db1
2 changed files with 81 additions and 25 deletions

View File

@ -7,7 +7,9 @@ pub use stats::{
};
mod server;
pub use server::{ScxStatsErrno, ScxStatsRequest, ScxStatsResponse, ScxStatsServer, ToJson};
pub use server::{
ScxStatsErrno, ScxStatsOps, ScxStatsRequest, ScxStatsResponse, ScxStatsServer, ToJson,
};
mod client;
pub use client::ScxStatsClient;

View File

@ -2,6 +2,7 @@ use crate::{Meta, ScxStatsMeta};
use anyhow::{anyhow, Context, Result};
use log::warn;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::BTreeMap;
use std::io::{BufRead, BufReader, Write};
use std::os::unix::net::{UnixListener, UnixStream};
@ -9,10 +10,40 @@ use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use std::thread::spawn;
type StatMap = BTreeMap<
String,
Arc<Mutex<Box<dyn FnMut(&BTreeMap<String, String>) -> Result<serde_json::Value> + Send>>>,
>;
pub trait StatsReader: FnMut(&BTreeMap<String, String>) -> Result<Value> {}
impl<T: FnMut(&BTreeMap<String, String>) -> Result<Value>> StatsReader for T {}
pub trait StatsReaderSend: FnMut(&BTreeMap<String, String>) -> Result<Value> + Send {}
impl<T: FnMut(&BTreeMap<String, String>) -> Result<Value> + Send> StatsReaderSend for T {}
pub trait StatsReaderSync: Fn(&BTreeMap<String, String>) -> Result<Value> + Send + Sync {}
impl<T: Fn(&BTreeMap<String, String>) -> Result<Value> + Send + Sync> StatsReaderSync for T {}
pub trait StatsOpener: FnMut() -> Result<Box<dyn StatsReader>> + Send {}
impl<T: FnMut() -> Result<Box<dyn StatsReader>> + Send> StatsOpener for T {}
pub trait StatsCloser: FnOnce() + Send {}
impl<T: FnOnce() + Send> StatsCloser for T {}
pub struct ScxStatsOps {
open: Box<dyn StatsOpener>,
close: Option<Box<dyn StatsCloser>>,
}
#[derive(Default)]
struct ScxStatsOpenOps {
map: BTreeMap<String, (Arc<Mutex<ScxStatsOps>>, Box<dyn StatsReader>)>,
}
impl std::ops::Drop for ScxStatsOpenOps {
fn drop(&mut self) {
for (_, (ops, _)) in self.map.iter_mut() {
if let Some(close) = ops.lock().unwrap().close.take() {
close();
}
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ScxStatsRequest {
@ -33,7 +64,7 @@ impl ScxStatsRequest {
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ScxStatsResponse {
pub errno: i32,
pub args: BTreeMap<String, serde_json::Value>,
pub args: BTreeMap<String, Value>,
}
pub struct ScxStatsErrno(pub i32);
@ -52,7 +83,7 @@ impl std::fmt::Debug for ScxStatsErrno {
struct ScxStatsServerData {
stats_meta: BTreeMap<String, ScxStatsMeta>,
stats: StatMap,
stats_ops: BTreeMap<String, Arc<Mutex<ScxStatsOps>>>,
}
struct ScxStatsServerInner {
@ -64,11 +95,14 @@ impl ScxStatsServerInner {
fn new(
listener: UnixListener,
stats_meta: BTreeMap<String, ScxStatsMeta>,
stats: StatMap,
stats_ops: BTreeMap<String, Arc<Mutex<ScxStatsOps>>>,
) -> Self {
Self {
listener,
data: Arc::new(Mutex::new(ScxStatsServerData { stats_meta, stats })),
data: Arc::new(Mutex::new(ScxStatsServerData {
stats_meta,
stats_ops,
})),
}
}
@ -89,6 +123,7 @@ impl ScxStatsServerInner {
data: &Arc<Mutex<ScxStatsServerData>>,
) -> Result<ScxStatsResponse> {
let req: ScxStatsRequest = serde_json::from_str(&line)?;
let mut open_ops = ScxStatsOpenOps::default();
match req.req.as_str() {
"stats" => {
@ -97,13 +132,20 @@ impl ScxStatsServerInner {
None => "top",
};
let handler = match data.lock().unwrap().stats.get(target) {
let ops = match data.lock().unwrap().stats_ops.get(target) {
Some(v) => v.clone(),
None => Err(anyhow!("unknown stat target {:?}", req)
.context(ScxStatsErrno(libc::EINVAL)))?,
};
let resp = handler.lock().unwrap()(&req.args)?;
if !open_ops.map.contains_key(target) {
let read = (ops.lock().unwrap().open)()?;
open_ops.map.insert(target.into(), (ops.clone(), read));
}
let read = &mut open_ops.map.get_mut(target).unwrap().1;
let resp = read(&req.args)?;
Self::build_resp(0, &resp)
}
@ -113,11 +155,11 @@ impl ScxStatsServerInner {
}
fn serve(mut stream: UnixStream, data: Arc<Mutex<ScxStatsServerData>>) -> Result<()> {
let mut reader = BufReader::new(stream.try_clone()?);
let mut stream_reader = BufReader::new(stream.try_clone()?);
loop {
let mut line = String::new();
reader.read_line(&mut line)?;
stream_reader.read_line(&mut line)?;
if line.len() == 0 {
return Ok(());
}
@ -164,7 +206,7 @@ pub struct ScxStatsServer {
path: Option<PathBuf>,
stats_meta_holder: BTreeMap<String, ScxStatsMeta>,
stats_holder: StatMap,
stats_ops_holder: BTreeMap<String, Arc<Mutex<ScxStatsOps>>>,
}
impl ScxStatsServer {
@ -176,7 +218,7 @@ impl ScxStatsServer {
path: None,
stats_meta_holder: BTreeMap::new(),
stats_holder: BTreeMap::new(),
stats_ops_holder: BTreeMap::new(),
}
}
@ -185,16 +227,28 @@ impl ScxStatsServer {
self
}
pub fn add_stats(
mut self,
name: &str,
fetch: Box<dyn FnMut(&BTreeMap<String, String>) -> Result<serde_json::Value> + Send>,
) -> Self {
self.stats_holder
.insert(name.to_string(), Arc::new(Mutex::new(Box::new(fetch))));
pub fn add_stats_ops(mut self, name: &str, ops: ScxStatsOps) -> Self {
self.stats_ops_holder
.insert(name.to_string(), Arc::new(Mutex::new(ops)));
self
}
pub fn add_stats(self, name: &str, fetch: Box<dyn StatsReaderSend>) -> Self {
let wrapped_fetch = Mutex::new(fetch);
let read: Box<dyn StatsReaderSync> =
Box::new(move |args| wrapped_fetch.lock().unwrap()(args));
let wrapped_read = Arc::new(read);
let ops = ScxStatsOps {
open: Box::new(move || {
let copy = wrapped_read.clone();
Ok(Box::new(move |args| copy(args)))
}),
close: None,
};
self.add_stats_ops(name, ops)
}
pub fn set_base_path<P: AsRef<Path>>(mut self, path: P) -> Self {
self.base_path = PathBuf::from(path.as_ref());
self
@ -238,7 +292,7 @@ impl ScxStatsServer {
let mut stats_meta = BTreeMap::new();
let mut stats = BTreeMap::new();
std::mem::swap(&mut stats_meta, &mut self.stats_meta_holder);
std::mem::swap(&mut stats, &mut self.stats_holder);
std::mem::swap(&mut stats, &mut self.stats_ops_holder);
let inner = ScxStatsServerInner::new(listener, stats_meta, stats);
@ -248,14 +302,14 @@ impl ScxStatsServer {
}
pub trait ToJson {
fn to_json(&self) -> Result<serde_json::Value>;
fn to_json(&self) -> Result<Value>;
}
impl<T> ToJson for T
where
T: Meta + Serialize,
{
fn to_json(&self) -> Result<serde_json::Value> {
fn to_json(&self) -> Result<Value> {
Ok(serde_json::to_value(self)?)
}
}