scx_utils: Add log_recorder module for metrics-rs

This change adds a new module to the scx_utils crate that provides a
log recorder for metrics-rs. The log recorder will log all metrics to
the console at a configurable interval in an easy to read format. Each
metric type will be displayed in a separate section. Indentation will
be used to show the hierarchy of the metrics. This results in a more
verbose output, but it is easier to read and understand.

scx_rusty was updated to use the log recorder and all explicit metric
logging was removed.

Counters will show the total count and the rate of change per second.
Counters with an additional label, like `type` in
`dispatched_tasks_total` in rusty, will show the count, rate, and
percentage of the total count.

Counters:
  dispatched_tasks_total: 65559 [1344.8/s]
    prev_idle: 44963 (68.6%) [966.5/s]
    wsync_prev_idle: 15696 (23.9%) [317.3/s]
    direct_dispatch: 2833 (4.3%) [35.3/s]
    dsq: 1804 (2.8%) [21.3/s]
    wsync: 262 (0.4%) [4.3/s]
    direct_greedy: 1 (0.0%) [0.0/s]
    pinned: 0 (0.0%) [0.0/s]
    greedy_idle: 0 (0.0%) [0.0/s]
    greedy_xnuma: 0 (0.0%) [0.0/s]
    direct_greedy_far: 0 (0.0%) [0.0/s]
    greedy_local: 0 (0.0%) [0.0/s]
  dl_clamped_total: 1290 [20.3/s]
  dl_preset_total: 514 [1.0/s]
  kick_greedy_total: 6 [0.3/s]
  lb_data_errors_total: 0 [0.0/s]
  load_balance_total: 0 [0.0/s]
  repatriate_total: 0 [0.0/s]
  task_errors_total: 0 [0.0/s]

Gauges will show the last set value:

Gauges:
  slice_length_us: 20000.00

Histograms will show the average, min, and max. The histogram will be
reset after each log interval to avoid memory leaks, since the data
structure that holds the samples is unbounded.

Histograms:
  cpu_busy_pct: avg=1.66 min=1.16 max=2.16
  load_avg node=0: avg=0.31 min=0.23 max=0.39
  load_avg node=0 dom=0: avg=0.31 min=0.23 max=0.39
  processing_duration_us: avg=297.50 min=296.00 max=299.00

Signed-off-by: Jose Fernandez <josef@netflix.com>
This commit is contained in:
Jose Fernandez 2024-06-22 16:17:33 -06:00
parent 5038f54701
commit e5984ed016
No known key found for this signature in database
GPG Key ID: B78CF7DCA7950576
5 changed files with 329 additions and 75 deletions

View File

@ -1 +1,2 @@
Cargo.lock
target

View File

@ -25,6 +25,8 @@ sscanf = "0.4"
tar = "0.4"
walkdir = "2.4"
version-compare = "0.1"
metrics = "0.23.0"
metrics-util = "0.17.0"
[build-dependencies]
bindgen = ">=0.68, <0.70"

View File

@ -71,3 +71,6 @@ pub use cpumask::Cpumask;
mod infeasible;
pub use infeasible::LoadAggregator;
pub use infeasible::LoadLedger;
mod log_recorder;
pub use log_recorder::LogRecorderBuilder;

View File

@ -0,0 +1,284 @@
// Copyright (c) Netflix, Inc.
// Author: Jose Fernandez
// This software may be used and distributed according to the terms of the
// GNU General Public License version 2.
use std::collections::HashMap;
use std::sync::atomic::Ordering::Relaxed;
use std::sync::Arc;
use std::thread;
use std::time::Duration;
use std::time::Instant;
use anyhow::Result;
use log::info;
use metrics::Counter;
use metrics::Gauge;
use metrics::Histogram;
use metrics::Key;
use metrics::KeyName;
use metrics::Metadata;
use metrics::Recorder;
use metrics::SharedString;
use metrics::Unit;
use metrics_util::registry::AtomicStorage;
use metrics_util::registry::Registry;
/// A builder for creating a new instance of `LogRecorder` and installing it as
/// the global recorder.
///
/// Example:
///
/// ```rust
/// LogRecorderBuilder::new()
/// .with_reporting_interval(Duration::from_secs(3))
/// .install()?;
/// ```
pub struct LogRecorderBuilder {
reporting_interval: Duration,
}
impl LogRecorderBuilder {
pub fn new() -> LogRecorderBuilder {
Self {
reporting_interval: Duration::from_secs(3),
}
}
/// Sets the interval at which the recorder will log the metrics.
pub fn with_reporting_interval(mut self, interval: Duration) -> Self {
self.reporting_interval = interval;
self
}
/// Installs the log recorder as the global recorder.
pub fn install(self) -> Result<()> {
let recorder = LogRecorder {
registry: Arc::new(Registry::<Key, AtomicStorage>::atomic()),
};
recorder.start(self.reporting_interval);
metrics::set_global_recorder(recorder)?;
Ok(())
}
}
/// A metrics recorder that logs metrics to the terminal.
///
/// `LogRecorder` implements the `Recorder` trait from the metrics-rs framework.
/// It maintains an in-memory registry of metrics and uses a background thread
/// to report all metrics at regular intervals.
///
/// Use the `LogRecorderBuilder` to create a new instance of `LogRecorder` and
/// install it as the global recorder.
struct LogRecorder {
registry: Arc<Registry<Key, AtomicStorage>>,
}
impl Recorder for LogRecorder {
fn describe_counter(&self, _: KeyName, _: Option<Unit>, _: SharedString) {
unimplemented!()
}
fn describe_gauge(&self, _: KeyName, _: Option<Unit>, _: SharedString) {
unimplemented!()
}
fn describe_histogram(&self, _: KeyName, _: Option<Unit>, _: SharedString) {
unimplemented!()
}
fn register_counter(&self, key: &Key, _: &Metadata<'_>) -> Counter {
self.registry
.get_or_create_counter(key, |c| c.clone().into())
}
fn register_gauge(&self, key: &Key, _: &Metadata<'_>) -> Gauge {
self.registry.get_or_create_gauge(key, |g| g.clone().into())
}
fn register_histogram(&self, key: &Key, _: &Metadata<'_>) -> Histogram {
self.registry
.get_or_create_histogram(key, |h: &Arc<metrics_util::AtomicBucket<f64>>| {
h.clone().into()
})
}
}
impl LogRecorder {
// Starts a background thread that logs the metrics at an interval defined
// by the `reporting_interval` parameter.
fn start(&self, reporting_interval: Duration) {
let registry_clone = self.registry.clone();
thread::spawn(move || {
let mut prev_counter_values: HashMap<Key, u64> = HashMap::new();
let mut prev_instant = Instant::now();
loop {
let now = Instant::now();
let period_secs = prev_instant.elapsed().as_secs_f64();
prev_instant = now;
log_counter_info(&registry_clone, &mut prev_counter_values, period_secs);
log_gauge_info(&registry_clone);
log_histogram_info(&registry_clone);
info!("---");
// Sleep for the remainder of the period
thread::sleep(reporting_interval - prev_instant.elapsed());
}
});
}
}
fn group_keys_by_name(keys: Vec<Key>) -> HashMap<String, Vec<Key>> {
let mut grouped_keys: HashMap<String, Vec<Key>> = HashMap::new();
for key in keys {
let key_name = key.name().to_string();
let keys = grouped_keys.entry(key_name).or_insert_with(Vec::new);
keys.push(key);
}
grouped_keys
}
fn log_counter_info(
registry: &Registry<Key, AtomicStorage>,
prev_counter_values: &mut HashMap<Key, u64>,
period_secs: f64,
) {
let handles = registry.get_counter_handles();
let grouped_keys = group_keys_by_name(handles.keys().cloned().collect());
// Collect the totals for sorting
let mut total_values: Vec<(String, u64, Vec<(Key, u64)>)> = grouped_keys
.into_iter()
.map(|(key_name, keys)| {
// Collect all counter values first
let key_values: Vec<(Key, u64)> = keys
.iter()
.map(|key| {
let value = registry.get_counter(key).unwrap().load(Relaxed);
(key.clone(), value)
})
.collect();
// Calculate the total
let total: u64 = key_values.iter().map(|(_, value)| *value).sum();
(key_name, total, key_values)
})
.collect();
// Sort by total value in descending order, then by key name in ascending order
total_values.sort_by(|a, b| b.1.cmp(&a.1).then_with(|| a.0.cmp(&b.0)));
if handles.len() > 0 {
info!("Counters:");
}
for (key_name, total, mut key_values) in total_values {
let mut total_rate_per_second = 0.0;
for (key, value) in &key_values {
let prev_value = prev_counter_values.get(key).cloned().unwrap_or(0);
let rate_per_second = (value - prev_value) as f64 / period_secs;
total_rate_per_second += rate_per_second;
}
info!(" {}: {} [{:.1}/s]", key_name, total, total_rate_per_second);
if key_values.len() > 1 {
// Sort the key_values by the counter value in descending order
key_values.sort_by(|a, b| b.1.cmp(&a.1));
// Log individual totals, their percentages, and rates
for (key, value) in key_values {
let prev_value = prev_counter_values.get(&key).cloned().unwrap_or(0);
let rate_per_second = (value - prev_value) as f64 / period_secs;
let percentage = if total == 0 {
0.0
} else {
(value as f64 / total as f64) * 100.0
};
let label_value = key.labels().next().unwrap().value(); // Assuming only one label
info!(
" {}: {} ({:.1}%) [{:.1}/s]",
label_value, value, percentage, rate_per_second
);
prev_counter_values.insert(key.clone(), value);
}
} else {
let (key, value) = &key_values[0];
prev_counter_values.insert(key.clone(), *value);
}
}
}
fn log_gauge_info(registry: &Registry<Key, AtomicStorage>) {
let handles = registry.get_gauge_handles();
let mut keys: Vec<Key> = handles.keys().cloned().collect();
keys.sort();
if keys.len() > 0 {
info!("Gauges:");
}
for key in keys {
match registry.get_gauge(&key) {
None => continue,
Some(gauge) => {
// Gauge values are stored as bits, so we need to convert them to f64
let value = f64::from_bits(gauge.load(Relaxed));
info!(" {}: {:.2}", key.name(), value);
}
}
}
}
fn log_histogram_info(registry: &Registry<Key, AtomicStorage>) {
let handles = registry.get_histogram_handles();
let mut keys: Vec<Key> = handles.keys().cloned().collect();
keys.sort();
if keys.len() > 0 {
info!("Histograms:");
}
for key in keys {
match registry.get_histogram(&key) {
None => continue,
Some(histogram) => {
let mut sum = 0.0;
let mut count = 0;
let mut min = 0.0;
let mut max = 0.0;
// Iterate over all elements in the histogram and clear it.
// This prevents the histogram from growing indefinitely.
histogram.clear_with(|elements| {
for element in elements.iter() {
sum += element;
count += 1;
if min == 0.0 || *element < min {
min = element.clone();
}
if *element >= max {
max = element.clone();
}
}
});
let avg = if count > 0 {
sum / count as f64
} else {
0.0
};
let mut name = key.name().to_string();
for label in key.labels() {
name.push_str(&format!(" {}={}", label.key(), label.value()));
}
info!(" {}: avg={:.2} min={:.2} max={:.2}", name, avg, min, max);
}
}
}
}

View File

@ -38,6 +38,11 @@ use log::info;
use metrics::counter;
use metrics::Counter;
use metrics_exporter_prometheus::PrometheusBuilder;
use metrics::histogram;
use metrics::Histogram;
use metrics::gauge;
use metrics::Gauge;
use scx_utils::LogRecorderBuilder;
use scx_utils::compat;
use scx_utils::init_libbpf_logging;
use scx_utils::scx_ops_attach;
@ -230,6 +235,12 @@ struct Metrics {
repatriate: Counter,
dl_clamped: Counter,
dl_preset: Counter,
task_errors: Counter,
lb_data_errors: Counter,
load_balance: Counter,
slice_length: Gauge,
cpu_busy_pct: Histogram,
processing_duration: Histogram,
}
impl Metrics {
@ -250,6 +261,14 @@ impl Metrics {
repatriate: counter!("repatriate_total"),
dl_clamped: counter!("dl_clamped_total"),
dl_preset: counter!("dl_preset_total"),
task_errors: counter!("task_errors_total"),
lb_data_errors: counter!("lb_data_errors_total"),
load_balance: counter!("load_balance_total"),
slice_length: gauge!("slice_length_us"),
cpu_busy_pct: histogram!("cpu_busy_pct"),
processing_duration: histogram!("processing_duration_us"),
}
}
}
@ -490,10 +509,8 @@ impl<'a> Scheduler<'a> {
}
fn report(
&mut self,
&self,
bpf_stats: &[u64],
cpu_busy: f64,
processing_dur: Duration,
lb_stats: &[NumaStat],
) {
let stat = |idx| bpf_stats[idx as usize];
@ -509,17 +526,6 @@ impl<'a> Scheduler<'a> {
let dsq = stat(bpf_intf::stat_idx_RUSTY_STAT_DSQ_DISPATCH);
let greedy_local = stat(bpf_intf::stat_idx_RUSTY_STAT_GREEDY_LOCAL);
let greedy_xnuma = stat(bpf_intf::stat_idx_RUSTY_STAT_GREEDY_XNUMA);
let total = wsync
+ wsync_prev_idle
+ prev_idle
+ greedy_idle
+ pinned
+ direct_dispatch
+ direct_greedy
+ direct_greedy_far
+ dsq
+ greedy_local
+ greedy_xnuma;
self.metrics.wsync_prev_idle.increment(wsync_prev_idle);
self.metrics.wsync.increment(wsync);
@ -543,68 +549,21 @@ impl<'a> Scheduler<'a> {
self.metrics.dl_clamped.increment(dl_clamped);
self.metrics.dl_preset.increment(dl_preset);
let numa_load_avg = lb_stats[0].load.load_avg();
let dom_load_avg = lb_stats[0].domains[0].load.load_avg();
info!(
"cpu={:7.2} bal={} numa_load_avg={:8.2} dom_load_avg={:8.2} task_err={} lb_data_err={} proc={:?}ms",
cpu_busy * 100.0,
bpf_stats[bpf_intf::stat_idx_RUSTY_STAT_LOAD_BALANCE as usize],
numa_load_avg, dom_load_avg,
bpf_stats[bpf_intf::stat_idx_RUSTY_STAT_TASK_GET_ERR as usize],
self.nr_lb_data_errors,
processing_dur.as_millis(),
);
let stat_pct = |value| value as f64 / total as f64 * 100.0;
info!(
"tot={:7} wsync_prev_idle={:5.2} wsync={:5.2}",
total,
stat_pct(wsync_prev_idle),
stat_pct(wsync),
);
info!(
"prev_idle={:5.2} greedy_idle={:5.2} pin={:5.2}",
stat_pct(prev_idle),
stat_pct(greedy_idle),
stat_pct(pinned),
);
info!(
"dir={:5.2} dir_greedy={:5.2} dir_greedy_far={:5.2}",
stat_pct(direct_dispatch),
stat_pct(direct_greedy),
stat_pct(direct_greedy_far),
);
info!(
"dsq={:5.2} greedy_local={:5.2} greedy_xnuma={:5.2}",
stat_pct(dsq),
stat_pct(greedy_local),
stat_pct(greedy_xnuma),
);
info!(
"kick_greedy={:5.2} rep={:5.2}",
stat_pct(kick_greedy),
stat_pct(repatriate),
);
info!(
"dl_clamped={:5.2} dl_preset={:5.2}",
stat_pct(dl_clamped),
stat_pct(dl_preset),
);
info!("slice_length={}us", self.tuner.slice_ns / 1000);
info!("direct_greedy_cpumask={}", self.tuner.direct_greedy_mask);
info!(" kick_greedy_cpumask={}", self.tuner.kick_greedy_mask);
self.metrics.task_errors.increment(stat(bpf_intf::stat_idx_RUSTY_STAT_TASK_GET_ERR));
self.metrics.lb_data_errors.increment(self.nr_lb_data_errors);
self.metrics.load_balance.increment(stat(bpf_intf::stat_idx_RUSTY_STAT_LOAD_BALANCE));
self.metrics.slice_length.set(self.tuner.slice_ns as f64 / 1000.0);
// We need to dynamically create the metrics for each node and domain
// because we don't know how many there are at compile time. Metrics
// will be cached and reused so this is not a performance issue.
for node in lb_stats.iter() {
info!("{}", node);
histogram!("load_avg", "node" => node.id.to_string())
.record(node.load.load_avg() as f64);
for dom in node.domains.iter() {
info!("{}", dom);
histogram!("load_avg", "node" => node.id.to_string(), "dom" => dom.id.to_string())
.record(dom.load.load_avg() as f64);
}
}
}
@ -613,6 +572,7 @@ impl<'a> Scheduler<'a> {
let started_at = Instant::now();
let bpf_stats = self.read_bpf_stats()?;
let cpu_busy = self.get_cpu_busy()?;
self.metrics.cpu_busy_pct.record(cpu_busy * 100.0);
let mut lb = LoadBalancer::new(
&mut self.skel,
@ -623,12 +583,11 @@ impl<'a> Scheduler<'a> {
);
lb.load_balance()?;
self.metrics.processing_duration.record(started_at.elapsed().as_micros() as f64);
let stats = lb.get_stats();
self.report(
&bpf_stats,
cpu_busy,
Instant::now().duration_since(started_at),
&stats,
);
@ -712,6 +671,11 @@ fn main() -> Result<()> {
PrometheusBuilder::new()
.install()
.expect("failed to install Prometheus recorder");
} else {
LogRecorderBuilder::new()
.with_reporting_interval(Duration::from_secs(3))
.install()
.expect("failed to install log recorder");
}
loop {