scx_lavd: Switch to scx_stats

Scheduling sample reporting is switched to use scx_stats. This makes the
scheduler run without making too much noise while still allowing monitoring
on demand. It can also make introspection more dynamic - e.g. it shouldn't
be difficult to add other monitoring commands which take scheduling samples
based on different criteria or add other types of staisitcs.

--nr_sched-samples is replaced with --monitor-nr-samples.
This commit is contained in:
Tejun Heo 2024-08-23 18:50:33 -10:00
parent 48092c6f88
commit 55e5b8b43f
3 changed files with 192 additions and 51 deletions

View File

@ -275,7 +275,6 @@ enum {
struct introspec {
volatile u64 arg;
volatile u32 cmd;
u8 requested;
};
struct msg_hdr {

View File

@ -12,7 +12,10 @@ pub mod bpf_intf;
pub use bpf_intf::*;
mod stats;
use stats::TaskSample;
use stats::SchedSample;
use stats::SchedSamples;
use stats::StatsReq;
use stats::StatsRes;
use libc::c_char;
use std::cell::Cell;
@ -25,6 +28,7 @@ use std::mem::MaybeUninit;
use std::str;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::thread::ThreadId;
use std::time::Duration;
use anyhow::Context;
@ -32,6 +36,7 @@ use anyhow::Result;
use clap::Parser;
use crossbeam::channel;
use crossbeam::channel::Receiver;
use crossbeam::channel::RecvTimeoutError;
use crossbeam::channel::Sender;
use crossbeam::channel::TrySendError;
use libbpf_rs::skel::OpenSkel;
@ -41,6 +46,7 @@ use libbpf_rs::OpenObject;
use log::debug;
use log::info;
use log::warn;
use scx_stats::ScxStatsServer;
use scx_utils::build_id;
use scx_utils::scx_ops_attach;
use scx_utils::scx_ops_load;
@ -100,10 +106,10 @@ struct Opts {
#[clap(long = "no-freq-scaling", action = clap::ArgAction::SetTrue)]
no_freq_scaling: bool,
/// The number of scheduling samples to be reported every second.
/// (default: 1, 0 = disable logging)
#[clap(short = 's', long, default_value = "1")]
nr_sched_samples: u64,
/// Run in monitoring mode. Show the specified number of scheduling
/// samples every second.
#[clap(long)]
monitor_sched_samples: Option<u64>,
/// Enable verbose output including libbpf details. Specify multiple
/// times to increase verbosity.
@ -149,18 +155,6 @@ impl introspec {
let intrspc = unsafe { mem::MaybeUninit::<introspec>::zeroed().assume_init() };
intrspc
}
fn init(opts: &Opts) -> Self {
let mut intrspc = introspec::new();
if opts.nr_sched_samples > 0 {
intrspc.cmd = LAVD_CMD_SCHED_N;
intrspc.arg = opts.nr_sched_samples;
} else {
intrspc.cmd = LAVD_CMD_NOP;
}
intrspc.requested = false as u8;
intrspc
}
}
#[derive(Debug)]
@ -409,10 +403,11 @@ impl FlatTopology {
struct Scheduler<'a> {
skel: BpfSkel<'a>,
struct_ops: Option<libbpf_rs::Link>,
nr_cpus_onln: u64,
rb_mgr: libbpf_rs::RingBuffer<'static>,
intrspc: introspec,
intrspc_rx: Receiver<TaskSample>,
intrspc_rx: Receiver<SchedSample>,
sampler_tid: Option<ThreadId>,
stats_server: ScxStatsServer<StatsReq, StatsRes>,
}
impl<'a> Scheduler<'a> {
@ -438,9 +433,10 @@ impl<'a> Scheduler<'a> {
// Attach.
let mut skel = scx_ops_load!(skel, lavd_ops, uei)?;
let struct_ops = Some(scx_ops_attach!(skel, lavd_ops)?);
let stats_server = ScxStatsServer::new(stats::server_data(nr_cpus_onln)).launch()?;
// Build a ring buffer for instrumentation
let (intrspc_tx, intrspc_rx) = channel::bounded(4096);
let (intrspc_tx, intrspc_rx) = channel::bounded(65536);
let rb_map = &mut skel.maps.introspec_msg;
let mut builder = libbpf_rs::RingBufferBuilder::new();
builder
@ -453,10 +449,11 @@ impl<'a> Scheduler<'a> {
Ok(Self {
skel,
struct_ops,
nr_cpus_onln,
rb_mgr,
intrspc: introspec::init(opts),
intrspc: introspec::new(),
intrspc_rx,
sampler_tid: None,
stats_server,
})
}
@ -506,7 +503,7 @@ impl<'a> Scheduler<'a> {
}
}
fn relay_introspec(data: &[u8], intrspc_tx: &Sender<TaskSample>) -> i32 {
fn relay_introspec(data: &[u8], intrspc_tx: &Sender<SchedSample>) -> i32 {
let mt = msg_task_ctx::from_bytes(data);
let tx = mt.taskc_x;
let tc = mt.taskc;
@ -526,7 +523,7 @@ impl<'a> Scheduler<'a> {
let c_tx_st_str: &CStr = unsafe { CStr::from_ptr(c_tx_st) };
let tx_stat: &str = c_tx_st_str.to_str().unwrap();
match intrspc_tx.try_send(TaskSample {
match intrspc_tx.try_send(SchedSample {
mseq,
pid: tx.pid,
comm: tx_comm.into(),
@ -555,41 +552,64 @@ impl<'a> Scheduler<'a> {
}
}
fn prep_introspec(&mut self) -> u64 {
let mut interval_ms = 1000;
if self.intrspc.cmd == LAVD_CMD_SCHED_N && self.intrspc.arg > self.nr_cpus_onln {
// More samples, shorter sampling interval.
let f = self.intrspc.arg / self.nr_cpus_onln * 2;
interval_ms /= f;
}
self.intrspc.requested = true as u8;
fn prep_introspec(&mut self) {
self.skel.maps.bss_data.intrspc.cmd = self.intrspc.cmd;
self.skel.maps.bss_data.intrspc.arg = self.intrspc.arg;
self.skel.maps.bss_data.intrspc.requested = self.intrspc.requested;
interval_ms
}
fn cleanup_introspec(&mut self) {
// If not yet requested, do nothing.
if self.intrspc.requested == false as u8 {
return;
}
self.skel.maps.bss_data.intrspc.cmd = LAVD_CMD_NOP;
}
fn running(&mut self) -> bool {
RUNNING.load(Ordering::Relaxed) && !uei_exited!(&self.skel, uei)
}
fn run(&mut self) -> Result<UserExitInfo> {
while self.running() {
let interval_ms = self.prep_introspec();
std::thread::sleep(Duration::from_millis(interval_ms));
fn stats_req_to_res(&mut self, req: &StatsReq) -> Result<StatsRes> {
Ok(match req {
StatsReq::NewSampler(tid) => {
self.rb_mgr.consume().unwrap();
self.sampler_tid = Some(*tid);
StatsRes::Ack
}
StatsReq::SchedSamplesNr {
tid,
nr_samples,
interval_ms,
} => {
if Some(*tid) != self.sampler_tid {
return Ok(StatsRes::Bye);
}
self.intrspc.cmd = LAVD_CMD_SCHED_N;
self.intrspc.arg = *nr_samples;
self.prep_introspec();
std::thread::sleep(Duration::from_millis(*interval_ms));
self.rb_mgr.poll(Duration::from_millis(100)).unwrap();
let mut samples = vec![];
while let Ok(ts) = self.intrspc_rx.try_recv() {
ts.format(&mut std::io::stdout()).unwrap();
samples.push(ts);
}
self.cleanup_introspec();
StatsRes::SchedSamples(SchedSamples { samples })
}
})
}
fn run(&mut self) -> Result<UserExitInfo> {
let (res_ch, req_ch) = self.stats_server.channels();
while self.running() {
match req_ch.recv_timeout(Duration::from_secs(1)) {
Ok(req) => {
let res = self.stats_req_to_res(&req)?;
res_ch.send(res)?;
}
Err(RecvTimeoutError::Timeout) => {}
Err(e) => Err(e)?,
}
self.cleanup_introspec();
}
@ -652,6 +672,12 @@ fn main() -> Result<()> {
init_signal_handlers();
debug!("{:#?}", opts);
if let Some(nr_samples) = opts.monitor_sched_samples {
let jh = std::thread::spawn(move || stats::monitor_sched_samples(nr_samples).unwrap());
let _ = jh.join();
return Ok(());
}
let mut open_object = MaybeUninit::uninit();
loop {
let mut sched = Scheduler::init(&opts, &mut open_object)?;
@ -663,8 +689,6 @@ fn main() -> Result<()> {
" Note that scx_lavd currently is not optimized for multi-CCX/NUMA architectures."
);
info!(" Stay tuned for future improvements!");
info!(
" stat: ('L'atency-critical, 'R'egular) (performance-'H'ungry, performance-'I'nsensitive) ('B'ig, li'T'tle) ('E'ligigle, 'G'reedy) ('P'reempting, 'N'ot)");
info!("scx_lavd scheduler starts running.");
if !sched.run()?.should_restart() {
break;

View File

@ -1,11 +1,22 @@
use anyhow::bail;
use anyhow::Result;
use scx_stats::Meta;
use scx_stats::ScxStatsOps;
use scx_stats::ScxStatsServerData;
use scx_stats::StatsOpener;
use scx_stats::StatsReader;
use scx_stats::ToJson;
use scx_stats_derive::Stats;
use serde::Deserialize;
use serde::Serialize;
use std::collections::BTreeMap;
use std::io::Write;
use std::sync::atomic::Ordering;
use std::thread::ThreadId;
use std::time::Duration;
#[derive(Clone, Debug, Default, Serialize, Deserialize, Stats)]
pub struct TaskSample {
pub struct SchedSample {
pub mseq: u64,
pub pid: i32,
pub comm: String,
@ -30,7 +41,7 @@ pub struct TaskSample {
pub nr_active: u32,
}
impl TaskSample {
impl SchedSample {
pub fn format_header<W: Write>(w: &mut W) -> Result<()> {
writeln!(
w,
@ -109,3 +120,110 @@ impl TaskSample {
Ok(())
}
}
#[derive(Clone, Debug, Default, Serialize, Deserialize, Stats)]
pub struct SchedSamples {
pub samples: Vec<SchedSample>,
}
#[derive(Debug)]
pub enum StatsReq {
NewSampler(ThreadId),
SchedSamplesNr {
tid: ThreadId,
nr_samples: u64,
interval_ms: u64,
},
}
impl StatsReq {
fn from_args(
tid: ThreadId,
nr_cpus_onln: u64,
args: &BTreeMap<String, String>,
) -> Result<Self> {
let mut nr_samples = 1;
if let Some(arg) = args.get("nr_samples") {
nr_samples = arg.trim().parse()?;
}
let mut interval_ms = 1000;
if nr_samples > nr_cpus_onln {
// More samples, shorter sampling interval.
let f = nr_samples / nr_cpus_onln * 2;
interval_ms /= f;
}
Ok(Self::SchedSamplesNr {
tid,
nr_samples,
interval_ms,
})
}
}
#[derive(Debug)]
pub enum StatsRes {
Ack,
Bye,
SchedSamples(SchedSamples),
}
pub fn server_data(nr_cpus_onln: u64) -> ScxStatsServerData<StatsReq, StatsRes> {
let samples_open: Box<dyn StatsOpener<StatsReq, StatsRes>> =
Box::new(move |(req_ch, res_ch)| {
let tid = std::thread::current().id();
req_ch.send(StatsReq::NewSampler(tid))?;
match res_ch.recv()? {
StatsRes::Ack => {}
res => bail!("invalid response: {:?}", &res),
}
let read: Box<dyn StatsReader<StatsReq, StatsRes>> =
Box::new(move |args, (req_ch, res_ch)| {
let req = StatsReq::from_args(tid, nr_cpus_onln, args)?;
req_ch.send(req)?;
let samples = match res_ch.recv()? {
StatsRes::SchedSamples(v) => v,
StatsRes::Bye => bail!("preempted by another sampler"),
res => bail!("invalid response: {:?}", &res),
};
samples.to_json()
});
Ok(read)
});
ScxStatsServerData::new()
.add_meta(SchedSample::meta())
.add_ops(
"sched_samples",
ScxStatsOps {
open: samples_open,
close: None,
},
)
}
pub fn monitor_sched_samples(nr_samples: u64) -> Result<()> {
println!(
" stat: ('L'atency-critical, 'R'egular) (performance-'H'ungry, performance-'I'nsensitive) ('B'ig, li'T'tle) ('E'ligigle, 'G'reedy) ('P'reempting, 'N'ot)");
scx_utils::monitor_stats::<SchedSamples>(
&vec![
("target".into(), "sched_samples".into()),
("nr_samples".into(), nr_samples.to_string()),
],
Duration::from_secs(0),
|| !crate::RUNNING.load(Ordering::Relaxed),
|ts| {
let mut stdout = std::io::stdout();
for sample in ts.samples.iter() {
sample.format(&mut stdout)?;
}
Ok(())
},
)
}