scx_rusty: Don't reset bpf_stats, remember prev states and calculate delta

This will ease transition to scx_stats.
This commit is contained in:
Tejun Heo 2024-08-22 11:52:02 -10:00
parent 13fa48a871
commit 16c07a5cd9
2 changed files with 116 additions and 119 deletions

View File

@ -53,7 +53,6 @@ use scx_utils::uei_report;
use scx_utils::Cpumask; use scx_utils::Cpumask;
use scx_utils::Topology; use scx_utils::Topology;
use scx_utils::UserExitInfo; use scx_utils::UserExitInfo;
use scx_utils::NR_CPUS_POSSIBLE;
use scx_utils::NR_CPU_IDS; use scx_utils::NR_CPU_IDS;
const MAX_DOMS: usize = bpf_intf::consts_MAX_DOMS as usize; const MAX_DOMS: usize = bpf_intf::consts_MAX_DOMS as usize;
@ -210,12 +209,32 @@ struct Opts {
version: bool, version: bool,
} }
fn read_total_cpu(reader: &procfs::ProcReader) -> Result<procfs::CpuStat> { fn read_cpu_busy_and_total(reader: &procfs::ProcReader) -> Result<(u64, u64)> {
reader let cs = reader
.read_stat() .read_stat()
.context("Failed to read procfs")? .context("Failed to read procfs")?
.total_cpu .total_cpu
.ok_or_else(|| anyhow!("Could not read total cpu stat in proc")) .ok_or_else(|| anyhow!("Could not read total cpu stat in proc"))?;
Ok(match cs {
procfs::CpuStat {
user_usec: Some(user),
nice_usec: Some(nice),
system_usec: Some(system),
idle_usec: Some(idle),
iowait_usec: Some(iowait),
irq_usec: Some(irq),
softirq_usec: Some(softirq),
stolen_usec: Some(stolen),
guest_usec: _,
guest_nice_usec: _,
} => {
let busy = user + system + nice + irq + softirq + stolen;
let total = busy + idle + iowait;
(busy, total)
}
_ => bail!("Some procfs stats are not populated!"),
})
} }
pub fn sub_or_zero(curr: &u64, prev: &u64) -> u64 { pub fn sub_or_zero(curr: &u64, prev: &u64) -> u64 {
@ -226,6 +245,68 @@ pub fn sub_or_zero(curr: &u64, prev: &u64) -> u64 {
} }
} }
#[derive(Clone, Debug)]
struct StatsCtx {
at: Instant,
cpu_busy: u64,
cpu_total: u64,
bpf_stats: Vec<u64>,
cpu_used: Duration,
}
impl StatsCtx {
fn read_bpf_stats(skel: &BpfSkel) -> Result<Vec<u64>> {
let stats_map = &skel.maps.stats;
let mut stats: Vec<u64> = Vec::new();
for stat in 0..bpf_intf::stat_idx_RUSTY_NR_STATS {
let cpu_stat_vec = stats_map
.lookup_percpu(&stat.to_ne_bytes(), libbpf_rs::MapFlags::ANY)
.with_context(|| format!("Failed to lookup stat {}", stat))?
.expect("per-cpu stat should exist");
let sum = cpu_stat_vec
.iter()
.map(|val| {
u64::from_ne_bytes(
val.as_slice()
.try_into()
.expect("Invalid value length in stat map"),
)
})
.sum();
stats.push(sum);
}
Ok(stats)
}
fn new(skel: &BpfSkel, proc_reader: &procfs::ProcReader, cpu_used: Duration) -> Result<Self> {
let (cpu_busy, cpu_total) = read_cpu_busy_and_total(proc_reader)?;
Ok(Self {
at: Instant::now(),
cpu_busy,
cpu_total,
bpf_stats: Self::read_bpf_stats(skel)?,
cpu_used,
})
}
fn delta(&self, rhs: &Self) -> Self {
Self {
at: self.at,
cpu_busy: sub_or_zero(&self.cpu_busy, &rhs.cpu_busy),
cpu_total: sub_or_zero(&self.cpu_total, &rhs.cpu_total),
bpf_stats: self
.bpf_stats
.iter()
.zip(rhs.bpf_stats.iter())
.map(|(lhs, rhs)| sub_or_zero(&lhs, &rhs))
.collect(),
cpu_used: self.cpu_used - rhs.cpu_used,
}
}
}
struct Scheduler<'a> { struct Scheduler<'a> {
skel: BpfSkel<'a>, skel: BpfSkel<'a>,
struct_ops: Option<libbpf_rs::Link>, struct_ops: Option<libbpf_rs::Link>,
@ -238,9 +319,7 @@ struct Scheduler<'a> {
dom_group: Arc<DomainGroup>, dom_group: Arc<DomainGroup>,
proc_reader: procfs::ProcReader, proc_reader: procfs::ProcReader,
cpu_used: Duration,
prev_at: Instant,
prev_total_cpu: procfs::CpuStat,
nr_lb_data_errors: u64, nr_lb_data_errors: u64,
@ -346,7 +425,6 @@ impl<'a> Scheduler<'a> {
// Other stuff. // Other stuff.
let proc_reader = procfs::ProcReader::new(); let proc_reader = procfs::ProcReader::new();
let prev_total_cpu = read_total_cpu(&proc_reader)?;
Ok(Self { Ok(Self {
skel, skel,
@ -359,9 +437,7 @@ impl<'a> Scheduler<'a> {
dom_group: domains.clone(), dom_group: domains.clone(),
proc_reader, proc_reader,
cpu_used: Duration::default(),
prev_at: Instant::now(),
prev_total_cpu,
nr_lb_data_errors: 0, nr_lb_data_errors: 0,
@ -375,94 +451,8 @@ impl<'a> Scheduler<'a> {
}) })
} }
fn get_cpu_busy(&mut self) -> Result<f64> { fn cluster_stats(&self, sc: &StatsCtx, node_stats: BTreeMap<usize, NodeStats>) -> ClusterStats {
let total_cpu = read_total_cpu(&self.proc_reader)?; let stat = |idx| sc.bpf_stats[idx as usize];
let busy = match (&self.prev_total_cpu, &total_cpu) {
(
procfs::CpuStat {
user_usec: Some(prev_user),
nice_usec: Some(prev_nice),
system_usec: Some(prev_system),
idle_usec: Some(prev_idle),
iowait_usec: Some(prev_iowait),
irq_usec: Some(prev_irq),
softirq_usec: Some(prev_softirq),
stolen_usec: Some(prev_stolen),
guest_usec: _,
guest_nice_usec: _,
},
procfs::CpuStat {
user_usec: Some(curr_user),
nice_usec: Some(curr_nice),
system_usec: Some(curr_system),
idle_usec: Some(curr_idle),
iowait_usec: Some(curr_iowait),
irq_usec: Some(curr_irq),
softirq_usec: Some(curr_softirq),
stolen_usec: Some(curr_stolen),
guest_usec: _,
guest_nice_usec: _,
},
) => {
let idle_usec = sub_or_zero(curr_idle, prev_idle);
let iowait_usec = sub_or_zero(curr_iowait, prev_iowait);
let user_usec = sub_or_zero(curr_user, prev_user);
let system_usec = sub_or_zero(curr_system, prev_system);
let nice_usec = sub_or_zero(curr_nice, prev_nice);
let irq_usec = sub_or_zero(curr_irq, prev_irq);
let softirq_usec = sub_or_zero(curr_softirq, prev_softirq);
let stolen_usec = sub_or_zero(curr_stolen, prev_stolen);
let busy_usec =
user_usec + system_usec + nice_usec + irq_usec + softirq_usec + stolen_usec;
let total_usec = idle_usec + busy_usec + iowait_usec;
busy_usec as f64 / total_usec as f64
}
_ => {
bail!("Some procfs stats are not populated!");
}
};
self.prev_total_cpu = total_cpu;
Ok(busy)
}
fn read_bpf_stats(&mut self) -> Result<Vec<u64>> {
let stats_map = &mut self.skel.maps.stats;
let mut stats: Vec<u64> = Vec::new();
let zero_vec = vec![vec![0u8; stats_map.value_size() as usize]; *NR_CPUS_POSSIBLE];
for stat in 0..bpf_intf::stat_idx_RUSTY_NR_STATS {
let cpu_stat_vec = stats_map
.lookup_percpu(&stat.to_ne_bytes(), libbpf_rs::MapFlags::ANY)
.with_context(|| format!("Failed to lookup stat {}", stat))?
.expect("per-cpu stat should exist");
let sum = cpu_stat_vec
.iter()
.map(|val| {
u64::from_ne_bytes(
val.as_slice()
.try_into()
.expect("Invalid value length in stat map"),
)
})
.sum();
stats_map
.update_percpu(&stat.to_ne_bytes(), &zero_vec, libbpf_rs::MapFlags::ANY)
.context("Failed to zero stat")?;
stats.push(sum);
}
Ok(stats)
}
fn cluster_stats(
&mut self,
bpf_stats: &[u64],
cpu_busy: f64,
processing_dur: Duration,
node_stats: BTreeMap<usize, NodeStats>,
) -> ClusterStats {
let stat = |idx| bpf_stats[idx as usize];
let total = stat(bpf_intf::stat_idx_RUSTY_STAT_WAKE_SYNC) let total = stat(bpf_intf::stat_idx_RUSTY_STAT_WAKE_SYNC)
+ stat(bpf_intf::stat_idx_RUSTY_STAT_SYNC_PREV_IDLE) + stat(bpf_intf::stat_idx_RUSTY_STAT_SYNC_PREV_IDLE)
+ stat(bpf_intf::stat_idx_RUSTY_STAT_PREV_IDLE) + stat(bpf_intf::stat_idx_RUSTY_STAT_PREV_IDLE)
@ -476,14 +466,20 @@ impl<'a> Scheduler<'a> {
+ stat(bpf_intf::stat_idx_RUSTY_STAT_GREEDY_XNUMA); + stat(bpf_intf::stat_idx_RUSTY_STAT_GREEDY_XNUMA);
let stat_pct = |idx| stat(idx) as f64 / total as f64 * 100.0; let stat_pct = |idx| stat(idx) as f64 / total as f64 * 100.0;
ClusterStats { let cpu_busy = if sc.cpu_total != 0 {
cpu_busy: cpu_busy * 100.0, (sc.cpu_busy as f64 / sc.cpu_total as f64) * 100.0
load: node_stats.iter().map(|(_k, v)| v.load).sum::<f64>(), } else {
nr_load_balances: bpf_stats[bpf_intf::stat_idx_RUSTY_STAT_LOAD_BALANCE as usize], 0.0
};
task_get_err: bpf_stats[bpf_intf::stat_idx_RUSTY_STAT_TASK_GET_ERR as usize], ClusterStats {
cpu_busy,
load: node_stats.iter().map(|(_k, v)| v.load).sum::<f64>(),
nr_load_balances: sc.bpf_stats[bpf_intf::stat_idx_RUSTY_STAT_LOAD_BALANCE as usize],
task_get_err: sc.bpf_stats[bpf_intf::stat_idx_RUSTY_STAT_TASK_GET_ERR as usize],
lb_data_err: self.nr_lb_data_errors, lb_data_err: self.nr_lb_data_errors,
proc_dur: processing_dur.as_secs_f64(), cpu_used: sc.cpu_used.as_secs_f64(),
total, total,
@ -511,10 +507,8 @@ impl<'a> Scheduler<'a> {
} }
} }
fn lb_step(&mut self) -> Result<()> { fn lb_step(&mut self, prev_sc: &mut StatsCtx) -> Result<()> {
let started_at = Instant::now(); let started_at = Instant::now();
let bpf_stats = self.read_bpf_stats()?;
let cpu_busy = self.get_cpu_busy()?;
let mut lb = LoadBalancer::new( let mut lb = LoadBalancer::new(
&mut self.skel, &mut self.skel,
@ -527,17 +521,17 @@ impl<'a> Scheduler<'a> {
lb.load_balance()?; lb.load_balance()?;
let lstats = lb.get_stats(); let lstats = lb.get_stats();
let cstats = self.cluster_stats( self.cpu_used += Instant::now().duration_since(started_at);
&bpf_stats, let cur_sc = StatsCtx::new(&self.skel, &self.proc_reader, self.cpu_used)?;
cpu_busy, let delta_sc = cur_sc.delta(&prev_sc);
Instant::now().duration_since(started_at), *prev_sc = cur_sc;
lstats,
); let cstats = self.cluster_stats(&delta_sc, lstats);
let mut buf = Vec::<u8>::new(); let mut buf = Vec::<u8>::new();
cstats.format(&mut buf)?; cstats.format(&mut buf)?;
print!("{}", std::str::from_utf8(&buf).unwrap()); print!("{}", std::str::from_utf8(&buf).unwrap());
self.prev_at = started_at;
Ok(()) Ok(())
} }
@ -545,6 +539,9 @@ impl<'a> Scheduler<'a> {
let now = Instant::now(); let now = Instant::now();
let mut next_tune_at = now + self.tune_interval; let mut next_tune_at = now + self.tune_interval;
let mut next_sched_at = now + self.sched_interval; let mut next_sched_at = now + self.sched_interval;
let mut prev_sc = StatsCtx::new(&self.skel, &self.proc_reader, self.cpu_used)?;
self.skel.maps.stats.value_size() as usize;
while !shutdown.load(Ordering::Relaxed) && !uei_exited!(&self.skel, uei) { while !shutdown.load(Ordering::Relaxed) && !uei_exited!(&self.skel, uei) {
let now = Instant::now(); let now = Instant::now();
@ -558,7 +555,7 @@ impl<'a> Scheduler<'a> {
} }
if now >= next_sched_at { if now >= next_sched_at {
self.lb_step()?; self.lb_step(&mut prev_sc)?;
next_sched_at += self.sched_interval; next_sched_at += self.sched_interval;
if next_sched_at < now { if next_sched_at < now {
next_sched_at = now + self.sched_interval; next_sched_at = now + self.sched_interval;

View File

@ -62,7 +62,7 @@ pub struct ClusterStats {
pub task_get_err: u64, pub task_get_err: u64,
pub lb_data_err: u64, pub lb_data_err: u64,
pub proc_dur: f64, pub cpu_used: f64,
pub total: u64, pub total: u64,
@ -93,13 +93,13 @@ impl ClusterStats {
pub fn format<W: Write>(&self, w: &mut W) -> Result<()> { pub fn format<W: Write>(&self, w: &mut W) -> Result<()> {
writeln!( writeln!(
w, w,
"cpu={:7.2} load={:8.2} bal={} task_err={} lb_data_err={} proc={:4.1}ms", "cpu={:7.2} load={:8.2} bal={} task_err={} lb_data_err={} cpu_used={:4.1}ms",
self.cpu_busy, self.cpu_busy,
self.load, self.load,
self.nr_load_balances, self.nr_load_balances,
self.task_get_err, self.task_get_err,
self.lb_data_err, self.lb_data_err,
self.proc_dur * 1000.0, self.cpu_used * 1000.0,
)?; )?;
writeln!( writeln!(
w, w,