Merge pull request #542 from sched-ext/htejun/scx_stats

scx_stats, scx_rusty, scx_layered: Implement `--help-stats`
This commit is contained in:
Tejun Heo 2024-08-24 15:38:36 -10:00 committed by GitHub
commit 1bba713a29
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 509 additions and 119 deletions

View File

@ -1,5 +1,5 @@
use log::{debug, warn};
use scx_stats::{Meta, ScxStatsServer, ToJson};
use log::{debug, info, warn};
use scx_stats::{Meta, ScxStatsServer, ScxStatsServerData, ToJson};
use scx_stats_derive::Stats;
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
@ -47,10 +47,9 @@ fn main() {
// If communication from the stats generating closure is not necessary,
// ScxStatsServer::<(), ()> can be used. This example sends thread ID
// and receives the formatted string just for demonstration.
let server = ScxStatsServer::<ThreadId, String>::new()
.set_path(&path)
.add_stats_meta(ClusterStats::meta())
.add_stats_meta(DomainStats::meta())
let sdata = ScxStatsServerData::<ThreadId, String>::new()
.add_meta(ClusterStats::meta())
.add_meta(DomainStats::meta())
.add_stats(
"top",
Box::new(move |_args, (tx, rx)| {
@ -61,7 +60,13 @@ fn main() {
debug!("Recevied {:?}", res);
stats.to_json()
}),
)
);
info!("stats_meta:");
sdata.describe_meta(&mut std::io::stderr(), None).unwrap();
let server = ScxStatsServer::<ThreadId, String>::new(sdata)
.set_path(&path)
.launch()
.unwrap();
@ -76,12 +81,9 @@ fn main() {
}
});
println!(
"Server listening. Run `client {:?}`.\n\
Use `socat - UNIX-CONNECT:{:?}` for raw connection.\n\
Press any key to exit.",
&path, &path,
);
info!("Server listening. Run `client {:?}`.", &path);
info!("Use `socat - UNIX-CONNECT:{:?}` for raw connection.", &path);
info!("Press any key to exit.");
let mut buf: [u8; 1] = [0];
let _ = std::io::stdin().read(&mut buf);

View File

@ -8,8 +8,9 @@ pub use stats::{
mod server;
pub use server::{
ScxStatsErrno, ScxStatsOps, ScxStatsRequest, ScxStatsResponse, ScxStatsServer, StatsCloser,
StatsOpener, StatsReader, StatsReaderSend, StatsReaderSync, ToJson,
ScxStatsErrno, ScxStatsOps, ScxStatsRequest, ScxStatsResponse, ScxStatsServer,
ScxStatsServerData, StatsCloser, StatsOpener, StatsReader, StatsReaderSend, StatsReaderSync,
ToJson,
};
mod client;

View File

@ -1,11 +1,11 @@
use crate::ScxStatsClient;
use crate::{Meta, ScxStatsMeta};
use anyhow::{anyhow, Context, Result};
use crate::{Meta, ScxStatsData, ScxStatsKind, ScxStatsMeta};
use anyhow::{anyhow, bail, Context, Result};
use crossbeam::channel::{unbounded, Receiver, RecvError, Select, SendError, Sender};
use log::{debug, error, warn};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::BTreeMap;
use std::collections::{BTreeMap, BTreeSet};
use std::io::{BufRead, BufReader, Write};
use std::os::unix::net::{UnixListener, UnixStream};
use std::path::{Path, PathBuf};
@ -166,9 +166,169 @@ impl<Req, Res> Clone for ChannelPair<Req, Res> {
}
}
struct ScxStatsServerData<Req, Res> {
stats_meta: BTreeMap<String, ScxStatsMeta>,
stats_ops: BTreeMap<String, Arc<Mutex<ScxStatsOps<Req, Res>>>>,
pub struct ScxStatsServerData<Req, Res>
where
Req: Send + 'static,
Res: Send + 'static,
{
top: Option<String>,
meta: BTreeMap<String, ScxStatsMeta>,
ops: BTreeMap<String, Arc<Mutex<ScxStatsOps<Req, Res>>>>,
}
impl<Req, Res> ScxStatsServerData<Req, Res>
where
Req: Send + 'static,
Res: Send + 'static,
{
pub fn new() -> Self {
Self {
top: None,
meta: BTreeMap::new(),
ops: BTreeMap::new(),
}
}
pub fn add_meta(mut self, meta: ScxStatsMeta) -> Self {
if meta.attrs.top.is_some() && self.top.is_none() {
self.top = Some(meta.name.clone());
}
self.meta.insert(meta.name.clone(), meta);
self
}
pub fn add_ops(mut self, name: &str, ops: ScxStatsOps<Req, Res>) -> Self {
self.ops.insert(name.to_string(), Arc::new(Mutex::new(ops)));
self
}
pub fn add_stats(self, name: &str, fetch: Box<dyn StatsReaderSend<Req, Res>>) -> Self {
let wrapped_fetch = Mutex::new(fetch);
let read: Box<dyn StatsReaderSync<Req, Res>> =
Box::new(move |args, chan| wrapped_fetch.lock().unwrap()(args, chan));
let wrapped_read = Arc::new(read);
let ops = ScxStatsOps {
open: Box::new(move |_| {
let copy = wrapped_read.clone();
Ok(Box::new(move |args, chan| copy(args, chan)))
}),
close: None,
};
self.add_ops(name, ops)
}
fn visit_meta_inner(
&self,
name: &str,
visit: &mut impl FnMut(&ScxStatsMeta) -> Result<()>,
nesting: &mut BTreeSet<String>,
visited: &mut BTreeSet<String>,
) -> Result<()> {
let m = match self.meta.get(name) {
Some(v) => v,
None => bail!("unknown stats meta name {}", name),
};
if !nesting.insert(name.into()) {
bail!("loop in stats meta detected, {} already nested", name);
}
if !visited.insert(name.into()) {
return Ok(());
}
visit(m)?;
for (fname, field) in m.fields.iter() {
match &field.data {
ScxStatsData::Array(ScxStatsKind::Struct(inner)) => {
self.visit_meta_inner(inner, visit, nesting, visited)?
}
ScxStatsData::Dict {
key: ScxStatsKind::Struct(inner),
datum: _,
} => bail!("{}.{} is a dict with struct {} as key", name, fname, inner),
ScxStatsData::Dict {
key: _,
datum: ScxStatsKind::Struct(inner),
} => self.visit_meta_inner(inner, visit, nesting, visited)?,
_ => {}
}
}
nesting.remove(name);
Ok(())
}
fn visit_meta(
&self,
from: &str,
visit: &mut impl FnMut(&ScxStatsMeta) -> Result<()>,
) -> Result<()> {
let mut nesting = BTreeSet::<String>::new();
let mut visited = BTreeSet::<String>::new();
self.visit_meta_inner(from, visit, &mut nesting, &mut visited)
}
fn verify_meta(&self) -> Result<()> {
if self.top.is_none() {
warn!("top-level stats metadata missing");
return Ok(());
}
// Null visit checks all nested stats are reacheable without loops.
self.visit_meta(self.top.as_ref().unwrap(), &mut |_| Ok(()))
}
pub fn describe_meta<W: Write>(&self, w: &mut W, from: Option<&str>) -> Result<()> {
let from = match from {
Some(v) => v,
None => self
.top
.as_ref()
.ok_or_else(|| anyhow!("don't know where to start "))?,
};
let (mut nwidth, mut fwidth, mut dwidth) = (0usize, 0usize, 0usize);
self.visit_meta(from, &mut |m| {
nwidth = nwidth.max(m.name.len());
(fwidth, dwidth) = m.fields.iter().fold((fwidth, dwidth), |acc, (n, f)| {
(acc.0.max(n.len()), acc.1.max(f.data.to_string().len()))
});
Ok(())
})?;
let mut first = true;
self.visit_meta(from, &mut |m| {
if !first {
writeln!(w, "")?;
}
first = false;
write!(w, "[{:nw$}]", m.name, nw = nwidth)?;
if let Some(desc) = &m.attrs.desc {
write!(w, " {}", desc)?;
}
writeln!(w, "")?;
for (fname, f) in m.fields.iter() {
write!(
w,
" {:fw$} {:dw$}",
fname,
f.data.to_string(),
fw = fwidth,
dw = dwidth
)?;
if let Some(desc) = &f.attrs.desc {
write!(w, " {}", desc)?;
}
writeln!(w, "")?;
}
Ok(())
})
}
}
struct ScxStatsServerInner<Req, Res>
@ -189,17 +349,13 @@ where
{
fn new(
listener: UnixListener,
stats_meta: BTreeMap<String, ScxStatsMeta>,
stats_ops: BTreeMap<String, Arc<Mutex<ScxStatsOps<Req, Res>>>>,
data: Arc<Mutex<ScxStatsServerData<Req, Res>>>,
inner_ch: ChannelPair<Req, Res>,
exit: Arc<AtomicBool>,
) -> Self {
Self {
listener,
data: Arc::new(Mutex::new(ScxStatsServerData {
stats_meta,
stats_ops,
})),
data,
inner_ch,
exit,
}
@ -232,7 +388,7 @@ where
None => "top",
};
let ops = match data.lock().unwrap().stats_ops.get(target) {
let ops = match data.lock().unwrap().ops.get(target) {
Some(v) => v.clone(),
None => Err(anyhow!("unknown stat target {:?}", req)
.context(ScxStatsErrno(libc::EINVAL)))?,
@ -251,7 +407,7 @@ where
Self::build_resp(0, &resp)
}
"stats_meta" => Ok(Self::build_resp(0, &data.lock().unwrap().stats_meta)?),
"stats_meta" => Ok(Self::build_resp(0, &data.lock().unwrap().meta)?),
req => Err(anyhow!("unknown command {:?}", req).context(ScxStatsErrno(libc::EINVAL)))?,
}
}
@ -421,8 +577,7 @@ where
stats_path: PathBuf,
path: Option<PathBuf>,
stats_meta_holder: BTreeMap<String, ScxStatsMeta>,
stats_ops_holder: BTreeMap<String, Arc<Mutex<ScxStatsOps<Req, Res>>>>,
data: Arc<Mutex<ScxStatsServerData<Req, Res>>>,
outer_ch: ChannelPair<Res, Req>,
inner_ch: Option<ChannelPair<Req, Res>>,
@ -434,7 +589,7 @@ where
Req: Send + 'static,
Res: Send + 'static,
{
pub fn new() -> Self {
pub fn new(data: ScxStatsServerData<Req, Res>) -> Self {
let (ich, och) = ChannelPair::<Req, Res>::bidi();
Self {
@ -442,43 +597,13 @@ where
sched_path: PathBuf::from("root"),
stats_path: PathBuf::from("stats"),
path: None,
stats_meta_holder: BTreeMap::new(),
stats_ops_holder: BTreeMap::new(),
data: Arc::new(Mutex::new(data)),
outer_ch: och,
inner_ch: Some(ich),
exit: Arc::new(AtomicBool::new(false)),
}
}
pub fn add_stats_meta(mut self, meta: ScxStatsMeta) -> Self {
self.stats_meta_holder.insert(meta.name.clone(), meta);
self
}
pub fn add_stats_ops(mut self, name: &str, ops: ScxStatsOps<Req, Res>) -> Self {
self.stats_ops_holder
.insert(name.to_string(), Arc::new(Mutex::new(ops)));
self
}
pub fn add_stats(self, name: &str, fetch: Box<dyn StatsReaderSend<Req, Res>>) -> Self {
let wrapped_fetch = Mutex::new(fetch);
let read: Box<dyn StatsReaderSync<Req, Res>> =
Box::new(move |args, chan| wrapped_fetch.lock().unwrap()(args, chan));
let wrapped_read = Arc::new(read);
let ops = ScxStatsOps {
open: Box::new(move |_| {
let copy = wrapped_read.clone();
Ok(Box::new(move |args, chan| copy(args, chan)))
}),
close: None,
};
self.add_stats_ops(name, ops)
}
pub fn set_base_path<P: AsRef<Path>>(mut self, path: P) -> Self {
self.base_path = PathBuf::from(path.as_ref());
self
@ -500,6 +625,8 @@ where
}
pub fn launch(mut self) -> Result<Self> {
self.data.lock().unwrap().verify_meta()?;
if self.path.is_none() {
self.path = Some(self.base_path.join(&self.sched_path).join(&self.stats_path));
}
@ -519,15 +646,9 @@ where
let listener =
UnixListener::bind(path).with_context(|| format!("creating UNIX socket {:?}", path))?;
let mut stats_meta = BTreeMap::new();
let mut stats = BTreeMap::new();
std::mem::swap(&mut stats_meta, &mut self.stats_meta_holder);
std::mem::swap(&mut stats, &mut self.stats_ops_holder);
let inner = ScxStatsServerInner::new(
listener,
stats_meta,
stats,
self.data.clone(),
self.inner_ch.take().unwrap(),
self.exit.clone(),
);

View File

@ -53,6 +53,18 @@ impl ScxStatsKind {
}
}
impl std::fmt::Display for ScxStatsKind {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
Self::I64 => write!(f, "i64"),
Self::U64 => write!(f, "u64"),
Self::Float => write!(f, "float"),
Self::String => write!(f, "string"),
Self::Struct(name) => write!(f, "{}", name),
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum ScxStatsData {
#[serde(rename = "datum")]
@ -172,6 +184,16 @@ impl ScxStatsData {
}
}
impl std::fmt::Display for ScxStatsData {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
Self::Datum(kind) => write!(f, "{}", kind),
Self::Array(kind) => write!(f, "[{}]", kind),
Self::Dict { key, datum } => write!(f, "{{{}:{}}}", key, datum),
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum ScxStatsAttr {
Top,

View File

@ -399,6 +399,38 @@ version = "0.8.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b"
[[package]]
name = "crossbeam"
version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1137cd7e7fc0fb5d3c5a8678be38ec56e819125d8d7907411fe24ccb943faca8"
dependencies = [
"crossbeam-channel",
"crossbeam-deque",
"crossbeam-epoch",
"crossbeam-queue",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-channel"
version = "0.5.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "33480d6946193aa8033910124896ca395333cae7e2d1113d1fef6c3272217df2"
dependencies = [
"crossbeam-utils",
]
[[package]]
name = "crossbeam-deque"
version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "613f8cc01fe9cf1a3eb3d7f488fd2fa8388403e97039e2f73692932e291a770d"
dependencies = [
"crossbeam-epoch",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-epoch"
version = "0.9.18"
@ -408,6 +440,15 @@ dependencies = [
"crossbeam-utils",
]
[[package]]
name = "crossbeam-queue"
version = "0.3.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df0346b5d5e76ac2fe4e327c5fd1118d6be7c51dfb18f9b7922923f287471e35"
dependencies = [
"crossbeam-utils",
]
[[package]]
name = "crossbeam-utils"
version = "0.8.20"
@ -1385,6 +1426,20 @@ dependencies = [
"simplelog",
]
[[package]]
name = "scx_stats"
version = "1.0.3"
dependencies = [
"anyhow",
"crossbeam",
"libc",
"log",
"quote",
"serde",
"serde_json",
"syn",
]
[[package]]
name = "scx_utils"
version = "1.0.3"
@ -1402,6 +1457,8 @@ dependencies = [
"metrics-util",
"paste",
"regex",
"scx_stats",
"serde",
"sscanf",
"tar",
"vergen",

View File

@ -303,6 +303,38 @@ dependencies = [
"unicode-segmentation",
]
[[package]]
name = "crossbeam"
version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1137cd7e7fc0fb5d3c5a8678be38ec56e819125d8d7907411fe24ccb943faca8"
dependencies = [
"crossbeam-channel",
"crossbeam-deque",
"crossbeam-epoch",
"crossbeam-queue",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-channel"
version = "0.5.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "33480d6946193aa8033910124896ca395333cae7e2d1113d1fef6c3272217df2"
dependencies = [
"crossbeam-utils",
]
[[package]]
name = "crossbeam-deque"
version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "613f8cc01fe9cf1a3eb3d7f488fd2fa8388403e97039e2f73692932e291a770d"
dependencies = [
"crossbeam-epoch",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-epoch"
version = "0.9.18"
@ -312,6 +344,15 @@ dependencies = [
"crossbeam-utils",
]
[[package]]
name = "crossbeam-queue"
version = "0.3.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df0346b5d5e76ac2fe4e327c5fd1118d6be7c51dfb18f9b7922923f287471e35"
dependencies = [
"crossbeam-utils",
]
[[package]]
name = "crossbeam-utils"
version = "0.8.20"
@ -968,6 +1009,20 @@ dependencies = [
"static_assertions",
]
[[package]]
name = "scx_stats"
version = "1.0.3"
dependencies = [
"anyhow",
"crossbeam",
"libc",
"log",
"quote",
"serde",
"serde_json",
"syn",
]
[[package]]
name = "scx_utils"
version = "1.0.3"
@ -985,6 +1040,8 @@ dependencies = [
"metrics-util",
"paste",
"regex",
"scx_stats",
"serde",
"sscanf",
"tar",
"vergen",

View File

@ -416,6 +416,10 @@ struct Opts {
#[clap(long)]
run_example: bool,
/// Show descriptions for statistics.
#[clap(long)]
help_stats: bool,
/// Layer specification. See --help.
specs: Vec<String>,
}
@ -1616,7 +1620,7 @@ impl<'a, 'b> Scheduler<'a, 'b> {
// Attach.
let struct_ops = scx_ops_attach!(skel, layered)?;
let stats_server = stats::launch_server()?;
let stats_server = ScxStatsServer::new(stats::server_data()).launch()?;
let sched = Self {
struct_ops: Some(struct_ops),
@ -1933,6 +1937,11 @@ fn verify_layer_specs(specs: &[LayerSpec]) -> Result<()> {
fn main() -> Result<()> {
let opts = Opts::parse();
if opts.help_stats {
stats::server_data().describe_meta(&mut std::io::stdout(), None)?;
return Ok(());
}
let llv = match opts.verbose {
0 => simplelog::LevelFilter::Info,
1 => simplelog::LevelFilter::Debug,

View File

@ -10,7 +10,7 @@ use chrono::Local;
use log::warn;
use scx_stats::Meta;
use scx_stats::ScxStatsOps;
use scx_stats::ScxStatsServer;
use scx_stats::ScxStatsServerData;
use scx_stats::StatsCloser;
use scx_stats::StatsOpener;
use scx_stats::StatsReader;
@ -50,69 +50,69 @@ fn fmt_num(v: u64) -> String {
#[derive(Clone, Debug, Default, Serialize, Deserialize, Stats)]
#[stat(_om_prefix = "l_", _om_label = "layer_name")]
pub struct LayerStats {
#[stat(desc = "layer: CPU utilization (100% means one full CPU)")]
#[stat(desc = "CPU utilization (100% means one full CPU)")]
pub util: f64,
#[stat(desc = "layer: fraction of total CPU utilization")]
#[stat(desc = "fraction of total CPU utilization")]
pub util_frac: f64,
#[stat(desc = "layer: sum of weight * duty_cycle for tasks")]
#[stat(desc = "sum of weight * duty_cycle for tasks")]
pub load: f64,
#[stat(desc = "layer: fraction of total load")]
#[stat(desc = "fraction of total load")]
pub load_frac: f64,
#[stat(desc = "layer: # tasks")]
#[stat(desc = "# tasks")]
pub tasks: u32,
#[stat(desc = "layer: # sched events duringg the period")]
#[stat(desc = "# sched events duringg the period")]
pub total: u64,
#[stat(desc = "layer: % dispatched into idle CPU")]
#[stat(desc = "% dispatched into idle CPU")]
pub sel_local: f64,
#[stat(desc = "layer: % enqueued after wakeup")]
#[stat(desc = "% enqueued after wakeup")]
pub enq_wakeup: f64,
#[stat(desc = "layer: % enqueued after slice expiration")]
#[stat(desc = "% enqueued after slice expiration")]
pub enq_expire: f64,
#[stat(desc = "layer: % re-enqueued due to RT preemption")]
#[stat(desc = "% re-enqueued due to RT preemption")]
pub enq_reenq: f64,
#[stat(desc = "layer: # times exec duration < min_exec_us")]
#[stat(desc = "# times exec duration < min_exec_us")]
pub min_exec: f64,
#[stat(desc = "layer: total exec durations extended due to min_exec_us")]
#[stat(desc = "total exec durations extended due to min_exec_us")]
pub min_exec_us: u64,
#[stat(desc = "layer: % dispatche into idle CPUs occupied by other layers")]
#[stat(desc = "% dispatche into idle CPUs occupied by other layers")]
pub open_idle: f64,
#[stat(desc = "layer: % preempted other tasks")]
#[stat(desc = "% preempted other tasks")]
pub preempt: f64,
#[stat(desc = "layer: % first-preempted other tasks")]
#[stat(desc = "% first-preempted other tasks")]
pub preempt_first: f64,
#[stat(desc = "layer: % idle-preempted other tasks")]
#[stat(desc = "% idle-preempted other tasks")]
pub preempt_idle: f64,
#[stat(desc = "layer: % attempted to preempt other tasks but failed")]
#[stat(desc = "% attempted to preempt other tasks but failed")]
pub preempt_fail: f64,
#[stat(desc = "layer: % violated config due to CPU affinity")]
#[stat(desc = "% violated config due to CPU affinity")]
pub affn_viol: f64,
#[stat(desc = "layer: % continued executing after slice expiration")]
#[stat(desc = "% continued executing after slice expiration")]
pub keep: f64,
#[stat(desc = "layer: % disallowed to continue executing due to max_exec")]
#[stat(desc = "% disallowed to continue executing due to max_exec")]
pub keep_fail_max_exec: f64,
#[stat(desc = "layer: % disallowed to continue executing due to other tasks")]
#[stat(desc = "% disallowed to continue executing due to other tasks")]
pub keep_fail_busy: f64,
#[stat(desc = "layer: whether is exclusive", _om_skip)]
#[stat(desc = "whether is exclusive", _om_skip)]
pub is_excl: u32,
#[stat(desc = "layer: # times an excl task skipped a CPU as the sibling was also excl")]
#[stat(desc = "# times an excl task skipped a CPU as the sibling was also excl")]
pub excl_collision: f64,
#[stat(desc = "layer: % a sibling CPU was preempted for an exclusive task")]
#[stat(desc = "% a sibling CPU was preempted for an exclusive task")]
pub excl_preempt: f64,
#[stat(desc = "layer: % kicked a CPU from enqueue path")]
#[stat(desc = "% kicked a CPU from enqueue path")]
pub kick: f64,
#[stat(desc = "layer: % yielded")]
#[stat(desc = "% yielded")]
pub yielded: f64,
#[stat(desc = "layer: # times yield was ignored")]
#[stat(desc = "# times yield was ignored")]
pub yield_ignore: u64,
#[stat(desc = "layer: % migrated across CPUs")]
#[stat(desc = "% migrated across CPUs")]
pub migration: f64,
#[stat(desc = "layer: mask of allocated CPUs", _om_skip)]
#[stat(desc = "mask of allocated CPUs", _om_skip)]
pub cpus: Vec<u32>,
#[stat(desc = "layer: # of CPUs assigned")]
#[stat(desc = "# of CPUs assigned")]
pub cur_nr_cpus: u32,
#[stat(desc = "layer: minimum # of CPUs assigned")]
#[stat(desc = "minimum # of CPUs assigned")]
pub min_nr_cpus: u32,
#[stat(desc = "layer: maximum # of CPUs assigned")]
#[stat(desc = "maximum # of CPUs assigned")]
pub max_nr_cpus: u32,
}
@ -432,7 +432,7 @@ pub enum StatsRes {
Bye,
}
pub fn launch_server() -> Result<ScxStatsServer<StatsReq, StatsRes>> {
pub fn server_data() -> ScxStatsServerData<StatsReq, StatsRes> {
let open: Box<dyn StatsOpener<StatsReq, StatsRes>> = Box::new(move |(req_ch, res_ch)| {
let tid = current().id();
req_ch.send(StatsReq::Hello(tid))?;
@ -463,17 +463,16 @@ pub fn launch_server() -> Result<ScxStatsServer<StatsReq, StatsRes>> {
}
});
Ok(ScxStatsServer::new()
.add_stats_meta(LayerStats::meta())
.add_stats_meta(SysStats::meta())
.add_stats_ops(
ScxStatsServerData::new()
.add_meta(LayerStats::meta())
.add_meta(SysStats::meta())
.add_ops(
"top",
ScxStatsOps {
open,
close: Some(close),
},
)
.launch()?)
}
pub fn monitor(intv: Duration, shutdown: Arc<AtomicBool>) -> Result<()> {

View File

@ -294,6 +294,38 @@ dependencies = [
"unicode-segmentation",
]
[[package]]
name = "crossbeam"
version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1137cd7e7fc0fb5d3c5a8678be38ec56e819125d8d7907411fe24ccb943faca8"
dependencies = [
"crossbeam-channel",
"crossbeam-deque",
"crossbeam-epoch",
"crossbeam-queue",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-channel"
version = "0.5.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "33480d6946193aa8033910124896ca395333cae7e2d1113d1fef6c3272217df2"
dependencies = [
"crossbeam-utils",
]
[[package]]
name = "crossbeam-deque"
version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "613f8cc01fe9cf1a3eb3d7f488fd2fa8388403e97039e2f73692932e291a770d"
dependencies = [
"crossbeam-epoch",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-epoch"
version = "0.9.18"
@ -303,6 +335,15 @@ dependencies = [
"crossbeam-utils",
]
[[package]]
name = "crossbeam-queue"
version = "0.3.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df0346b5d5e76ac2fe4e327c5fd1118d6be7c51dfb18f9b7922923f287471e35"
dependencies = [
"crossbeam-utils",
]
[[package]]
name = "crossbeam-utils"
version = "0.8.20"
@ -892,6 +933,20 @@ dependencies = [
"walkdir",
]
[[package]]
name = "scx_stats"
version = "1.0.3"
dependencies = [
"anyhow",
"crossbeam",
"libc",
"log",
"quote",
"serde",
"serde_json",
"syn",
]
[[package]]
name = "scx_utils"
version = "1.0.3"
@ -909,6 +964,8 @@ dependencies = [
"metrics-util",
"paste",
"regex",
"scx_stats",
"serde",
"sscanf",
"tar",
"vergen",

View File

@ -303,6 +303,38 @@ dependencies = [
"unicode-segmentation",
]
[[package]]
name = "crossbeam"
version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1137cd7e7fc0fb5d3c5a8678be38ec56e819125d8d7907411fe24ccb943faca8"
dependencies = [
"crossbeam-channel",
"crossbeam-deque",
"crossbeam-epoch",
"crossbeam-queue",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-channel"
version = "0.5.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "33480d6946193aa8033910124896ca395333cae7e2d1113d1fef6c3272217df2"
dependencies = [
"crossbeam-utils",
]
[[package]]
name = "crossbeam-deque"
version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "613f8cc01fe9cf1a3eb3d7f488fd2fa8388403e97039e2f73692932e291a770d"
dependencies = [
"crossbeam-epoch",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-epoch"
version = "0.9.18"
@ -312,6 +344,15 @@ dependencies = [
"crossbeam-utils",
]
[[package]]
name = "crossbeam-queue"
version = "0.3.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df0346b5d5e76ac2fe4e327c5fd1118d6be7c51dfb18f9b7922923f287471e35"
dependencies = [
"crossbeam-utils",
]
[[package]]
name = "crossbeam-utils"
version = "0.8.20"
@ -968,6 +1009,20 @@ dependencies = [
"walkdir",
]
[[package]]
name = "scx_stats"
version = "1.0.3"
dependencies = [
"anyhow",
"crossbeam",
"libc",
"log",
"quote",
"serde",
"serde_json",
"syn",
]
[[package]]
name = "scx_utils"
version = "1.0.3"
@ -985,6 +1040,8 @@ dependencies = [
"metrics-util",
"paste",
"regex",
"scx_stats",
"serde",
"sscanf",
"tar",
"vergen",

View File

@ -219,6 +219,10 @@ struct Opts {
/// Print version and exit.
#[clap(long)]
version: bool,
/// Show descriptions for statistics.
#[clap(long)]
help_stats: bool,
}
fn read_cpu_busy_and_total(reader: &procfs::ProcReader) -> Result<(u64, u64)> {
@ -442,7 +446,7 @@ impl<'a> Scheduler<'a> {
// Attach.
let mut skel = scx_ops_load!(skel, rusty, uei)?;
let struct_ops = Some(scx_ops_attach!(skel, rusty)?);
let stats_server = stats::launch_server()?;
let stats_server = ScxStatsServer::new(stats::server_data()).launch()?;
info!("Rusty scheduler started! Run `scx_rusty --monitor` for metrics.");
@ -624,6 +628,11 @@ fn main() -> Result<()> {
return Ok(());
}
if opts.help_stats {
stats::server_data().describe_meta(&mut std::io::stdout(), None)?;
return Ok(());
}
let llv = match opts.verbose {
0 => simplelog::LevelFilter::Info,
1 => simplelog::LevelFilter::Debug,

View File

@ -4,7 +4,7 @@ use chrono::DateTime;
use chrono::Local;
use scx_stats::Meta;
use scx_stats::ScxStatsOps;
use scx_stats::ScxStatsServer;
use scx_stats::ScxStatsServerData;
use scx_stats::StatsOpener;
use scx_stats::StatsReader;
use scx_stats::ToJson;
@ -215,7 +215,7 @@ impl ClusterStats {
}
}
pub fn launch_server() -> Result<ScxStatsServer<StatsCtx, (StatsCtx, ClusterStats)>> {
pub fn server_data() -> ScxStatsServerData<StatsCtx, (StatsCtx, ClusterStats)> {
let open: Box<dyn StatsOpener<StatsCtx, (StatsCtx, ClusterStats)>> =
Box::new(move |(req_ch, res_ch)| {
// Send one bogus request on open to establish prev_sc.
@ -234,12 +234,11 @@ pub fn launch_server() -> Result<ScxStatsServer<StatsCtx, (StatsCtx, ClusterStat
Ok(read)
});
Ok(ScxStatsServer::new()
.add_stats_meta(DomainStats::meta())
.add_stats_meta(NodeStats::meta())
.add_stats_meta(ClusterStats::meta())
.add_stats_ops("top", ScxStatsOps { open, close: None })
.launch()?)
ScxStatsServerData::new()
.add_meta(DomainStats::meta())
.add_meta(NodeStats::meta())
.add_meta(ClusterStats::meta())
.add_ops("top", ScxStatsOps { open, close: None })
}
pub fn monitor(intv: Duration, shutdown: Arc<AtomicBool>) -> Result<()> {