scx_rusty: Convert to scx_stats

This allows scx_rusty to avoid generating excessive logs for statistics
while still allowing detailed monitoring on demand.
This commit is contained in:
Tejun Heo 2024-08-22 13:53:35 -10:00
parent 4678476ca7
commit 76934f3aab
4 changed files with 296 additions and 29 deletions

View File

@ -23,6 +23,21 @@ dependencies = [
"memchr",
]
[[package]]
name = "android-tzdata"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e999941b234f3131b00bc13c22d06e8c5ff726d1b6318ac7eb276997bbb4fef0"
[[package]]
name = "android_system_properties"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311"
dependencies = [
"libc",
]
[[package]]
name = "anstream"
version = "0.6.15"
@ -214,6 +229,20 @@ version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724"
[[package]]
name = "chrono"
version = "0.4.38"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a21f936df1771bf62b77f047b726c4625ff2e8aa607c01ec06e5a05bd8463401"
dependencies = [
"android-tzdata",
"iana-time-zone",
"js-sys",
"num-traits",
"wasm-bindgen",
"windows-targets 0.52.6",
]
[[package]]
name = "clang-sys"
version = "1.8.1"
@ -303,6 +332,44 @@ dependencies = [
"unicode-segmentation",
]
[[package]]
name = "core-foundation-sys"
version = "0.8.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b"
[[package]]
name = "crossbeam"
version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1137cd7e7fc0fb5d3c5a8678be38ec56e819125d8d7907411fe24ccb943faca8"
dependencies = [
"crossbeam-channel",
"crossbeam-deque",
"crossbeam-epoch",
"crossbeam-queue",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-channel"
version = "0.5.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "33480d6946193aa8033910124896ca395333cae7e2d1113d1fef6c3272217df2"
dependencies = [
"crossbeam-utils",
]
[[package]]
name = "crossbeam-deque"
version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "613f8cc01fe9cf1a3eb3d7f488fd2fa8388403e97039e2f73692932e291a770d"
dependencies = [
"crossbeam-epoch",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-epoch"
version = "0.9.18"
@ -312,6 +379,15 @@ dependencies = [
"crossbeam-utils",
]
[[package]]
name = "crossbeam-queue"
version = "0.3.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df0346b5d5e76ac2fe4e327c5fd1118d6be7c51dfb18f9b7922923f287471e35"
dependencies = [
"crossbeam-utils",
]
[[package]]
name = "crossbeam-utils"
version = "0.8.20"
@ -446,6 +522,29 @@ dependencies = [
"windows-sys 0.52.0",
]
[[package]]
name = "iana-time-zone"
version = "0.1.60"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e7ffbb5a1b541ea2561f8c41c087286cc091e21e556a4f09a8f6cbf17b69b141"
dependencies = [
"android_system_properties",
"core-foundation-sys",
"iana-time-zone-haiku",
"js-sys",
"wasm-bindgen",
"windows-core",
]
[[package]]
name = "iana-time-zone-haiku"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f"
dependencies = [
"cc",
]
[[package]]
name = "indexmap"
version = "2.4.0"
@ -936,19 +1035,49 @@ name = "scx_rusty"
version = "1.0.2"
dependencies = [
"anyhow",
"chrono",
"clap",
"crossbeam",
"ctrlc",
"fb_procfs",
"libbpf-rs",
"libc",
"log",
"ordered-float 3.9.2",
"scx_stats",
"scx_stats_derive",
"scx_utils",
"serde",
"simplelog",
"sorted-vec",
"static_assertions",
]
[[package]]
name = "scx_stats"
version = "0.2.0"
dependencies = [
"anyhow",
"crossbeam",
"libc",
"log",
"quote",
"serde",
"serde_json",
"syn",
]
[[package]]
name = "scx_stats_derive"
version = "0.2.0"
dependencies = [
"proc-macro2",
"quote",
"scx_stats",
"serde_json",
"syn",
]
[[package]]
name = "scx_utils"
version = "1.0.2"
@ -1415,6 +1544,15 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
[[package]]
name = "windows-core"
version = "0.52.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9"
dependencies = [
"windows-targets 0.52.6",
]
[[package]]
name = "windows-sys"
version = "0.48.0"

View File

@ -8,14 +8,19 @@ license = "GPL-2.0-only"
[dependencies]
anyhow = "1.0.65"
chrono = "0.4"
clap = { version = "4.1", features = ["derive", "env", "unicode", "wrap_help"] }
crossbeam = "0.8.4"
ctrlc = { version = "3.1", features = ["termination"] }
fb_procfs = "0.7"
libbpf-rs = "0.24.1"
libc = "0.2.137"
log = "0.4.17"
ordered-float = "3.4.0"
scx_stats = { path = "../../../rust/scx_stats", version = "0.2.0" }
scx_stats_derive = { path = "../../../rust/scx_stats/scx_stats_derive", version = "0.2.0" }
scx_utils = { path = "../../../rust/scx_utils", version = "1.0.2" }
serde = { version = "1.0", features = ["derive"] }
simplelog = "0.12"
sorted-vec = "0.8.3"
static_assertions = "1.1.0"

View File

@ -26,6 +26,8 @@ use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;
use std::time::SystemTime;
use std::time::UNIX_EPOCH;
#[macro_use]
extern crate static_assertions;
@ -36,12 +38,14 @@ use anyhow::bail;
use anyhow::Context;
use anyhow::Result;
use clap::Parser;
use crossbeam::channel::RecvTimeoutError;
use libbpf_rs::skel::OpenSkel;
use libbpf_rs::skel::Skel;
use libbpf_rs::skel::SkelBuilder;
use libbpf_rs::MapCore as _;
use libbpf_rs::OpenObject;
use log::info;
use scx_stats::ScxStatsServer;
use scx_utils::build_id;
use scx_utils::compat;
use scx_utils::init_libbpf_logging;
@ -98,7 +102,7 @@ struct Opts {
#[clap(short = 'o', long, default_value = "1000")]
slice_us_overutil: u64,
/// Monitoring and load balance interval in seconds.
/// Load balance interval in seconds.
#[clap(short = 'i', long, default_value = "2.0")]
interval: f64,
@ -195,6 +199,15 @@ struct Opts {
#[clap(long, action = clap::ArgAction::SetTrue)]
mempolicy_affinity: bool,
/// Enable stats monitoring with the specified interval.
#[clap(long)]
stats: Option<f64>,
/// Run in stats monitoring mode with the specified interval. Scheduler
/// is not launched.
#[clap(long)]
monitor: Option<f64>,
/// Exit debug dump buffer length. 0 indicates default.
#[clap(long, default_value = "0")]
exit_dump_len: u32,
@ -247,7 +260,6 @@ pub fn sub_or_zero(curr: &u64, prev: &u64) -> u64 {
#[derive(Clone, Debug)]
struct StatsCtx {
at: Instant,
cpu_busy: u64,
cpu_total: u64,
bpf_stats: Vec<u64>,
@ -279,11 +291,19 @@ impl StatsCtx {
Ok(stats)
}
fn blank() -> Self {
Self {
cpu_busy: 0,
cpu_total: 0,
bpf_stats: vec![0u64; bpf_intf::stat_idx_RUSTY_NR_STATS as usize],
cpu_used: Duration::default(),
}
}
fn new(skel: &BpfSkel, proc_reader: &procfs::ProcReader, cpu_used: Duration) -> Result<Self> {
let (cpu_busy, cpu_total) = read_cpu_busy_and_total(proc_reader)?;
Ok(Self {
at: Instant::now(),
cpu_busy,
cpu_total,
bpf_stats: Self::read_bpf_stats(skel)?,
@ -293,7 +313,6 @@ impl StatsCtx {
fn delta(&self, rhs: &Self) -> Self {
Self {
at: self.at,
cpu_busy: sub_or_zero(&self.cpu_busy, &rhs.cpu_busy),
cpu_total: sub_or_zero(&self.cpu_total, &rhs.cpu_total),
bpf_stats: self
@ -319,11 +338,13 @@ struct Scheduler<'a> {
dom_group: Arc<DomainGroup>,
proc_reader: procfs::ProcReader,
cpu_used: Duration,
lb_stats: BTreeMap<usize, NodeStats>,
cpu_used: Duration,
nr_lb_data_errors: u64,
tuner: Tuner,
stats_server: ScxStatsServer<StatsCtx, (StatsCtx, ClusterStats)>,
}
impl<'a> Scheduler<'a> {
@ -421,7 +442,9 @@ impl<'a> Scheduler<'a> {
// Attach.
let mut skel = scx_ops_load!(skel, rusty, uei)?;
let struct_ops = Some(scx_ops_attach!(skel, rusty)?);
info!("Rusty scheduler started!");
let stats_server = stats::launch_server()?;
info!("Rusty scheduler started! Run `scx_rusty --monitor` for metrics.");
// Other stuff.
let proc_reader = procfs::ProcReader::new();
@ -437,8 +460,9 @@ impl<'a> Scheduler<'a> {
dom_group: domains.clone(),
proc_reader,
cpu_used: Duration::default(),
lb_stats: BTreeMap::new(),
cpu_used: Duration::default(),
nr_lb_data_errors: 0,
tuner: Tuner::new(
@ -448,6 +472,7 @@ impl<'a> Scheduler<'a> {
opts.slice_us_underutil * 1000,
opts.slice_us_overutil * 1000,
)?,
stats_server,
})
}
@ -473,6 +498,10 @@ impl<'a> Scheduler<'a> {
};
ClusterStats {
at: SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs_f64(),
cpu_busy,
load: node_stats.iter().map(|(_k, v)| v.load).sum::<f64>(),
nr_load_balances: sc.bpf_stats[bpf_intf::stat_idx_RUSTY_STAT_LOAD_BALANCE as usize],
@ -507,7 +536,7 @@ impl<'a> Scheduler<'a> {
}
}
fn lb_step(&mut self, prev_sc: &mut StatsCtx) -> Result<()> {
fn lb_step(&mut self) -> Result<()> {
let started_at = Instant::now();
let mut lb = LoadBalancer::new(
@ -520,26 +549,16 @@ impl<'a> Scheduler<'a> {
lb.load_balance()?;
let lstats = lb.get_stats();
self.lb_stats = lb.get_stats();
self.cpu_used += Instant::now().duration_since(started_at);
let cur_sc = StatsCtx::new(&self.skel, &self.proc_reader, self.cpu_used)?;
let delta_sc = cur_sc.delta(&prev_sc);
*prev_sc = cur_sc;
let cstats = self.cluster_stats(&delta_sc, lstats);
let mut buf = Vec::<u8>::new();
cstats.format(&mut buf)?;
print!("{}", std::str::from_utf8(&buf).unwrap());
Ok(())
}
fn run(&mut self, shutdown: Arc<AtomicBool>) -> Result<UserExitInfo> {
let (res_ch, req_ch) = self.stats_server.channels();
let now = Instant::now();
let mut next_tune_at = now + self.tune_interval;
let mut next_sched_at = now + self.sched_interval;
let mut prev_sc = StatsCtx::new(&self.skel, &self.proc_reader, self.cpu_used)?;
self.skel.maps.stats.value_size() as usize;
@ -555,18 +574,23 @@ impl<'a> Scheduler<'a> {
}
if now >= next_sched_at {
self.lb_step(&mut prev_sc)?;
self.lb_step()?;
next_sched_at += self.sched_interval;
if next_sched_at < now {
next_sched_at = now + self.sched_interval;
}
}
std::thread::sleep(
next_sched_at
.min(next_tune_at)
.duration_since(Instant::now()),
);
match req_ch.recv_deadline(next_sched_at.min(next_tune_at)) {
Ok(prev_sc) => {
let cur_sc = StatsCtx::new(&self.skel, &self.proc_reader, self.cpu_used)?;
let delta_sc = cur_sc.delta(&prev_sc);
let cstats = self.cluster_stats(&delta_sc, self.lb_stats.clone());
res_ch.send((cur_sc, cstats))?;
}
Err(RecvTimeoutError::Timeout) => {}
Err(e) => Err(e)?,
}
}
self.struct_ops.take();
@ -614,6 +638,17 @@ fn main() -> Result<()> {
})
.context("Error setting Ctrl-C handler")?;
if let Some(intv) = opts.monitor.or(opts.stats) {
let shutdown_copy = shutdown.clone();
let jh = std::thread::spawn(move || {
stats::monitor(Duration::from_secs_f64(intv), shutdown_copy).unwrap()
});
if opts.monitor.is_some() {
let _ = jh.join();
return Ok(());
}
}
let mut open_object = MaybeUninit::uninit();
loop {
let mut sched = Scheduler::init(&opts, &mut open_object)?;

View File

@ -1,7 +1,27 @@
use crate::StatsCtx;
use anyhow::Result;
use chrono::DateTime;
use chrono::Local;
use log::info;
use scx_stats::Meta;
use scx_stats::ScxStatsClient;
use scx_stats::ScxStatsOps;
use scx_stats::ScxStatsServer;
use scx_stats::StatsOpener;
use scx_stats::StatsReader;
use scx_stats::ToJson;
use scx_stats_derive::Stats;
use scx_utils::Cpumask;
use serde::Deserialize;
use serde::Serialize;
use std::collections::BTreeMap;
use std::io::Write;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::thread::sleep;
use std::time::Duration;
use std::time::UNIX_EPOCH;
fn signed(x: f64) -> String {
if x >= 0.0f64 {
@ -11,7 +31,7 @@ fn signed(x: f64) -> String {
}
}
#[derive(Clone, Debug, Default)]
#[derive(Clone, Debug, Default, Serialize, Deserialize, Stats)]
pub struct DomainStats {
pub load: f64,
pub imbal: f64,
@ -32,7 +52,7 @@ impl DomainStats {
}
}
#[derive(Clone, Debug, Default)]
#[derive(Clone, Debug, Default, Serialize, Deserialize, Stats)]
pub struct NodeStats {
pub load: f64,
pub imbal: f64,
@ -54,8 +74,9 @@ impl NodeStats {
}
}
#[derive(Clone, Debug, Default)]
#[derive(Clone, Debug, Default, Serialize, Deserialize, Stats)]
pub struct ClusterStats {
pub at: f64,
pub cpu_busy: f64,
pub load: f64,
pub nr_load_balances: u64,
@ -157,3 +178,71 @@ impl ClusterStats {
Ok(())
}
}
pub fn launch_server() -> Result<ScxStatsServer<StatsCtx, (StatsCtx, ClusterStats)>> {
let open: Box<dyn StatsOpener<StatsCtx, (StatsCtx, ClusterStats)>> =
Box::new(move |(req_ch, res_ch)| {
// Send one bogus request on open to establish prev_sc.
let mut prev_sc = StatsCtx::blank();
req_ch.send(prev_sc.clone())?;
let (cur_sc, _) = res_ch.recv()?;
prev_sc = cur_sc;
let read: Box<dyn StatsReader<StatsCtx, (StatsCtx, ClusterStats)>> =
Box::new(move |_args, (req_ch, res_ch)| {
req_ch.send(prev_sc.clone())?;
let (cur_sc, cluster_stats) = res_ch.recv()?;
prev_sc = cur_sc;
cluster_stats.to_json()
});
Ok(read)
});
Ok(ScxStatsServer::new()
.add_stats_meta(DomainStats::meta())
.add_stats_meta(NodeStats::meta())
.add_stats_meta(ClusterStats::meta())
.add_stats_ops("top", ScxStatsOps { open, close: None })
.launch()?)
}
pub fn monitor(intv: Duration, shutdown: Arc<AtomicBool>) -> Result<()> {
let mut retry_cnt: u32 = 0;
while !shutdown.load(Ordering::Relaxed) {
let mut client = match ScxStatsClient::new().connect() {
Ok(v) => v,
Err(e) => match e.downcast_ref::<std::io::Error>() {
Some(ioe) if ioe.kind() == std::io::ErrorKind::ConnectionRefused => {
if retry_cnt == 1 {
info!("Stats server not avaliable, retrying...");
}
retry_cnt += 1;
sleep(Duration::from_secs(1));
continue;
}
_ => Err(e)?,
},
};
retry_cnt = 0;
while !shutdown.load(Ordering::Relaxed) {
let cst = match client.request::<ClusterStats>("stats", vec![]) {
Ok(v) => v,
Err(e) => match e.downcast_ref::<std::io::Error>() {
Some(ioe) => {
info!("Connection to stats_server failed ({})", &ioe);
sleep(Duration::from_secs(1));
break;
}
None => Err(e)?,
},
};
let dt = DateTime::<Local>::from(UNIX_EPOCH + Duration::from_secs_f64(cst.at));
println!("###### {} ######", dt.to_rfc2822());
cst.format(&mut std::io::stdout())?;
sleep(intv);
}
}
Ok(())
}