rusty: Move LoadBalancer out of rusty.rs

More cleanup of scx_rusty. Let's move the LoadBalancer out of rusty.rs and into
its own file. It will soon be extended quite a bit to support multi-NUMA and
other multivariate LB cost functions, so it's time to clean things up and split
it out.

Signed-off-by: David Vernet <void@manifault.com>
This commit is contained in:
David Vernet 2024-02-26 16:46:59 -06:00
parent 94f75bcec6
commit 758f762058
No known key found for this signature in database
GPG Key ID: 59E4B86965C4F364

View File

@ -12,19 +12,18 @@ use domain::DomainGroup;
pub mod tuner;
use tuner::Tuner;
#[macro_use]
extern crate static_assertions;
pub mod load_balance;
use load_balance::LoadBalancer;
use std::cell::Cell;
use std::collections::BTreeMap;
use std::ops::Bound::Included;
use std::ops::Bound::Unbounded;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;
#[macro_use]
extern crate static_assertions;
use ::fb_procfs as procfs;
use anyhow::anyhow;
use anyhow::bail;
@ -34,19 +33,12 @@ use clap::Parser;
use libbpf_rs::skel::OpenSkel as _;
use libbpf_rs::skel::Skel as _;
use libbpf_rs::skel::SkelBuilder as _;
use log::debug;
use log::info;
use log::trace;
use log::warn;
use ordered_float::OrderedFloat;
use scx_utils::init_libbpf_logging;
use scx_utils::ravg::ravg_read;
use scx_utils::uei_exited;
use scx_utils::uei_report;
use scx_utils::Topology;
use scx_utils::LoadAggregator;
const RAVG_FRAC_BITS: u32 = bpf_intf::ravg_consts_RAVG_FRAC_BITS;
const MAX_DOMS: usize = bpf_intf::consts_MAX_DOMS as usize;
const MAX_CPUS: usize = bpf_intf::consts_MAX_CPUS as usize;
@ -166,22 +158,6 @@ struct Opts {
verbose: u8,
}
fn now_monotonic() -> u64 {
let mut time = libc::timespec {
tv_sec: 0,
tv_nsec: 0,
};
let ret = unsafe { libc::clock_gettime(libc::CLOCK_MONOTONIC, &mut time) };
assert!(ret == 0);
time.tv_sec as u64 * 1_000_000_000 + time.tv_nsec as u64
}
fn clear_map(map: &libbpf_rs::Map) {
for key in map.keys() {
let _ = map.delete(&key);
}
}
fn format_cpumask(cpumask: &[u64], nr_cpus: usize) -> String {
cpumask
.iter()
@ -206,451 +182,6 @@ pub fn sub_or_zero(curr: &u64, prev: &u64) -> u64 {
}
}
fn approx_eq(a: f64, b: f64) -> bool {
(a - b).abs() <= 0.0001f64
}
#[derive(Debug)]
struct TaskInfo {
pid: i32,
dom_mask: u64,
migrated: Cell<bool>,
is_kworker: bool,
}
struct LoadBalancer<'a, 'b, 'c> {
skel: &'a mut BpfSkel<'b>,
top: Arc<Topology>,
dom_group: Arc<DomainGroup>,
skip_kworkers: bool,
infeas_threshold: f64,
tasks_by_load: Vec<Option<BTreeMap<OrderedFloat<f64>, TaskInfo>>>,
load_avg: f64,
dom_loads: Vec<f64>,
imbal: Vec<f64>,
doms_to_push: BTreeMap<OrderedFloat<f64>, u32>,
doms_to_pull: BTreeMap<OrderedFloat<f64>, u32>,
lb_apply_weight: bool,
nr_lb_data_errors: &'c mut u64,
}
// Verify that the number of buckets is a factor of the maximum weight to
// ensure that the range of weight can be split evenly amongst every bucket.
const_assert_eq!(bpf_intf::consts_LB_MAX_WEIGHT % bpf_intf::consts_LB_LOAD_BUCKETS, 0);
impl<'a, 'b, 'c> LoadBalancer<'a, 'b, 'c> {
// If imbalance gets higher than this ratio, try to balance the loads.
const LOAD_IMBAL_HIGH_RATIO: f64 = 0.05;
// Aim to transfer this fraction of the imbalance on each round. We want
// to be gradual to avoid unnecessary oscillations. While this can delay
// convergence, greedy execution should be able to bridge the temporary
// gap.
const LOAD_IMBAL_XFER_TARGET_RATIO: f64 = 0.50;
// Don't push out more than this ratio of load on each round. While this
// overlaps with XFER_TARGET_RATIO, XFER_TARGET_RATIO only defines the
// target and doesn't limit the total load. As long as the transfer
// reduces load imbalance between the two involved domains, it'd happily
// transfer whatever amount that can be transferred. This limit is used
// as the safety cap to avoid draining a given domain too much in a
// single round.
const LOAD_IMBAL_PUSH_MAX_RATIO: f64 = 0.50;
fn new(
skel: &'a mut BpfSkel<'b>,
top: Arc<Topology>,
dom_group: Arc<DomainGroup>,
skip_kworkers: bool,
lb_apply_weight: bool,
nr_lb_data_errors: &'c mut u64,
) -> Self {
Self {
skel,
skip_kworkers,
infeas_threshold: bpf_intf::consts_LB_MAX_WEIGHT as f64,
tasks_by_load: (0..dom_group.nr_doms()).map(|_| None).collect(),
load_avg: 0f64,
dom_loads: vec![0.0; dom_group.nr_doms()],
imbal: vec![0.0; dom_group.nr_doms()],
doms_to_pull: BTreeMap::new(),
doms_to_push: BTreeMap::new(),
lb_apply_weight: lb_apply_weight.clone(),
nr_lb_data_errors,
top,
dom_group,
}
}
fn bucket_range(&self, bucket: u64) -> (f64, f64) {
const MAX_WEIGHT: u64 = bpf_intf::consts_LB_MAX_WEIGHT as u64;
const NUM_BUCKETS: u64 = bpf_intf::consts_LB_LOAD_BUCKETS as u64;
const WEIGHT_PER_BUCKET: u64 = MAX_WEIGHT / NUM_BUCKETS;
if bucket >= NUM_BUCKETS {
panic!("Invalid bucket {}, max {}", bucket, NUM_BUCKETS);
}
// w_x = [1 + (10000 * x) / N, 10000 * (x + 1) / N]
let min_w = 1 + (MAX_WEIGHT * bucket) / NUM_BUCKETS;
let max_w = min_w + WEIGHT_PER_BUCKET - 1;
(min_w as f64, max_w as f64)
}
fn bucket_weight(&self, bucket: u64) -> usize {
const WEIGHT_PER_BUCKET: f64 = bpf_intf::consts_LB_WEIGHT_PER_BUCKET as f64;
let (min_weight, _) = self.bucket_range(bucket);
// Use the mid-point of the bucket when determining weight
(min_weight + (WEIGHT_PER_BUCKET / 2.0f64)).floor() as usize
}
fn read_dom_loads(&mut self) -> Result<()> {
const NUM_BUCKETS: u64 = bpf_intf::consts_LB_LOAD_BUCKETS as u64;
let now_mono = now_monotonic();
let load_half_life = self.skel.rodata().load_half_life;
let maps = self.skel.maps();
let dom_data = maps.dom_data();
let mut aggregator = LoadAggregator::new(self.top.nr_cpus(), !self.lb_apply_weight.clone());
// Accumulate dcycle and load across all domains and buckets. If we're
// under-utilized, or there are no infeasible weights, this is
// sufficient to collect all of the data we need for load balancing.
for dom in 0..self.dom_group.nr_doms() {
let dom_key = unsafe { std::mem::transmute::<u32, [u8; 4]>(dom as u32) };
if let Some(dom_ctx_map_elem) = dom_data
.lookup(&dom_key, libbpf_rs::MapFlags::ANY)
.context("Failed to lookup dom_ctx")?
{
let dom_ctx =
unsafe { &*(dom_ctx_map_elem.as_slice().as_ptr() as *const bpf_intf::dom_ctx) };
for bucket in 0..NUM_BUCKETS {
let bucket_ctx = dom_ctx.buckets[bucket as usize];
let rd = &bucket_ctx.rd;
let duty_cycle = ravg_read(
rd.val,
rd.val_at,
rd.old,
rd.cur,
now_mono,
load_half_life,
RAVG_FRAC_BITS,
);
if approx_eq(0.0, duty_cycle) {
continue;
}
let weight = self.bucket_weight(bucket);
aggregator.record_dom_load(dom, weight, duty_cycle)?;
}
}
}
let ledger = aggregator.calculate();
if !self.lb_apply_weight {
// System is under-utilized, so just use dcycle instead of load.
self.load_avg = ledger.global_dcycle_sum() / self.dom_group.nr_doms() as f64;
self.dom_loads = ledger.dom_dcycle_sums().to_vec();
} else {
self.load_avg = ledger.global_load_sum() / self.dom_group.nr_doms() as f64;
self.dom_loads = ledger.dom_load_sums().to_vec();
self.infeas_threshold = ledger.effective_max_weight();
}
Ok(())
}
/// To balance dom loads, identify doms with lower and higher load than
/// average.
fn calculate_dom_load_balance(&mut self) -> Result<()> {
let mode = if self.lb_apply_weight {
"weighted"
} else {
"dcycle"
};
debug!("mode= {} load_avg= {:.2} infeasible_thresh= {:.2}",
mode, self.load_avg, self.infeas_threshold);
for (dom, dom_load) in self.dom_loads.iter().enumerate() {
let imbal = dom_load - self.load_avg;
if imbal.abs() >= self.load_avg * Self::LOAD_IMBAL_HIGH_RATIO {
if imbal > 0f64 {
self.doms_to_push.insert(OrderedFloat(imbal), dom as u32);
} else {
self.doms_to_pull.insert(OrderedFloat(-imbal), dom as u32);
}
self.imbal[dom] = imbal;
}
}
Ok(())
}
/// @dom needs to push out tasks to balance loads. Make sure its
/// tasks_by_load is populated so that the victim tasks can be picked.
fn populate_tasks_by_load(&mut self, dom: u32) -> Result<()> {
if self.tasks_by_load[dom as usize].is_some() {
return Ok(());
}
// Read active_pids and update write_idx and gen.
//
// XXX - We can't read task_ctx inline because self.skel.bss()
// borrows mutably and thus conflicts with self.skel.maps().
const MAX_PIDS: u64 = bpf_intf::consts_MAX_DOM_ACTIVE_PIDS as u64;
let active_pids = &mut self.skel.bss_mut().dom_active_pids[dom as usize];
let mut pids = vec![];
let (mut ridx, widx) = (active_pids.read_idx, active_pids.write_idx);
if widx - ridx > MAX_PIDS {
ridx = widx - MAX_PIDS;
}
for idx in ridx..widx {
let pid = active_pids.pids[(idx % MAX_PIDS) as usize];
pids.push(pid);
}
active_pids.read_idx = active_pids.write_idx;
active_pids.gen += 1;
// Read task_ctx and load.
let load_half_life = self.skel.rodata().load_half_life;
let maps = self.skel.maps();
let task_data = maps.task_data();
let now_mono = now_monotonic();
let mut tasks_by_load = BTreeMap::new();
for pid in pids.iter() {
let key = unsafe { std::mem::transmute::<i32, [u8; 4]>(*pid) };
if let Some(task_data_elem) = task_data.lookup(&key, libbpf_rs::MapFlags::ANY)? {
let task_ctx =
unsafe { &*(task_data_elem.as_slice().as_ptr() as *const bpf_intf::task_ctx) };
if task_ctx.dom_id != dom {
continue;
}
let weight = (task_ctx.weight as f64).min(self.infeas_threshold);
let rd = &task_ctx.dcyc_rd;
let load = weight
* ravg_read(
rd.val,
rd.val_at,
rd.old,
rd.cur,
now_mono,
load_half_life,
RAVG_FRAC_BITS,
);
tasks_by_load.insert(
OrderedFloat(load),
TaskInfo {
pid: *pid,
dom_mask: task_ctx.dom_mask,
migrated: Cell::new(false),
is_kworker: task_ctx.is_kworker,
},
);
}
}
debug!(
"DOM[{:02}] read load for {} tasks",
dom,
&tasks_by_load.len(),
);
trace!("DOM[{:02}] tasks_by_load={:?}", dom, &tasks_by_load);
self.tasks_by_load[dom as usize] = Some(tasks_by_load);
Ok(())
}
// Find the first candidate pid which hasn't already been migrated and
// can run in @pull_dom.
fn find_first_candidate<'d, I>(
tasks_by_load: I,
pull_dom: u32,
skip_kworkers: bool,
) -> Option<(f64, &'d TaskInfo)>
where
I: IntoIterator<Item = (&'d OrderedFloat<f64>, &'d TaskInfo)>,
{
match tasks_by_load
.into_iter()
.skip_while(|(_, task)| {
task.migrated.get()
|| (task.dom_mask & (1 << pull_dom) == 0)
|| (skip_kworkers && task.is_kworker)
})
.next()
{
Some((OrderedFloat(load), task)) => Some((*load, task)),
None => None,
}
}
fn pick_victim(
&mut self,
(push_dom, to_push): (u32, f64),
(pull_dom, to_pull): (u32, f64),
) -> Result<Option<(&TaskInfo, f64)>> {
let to_xfer = to_pull.min(to_push) * Self::LOAD_IMBAL_XFER_TARGET_RATIO;
debug!(
"considering dom {}@{:.2} -> {}@{:.2}",
push_dom, to_push, pull_dom, to_pull
);
let calc_new_imbal = |xfer: f64| (to_push - xfer).abs() + (to_pull - xfer).abs();
self.populate_tasks_by_load(push_dom)?;
// We want to pick a task to transfer from push_dom to pull_dom to
// reduce the load imbalance between the two closest to $to_xfer.
// IOW, pick a task which has the closest load value to $to_xfer
// that can be migrated. Find such task by locating the first
// migratable task while scanning left from $to_xfer and the
// counterpart while scanning right and picking the better of the
// two.
let (load, task, new_imbal) = match (
Self::find_first_candidate(
self.tasks_by_load[push_dom as usize]
.as_ref()
.unwrap()
.range((Unbounded, Included(&OrderedFloat(to_xfer))))
.rev(),
pull_dom,
self.skip_kworkers,
),
Self::find_first_candidate(
self.tasks_by_load[push_dom as usize]
.as_ref()
.unwrap()
.range((Included(&OrderedFloat(to_xfer)), Unbounded)),
pull_dom,
self.skip_kworkers,
),
) {
(None, None) => return Ok(None),
(Some((load, task)), None) | (None, Some((load, task))) => {
(load, task, calc_new_imbal(load))
}
(Some((load0, task0)), Some((load1, task1))) => {
let (new_imbal0, new_imbal1) = (calc_new_imbal(load0), calc_new_imbal(load1));
if new_imbal0 <= new_imbal1 {
(load0, task0, new_imbal0)
} else {
(load1, task1, new_imbal1)
}
}
};
// If the best candidate can't reduce the imbalance, there's nothing
// to do for this pair.
let old_imbal = to_push + to_pull;
if old_imbal < new_imbal {
debug!(
"skipping pid {}, dom {} -> {} won't improve imbal {:.2} -> {:.2}",
task.pid, push_dom, pull_dom, old_imbal, new_imbal
);
return Ok(None);
}
debug!(
"migrating pid {}, dom {} -> {}, imbal={:.2} -> {:.2}",
task.pid, push_dom, pull_dom, old_imbal, new_imbal,
);
Ok(Some((task, load)))
}
// Actually execute the load balancing. Concretely this writes pid -> dom
// entries into the lb_data map for bpf side to consume.
fn load_balance(&mut self) -> Result<()> {
clear_map(self.skel.maps().lb_data());
debug!("imbal={:?}", &self.imbal);
debug!("doms_to_push={:?}", &self.doms_to_push);
debug!("doms_to_pull={:?}", &self.doms_to_pull);
// Push from the most imbalanced to least.
while let Some((OrderedFloat(mut to_push), push_dom)) = self.doms_to_push.pop_last() {
let push_max = self.dom_loads[push_dom as usize] * Self::LOAD_IMBAL_PUSH_MAX_RATIO;
let mut pushed = 0f64;
// Transfer tasks from push_dom to reduce imbalance.
loop {
let last_pushed = pushed;
// Pull from the most imbalanced to least.
let mut doms_to_pull = BTreeMap::<_, _>::new();
std::mem::swap(&mut self.doms_to_pull, &mut doms_to_pull);
let mut pull_doms = doms_to_pull.into_iter().rev().collect::<Vec<(_, _)>>();
for (to_pull, pull_dom) in pull_doms.iter_mut() {
if let Some((task, load)) =
self.pick_victim((push_dom, to_push), (*pull_dom, f64::from(*to_pull)))?
{
// Execute migration.
task.migrated.set(true);
to_push -= load;
*to_pull -= load;
pushed += load;
// Ask BPF code to execute the migration.
let pid = task.pid;
let cpid = (pid as libc::pid_t).to_ne_bytes();
if let Err(e) = self.skel.maps_mut().lb_data().update(
&cpid,
&pull_dom.to_ne_bytes(),
libbpf_rs::MapFlags::NO_EXIST,
) {
warn!(
"Failed to update lb_data map for pid={} error={:?}",
pid, &e
);
*self.nr_lb_data_errors += 1;
}
// Always break after a successful migration so that
// the pulling domains are always considered in the
// descending imbalance order.
break;
}
}
pull_doms
.into_iter()
.map(|(k, v)| self.doms_to_pull.insert(k, v))
.count();
// Stop repeating if nothing got transferred or pushed enough.
if pushed == last_pushed || pushed >= push_max {
break;
}
}
}
Ok(())
}
}
struct Scheduler<'a> {
skel: BpfSkel<'a>,
struct_ops: Option<libbpf_rs::Link>,