From 758f762058d653e9721896696a38db86e5cc4104 Mon Sep 17 00:00:00 2001 From: David Vernet Date: Mon, 26 Feb 2024 16:46:59 -0600 Subject: [PATCH] rusty: Move LoadBalancer out of rusty.rs More cleanup of scx_rusty. Let's move the LoadBalancer out of rusty.rs and into its own file. It will soon be extended quite a bit to support multi-NUMA and other multivariate LB cost functions, so it's time to clean things up and split it out. Signed-off-by: David Vernet --- scheds/rust/scx_rusty/src/main.rs | 479 +----------------------------- 1 file changed, 5 insertions(+), 474 deletions(-) diff --git a/scheds/rust/scx_rusty/src/main.rs b/scheds/rust/scx_rusty/src/main.rs index 54340fd..36616cb 100644 --- a/scheds/rust/scx_rusty/src/main.rs +++ b/scheds/rust/scx_rusty/src/main.rs @@ -12,19 +12,18 @@ use domain::DomainGroup; pub mod tuner; use tuner::Tuner; -#[macro_use] -extern crate static_assertions; +pub mod load_balance; +use load_balance::LoadBalancer; -use std::cell::Cell; -use std::collections::BTreeMap; -use std::ops::Bound::Included; -use std::ops::Bound::Unbounded; use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering; use std::sync::Arc; use std::time::Duration; use std::time::Instant; +#[macro_use] +extern crate static_assertions; + use ::fb_procfs as procfs; use anyhow::anyhow; use anyhow::bail; @@ -34,19 +33,12 @@ use clap::Parser; use libbpf_rs::skel::OpenSkel as _; use libbpf_rs::skel::Skel as _; use libbpf_rs::skel::SkelBuilder as _; -use log::debug; use log::info; -use log::trace; -use log::warn; -use ordered_float::OrderedFloat; use scx_utils::init_libbpf_logging; -use scx_utils::ravg::ravg_read; use scx_utils::uei_exited; use scx_utils::uei_report; use scx_utils::Topology; -use scx_utils::LoadAggregator; -const RAVG_FRAC_BITS: u32 = bpf_intf::ravg_consts_RAVG_FRAC_BITS; const MAX_DOMS: usize = bpf_intf::consts_MAX_DOMS as usize; const MAX_CPUS: usize = bpf_intf::consts_MAX_CPUS as usize; @@ -166,22 +158,6 @@ struct Opts { verbose: u8, } -fn now_monotonic() -> u64 { - let mut time = libc::timespec { - tv_sec: 0, - tv_nsec: 0, - }; - let ret = unsafe { libc::clock_gettime(libc::CLOCK_MONOTONIC, &mut time) }; - assert!(ret == 0); - time.tv_sec as u64 * 1_000_000_000 + time.tv_nsec as u64 -} - -fn clear_map(map: &libbpf_rs::Map) { - for key in map.keys() { - let _ = map.delete(&key); - } -} - fn format_cpumask(cpumask: &[u64], nr_cpus: usize) -> String { cpumask .iter() @@ -206,451 +182,6 @@ pub fn sub_or_zero(curr: &u64, prev: &u64) -> u64 { } } -fn approx_eq(a: f64, b: f64) -> bool { - (a - b).abs() <= 0.0001f64 -} - -#[derive(Debug)] -struct TaskInfo { - pid: i32, - dom_mask: u64, - migrated: Cell, - is_kworker: bool, -} - -struct LoadBalancer<'a, 'b, 'c> { - skel: &'a mut BpfSkel<'b>, - top: Arc, - dom_group: Arc, - skip_kworkers: bool, - - infeas_threshold: f64, - - tasks_by_load: Vec, TaskInfo>>>, - load_avg: f64, - dom_loads: Vec, - - imbal: Vec, - doms_to_push: BTreeMap, u32>, - doms_to_pull: BTreeMap, u32>, - - lb_apply_weight: bool, - nr_lb_data_errors: &'c mut u64, -} - -// Verify that the number of buckets is a factor of the maximum weight to -// ensure that the range of weight can be split evenly amongst every bucket. -const_assert_eq!(bpf_intf::consts_LB_MAX_WEIGHT % bpf_intf::consts_LB_LOAD_BUCKETS, 0); - - -impl<'a, 'b, 'c> LoadBalancer<'a, 'b, 'c> { - // If imbalance gets higher than this ratio, try to balance the loads. - const LOAD_IMBAL_HIGH_RATIO: f64 = 0.05; - - // Aim to transfer this fraction of the imbalance on each round. We want - // to be gradual to avoid unnecessary oscillations. While this can delay - // convergence, greedy execution should be able to bridge the temporary - // gap. - const LOAD_IMBAL_XFER_TARGET_RATIO: f64 = 0.50; - - // Don't push out more than this ratio of load on each round. While this - // overlaps with XFER_TARGET_RATIO, XFER_TARGET_RATIO only defines the - // target and doesn't limit the total load. As long as the transfer - // reduces load imbalance between the two involved domains, it'd happily - // transfer whatever amount that can be transferred. This limit is used - // as the safety cap to avoid draining a given domain too much in a - // single round. - const LOAD_IMBAL_PUSH_MAX_RATIO: f64 = 0.50; - - fn new( - skel: &'a mut BpfSkel<'b>, - top: Arc, - dom_group: Arc, - skip_kworkers: bool, - lb_apply_weight: bool, - nr_lb_data_errors: &'c mut u64, - ) -> Self { - Self { - skel, - skip_kworkers, - - infeas_threshold: bpf_intf::consts_LB_MAX_WEIGHT as f64, - - tasks_by_load: (0..dom_group.nr_doms()).map(|_| None).collect(), - load_avg: 0f64, - dom_loads: vec![0.0; dom_group.nr_doms()], - - imbal: vec![0.0; dom_group.nr_doms()], - doms_to_pull: BTreeMap::new(), - doms_to_push: BTreeMap::new(), - - lb_apply_weight: lb_apply_weight.clone(), - nr_lb_data_errors, - - top, - dom_group, - } - } - - fn bucket_range(&self, bucket: u64) -> (f64, f64) { - const MAX_WEIGHT: u64 = bpf_intf::consts_LB_MAX_WEIGHT as u64; - const NUM_BUCKETS: u64 = bpf_intf::consts_LB_LOAD_BUCKETS as u64; - const WEIGHT_PER_BUCKET: u64 = MAX_WEIGHT / NUM_BUCKETS; - - if bucket >= NUM_BUCKETS { - panic!("Invalid bucket {}, max {}", bucket, NUM_BUCKETS); - } - - // w_x = [1 + (10000 * x) / N, 10000 * (x + 1) / N] - let min_w = 1 + (MAX_WEIGHT * bucket) / NUM_BUCKETS; - let max_w = min_w + WEIGHT_PER_BUCKET - 1; - - (min_w as f64, max_w as f64) - } - - fn bucket_weight(&self, bucket: u64) -> usize { - const WEIGHT_PER_BUCKET: f64 = bpf_intf::consts_LB_WEIGHT_PER_BUCKET as f64; - let (min_weight, _) = self.bucket_range(bucket); - - // Use the mid-point of the bucket when determining weight - (min_weight + (WEIGHT_PER_BUCKET / 2.0f64)).floor() as usize - } - - fn read_dom_loads(&mut self) -> Result<()> { - const NUM_BUCKETS: u64 = bpf_intf::consts_LB_LOAD_BUCKETS as u64; - let now_mono = now_monotonic(); - let load_half_life = self.skel.rodata().load_half_life; - let maps = self.skel.maps(); - let dom_data = maps.dom_data(); - - let mut aggregator = LoadAggregator::new(self.top.nr_cpus(), !self.lb_apply_weight.clone()); - - // Accumulate dcycle and load across all domains and buckets. If we're - // under-utilized, or there are no infeasible weights, this is - // sufficient to collect all of the data we need for load balancing. - for dom in 0..self.dom_group.nr_doms() { - let dom_key = unsafe { std::mem::transmute::(dom as u32) }; - - if let Some(dom_ctx_map_elem) = dom_data - .lookup(&dom_key, libbpf_rs::MapFlags::ANY) - .context("Failed to lookup dom_ctx")? - { - let dom_ctx = - unsafe { &*(dom_ctx_map_elem.as_slice().as_ptr() as *const bpf_intf::dom_ctx) }; - - for bucket in 0..NUM_BUCKETS { - let bucket_ctx = dom_ctx.buckets[bucket as usize]; - let rd = &bucket_ctx.rd; - let duty_cycle = ravg_read( - rd.val, - rd.val_at, - rd.old, - rd.cur, - now_mono, - load_half_life, - RAVG_FRAC_BITS, - ); - - if approx_eq(0.0, duty_cycle) { - continue; - } - - let weight = self.bucket_weight(bucket); - aggregator.record_dom_load(dom, weight, duty_cycle)?; - } - } - } - - let ledger = aggregator.calculate(); - if !self.lb_apply_weight { - // System is under-utilized, so just use dcycle instead of load. - self.load_avg = ledger.global_dcycle_sum() / self.dom_group.nr_doms() as f64; - self.dom_loads = ledger.dom_dcycle_sums().to_vec(); - } else { - self.load_avg = ledger.global_load_sum() / self.dom_group.nr_doms() as f64; - self.dom_loads = ledger.dom_load_sums().to_vec(); - self.infeas_threshold = ledger.effective_max_weight(); - } - - Ok(()) - } - - /// To balance dom loads, identify doms with lower and higher load than - /// average. - fn calculate_dom_load_balance(&mut self) -> Result<()> { - let mode = if self.lb_apply_weight { - "weighted" - } else { - "dcycle" - }; - - debug!("mode= {} load_avg= {:.2} infeasible_thresh= {:.2}", - mode, self.load_avg, self.infeas_threshold); - for (dom, dom_load) in self.dom_loads.iter().enumerate() { - let imbal = dom_load - self.load_avg; - if imbal.abs() >= self.load_avg * Self::LOAD_IMBAL_HIGH_RATIO { - if imbal > 0f64 { - self.doms_to_push.insert(OrderedFloat(imbal), dom as u32); - } else { - self.doms_to_pull.insert(OrderedFloat(-imbal), dom as u32); - } - self.imbal[dom] = imbal; - } - } - Ok(()) - } - - /// @dom needs to push out tasks to balance loads. Make sure its - /// tasks_by_load is populated so that the victim tasks can be picked. - fn populate_tasks_by_load(&mut self, dom: u32) -> Result<()> { - if self.tasks_by_load[dom as usize].is_some() { - return Ok(()); - } - - // Read active_pids and update write_idx and gen. - // - // XXX - We can't read task_ctx inline because self.skel.bss() - // borrows mutably and thus conflicts with self.skel.maps(). - const MAX_PIDS: u64 = bpf_intf::consts_MAX_DOM_ACTIVE_PIDS as u64; - let active_pids = &mut self.skel.bss_mut().dom_active_pids[dom as usize]; - let mut pids = vec![]; - - let (mut ridx, widx) = (active_pids.read_idx, active_pids.write_idx); - if widx - ridx > MAX_PIDS { - ridx = widx - MAX_PIDS; - } - - for idx in ridx..widx { - let pid = active_pids.pids[(idx % MAX_PIDS) as usize]; - pids.push(pid); - } - - active_pids.read_idx = active_pids.write_idx; - active_pids.gen += 1; - - // Read task_ctx and load. - let load_half_life = self.skel.rodata().load_half_life; - let maps = self.skel.maps(); - let task_data = maps.task_data(); - let now_mono = now_monotonic(); - let mut tasks_by_load = BTreeMap::new(); - - for pid in pids.iter() { - let key = unsafe { std::mem::transmute::(*pid) }; - - if let Some(task_data_elem) = task_data.lookup(&key, libbpf_rs::MapFlags::ANY)? { - let task_ctx = - unsafe { &*(task_data_elem.as_slice().as_ptr() as *const bpf_intf::task_ctx) }; - if task_ctx.dom_id != dom { - continue; - } - - let weight = (task_ctx.weight as f64).min(self.infeas_threshold); - - let rd = &task_ctx.dcyc_rd; - let load = weight - * ravg_read( - rd.val, - rd.val_at, - rd.old, - rd.cur, - now_mono, - load_half_life, - RAVG_FRAC_BITS, - ); - - tasks_by_load.insert( - OrderedFloat(load), - TaskInfo { - pid: *pid, - dom_mask: task_ctx.dom_mask, - migrated: Cell::new(false), - is_kworker: task_ctx.is_kworker, - }, - ); - } - } - - debug!( - "DOM[{:02}] read load for {} tasks", - dom, - &tasks_by_load.len(), - ); - trace!("DOM[{:02}] tasks_by_load={:?}", dom, &tasks_by_load); - - self.tasks_by_load[dom as usize] = Some(tasks_by_load); - Ok(()) - } - - // Find the first candidate pid which hasn't already been migrated and - // can run in @pull_dom. - fn find_first_candidate<'d, I>( - tasks_by_load: I, - pull_dom: u32, - skip_kworkers: bool, - ) -> Option<(f64, &'d TaskInfo)> - where - I: IntoIterator, &'d TaskInfo)>, - { - match tasks_by_load - .into_iter() - .skip_while(|(_, task)| { - task.migrated.get() - || (task.dom_mask & (1 << pull_dom) == 0) - || (skip_kworkers && task.is_kworker) - }) - .next() - { - Some((OrderedFloat(load), task)) => Some((*load, task)), - None => None, - } - } - - fn pick_victim( - &mut self, - (push_dom, to_push): (u32, f64), - (pull_dom, to_pull): (u32, f64), - ) -> Result> { - let to_xfer = to_pull.min(to_push) * Self::LOAD_IMBAL_XFER_TARGET_RATIO; - - debug!( - "considering dom {}@{:.2} -> {}@{:.2}", - push_dom, to_push, pull_dom, to_pull - ); - - let calc_new_imbal = |xfer: f64| (to_push - xfer).abs() + (to_pull - xfer).abs(); - - self.populate_tasks_by_load(push_dom)?; - - // We want to pick a task to transfer from push_dom to pull_dom to - // reduce the load imbalance between the two closest to $to_xfer. - // IOW, pick a task which has the closest load value to $to_xfer - // that can be migrated. Find such task by locating the first - // migratable task while scanning left from $to_xfer and the - // counterpart while scanning right and picking the better of the - // two. - let (load, task, new_imbal) = match ( - Self::find_first_candidate( - self.tasks_by_load[push_dom as usize] - .as_ref() - .unwrap() - .range((Unbounded, Included(&OrderedFloat(to_xfer)))) - .rev(), - pull_dom, - self.skip_kworkers, - ), - Self::find_first_candidate( - self.tasks_by_load[push_dom as usize] - .as_ref() - .unwrap() - .range((Included(&OrderedFloat(to_xfer)), Unbounded)), - pull_dom, - self.skip_kworkers, - ), - ) { - (None, None) => return Ok(None), - (Some((load, task)), None) | (None, Some((load, task))) => { - (load, task, calc_new_imbal(load)) - } - (Some((load0, task0)), Some((load1, task1))) => { - let (new_imbal0, new_imbal1) = (calc_new_imbal(load0), calc_new_imbal(load1)); - if new_imbal0 <= new_imbal1 { - (load0, task0, new_imbal0) - } else { - (load1, task1, new_imbal1) - } - } - }; - - // If the best candidate can't reduce the imbalance, there's nothing - // to do for this pair. - let old_imbal = to_push + to_pull; - if old_imbal < new_imbal { - debug!( - "skipping pid {}, dom {} -> {} won't improve imbal {:.2} -> {:.2}", - task.pid, push_dom, pull_dom, old_imbal, new_imbal - ); - return Ok(None); - } - - debug!( - "migrating pid {}, dom {} -> {}, imbal={:.2} -> {:.2}", - task.pid, push_dom, pull_dom, old_imbal, new_imbal, - ); - - Ok(Some((task, load))) - } - - // Actually execute the load balancing. Concretely this writes pid -> dom - // entries into the lb_data map for bpf side to consume. - fn load_balance(&mut self) -> Result<()> { - clear_map(self.skel.maps().lb_data()); - - debug!("imbal={:?}", &self.imbal); - debug!("doms_to_push={:?}", &self.doms_to_push); - debug!("doms_to_pull={:?}", &self.doms_to_pull); - - // Push from the most imbalanced to least. - while let Some((OrderedFloat(mut to_push), push_dom)) = self.doms_to_push.pop_last() { - let push_max = self.dom_loads[push_dom as usize] * Self::LOAD_IMBAL_PUSH_MAX_RATIO; - let mut pushed = 0f64; - - // Transfer tasks from push_dom to reduce imbalance. - loop { - let last_pushed = pushed; - - // Pull from the most imbalanced to least. - let mut doms_to_pull = BTreeMap::<_, _>::new(); - std::mem::swap(&mut self.doms_to_pull, &mut doms_to_pull); - let mut pull_doms = doms_to_pull.into_iter().rev().collect::>(); - - for (to_pull, pull_dom) in pull_doms.iter_mut() { - if let Some((task, load)) = - self.pick_victim((push_dom, to_push), (*pull_dom, f64::from(*to_pull)))? - { - // Execute migration. - task.migrated.set(true); - to_push -= load; - *to_pull -= load; - pushed += load; - - // Ask BPF code to execute the migration. - let pid = task.pid; - let cpid = (pid as libc::pid_t).to_ne_bytes(); - if let Err(e) = self.skel.maps_mut().lb_data().update( - &cpid, - &pull_dom.to_ne_bytes(), - libbpf_rs::MapFlags::NO_EXIST, - ) { - warn!( - "Failed to update lb_data map for pid={} error={:?}", - pid, &e - ); - *self.nr_lb_data_errors += 1; - } - - // Always break after a successful migration so that - // the pulling domains are always considered in the - // descending imbalance order. - break; - } - } - - pull_doms - .into_iter() - .map(|(k, v)| self.doms_to_pull.insert(k, v)) - .count(); - - // Stop repeating if nothing got transferred or pushed enough. - if pushed == last_pushed || pushed >= push_max { - break; - } - } - } - Ok(()) - } -} - struct Scheduler<'a> { skel: BpfSkel<'a>, struct_ops: Option,