scx_stats: Make ScxStatsServerData a public carrier of data needed for stats server

And move related ops into it. This is a bit more natural and will also allow
doing other operaitons (e.g. describing stats) without launching the server.
This commit is contained in:
Tejun Heo 2024-08-23 12:23:57 -10:00
parent e878dac619
commit 405bcc63fe
7 changed files with 173 additions and 178 deletions

View File

@ -1,5 +1,5 @@
use log::{debug, info, warn};
use scx_stats::{Meta, ScxStatsServer, ToJson};
use scx_stats::{Meta, ScxStatsServer, ScxStatsServerData, ToJson};
use scx_stats_derive::Stats;
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
@ -47,8 +47,7 @@ fn main() {
// If communication from the stats generating closure is not necessary,
// ScxStatsServer::<(), ()> can be used. This example sends thread ID
// and receives the formatted string just for demonstration.
let server = ScxStatsServer::<ThreadId, String>::new()
.set_path(&path)
let sdata = ScxStatsServerData::<ThreadId, String>::new()
.add_meta(ClusterStats::meta())
.add_meta(DomainStats::meta())
.add_stats(
@ -61,12 +60,15 @@ fn main() {
debug!("Recevied {:?}", res);
stats.to_json()
}),
)
.launch()
.unwrap();
);
info!("stats_meta:");
server.describe_meta(&mut std::io::stderr(), None).unwrap();
sdata.describe_meta(&mut std::io::stderr(), None).unwrap();
let server = ScxStatsServer::<ThreadId, String>::new(sdata)
.set_path(&path)
.launch()
.unwrap();
debug!("Doing unnecessary server channel handling");
let (tx, rx) = server.channels();

View File

@ -8,8 +8,9 @@ pub use stats::{
mod server;
pub use server::{
ScxStatsErrno, ScxStatsOps, ScxStatsRequest, ScxStatsResponse, ScxStatsServer, StatsCloser,
StatsOpener, StatsReader, StatsReaderSend, StatsReaderSync, ToJson,
ScxStatsErrno, ScxStatsOps, ScxStatsRequest, ScxStatsResponse, ScxStatsServer,
ScxStatsServerData, StatsCloser, StatsOpener, StatsReader, StatsReaderSend, StatsReaderSync,
ToJson,
};
mod client;

View File

@ -166,12 +166,160 @@ impl<Req, Res> Clone for ChannelPair<Req, Res> {
}
}
struct ScxStatsServerData<Req, Res> {
pub struct ScxStatsServerData<Req, Res>
where
Req: Send + 'static,
Res: Send + 'static,
{
top: Option<String>,
meta: BTreeMap<String, ScxStatsMeta>,
ops: BTreeMap<String, Arc<Mutex<ScxStatsOps<Req, Res>>>>,
}
impl<Req, Res> ScxStatsServerData<Req, Res>
where
Req: Send + 'static,
Res: Send + 'static,
{
pub fn new() -> Self {
Self {
top: None,
meta: BTreeMap::new(),
ops: BTreeMap::new(),
}
}
pub fn add_meta(mut self, meta: ScxStatsMeta) -> Self {
if meta.attrs.top.is_some() && self.top.is_none() {
self.top = Some(meta.name.clone());
}
self.meta.insert(meta.name.clone(), meta);
self
}
pub fn add_ops(mut self, name: &str, ops: ScxStatsOps<Req, Res>) -> Self {
self.ops.insert(name.to_string(), Arc::new(Mutex::new(ops)));
self
}
pub fn add_stats(self, name: &str, fetch: Box<dyn StatsReaderSend<Req, Res>>) -> Self {
let wrapped_fetch = Mutex::new(fetch);
let read: Box<dyn StatsReaderSync<Req, Res>> =
Box::new(move |args, chan| wrapped_fetch.lock().unwrap()(args, chan));
let wrapped_read = Arc::new(read);
let ops = ScxStatsOps {
open: Box::new(move |_| {
let copy = wrapped_read.clone();
Ok(Box::new(move |args, chan| copy(args, chan)))
}),
close: None,
};
self.add_ops(name, ops)
}
fn visit_meta_inner(
&self,
name: &str,
visit: &mut impl FnMut(&ScxStatsMeta) -> Result<()>,
visited: &mut BTreeSet<String>,
) -> Result<()> {
let m = match self.meta.get(name) {
Some(v) => v,
None => bail!("unknown stats meta name {}", name),
};
if !visited.insert(name.into()) {
bail!("loop in stats meta detected, {} already nested", name);
}
visit(m)?;
for (fname, field) in m.fields.iter() {
match &field.data {
ScxStatsData::Array(ScxStatsKind::Struct(nested)) => {
self.visit_meta_inner(nested, visit, visited)?
}
ScxStatsData::Dict {
key: ScxStatsKind::Struct(nested),
datum: _,
} => bail!("{}.{} is a dict with struct {} as key", name, fname, nested),
ScxStatsData::Dict {
key: _,
datum: ScxStatsKind::Struct(nested),
} => self.visit_meta_inner(nested, visit, visited)?,
_ => {}
}
}
visited.remove(name);
Ok(())
}
fn visit_meta(
&self,
from: &str,
visit: &mut impl FnMut(&ScxStatsMeta) -> Result<()>,
) -> Result<()> {
let mut visited = BTreeSet::<String>::new();
self.visit_meta_inner(from, visit, &mut visited)
}
fn verify_meta(&self) -> Result<()> {
if self.top.is_none() {
warn!("top-level stats metadata missing");
return Ok(());
}
// Null visit checks all nested stats are reacheable without loops.
self.visit_meta(self.top.as_ref().unwrap(), &mut |_| Ok(()))
}
pub fn describe_meta<W: Write>(&self, w: &mut W, from: Option<&str>) -> Result<()> {
let from = match from {
Some(v) => v,
None => self
.top
.as_ref()
.ok_or_else(|| anyhow!("don't know where to start "))?,
};
let (mut nwidth, mut fwidth, mut dwidth) = (0usize, 0usize, 0usize);
self.visit_meta(from, &mut |m| {
nwidth = nwidth.max(m.name.len());
(fwidth, dwidth) = m.fields.iter().fold((fwidth, dwidth), |acc, (n, f)| {
(acc.0.max(n.len()), acc.1.max(f.data.to_string().len()))
});
Ok(())
})?;
self.visit_meta(from, &mut |m| {
write!(w, "[{:nw$}]", m.name, nw = nwidth)?;
if let Some(desc) = &m.attrs.desc {
write!(w, " {}", desc)?;
}
writeln!(w, "")?;
for (fname, f) in m.fields.iter() {
write!(
w,
" {:fw$} {:dw$}",
fname,
f.data.to_string(),
fw = fwidth,
dw = dwidth
)?;
if let Some(desc) = &f.attrs.desc {
write!(w, " {}", desc)?;
}
writeln!(w, "")?;
}
Ok(())
})
}
}
struct ScxStatsServerInner<Req, Res>
where
Req: Send + 'static,
@ -430,7 +578,7 @@ where
Req: Send + 'static,
Res: Send + 'static,
{
pub fn new() -> Self {
pub fn new(data: ScxStatsServerData<Req, Res>) -> Self {
let (ich, och) = ChannelPair::<Req, Res>::bidi();
Self {
@ -438,54 +586,13 @@ where
sched_path: PathBuf::from("root"),
stats_path: PathBuf::from("stats"),
path: None,
data: Arc::new(Mutex::new(ScxStatsServerData {
top: None,
meta: BTreeMap::new(),
ops: BTreeMap::new(),
})),
data: Arc::new(Mutex::new(data)),
outer_ch: och,
inner_ch: Some(ich),
exit: Arc::new(AtomicBool::new(false)),
}
}
pub fn add_meta(self, meta: ScxStatsMeta) -> Self {
let mut data = self.data.lock().unwrap();
if meta.attrs.top.is_some() && data.top.is_none() {
data.top = Some(meta.name.clone());
}
data.meta.insert(meta.name.clone(), meta);
drop(data);
self
}
pub fn add_ops(self, name: &str, ops: ScxStatsOps<Req, Res>) -> Self {
self.data
.lock()
.unwrap()
.ops
.insert(name.to_string(), Arc::new(Mutex::new(ops)));
self
}
pub fn add_stats(self, name: &str, fetch: Box<dyn StatsReaderSend<Req, Res>>) -> Self {
let wrapped_fetch = Mutex::new(fetch);
let read: Box<dyn StatsReaderSync<Req, Res>> =
Box::new(move |args, chan| wrapped_fetch.lock().unwrap()(args, chan));
let wrapped_read = Arc::new(read);
let ops = ScxStatsOps {
open: Box::new(move |_| {
let copy = wrapped_read.clone();
Ok(Box::new(move |args, chan| copy(args, chan)))
}),
close: None,
};
self.add_ops(name, ops)
}
pub fn set_base_path<P: AsRef<Path>>(mut self, path: P) -> Self {
self.base_path = PathBuf::from(path.as_ref());
self
@ -506,68 +613,8 @@ where
self
}
fn visit_meta_inner(
name: &str,
visit: &mut impl FnMut(&ScxStatsMeta) -> Result<()>,
meta: &BTreeMap<String, ScxStatsMeta>,
visited: &mut BTreeSet<String>,
) -> Result<()> {
let m = match meta.get(name) {
Some(v) => v,
None => bail!("unknown stats meta name {}", name),
};
if !visited.insert(name.into()) {
bail!("loop in stats meta detected, {} already nested", name);
}
visit(m)?;
for (fname, field) in m.fields.iter() {
match &field.data {
ScxStatsData::Array(ScxStatsKind::Struct(nested)) => {
Self::visit_meta_inner(nested, visit, meta, visited)?
}
ScxStatsData::Dict {
key: ScxStatsKind::Struct(nested),
datum: _,
} => bail!("{}.{} is a dict with struct {} as key", name, fname, nested),
ScxStatsData::Dict {
key: _,
datum: ScxStatsKind::Struct(nested),
} => Self::visit_meta_inner(nested, visit, meta, visited)?,
_ => {}
}
}
visited.remove(name);
Ok(())
}
fn visit_meta(
from: &str,
visit: &mut impl FnMut(&ScxStatsMeta) -> Result<()>,
meta: &BTreeMap<String, ScxStatsMeta>,
) -> Result<()> {
let mut visited = BTreeSet::<String>::new();
Self::visit_meta_inner(from, visit, meta, &mut visited)
}
fn verify_meta(&self) -> Result<()> {
let data = self.data.lock().unwrap();
if data.top.is_none() {
warn!("top-level stats metadata missing");
return Ok(());
}
// Null visit checks all nested stats are reacheable without loops.
Self::visit_meta(data.top.as_ref().unwrap(), &mut |_| Ok(()), &data.meta)
}
pub fn launch(mut self) -> Result<Self> {
self.verify_meta()?;
self.data.lock().unwrap().verify_meta()?;
if self.path.is_none() {
self.path = Some(self.base_path.join(&self.sched_path).join(&self.stats_path));
@ -602,59 +649,6 @@ where
pub fn channels(&self) -> (Sender<Res>, Receiver<Req>) {
(self.outer_ch.req.clone(), self.outer_ch.res.clone())
}
pub fn describe_meta<W: Write>(&self, w: &mut W, from: Option<&str>) -> Result<()> {
let data = self.data.lock().unwrap();
let from = match from {
Some(v) => v,
None => data
.top
.as_ref()
.ok_or_else(|| anyhow!("don't know where to start "))?,
};
let (mut nwidth, mut fwidth, mut dwidth) = (0usize, 0usize, 0usize);
Self::visit_meta(
from,
&mut |m| {
nwidth = nwidth.max(m.name.len());
(fwidth, dwidth) = m.fields.iter().fold((fwidth, dwidth), |acc, (n, f)| {
(acc.0.max(n.len()), acc.1.max(f.data.to_string().len()))
});
Ok(())
},
&data.meta,
)?;
Self::visit_meta(
from,
&mut |m| {
write!(w, "[{:nw$}]", m.name, nw = nwidth)?;
if let Some(desc) = &m.attrs.desc {
write!(w, " {}", desc)?;
}
writeln!(w, "")?;
for (fname, f) in m.fields.iter() {
write!(
w,
" {:fw$} {:dw$}",
fname,
f.data.to_string(),
fw = fwidth,
dw = dwidth
)?;
if let Some(desc) = &f.attrs.desc {
write!(w, " {}", desc)?;
}
writeln!(w, "")?;
}
Ok(())
},
&data.meta,
)
}
}
impl<Req, Res> std::ops::Drop for ScxStatsServer<Req, Res>

View File

@ -1611,7 +1611,7 @@ impl<'a, 'b> Scheduler<'a, 'b> {
// Attach.
let struct_ops = scx_ops_attach!(skel, layered)?;
let stats_server = stats::launch_server()?;
let stats_server = ScxStatsServer::new(stats::server_data()).launch()?;
let sched = Self {
struct_ops: Some(struct_ops),

View File

@ -10,7 +10,7 @@ use chrono::Local;
use log::warn;
use scx_stats::Meta;
use scx_stats::ScxStatsOps;
use scx_stats::ScxStatsServer;
use scx_stats::ScxStatsServerData;
use scx_stats::StatsCloser;
use scx_stats::StatsOpener;
use scx_stats::StatsReader;
@ -432,7 +432,7 @@ pub enum StatsRes {
Bye,
}
pub fn launch_server() -> Result<ScxStatsServer<StatsReq, StatsRes>> {
pub fn server_data() -> ScxStatsServerData<StatsReq, StatsRes> {
let open: Box<dyn StatsOpener<StatsReq, StatsRes>> = Box::new(move |(req_ch, res_ch)| {
let tid = current().id();
req_ch.send(StatsReq::Hello(tid))?;
@ -463,7 +463,7 @@ pub fn launch_server() -> Result<ScxStatsServer<StatsReq, StatsRes>> {
}
});
Ok(ScxStatsServer::new()
ScxStatsServerData::new()
.add_meta(LayerStats::meta())
.add_meta(SysStats::meta())
.add_ops(
@ -473,11 +473,10 @@ pub fn launch_server() -> Result<ScxStatsServer<StatsReq, StatsRes>> {
close: Some(close),
},
)
.launch()?)
}
pub fn monitor(intv: Duration, shutdown: Arc<AtomicBool>) -> Result<()> {
scx_utils::monitor_stats::<SysStats>(
scx_utils::monitor_stats::<SysStats>(
&vec![],
intv,
|| shutdown.load(Ordering::Relaxed),

View File

@ -443,7 +443,7 @@ impl<'a> Scheduler<'a> {
// Attach.
let mut skel = scx_ops_load!(skel, rusty, uei)?;
let struct_ops = Some(scx_ops_attach!(skel, rusty)?);
let stats_server = stats::launch_server()?;
let stats_server = ScxStatsServer::new(stats::server_data()).launch()?;
info!("Rusty scheduler started! Run `scx_rusty --monitor` for metrics.");

View File

@ -4,7 +4,7 @@ use chrono::DateTime;
use chrono::Local;
use scx_stats::Meta;
use scx_stats::ScxStatsOps;
use scx_stats::ScxStatsServer;
use scx_stats::ScxStatsServerData;
use scx_stats::StatsOpener;
use scx_stats::StatsReader;
use scx_stats::ToJson;
@ -215,7 +215,7 @@ impl ClusterStats {
}
}
pub fn launch_server() -> Result<ScxStatsServer<StatsCtx, (StatsCtx, ClusterStats)>> {
pub fn server_data() -> ScxStatsServerData<StatsCtx, (StatsCtx, ClusterStats)> {
let open: Box<dyn StatsOpener<StatsCtx, (StatsCtx, ClusterStats)>> =
Box::new(move |(req_ch, res_ch)| {
// Send one bogus request on open to establish prev_sc.
@ -234,12 +234,11 @@ pub fn launch_server() -> Result<ScxStatsServer<StatsCtx, (StatsCtx, ClusterStat
Ok(read)
});
Ok(ScxStatsServer::new()
ScxStatsServerData::new()
.add_meta(DomainStats::meta())
.add_meta(NodeStats::meta())
.add_meta(ClusterStats::meta())
.add_ops("top", ScxStatsOps { open, close: None })
.launch()?)
}
pub fn monitor(intv: Duration, shutdown: Arc<AtomicBool>) -> Result<()> {