scx_utils: Factor out monitor_stats() from scx_rusty and scx_layered

This commit is contained in:
Tejun Heo 2024-08-23 06:46:19 -10:00
parent ae3024e938
commit 44a0f1b124
7 changed files with 80 additions and 78 deletions

View File

@ -21,6 +21,8 @@ libbpf-rs = "0.24.1"
log = "0.4.17"
paste = "1.0"
regex = "1.10"
scx_stats = { path = "../scx_stats", version = "0.2.0" }
serde = { version = "1.0", features = ["derive"] }
sscanf = "0.4"
tar = "0.4"
walkdir = "2.4"

View File

@ -77,3 +77,7 @@ pub use infeasible::LoadLedger;
mod log_recorder;
pub use log_recorder::LogRecorderBuilder;
mod misc;
pub use misc::monitor_stats;

View File

@ -0,0 +1,53 @@
use anyhow::Result;
use log::info;
use scx_stats::ScxStatsClient;
use serde::Deserialize;
use std::thread::sleep;
use std::time::Duration;
pub fn monitor_stats<T>(
stats_args: &Vec<(String, String)>,
intv: Duration,
mut should_exit: impl FnMut() -> bool,
mut output: impl FnMut(T) -> Result<()>,
) -> Result<()>
where
T: for<'a> Deserialize<'a>,
{
let mut retry_cnt: u32 = 0;
while !should_exit() {
let mut client = match ScxStatsClient::new().connect() {
Ok(v) => v,
Err(e) => match e.downcast_ref::<std::io::Error>() {
Some(ioe) if ioe.kind() == std::io::ErrorKind::ConnectionRefused => {
if retry_cnt == 1 {
info!("Stats server not avaliable, retrying...");
}
retry_cnt += 1;
sleep(Duration::from_secs(1));
continue;
}
_ => Err(e)?,
},
};
retry_cnt = 0;
while !should_exit() {
let stats = match client.request::<T>("stats", stats_args.clone()) {
Ok(v) => v,
Err(e) => match e.downcast_ref::<std::io::Error>() {
Some(ioe) => {
info!("Connection to stats_server failed ({})", &ioe);
sleep(Duration::from_secs(1));
break;
}
None => Err(e)?,
},
};
output(stats)?;
sleep(intv);
}
}
Ok(())
}

View File

@ -1086,6 +1086,8 @@ dependencies = [
"metrics-util",
"paste",
"regex",
"scx_stats",
"serde",
"sscanf",
"tar",
"vergen",

View File

@ -7,9 +7,8 @@ use anyhow::{bail, Result};
use bitvec::prelude::*;
use chrono::DateTime;
use chrono::Local;
use log::{info, warn};
use log::warn;
use scx_stats::Meta;
use scx_stats::ScxStatsClient;
use scx_stats::ScxStatsOps;
use scx_stats::ScxStatsServer;
use scx_stats::StatsCloser;
@ -25,7 +24,6 @@ use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::thread::current;
use std::thread::sleep;
use std::thread::ThreadId;
use std::time::Duration;
use std::time::SystemTime;
@ -485,42 +483,14 @@ pub fn launch_server() -> Result<ScxStatsServer<StatsReq, StatsRes>> {
}
pub fn monitor(intv: Duration, shutdown: Arc<AtomicBool>) -> Result<()> {
let mut retry_cnt: u32 = 0;
while !shutdown.load(Ordering::Relaxed) {
let mut client = match ScxStatsClient::new().connect() {
Ok(v) => v,
Err(e) => match e.downcast_ref::<std::io::Error>() {
Some(ioe) if ioe.kind() == std::io::ErrorKind::ConnectionRefused => {
if retry_cnt == 1 {
info!("Stats server not avaliable, retrying...");
}
retry_cnt += 1;
sleep(Duration::from_secs(1));
continue;
}
_ => Err(e)?,
},
};
retry_cnt = 0;
while !shutdown.load(Ordering::Relaxed) {
let sst = match client.request::<SysStats>("stats", vec![]) {
Ok(v) => v,
Err(e) => match e.downcast_ref::<std::io::Error>() {
Some(ioe) => {
info!("Connection to stats_server failed ({})", &ioe);
sleep(Duration::from_secs(1));
break;
}
None => Err(e)?,
},
};
scx_utils::monitor_stats::<SysStats>(
&vec![],
intv,
|| shutdown.load(Ordering::Relaxed),
|sst| {
let dt = DateTime::<Local>::from(UNIX_EPOCH + Duration::from_secs_f64(sst.at));
println!("###### {} ######", dt.to_rfc2822());
sst.format_all(&mut std::io::stdout())?;
sleep(intv);
}
}
Ok(())
sst.format_all(&mut std::io::stdout())
},
)
}

View File

@ -1095,6 +1095,8 @@ dependencies = [
"metrics-util",
"paste",
"regex",
"scx_stats",
"serde",
"sscanf",
"tar",
"vergen",

View File

@ -2,9 +2,7 @@ use crate::StatsCtx;
use anyhow::Result;
use chrono::DateTime;
use chrono::Local;
use log::info;
use scx_stats::Meta;
use scx_stats::ScxStatsClient;
use scx_stats::ScxStatsOps;
use scx_stats::ScxStatsServer;
use scx_stats::StatsOpener;
@ -19,7 +17,6 @@ use std::io::Write;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::thread::sleep;
use std::time::Duration;
use std::time::UNIX_EPOCH;
@ -246,46 +243,18 @@ pub fn launch_server() -> Result<ScxStatsServer<StatsCtx, (StatsCtx, ClusterStat
}
pub fn monitor(intv: Duration, shutdown: Arc<AtomicBool>) -> Result<()> {
let mut retry_cnt: u32 = 0;
while !shutdown.load(Ordering::Relaxed) {
let mut client = match ScxStatsClient::new().connect() {
Ok(v) => v,
Err(e) => match e.downcast_ref::<std::io::Error>() {
Some(ioe) if ioe.kind() == std::io::ErrorKind::ConnectionRefused => {
if retry_cnt == 1 {
info!("Stats server not avaliable, retrying...");
}
retry_cnt += 1;
sleep(Duration::from_secs(1));
continue;
}
_ => Err(e)?,
},
};
retry_cnt = 0;
while !shutdown.load(Ordering::Relaxed) {
let cst = match client.request::<ClusterStats>("stats", vec![]) {
Ok(v) => v,
Err(e) => match e.downcast_ref::<std::io::Error>() {
Some(ioe) => {
info!("Connection to stats_server failed ({})", &ioe);
sleep(Duration::from_secs(1));
break;
}
None => Err(e)?,
},
};
scx_utils::monitor_stats::<ClusterStats>(
&vec![],
intv,
|| shutdown.load(Ordering::Relaxed),
|cst| {
let dt = DateTime::<Local>::from(UNIX_EPOCH + Duration::from_micros(cst.at_us));
println!(
"###### {}, load balance @ {:7.1}ms ######",
dt.to_rfc2822(),
(cst.lb_at_us as i64 - cst.at_us as i64) as f64 / 1000.0
);
cst.format(&mut std::io::stdout())?;
sleep(intv);
}
}
Ok(())
cst.format(&mut std::io::stdout())
},
)
}