Merge pull request #514 from sched-ext/htejun/scx_stats

scx_stats, scx_layered: Implement independent stats client sessions
This commit is contained in:
Tejun Heo 2024-08-19 11:24:53 -10:00 committed by GitHub
commit 3498a2b899
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 670 additions and 182 deletions

View File

@ -11,6 +11,7 @@ description = "Statistics transport library for sched_ext schedulers"
[dependencies]
anyhow = "1.0.65"
crossbeam = "0.8.4"
libc = "0.2.137"
log = "0.4.17"
quote = "1.0"

View File

@ -90,7 +90,7 @@ The statistics server which serves the above structs through a UNIX domain
socket can be launched as follows:
```rust
ScxStatsServer::new()
let _server = ScxStatsServer::new()
.set_path(&path)
.add_stats_meta(ClusterStats::meta())
.add_stats_meta(DomainStats::meta())
@ -111,9 +111,9 @@ return `serde_json::Value`. Note that `scx_stats::ToJson` automatically adds
`.to_json()` to structs which implement both `scx_stats::Meta` and
`serde::Serialize`.
The above will launch the statistics server listening on `@path`. The client
side is also simple. Taken from
[`examples/client.rs`](./examples/client.rs):
The above will launch the statistics server listening on `@path`. Note that
the server will shutdown when `_server` variable is dropped. The client side
is also simple. Taken from [`examples/client.rs`](./examples/client.rs):
```rust
let mut client = ScxStatsClient::new().set_path(path).connect().unwrap();

View File

@ -41,6 +41,8 @@ fn main() {
println!("{:#?}", &resp);
println!("\n===== Requesting \"stats_meta\" but receiving with serde_json::Value:");
let resp = client.request::<serde_json::Value>("stats_meta", vec![]).unwrap();
let resp = client
.request::<serde_json::Value>("stats_meta", vec![])
.unwrap();
println!("{}", serde_json::to_string_pretty(&resp).unwrap());
}

View File

@ -1,14 +1,22 @@
use scx_stats::{ScxStatsServer, Meta, ToJson};
use log::{debug, warn};
use scx_stats::{Meta, ScxStatsServer, ToJson};
use scx_stats_derive::Stats;
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
use std::env::args;
use std::io::Read;
use std::thread::{current, spawn, ThreadId};
// Hacky definition sharing. See stats_def.rs.h.
include!("stats_defs.rs.h");
fn main() {
simple_logger::SimpleLogger::new()
.with_level(log::LevelFilter::Info)
.env()
.init()
.unwrap();
let stats = ClusterStats {
name: "test cluster".into(),
at: 12345,
@ -36,14 +44,38 @@ fn main() {
std::assert_eq!(args().len(), 2, "Usage: server UNIX_SOCKET_PATH");
let path = args().nth(1).unwrap();
ScxStatsServer::new()
// If communication from the stats generating closure is not necessary,
// ScxStatsServer::<(), ()> can be used. This example sends thread ID
// and receives the formatted string just for demonstration.
let server = ScxStatsServer::<ThreadId, String>::new()
.set_path(&path)
.add_stats_meta(ClusterStats::meta())
.add_stats_meta(DomainStats::meta())
.add_stats("top", Box::new(move |_| stats.to_json()))
.add_stats(
"top",
Box::new(move |_args, (tx, rx)| {
let id = current().id();
let res = tx.send(id);
debug!("Sendt {:?} {:?}", id, &res);
let res = rx.recv();
debug!("Recevied {:?}", res);
stats.to_json()
}),
)
.launch()
.unwrap();
debug!("Doing unnecessary server channel handling");
let (tx, rx) = server.channels();
spawn(move || {
while let Ok(id) = rx.recv() {
if let Err(e) = tx.send(format!("hello {:?}", &id)) {
warn!("Server channel errored ({:?})", &e);
break;
}
}
});
println!(
"Server listening. Run `client {:?}`.\n\
Use `socat - UNIX-CONNECT:{:?}` for raw connection.\n\

View File

@ -7,7 +7,10 @@ pub use stats::{
};
mod server;
pub use server::{ScxStatsErrno, ScxStatsRequest, ScxStatsResponse, ScxStatsServer, ToJson};
pub use server::{
ScxStatsErrno, ScxStatsOps, ScxStatsRequest, ScxStatsResponse, ScxStatsServer, StatsCloser,
StatsOpener, StatsReader, StatsReaderSend, StatsReaderSync, ToJson,
};
mod client;
pub use client::ScxStatsClient;

View File

@ -1,18 +1,104 @@
use crate::ScxStatsClient;
use crate::{Meta, ScxStatsMeta};
use anyhow::{anyhow, Context, Result};
use log::warn;
use crossbeam::channel::{unbounded, Receiver, RecvError, Select, SendError, Sender};
use log::{debug, error, warn};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::BTreeMap;
use std::io::{BufRead, BufReader, Write};
use std::os::unix::net::{UnixListener, UnixStream};
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::thread::spawn;
type StatMap = BTreeMap<
pub trait StatsReader<Req, Res>:
FnMut(&BTreeMap<String, String>, (&Sender<Req>, &Receiver<Res>)) -> Result<Value>
{
}
impl<
Req,
Res,
T: FnMut(&BTreeMap<String, String>, (&Sender<Req>, &Receiver<Res>)) -> Result<Value>,
> StatsReader<Req, Res> for T
{
}
pub trait StatsReaderSend<Req, Res>:
FnMut(&BTreeMap<String, String>, (&Sender<Req>, &Receiver<Res>)) -> Result<Value> + Send
{
}
impl<
Req,
Res,
T: FnMut(&BTreeMap<String, String>, (&Sender<Req>, &Receiver<Res>)) -> Result<Value> + Send,
> StatsReaderSend<Req, Res> for T
{
}
pub trait StatsReaderSync<Req, Res>:
Fn(&BTreeMap<String, String>, (&Sender<Req>, &Receiver<Res>)) -> Result<Value> + Send + Sync
{
}
impl<
Req,
Res,
T: Fn(&BTreeMap<String, String>, (&Sender<Req>, &Receiver<Res>)) -> Result<Value>
+ Send
+ Sync,
> StatsReaderSync<Req, Res> for T
{
}
pub trait StatsOpener<Req, Res>:
FnMut((&Sender<Req>, &Receiver<Res>)) -> Result<Box<dyn StatsReader<Req, Res>>> + Send
{
}
impl<
Req,
Res,
T: FnMut((&Sender<Req>, &Receiver<Res>)) -> Result<Box<dyn StatsReader<Req, Res>>> + Send,
> StatsOpener<Req, Res> for T
{
}
pub trait StatsCloser<Req, Res>: FnOnce((&Sender<Req>, &Receiver<Res>)) + Send {}
impl<Req, Res, T: FnOnce((&Sender<Req>, &Receiver<Res>)) + Send> StatsCloser<Req, Res> for T {}
pub struct ScxStatsOps<Req, Res> {
pub open: Box<dyn StatsOpener<Req, Res>>,
pub close: Option<Box<dyn StatsCloser<Req, Res>>>,
}
struct ScxStatsOpenOps<Req, Res> {
map: BTreeMap<
String,
Arc<Mutex<Box<dyn FnMut(&BTreeMap<String, String>) -> Result<serde_json::Value> + Send>>>,
>;
(
Arc<Mutex<ScxStatsOps<Req, Res>>>,
Box<dyn StatsReader<Req, Res>>,
ChannelPair<Req, Res>,
),
>,
}
impl<Req, Res> ScxStatsOpenOps<Req, Res> {
fn new() -> Self {
Self {
map: BTreeMap::new(),
}
}
}
impl<Req, Res> std::ops::Drop for ScxStatsOpenOps<Req, Res> {
fn drop(&mut self) {
for (_, (ops, _, ch)) in self.map.iter_mut() {
if let Some(close) = ops.lock().unwrap().close.take() {
close((&ch.req, &ch.res));
}
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ScxStatsRequest {
@ -33,7 +119,7 @@ impl ScxStatsRequest {
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ScxStatsResponse {
pub errno: i32,
pub args: BTreeMap<String, serde_json::Value>,
pub args: BTreeMap<String, Value>,
}
pub struct ScxStatsErrno(pub i32);
@ -50,25 +136,72 @@ impl std::fmt::Debug for ScxStatsErrno {
}
}
struct ScxStatsServerData {
struct ChannelPair<Req, Res> {
req: Sender<Req>,
res: Receiver<Res>,
}
impl<Req, Res> ChannelPair<Req, Res> {
fn bidi() -> (ChannelPair<Req, Res>, ChannelPair<Res, Req>) {
let (req, res) = (unbounded::<Req>(), unbounded::<Res>());
(
ChannelPair {
req: req.0,
res: res.1,
},
ChannelPair {
req: res.0,
res: req.1,
},
)
}
}
impl<Req, Res> Clone for ChannelPair<Req, Res> {
fn clone(&self) -> Self {
Self {
req: self.req.clone(),
res: self.res.clone(),
}
}
}
struct ScxStatsServerData<Req, Res> {
stats_meta: BTreeMap<String, ScxStatsMeta>,
stats: StatMap,
stats_ops: BTreeMap<String, Arc<Mutex<ScxStatsOps<Req, Res>>>>,
}
struct ScxStatsServerInner {
struct ScxStatsServerInner<Req, Res>
where
Req: Send + 'static,
Res: Send + 'static,
{
listener: UnixListener,
data: Arc<Mutex<ScxStatsServerData>>,
data: Arc<Mutex<ScxStatsServerData<Req, Res>>>,
inner_ch: ChannelPair<Req, Res>,
exit: Arc<AtomicBool>,
}
impl ScxStatsServerInner {
impl<Req, Res> ScxStatsServerInner<Req, Res>
where
Req: Send + 'static,
Res: Send + 'static,
{
fn new(
listener: UnixListener,
stats_meta: BTreeMap<String, ScxStatsMeta>,
stats: StatMap,
stats_ops: BTreeMap<String, Arc<Mutex<ScxStatsOps<Req, Res>>>>,
inner_ch: ChannelPair<Req, Res>,
exit: Arc<AtomicBool>,
) -> Self {
Self {
listener,
data: Arc::new(Mutex::new(ScxStatsServerData { stats_meta, stats })),
data: Arc::new(Mutex::new(ScxStatsServerData {
stats_meta,
stats_ops,
})),
inner_ch,
exit,
}
}
@ -86,7 +219,9 @@ impl ScxStatsServerInner {
fn handle_request(
line: String,
data: &Arc<Mutex<ScxStatsServerData>>,
data: &Arc<Mutex<ScxStatsServerData<Req, Res>>>,
ch: &ChannelPair<Req, Res>,
open_ops: &mut ScxStatsOpenOps<Req, Res>,
) -> Result<ScxStatsResponse> {
let req: ScxStatsRequest = serde_json::from_str(&line)?;
@ -97,13 +232,22 @@ impl ScxStatsServerInner {
None => "top",
};
let handler = match data.lock().unwrap().stats.get(target) {
let ops = match data.lock().unwrap().stats_ops.get(target) {
Some(v) => v.clone(),
None => Err(anyhow!("unknown stat target {:?}", req)
.context(ScxStatsErrno(libc::EINVAL)))?,
};
let resp = handler.lock().unwrap()(&req.args)?;
if !open_ops.map.contains_key(target) {
let read = (ops.lock().unwrap().open)((&ch.req, &ch.res))?;
open_ops
.map
.insert(target.into(), (ops.clone(), read, ch.clone()));
}
let read = &mut open_ops.map.get_mut(target).unwrap().1;
let resp = read(&req.args, (&ch.req, &ch.res))?;
Self::build_resp(0, &resp)
}
@ -112,17 +256,27 @@ impl ScxStatsServerInner {
}
}
fn serve(mut stream: UnixStream, data: Arc<Mutex<ScxStatsServerData>>) -> Result<()> {
let mut reader = BufReader::new(stream.try_clone()?);
fn serve(
mut stream: UnixStream,
data: Arc<Mutex<ScxStatsServerData<Req, Res>>>,
inner_ch: ChannelPair<Req, Res>,
exit: Arc<AtomicBool>,
) -> Result<()> {
let mut stream_reader = BufReader::new(stream.try_clone()?);
let mut open_ops = ScxStatsOpenOps::new();
loop {
let mut line = String::new();
reader.read_line(&mut line)?;
stream_reader.read_line(&mut line)?;
if line.len() == 0 {
return Ok(());
}
if exit.load(Ordering::Relaxed) {
debug!("server exiting due to exit");
return Ok(());
}
let resp = match Self::handle_request(line, &data) {
let resp = match Self::handle_request(line, &data, &inner_ch, &mut open_ops) {
Ok(v) => v,
Err(e) => {
let errno = match e.downcast_ref::<ScxStatsErrno>() {
@ -138,14 +292,115 @@ impl ScxStatsServerInner {
}
}
fn proxy(inner_ch: ChannelPair<Req, Res>, add_res: Receiver<ChannelPair<Res, Req>>) {
let mut chs_cursor = 0;
let mut chs = BTreeMap::<u64, ChannelPair<Res, Req>>::new();
let mut ch_to_add: Option<ChannelPair<Res, Req>> = None;
let mut idx_to_drop: Option<u64> = None;
'outer: loop {
if let Some(new_ch) = ch_to_add.take() {
let idx = chs_cursor;
chs_cursor += 1;
chs.insert(idx, new_ch);
debug!("proxy: added new channel idx={}, total={}", idx, chs.len());
}
if let Some(idx) = idx_to_drop.take() {
debug!("proxy: dropping channel {}, total={}", idx, chs.len());
chs.remove(&idx).unwrap();
}
let mut sel = Select::new();
let inner_idx = sel.recv(&inner_ch.res);
let add_idx = sel.recv(&add_res);
let mut chs_sel_idx = BTreeMap::<usize, u64>::new();
for (idx, cp) in chs.iter() {
let sel_idx = sel.recv(&cp.res);
chs_sel_idx.insert(sel_idx, *idx);
}
'select: loop {
let oper = sel.select();
match oper.index() {
sel_idx if sel_idx == add_idx => match oper.recv(&add_res) {
Ok(ch) => {
ch_to_add = Some(ch);
debug!("proxy: received new channel from add_res");
break 'select;
}
Err(RecvError) => {
debug!("proxy: add_res disconnected, terminating");
break 'outer;
}
},
sel_idx if sel_idx == inner_idx => match oper.recv(&inner_ch.res) {
Ok(_) => {
error!("proxy: unexpected data in ScxStatsServer.channels().0");
panic!();
}
Err(RecvError) => break 'outer,
},
sel_idx => {
let idx = chs_sel_idx.get(&sel_idx).unwrap();
let pair = chs.get(idx).unwrap();
let req = match oper.recv(&pair.res) {
Ok(v) => v,
Err(RecvError) => {
idx_to_drop = Some(*idx);
break 'select;
}
};
match inner_ch.req.send(req) {
Ok(()) => {}
Err(SendError(..)) => break 'outer,
}
let resp = match inner_ch.res.recv() {
Ok(v) => v,
Err(RecvError) => break 'outer,
};
match pair.req.send(resp) {
Ok(()) => {}
Err(SendError(..)) => {
idx_to_drop = Some(*idx);
break 'select;
}
}
}
}
}
}
}
fn listen(self) {
loop {
let inner_ch_copy = self.inner_ch.clone();
let (add_req, add_res) = unbounded::<ChannelPair<Res, Req>>();
spawn(move || Self::proxy(inner_ch_copy, add_res));
for stream in self.listener.incoming() {
if self.exit.load(Ordering::Relaxed) {
debug!("listener exiting");
break;
}
match stream {
Ok(stream) => {
let data = self.data.clone();
let exit = self.exit.clone();
let (req_pair, res_pair) = ChannelPair::<Req, Res>::bidi();
match add_req.send(res_pair) {
Ok(()) => debug!("sent new channel to proxy"),
Err(e) => warn!("ScxStatsServer::proxy() failed ({})", &e),
}
spawn(move || {
if let Err(e) = Self::serve(stream, data) {
if let Err(e) = Self::serve(stream, data, req_pair, exit) {
warn!("stat communication errored ({})", &e);
}
});
@ -155,20 +410,33 @@ impl ScxStatsServerInner {
}
}
}
}
pub struct ScxStatsServer {
pub struct ScxStatsServer<Req, Res>
where
Req: Send + 'static,
Res: Send + 'static,
{
base_path: PathBuf,
sched_path: PathBuf,
stats_path: PathBuf,
path: Option<PathBuf>,
stats_meta_holder: BTreeMap<String, ScxStatsMeta>,
stats_holder: StatMap,
stats_ops_holder: BTreeMap<String, Arc<Mutex<ScxStatsOps<Req, Res>>>>,
outer_ch: ChannelPair<Res, Req>,
inner_ch: Option<ChannelPair<Req, Res>>,
exit: Arc<AtomicBool>,
}
impl ScxStatsServer {
impl<Req, Res> ScxStatsServer<Req, Res>
where
Req: Send + 'static,
Res: Send + 'static,
{
pub fn new() -> Self {
let (ich, och) = ChannelPair::<Req, Res>::bidi();
Self {
base_path: PathBuf::from("/var/run/scx"),
sched_path: PathBuf::from("root"),
@ -176,7 +444,11 @@ impl ScxStatsServer {
path: None,
stats_meta_holder: BTreeMap::new(),
stats_holder: BTreeMap::new(),
stats_ops_holder: BTreeMap::new(),
outer_ch: och,
inner_ch: Some(ich),
exit: Arc::new(AtomicBool::new(false)),
}
}
@ -185,16 +457,28 @@ impl ScxStatsServer {
self
}
pub fn add_stats(
mut self,
name: &str,
fetch: Box<dyn FnMut(&BTreeMap<String, String>) -> Result<serde_json::Value> + Send>,
) -> Self {
self.stats_holder
.insert(name.to_string(), Arc::new(Mutex::new(Box::new(fetch))));
pub fn add_stats_ops(mut self, name: &str, ops: ScxStatsOps<Req, Res>) -> Self {
self.stats_ops_holder
.insert(name.to_string(), Arc::new(Mutex::new(ops)));
self
}
pub fn add_stats(self, name: &str, fetch: Box<dyn StatsReaderSend<Req, Res>>) -> Self {
let wrapped_fetch = Mutex::new(fetch);
let read: Box<dyn StatsReaderSync<Req, Res>> =
Box::new(move |args, chan| wrapped_fetch.lock().unwrap()(args, chan));
let wrapped_read = Arc::new(read);
let ops = ScxStatsOps {
open: Box::new(move |_| {
let copy = wrapped_read.clone();
Ok(Box::new(move |args, chan| copy(args, chan)))
}),
close: None,
};
self.add_stats_ops(name, ops)
}
pub fn set_base_path<P: AsRef<Path>>(mut self, path: P) -> Self {
self.base_path = PathBuf::from(path.as_ref());
self
@ -238,24 +522,47 @@ impl ScxStatsServer {
let mut stats_meta = BTreeMap::new();
let mut stats = BTreeMap::new();
std::mem::swap(&mut stats_meta, &mut self.stats_meta_holder);
std::mem::swap(&mut stats, &mut self.stats_holder);
std::mem::swap(&mut stats, &mut self.stats_ops_holder);
let inner = ScxStatsServerInner::new(listener, stats_meta, stats);
let inner = ScxStatsServerInner::new(
listener,
stats_meta,
stats,
self.inner_ch.take().unwrap(),
self.exit.clone(),
);
spawn(move || inner.listen());
Ok(self)
}
pub fn channels(&self) -> (Sender<Res>, Receiver<Req>) {
(self.outer_ch.req.clone(), self.outer_ch.res.clone())
}
}
impl<Req, Res> std::ops::Drop for ScxStatsServer<Req, Res>
where
Req: Send + 'static,
Res: Send + 'static,
{
fn drop(&mut self) {
self.exit.store(true, Ordering::Relaxed);
if let Some(path) = self.path.as_ref() {
let _ = ScxStatsClient::new().set_path(path).connect();
}
}
}
pub trait ToJson {
fn to_json(&self) -> Result<serde_json::Value>;
fn to_json(&self) -> Result<Value>;
}
impl<T> ToJson for T
where
T: Meta + Serialize,
{
fn to_json(&self) -> Result<serde_json::Value> {
fn to_json(&self) -> Result<Value> {
Ok(serde_json::to_value(self)?)
}
}

View File

@ -338,6 +338,38 @@ version = "0.8.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b"
[[package]]
name = "crossbeam"
version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1137cd7e7fc0fb5d3c5a8678be38ec56e819125d8d7907411fe24ccb943faca8"
dependencies = [
"crossbeam-channel",
"crossbeam-deque",
"crossbeam-epoch",
"crossbeam-queue",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-channel"
version = "0.5.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "33480d6946193aa8033910124896ca395333cae7e2d1113d1fef6c3272217df2"
dependencies = [
"crossbeam-utils",
]
[[package]]
name = "crossbeam-deque"
version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "613f8cc01fe9cf1a3eb3d7f488fd2fa8388403e97039e2f73692932e291a770d"
dependencies = [
"crossbeam-epoch",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-epoch"
version = "0.9.18"
@ -347,6 +379,15 @@ dependencies = [
"crossbeam-utils",
]
[[package]]
name = "crossbeam-queue"
version = "0.3.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df0346b5d5e76ac2fe4e327c5fd1118d6be7c51dfb18f9b7922923f287471e35"
dependencies = [
"crossbeam-utils",
]
[[package]]
name = "crossbeam-utils"
version = "0.8.20"
@ -988,6 +1029,7 @@ dependencies = [
"bitvec",
"chrono",
"clap",
"crossbeam",
"ctrlc",
"fb_procfs",
"lazy_static",
@ -1007,6 +1049,7 @@ name = "scx_stats"
version = "0.2.0"
dependencies = [
"anyhow",
"crossbeam",
"libc",
"log",
"quote",

View File

@ -11,6 +11,7 @@ anyhow = "1.0.65"
bitvec = "1.0"
chrono = "0.4"
clap = { version = "4.1", features = ["derive", "env", "unicode", "wrap_help"] }
crossbeam = "0.8.4"
ctrlc = { version = "3.1", features = ["termination"] }
fb_procfs = "0.7"
lazy_static = "1.4"

View File

@ -7,9 +7,12 @@ mod stats;
pub use bpf_skel::*;
pub mod bpf_intf;
use stats::LayerStats;
use stats::StatsReq;
use stats::StatsRes;
use stats::SysStats;
use std::collections::BTreeMap;
use std::collections::BTreeSet;
use std::collections::HashMap;
use std::ffi::CString;
use std::fs;
use std::io::Read;
@ -19,7 +22,8 @@ use std::ops::Sub;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::sync::Mutex;
use std::thread::spawn;
use std::thread::ThreadId;
use std::time::Duration;
use std::time::Instant;
@ -30,6 +34,7 @@ use anyhow::Context;
use anyhow::Result;
use bitvec::prelude::*;
use clap::Parser;
use crossbeam::channel::RecvTimeoutError;
use libbpf_rs::skel::OpenSkel;
use libbpf_rs::skel::Skel;
use libbpf_rs::skel::SkelBuilder;
@ -39,6 +44,7 @@ use log::debug;
use log::info;
use log::trace;
use log::warn;
use scx_stats::ScxStatsServer;
use scx_utils::compat;
use scx_utils::init_libbpf_logging;
use scx_utils::ravg::ravg_read;
@ -234,10 +240,12 @@ lazy_static::lazy_static! {
/// Monitoring Statistics
/// =====================
///
/// scx_stat server will listen on /var/run/scx/root/stat. Monitor
/// statistics by runing `scx_layered monitor`.
/// Run with `--monitor INTERVAL` added to enable stats monitoring. There is
/// also scx_stat server listening on /var/run/scx/root/stat and you can
/// monitor statistics by running `scx_layered --monitor INTERVAL`
/// separately.
///
/// $ scx_layered --monitor
/// $ scx_layered --monitor 1
/// tot= 117909 local=86.20 open_idle= 0.21 affn_viol= 1.37 proc=6ms
/// busy= 34.2 util= 1733.6 load= 21744.1 fallback_cpu= 1
/// batch : util/frac= 11.8/ 0.7 load/frac= 29.7: 0.1 tasks= 2597
@ -307,10 +315,6 @@ struct Opts {
#[clap(short = 'i', long, default_value = "0.1")]
interval: f64,
/// Statistics interval in seconds.
#[clap(short = 'I', long, default_value = "2.0")]
stats_interval: f64,
/// Disable load-fraction based max layer CPU limit. ***NOTE***
/// load-fraction calculation is currently broken due to lack of
/// infeasible weight adjustments. Setting this option is recommended.
@ -335,9 +339,10 @@ struct Opts {
#[clap(short = 'e', long)]
example: Option<String>,
/// Run in monitor mode.
/// Enable stats monitoring with the specified interval. If no layer
/// specs are specified, run in monitor mode.
#[clap(long)]
monitor: bool,
monitor: Option<f64>,
/// Layer specification. See --help.
specs: Vec<String>,
@ -609,6 +614,7 @@ impl<'a, 'b> Sub<&'b BpfStats> for &'a BpfStats {
}
}
#[derive(Clone, Debug)]
struct Stats {
nr_layers: usize,
at: Instant,
@ -627,6 +633,9 @@ struct Stats {
bpf_stats: BpfStats,
prev_bpf_stats: BpfStats,
processing_dur: Duration,
prev_processing_dur: Duration,
}
impl Stats {
@ -668,7 +677,8 @@ impl Stats {
fn new(skel: &mut BpfSkel, proc_reader: &procfs::ProcReader) -> Result<Self> {
let nr_layers = skel.maps.rodata_data.nr_layers as usize;
let bpf_stats = BpfStats::read(&read_cpu_ctxs(skel)?, nr_layers);
let cpu_ctxs = read_cpu_ctxs(skel)?;
let bpf_stats = BpfStats::read(&cpu_ctxs, nr_layers);
Ok(Self {
at: Instant::now(),
@ -681,13 +691,16 @@ impl Stats {
total_util: 0.0,
layer_utils: vec![0.0; nr_layers],
prev_layer_cycles: vec![0; nr_layers],
prev_layer_cycles: Self::read_layer_cycles(&cpu_ctxs, nr_layers),
cpu_busy: 0.0,
prev_total_cpu: read_total_cpu(&proc_reader)?,
bpf_stats: bpf_stats.clone(),
prev_bpf_stats: bpf_stats,
processing_dur: Default::default(),
prev_processing_dur: Default::default(),
})
}
@ -696,6 +709,7 @@ impl Stats {
skel: &mut BpfSkel,
proc_reader: &procfs::ProcReader,
now: Instant,
cur_processing_dur: Duration,
) -> Result<()> {
let elapsed = now.duration_since(self.at).as_secs_f64() as f64;
let cpu_ctxs = read_cpu_ctxs(skel)?;
@ -732,6 +746,10 @@ impl Stats {
let cur_bpf_stats = BpfStats::read(&cpu_ctxs, self.nr_layers);
let bpf_stats = &cur_bpf_stats - &self.prev_bpf_stats;
let processing_dur = cur_processing_dur
.checked_sub(self.prev_processing_dur)
.unwrap();
*self = Self {
at: now,
nr_layers: self.nr_layers,
@ -750,6 +768,9 @@ impl Stats {
bpf_stats,
prev_bpf_stats: cur_bpf_stats,
processing_dur,
prev_processing_dur: cur_processing_dur,
};
Ok(())
}
@ -1225,7 +1246,6 @@ struct Scheduler<'a, 'b> {
layer_specs: &'b Vec<LayerSpec>,
sched_intv: Duration,
stats_intv: Duration,
no_load_frac_limit: bool,
cpu_pool: CpuPool,
@ -1233,13 +1253,11 @@ struct Scheduler<'a, 'b> {
proc_reader: procfs::ProcReader,
sched_stats: Stats,
report_stats: Stats,
nr_layer_cpus_min_max: Vec<(usize, usize)>,
nr_layer_cpus_ranges: Vec<(usize, usize)>,
processing_dur: Duration,
prev_processing_dur: Duration,
sys_stats: Arc<Mutex<SysStats>>,
stats_server: ScxStatsServer<StatsReq, StatsRes>,
}
impl<'a, 'b> Scheduler<'a, 'b> {
@ -1448,33 +1466,6 @@ impl<'a, 'b> Scheduler<'a, 'b> {
// Other stuff.
let proc_reader = procfs::ProcReader::new();
let sys_stats = Arc::new(Mutex::new(SysStats::default()));
let mut sched = Self {
struct_ops: None,
layer_specs,
sched_intv: Duration::from_secs_f64(opts.interval),
stats_intv: Duration::from_secs_f64(opts.stats_interval),
no_load_frac_limit: opts.no_load_frac_limit,
cpu_pool,
layers,
sched_stats: Stats::new(&mut skel, &proc_reader)?,
report_stats: Stats::new(&mut skel, &proc_reader)?,
nr_layer_cpus_min_max: vec![(0, 0); nr_layers],
processing_dur: Duration::from_millis(0),
prev_processing_dur: Duration::from_millis(0),
proc_reader,
skel,
sys_stats: sys_stats.clone(),
};
stats::launch_server(sys_stats)?;
// XXX If we try to refresh the cpumasks here before attaching, we
// sometimes (non-deterministically) don't see the updated values in
@ -1483,7 +1474,30 @@ impl<'a, 'b> Scheduler<'a, 'b> {
// huge problem in the interim until we figure it out.
// Attach.
sched.struct_ops = Some(scx_ops_attach!(sched.skel, layered)?);
let struct_ops = scx_ops_attach!(skel, layered)?;
let stats_server = stats::launch_server()?;
let sched = Self {
struct_ops: Some(struct_ops),
layer_specs,
sched_intv: Duration::from_secs_f64(opts.interval),
no_load_frac_limit: opts.no_load_frac_limit,
cpu_pool,
layers,
sched_stats: Stats::new(&mut skel, &proc_reader)?,
nr_layer_cpus_ranges: vec![(0, 0); nr_layers],
processing_dur: Default::default(),
proc_reader,
skel,
stats_server,
};
info!("Layered Scheduler Attached. Run `scx_layered --monitor` for metrics.");
Ok(sched)
@ -1565,9 +1579,9 @@ impl<'a, 'b> Scheduler<'a, 'b> {
self.skel.maps.bss_data.fallback_cpu = self.cpu_pool.fallback_cpu as u32;
for (lidx, layer) in self.layers.iter().enumerate() {
self.nr_layer_cpus_min_max[lidx] = (
self.nr_layer_cpus_min_max[lidx].0.min(layer.nr_cpus),
self.nr_layer_cpus_min_max[lidx].1.max(layer.nr_cpus),
self.nr_layer_cpus_ranges[lidx] = (
self.nr_layer_cpus_ranges[lidx].0.min(layer.nr_cpus),
self.nr_layer_cpus_ranges[lidx].1.max(layer.nr_cpus),
);
}
}
@ -1577,48 +1591,39 @@ impl<'a, 'b> Scheduler<'a, 'b> {
fn step(&mut self) -> Result<()> {
let started_at = Instant::now();
self.sched_stats
.refresh(&mut self.skel, &self.proc_reader, started_at)?;
self.sched_stats.refresh(
&mut self.skel,
&self.proc_reader,
started_at,
self.processing_dur,
)?;
self.refresh_cpumasks()?;
self.processing_dur += Instant::now().duration_since(started_at);
Ok(())
}
fn refresh_sys_stats(&mut self) -> Result<()> {
let started_at = Instant::now();
self.report_stats
.refresh(&mut self.skel, &self.proc_reader, started_at)?;
let stats = &self.report_stats;
fn generate_sys_stats(
&mut self,
stats: &Stats,
cpus_ranges: &mut Vec<(usize, usize)>,
) -> Result<SysStats> {
let bstats = &stats.bpf_stats;
let processing_dur = self.processing_dur - self.prev_processing_dur;
self.prev_processing_dur = self.processing_dur;
let mut sys_stats = SysStats::new(
stats,
bstats,
&self.stats_intv,
&processing_dur,
self.cpu_pool.fallback_cpu,
)?;
let mut sys_stats = SysStats::new(stats, bstats, self.cpu_pool.fallback_cpu)?;
for (lidx, (spec, layer)) in self.layer_specs.iter().zip(self.layers.iter()).enumerate() {
let layer_stats =
LayerStats::new(lidx, layer, stats, bstats, self.nr_layer_cpus_min_max[lidx]);
let layer_stats = LayerStats::new(lidx, layer, stats, bstats, cpus_ranges[lidx]);
sys_stats.layers.insert(spec.name.to_string(), layer_stats);
self.nr_layer_cpus_min_max[lidx] = (layer.nr_cpus, layer.nr_cpus);
cpus_ranges[lidx] = (layer.nr_cpus, layer.nr_cpus);
}
*self.sys_stats.lock().unwrap() = sys_stats;
Ok(())
Ok(sys_stats)
}
fn run(&mut self, shutdown: Arc<AtomicBool>) -> Result<UserExitInfo> {
let now = Instant::now();
let mut next_sched_at = now + self.sched_intv;
let mut next_stats_at = now + self.stats_intv;
let (res_ch, req_ch) = self.stats_server.channels();
let mut next_sched_at = Instant::now() + self.sched_intv;
let mut cpus_ranges = HashMap::<ThreadId, Vec<(usize, usize)>>::new();
while !shutdown.load(Ordering::Relaxed) && !uei_exited!(&self.skel, uei) {
let now = Instant::now();
@ -1630,18 +1635,40 @@ impl<'a, 'b> Scheduler<'a, 'b> {
}
}
if now >= next_stats_at {
self.refresh_sys_stats()?;
while next_stats_at < now {
next_stats_at += self.stats_intv;
match req_ch.recv_deadline(next_sched_at) {
Ok(StatsReq::Hello(tid)) => {
cpus_ranges.insert(
tid,
self.layers.iter().map(|l| (l.nr_cpus, l.nr_cpus)).collect(),
);
let stats = Stats::new(&mut self.skel, &self.proc_reader)?;
res_ch.send(StatsRes::Hello(stats))?;
}
Ok(StatsReq::Refresh(tid, mut stats)) => {
// Propagate self's layer cpu ranges into each stat's.
for i in 0..self.nr_layer_cpus_ranges.len() {
for (_, ranges) in cpus_ranges.iter_mut() {
ranges[i] = (
ranges[i].0.min(self.nr_layer_cpus_ranges[i].0),
ranges[i].1.max(self.nr_layer_cpus_ranges[i].1),
);
}
self.nr_layer_cpus_ranges[i] =
(self.layers[i].nr_cpus, self.layers[i].nr_cpus);
}
std::thread::sleep(
next_sched_at
.min(next_stats_at)
.duration_since(Instant::now()),
);
stats.refresh(&mut self.skel, &self.proc_reader, now, self.processing_dur)?;
let sys_stats =
self.generate_sys_stats(&stats, cpus_ranges.get_mut(&tid).unwrap())?;
res_ch.send(StatsRes::Refreshed((stats, sys_stats)))?;
}
Ok(StatsReq::Bye(tid)) => {
cpus_ranges.remove(&tid);
res_ch.send(StatsRes::Bye)?;
}
Err(RecvTimeoutError::Timeout) => {}
Err(e) => Err(e)?,
}
}
self.struct_ops.take();
@ -1850,10 +1877,6 @@ fn main() -> Result<()> {
})
.context("Error setting Ctrl-C handler")?;
if opts.monitor {
return stats::monitor(shutdown.clone());
}
if let Some(path) = &opts.example {
write_example_file(path)?;
return Ok(());
@ -1867,6 +1890,16 @@ fn main() -> Result<()> {
);
}
if let Some(intv) = opts.monitor {
let shutdown_copy = shutdown.clone();
let jh =
spawn(move || stats::monitor(Duration::from_secs_f64(intv), shutdown_copy).unwrap());
if layer_config.specs.len() == 0 {
let _ = jh.join();
return Ok(());
}
}
debug!("specs={}", serde_json::to_string_pretty(&layer_config)?);
verify_layer_specs(&layer_config.specs)?;

View File

@ -3,14 +3,18 @@ use crate::BpfStats;
use crate::Layer;
use crate::LayerKind;
use crate::Stats;
use anyhow::Result;
use anyhow::{bail, Result};
use bitvec::prelude::*;
use chrono::DateTime;
use chrono::Local;
use log::warn;
use log::{info, warn};
use scx_stats::Meta;
use scx_stats::ScxStatsClient;
use scx_stats::ScxStatsOps;
use scx_stats::ScxStatsServer;
use scx_stats::StatsCloser;
use scx_stats::StatsOpener;
use scx_stats::StatsReader;
use scx_stats::ToJson;
use scx_stats_derive::Stats;
use serde::Deserialize;
@ -20,7 +24,9 @@ use std::io::Write;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::sync::Mutex;
use std::thread::current;
use std::thread::sleep;
use std::thread::ThreadId;
use std::time::Duration;
use std::time::SystemTime;
use std::time::UNIX_EPOCH;
@ -306,8 +312,6 @@ impl LayerStats {
#[derive(Clone, Debug, Default, Serialize, Deserialize, Stats)]
#[stat(top)]
pub struct SysStats {
#[stat(desc = "update interval", _om_skip)]
pub intv: f64,
#[stat(desc = "timestamp", _om_skip)]
pub at: f64,
#[stat(desc = "# sched events duringg the period")]
@ -341,13 +345,7 @@ pub struct SysStats {
}
impl SysStats {
pub fn new(
stats: &Stats,
bstats: &BpfStats,
intv: &Duration,
proc_dur: &Duration,
fallback_cpu: usize,
) -> Result<Self> {
pub fn new(stats: &Stats, bstats: &BpfStats, fallback_cpu: usize) -> Result<Self> {
let lsum = |idx| stats.bpf_stats.lstats_sums[idx as usize];
let total = lsum(bpf_intf::layer_stat_idx_LSTAT_SEL_LOCAL)
+ lsum(bpf_intf::layer_stat_idx_LSTAT_ENQ_WAKEUP)
@ -363,7 +361,6 @@ impl SysStats {
};
Ok(Self {
intv: intv.as_secs_f64(),
at: SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs_f64(),
total,
local: lsum_pct(bpf_intf::layer_stat_idx_LSTAT_SEL_LOCAL),
@ -375,7 +372,7 @@ impl SysStats {
/ total as f64,
excl_wakeup: bstats.gstats[bpf_intf::global_stat_idx_GSTAT_EXCL_WAKEUP as usize] as f64
/ total as f64,
proc_ms: proc_dur.as_millis() as u64,
proc_ms: stats.processing_dur.as_millis() as u64,
busy: stats.cpu_busy * 100.0,
util: stats.total_util * 100.0,
load: stats.total_load,
@ -429,32 +426,101 @@ impl SysStats {
}
}
pub fn launch_server(sys_stats: Arc<Mutex<SysStats>>) -> Result<()> {
ScxStatsServer::new()
.add_stats_meta(LayerStats::meta())
.add_stats_meta(SysStats::meta())
.add_stats(
"top",
Box::new(move |_| sys_stats.lock().unwrap().to_json()),
)
.launch()?;
Ok(())
#[derive(Debug)]
pub enum StatsReq {
Hello(ThreadId),
Refresh(ThreadId, Stats),
Bye(ThreadId),
}
pub fn monitor(shutdown: Arc<AtomicBool>) -> Result<()> {
let mut client = ScxStatsClient::new().connect()?;
let mut last_at = 0.0;
#[derive(Debug)]
pub enum StatsRes {
Hello(Stats),
Refreshed((Stats, SysStats)),
Bye,
}
pub fn launch_server() -> Result<ScxStatsServer<StatsReq, StatsRes>> {
let open: Box<dyn StatsOpener<StatsReq, StatsRes>> = Box::new(move |(req_ch, res_ch)| {
let tid = current().id();
req_ch.send(StatsReq::Hello(tid))?;
let mut stats = Some(match res_ch.recv()? {
StatsRes::Hello(v) => v,
res => bail!("invalid response to Hello: {:?}", &res),
});
let read: Box<dyn StatsReader<StatsReq, StatsRes>> =
Box::new(move |_args, (req_ch, res_ch)| {
req_ch.send(StatsReq::Refresh(tid, stats.take().unwrap()))?;
let (new_stats, sys_stats) = match res_ch.recv()? {
StatsRes::Refreshed(v) => v,
res => bail!("invalid response to Refresh: {:?}", &res),
};
stats = Some(new_stats);
sys_stats.to_json()
});
Ok(read)
});
let close: Box<dyn StatsCloser<StatsReq, StatsRes>> = Box::new(move |(req_ch, res_ch)| {
req_ch.send(StatsReq::Bye(current().id())).unwrap();
match res_ch.recv().unwrap() {
StatsRes::Bye => {}
res => panic!("invalid response to Bye: {:?}", &res),
}
});
Ok(ScxStatsServer::new()
.add_stats_meta(LayerStats::meta())
.add_stats_meta(SysStats::meta())
.add_stats_ops(
"top",
ScxStatsOps {
open,
close: Some(close),
},
)
.launch()?)
}
pub fn monitor(intv: Duration, shutdown: Arc<AtomicBool>) -> Result<()> {
let mut retry_cnt: u32 = 0;
while !shutdown.load(Ordering::Relaxed) {
let mut client = match ScxStatsClient::new().connect() {
Ok(v) => v,
Err(e) => match e.downcast_ref::<std::io::Error>() {
Some(ioe) if ioe.kind() == std::io::ErrorKind::ConnectionRefused => {
if retry_cnt == 1 {
info!("Stats server not avaliable, retrying...");
}
retry_cnt += 1;
sleep(Duration::from_secs(1));
continue;
}
_ => Err(e)?,
},
};
retry_cnt = 0;
while !shutdown.load(Ordering::Relaxed) {
let sst = client.request::<SysStats>("stats", vec![])?;
if sst.at != last_at {
let sst = match client.request::<SysStats>("stats", vec![]) {
Ok(v) => v,
Err(e) => match e.downcast_ref::<std::io::Error>() {
Some(ioe) => {
info!("Connection to stats_server failed ({})", &ioe);
sleep(Duration::from_secs(1));
break;
}
None => Err(e)?,
},
};
let dt = DateTime::<Local>::from(UNIX_EPOCH + Duration::from_secs_f64(sst.at));
println!("###### {} ######", dt.to_rfc2822());
sst.format_all(&mut std::io::stdout())?;
last_at = sst.at;
sleep(intv);
}
}
std::thread::sleep(Duration::from_secs(1));
}
Ok(())
}