Merge pull request #129 from sched-ext/infeasible_weights

Implement solution to infeasible weights problem
This commit is contained in:
David Vernet 2024-02-09 16:23:56 -06:00 committed by GitHub
commit 1c00de9402
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 488 additions and 168 deletions

View File

@ -34,7 +34,7 @@ struct ravg_data {
/*
* Accumulated value of the current period. Input value is 48bits and we
* normalize half-life to 16bit, so it should fit in an u64.
* normalize half-life to 16bit, so it should fit in a u64.
*/
u64 cur;
};

View File

@ -19,6 +19,7 @@ log = "0.4.17"
ordered-float = "3.4.0"
scx_utils = { path = "../../../rust/scx_utils", version = "0.6" }
simplelog = "0.12.0"
static_assertions = "1.1.0"
[build-dependencies]
scx_utils = { path = "../../../rust/scx_utils", version = "0.6" }

View File

@ -26,6 +26,12 @@ enum consts {
MAX_DOMS = 64, /* limited to avoid complex bitmask ops */
CACHELINE_SIZE = 64,
LB_DEFAULT_WEIGHT = 100,
LB_MIN_WEIGHT = 1,
LB_MAX_WEIGHT = 10000,
LB_LOAD_BUCKETS = 100, /* Must be a factor of LB_MAX_WEIGHT */
LB_WEIGHT_PER_BUCKET = LB_MAX_WEIGHT / LB_LOAD_BUCKETS,
/*
* When userspace load balancer is trying to determine the tasks to push
* out from an overloaded domain, it looks at the the following number
@ -84,14 +90,18 @@ struct task_ctx {
struct ravg_data dcyc_rd;
};
struct bucket_ctx {
u64 dcycle;
struct ravg_data rd;
};
struct dom_ctx {
u64 vtime_now;
struct bpf_cpumask __kptr *cpumask;
struct bpf_cpumask __kptr *direct_greedy_cpumask;
u64 load;
struct ravg_data load_rd;
u64 dbg_load_printed_at;
u64 dbg_dcycle_printed_at;
struct bucket_ctx buckets[LB_LOAD_BUCKETS];
};
#endif /* __INTF_H */

View File

@ -104,9 +104,9 @@ struct {
__uint(type, BPF_MAP_TYPE_ARRAY);
__type(key, u32);
__type(value, struct lock_wrapper);
__uint(max_entries, MAX_DOMS);
__uint(max_entries, MAX_DOMS * LB_LOAD_BUCKETS);
__uint(map_flags, 0);
} dom_load_locks SEC(".maps");
} dom_dcycle_locks SEC(".maps");
struct dom_active_pids {
u64 gen;
@ -119,128 +119,6 @@ struct dom_active_pids dom_active_pids[MAX_DOMS];
const u64 ravg_1 = 1 << RAVG_FRAC_BITS;
static void dom_load_adj(u32 dom_id, s64 adj, u64 now)
{
struct dom_ctx *domc;
struct lock_wrapper *lockw;
domc = bpf_map_lookup_elem(&dom_data, &dom_id);
lockw = bpf_map_lookup_elem(&dom_load_locks, &dom_id);
if (!domc || !lockw) {
scx_bpf_error("dom_ctx / lock lookup failed");
return;
}
bpf_spin_lock(&lockw->lock);
domc->load += adj;
ravg_accumulate(&domc->load_rd, domc->load, now, load_half_life);
bpf_spin_unlock(&lockw->lock);
if (adj < 0 && (s64)domc->load < 0)
scx_bpf_error("cpu%d dom%u load underflow (load=%lld adj=%lld)",
bpf_get_smp_processor_id(), dom_id, domc->load, adj);
if (debug >=2 &&
(!domc->dbg_load_printed_at || now - domc->dbg_load_printed_at >= 1000000000)) {
bpf_printk("LOAD ADJ dom=%u adj=%lld load=%llu",
dom_id,
adj,
ravg_read(&domc->load_rd, now, load_half_life) >> RAVG_FRAC_BITS);
domc->dbg_load_printed_at = now;
}
}
static void dom_load_xfer_task(struct task_struct *p, struct task_ctx *taskc,
u32 from_dom_id, u32 to_dom_id, u64 now)
{
struct dom_ctx *from_domc, *to_domc;
struct lock_wrapper *from_lockw, *to_lockw;
struct ravg_data task_load_rd;
u64 from_load[2], to_load[2], task_load;
from_domc = bpf_map_lookup_elem(&dom_data, &from_dom_id);
from_lockw = bpf_map_lookup_elem(&dom_load_locks, &from_dom_id);
to_domc = bpf_map_lookup_elem(&dom_data, &to_dom_id);
to_lockw = bpf_map_lookup_elem(&dom_load_locks, &to_dom_id);
if (!from_domc || !from_lockw || !to_domc || !to_lockw) {
scx_bpf_error("dom_ctx / lock lookup failed");
return;
}
/*
* @p is moving from @from_dom_id to @to_dom_id. Its load contribution
* should be moved together. We only track duty cycle for tasks. Scale
* it by weight to get load_rd.
*/
ravg_accumulate(&taskc->dcyc_rd, taskc->runnable, now, load_half_life);
task_load_rd = taskc->dcyc_rd;
ravg_scale(&task_load_rd, p->scx.weight, 0);
if (debug >= 2)
task_load = ravg_read(&task_load_rd, now, load_half_life);
/* transfer out of @from_dom_id */
bpf_spin_lock(&from_lockw->lock);
if (taskc->runnable)
from_domc->load -= p->scx.weight;
if (debug >= 2)
from_load[0] = ravg_read(&from_domc->load_rd, now, load_half_life);
ravg_transfer(&from_domc->load_rd, from_domc->load,
&task_load_rd, taskc->runnable, load_half_life, false);
if (debug >= 2)
from_load[1] = ravg_read(&from_domc->load_rd, now, load_half_life);
bpf_spin_unlock(&from_lockw->lock);
/* transfer into @to_dom_id */
bpf_spin_lock(&to_lockw->lock);
if (taskc->runnable)
to_domc->load += p->scx.weight;
if (debug >= 2)
to_load[0] = ravg_read(&to_domc->load_rd, now, load_half_life);
ravg_transfer(&to_domc->load_rd, to_domc->load,
&task_load_rd, taskc->runnable, load_half_life, true);
if (debug >= 2)
to_load[1] = ravg_read(&to_domc->load_rd, now, load_half_life);
bpf_spin_unlock(&to_lockw->lock);
if (debug >= 2)
bpf_printk("XFER dom%u->%u task=%lu from=%lu->%lu to=%lu->%lu",
from_dom_id, to_dom_id,
task_load >> RAVG_FRAC_BITS,
from_load[0] >> RAVG_FRAC_BITS,
from_load[1] >> RAVG_FRAC_BITS,
to_load[0] >> RAVG_FRAC_BITS,
to_load[1] >> RAVG_FRAC_BITS);
}
/*
* Statistics
*/
struct {
__uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
__uint(key_size, sizeof(u32));
__uint(value_size, sizeof(u64));
__uint(max_entries, RUSTY_NR_STATS);
} stats SEC(".maps");
static inline void stat_add(enum stat_idx idx, u64 addend)
{
u32 idx_v = idx;
u64 *cnt_p = bpf_map_lookup_elem(&stats, &idx_v);
if (cnt_p)
(*cnt_p) += addend;
}
/* Map pid -> task_ctx */
struct {
__uint(type, BPF_MAP_TYPE_HASH);
@ -263,6 +141,183 @@ struct task_ctx *lookup_task_ctx(struct task_struct *p)
}
}
static inline u32 weight_to_bucket_idx(u32 weight)
{
/* Weight is calculated linearly, and is within range of [1, 10000] */
return weight * LB_LOAD_BUCKETS / LB_MAX_WEIGHT;
}
static void task_load_adj(struct task_struct *p, struct task_ctx *taskc,
u64 now, bool runnable)
{
taskc->runnable = runnable;
ravg_accumulate(&taskc->dcyc_rd, taskc->runnable, now, load_half_life);
}
static struct bucket_ctx *lookup_dom_bucket(struct dom_ctx *dom_ctx,
u32 weight, u32 *bucket_id)
{
u32 idx = weight_to_bucket_idx(weight);
struct bucket_ctx *bucket;
*bucket_id = idx;
bucket = MEMBER_VPTR(dom_ctx->buckets, [idx]);
if (bucket)
return bucket;
scx_bpf_error("Failed to lookup dom bucket");
return NULL;
}
static struct lock_wrapper *lookup_dom_lock(u32 dom_id, u32 weight)
{
u32 idx = dom_id * LB_LOAD_BUCKETS + weight_to_bucket_idx(weight);
struct lock_wrapper *lockw;
lockw = bpf_map_lookup_elem(&dom_dcycle_locks, &idx);
if (lockw)
return lockw;
scx_bpf_error("Failed to lookup dom lock");
return NULL;
}
static void dom_dcycle_adj(u32 dom_id, u32 weight, u64 now, bool runnable)
{
struct dom_ctx *domc;
struct bucket_ctx *bucket;
struct lock_wrapper *lockw;
s64 adj = runnable ? 1 : -1;
u32 bucket_idx = 0;
domc = bpf_map_lookup_elem(&dom_data, &dom_id);
if (!domc) {
scx_bpf_error("Failed to lookup dom_ctx");
return;
}
bucket = lookup_dom_bucket(domc, weight, &bucket_idx);
lockw = lookup_dom_lock(dom_id, weight);
if (!bucket || !lockw)
return;
bpf_spin_lock(&lockw->lock);
bucket->dcycle += adj;
ravg_accumulate(&bucket->rd, bucket->dcycle, now, load_half_life);
bpf_spin_unlock(&lockw->lock);
if (adj < 0 && (s64)bucket->dcycle < 0)
scx_bpf_error("cpu%d dom%u bucket%u load underflow (dcycle=%lld adj=%lld)",
bpf_get_smp_processor_id(), dom_id, bucket_idx,
bucket->dcycle, adj);
if (debug >=2 &&
(!domc->dbg_dcycle_printed_at || now - domc->dbg_dcycle_printed_at >= 1000000000)) {
bpf_printk("DCYCLE ADJ dom=%u bucket=%u adj=%lld dcycle=%u avg_dcycle=%llu",
dom_id, bucket_idx, adj, bucket->dcycle,
ravg_read(&bucket->rd, now, load_half_life) >> RAVG_FRAC_BITS);
domc->dbg_dcycle_printed_at = now;
}
}
static void dom_load_xfer_task(struct task_struct *p, struct task_ctx *taskc,
u32 from_dom_id, u32 to_dom_id, u64 now)
{
struct bucket_ctx *from_bucket, *to_bucket;
u32 idx = 0, weight = taskc->weight;
struct dom_ctx *from_domc, *to_domc;
struct lock_wrapper *from_lockw, *to_lockw;
struct ravg_data task_dcyc_rd;
u64 from_dcycle[2], to_dcycle[2], task_dcycle;
from_domc = bpf_map_lookup_elem(&dom_data, &from_dom_id);
from_lockw = lookup_dom_lock(from_dom_id, weight);
to_domc = bpf_map_lookup_elem(&dom_data, &to_dom_id);
to_lockw = lookup_dom_lock(to_dom_id, weight);
if (!from_domc || !from_lockw || !to_domc || !to_lockw) {
scx_bpf_error("dom_ctx / lock lookup failed");
return;
}
from_bucket = lookup_dom_bucket(from_domc, weight, &idx);
to_bucket = lookup_dom_bucket(to_domc, weight, &idx);
if (!from_bucket || !to_bucket)
return;
/*
* @p is moving from @from_dom_id to @to_dom_id. Its duty cycle
* contribution in the relevant bucket of @from_dom_id should be moved
* together to the corresponding bucket in @to_dom_id. We only track
* duty cycle from BPF. Load is computed in user space when performing
* load balancing.
*/
ravg_accumulate(&taskc->dcyc_rd, taskc->runnable, now, load_half_life);
task_dcyc_rd = taskc->dcyc_rd;
if (debug >= 2)
task_dcycle = ravg_read(&task_dcyc_rd, now, load_half_life);
/* transfer out of @from_dom_id */
bpf_spin_lock(&from_lockw->lock);
if (taskc->runnable)
from_bucket->dcycle--;
if (debug >= 2)
from_dcycle[0] = ravg_read(&from_bucket->rd, now, load_half_life);
ravg_transfer(&from_bucket->rd, from_bucket->dcycle,
&task_dcyc_rd, taskc->runnable, load_half_life, false);
if (debug >= 2)
from_dcycle[1] = ravg_read(&from_bucket->rd, now, load_half_life);
bpf_spin_unlock(&from_lockw->lock);
/* transfer into @to_dom_id */
bpf_spin_lock(&to_lockw->lock);
if (taskc->runnable)
to_bucket->dcycle++;
if (debug >= 2)
to_dcycle[0] = ravg_read(&to_bucket->rd, now, load_half_life);
ravg_transfer(&to_bucket->rd, to_bucket->dcycle,
&task_dcyc_rd, taskc->runnable, load_half_life, true);
if (debug >= 2)
to_dcycle[1] = ravg_read(&to_bucket->rd, now, load_half_life);
bpf_spin_unlock(&to_lockw->lock);
if (debug >= 2)
bpf_printk("XFER dom%u->%u task=%lu from=%lu->%lu to=%lu->%lu",
from_dom_id, to_dom_id,
task_dcycle >> RAVG_FRAC_BITS,
from_dcycle[0] >> RAVG_FRAC_BITS,
from_dcycle[1] >> RAVG_FRAC_BITS,
to_dcycle[0] >> RAVG_FRAC_BITS,
to_dcycle[1] >> RAVG_FRAC_BITS);
}
/*
* Statistics
*/
struct {
__uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
__uint(key_size, sizeof(u32));
__uint(value_size, sizeof(u64));
__uint(max_entries, RUSTY_NR_STATS);
} stats SEC(".maps");
static inline void stat_add(enum stat_idx idx, u64 addend)
{
u32 idx_v = idx;
u64 *cnt_p = bpf_map_lookup_elem(&stats, &idx_v);
if (cnt_p)
(*cnt_p) += addend;
}
/*
* This is populated from userspace to indicate which pids should be reassigned
* to new doms.
@ -788,11 +843,10 @@ void BPF_STRUCT_OPS(rusty_runnable, struct task_struct *p, u64 enq_flags)
if (!(taskc = lookup_task_ctx(p)))
return;
taskc->runnable = true;
taskc->is_kworker = p->flags & PF_WQ_WORKER;
ravg_accumulate(&taskc->dcyc_rd, taskc->runnable, now, load_half_life);
dom_load_adj(taskc->dom_id, p->scx.weight, now);
task_load_adj(p, taskc, now, true);
dom_dcycle_adj(taskc->dom_id, taskc->weight, now, true);
}
void BPF_STRUCT_OPS(rusty_running, struct task_struct *p)
@ -875,19 +929,22 @@ void BPF_STRUCT_OPS(rusty_quiescent, struct task_struct *p, u64 deq_flags)
if (!(taskc = lookup_task_ctx(p)))
return;
taskc->runnable = false;
ravg_accumulate(&taskc->dcyc_rd, taskc->runnable, now, load_half_life);
dom_load_adj(taskc->dom_id, -(s64)p->scx.weight, now);
task_load_adj(p, taskc, now, false);
dom_dcycle_adj(taskc->dom_id, taskc->weight, now, false);
}
void BPF_STRUCT_OPS(rusty_set_weight, struct task_struct *p, u32 weight)
{
struct task_ctx *taskc;
u64 now = bpf_ktime_get_ns();
if (!(taskc = lookup_task_ctx(p)))
return;
if (debug >= 2)
bpf_printk("%s[%d]: SET_WEIGHT %u -> %u", p->comm, p->pid,
taskc->weight, weight);
taskc->weight = weight;
}
@ -908,7 +965,7 @@ static u32 task_pick_domain(struct task_ctx *taskc, struct task_struct *p,
if (cpumask_intersects_domain(cpumask, dom)) {
taskc->dom_mask |= 1LLU << dom;
/*
* AsThe starting point is round-robin'd and the first
* The starting point is round-robin'd and the first
* match should be spread across all the domains.
*/
if (first_dom == MAX_DOMS)
@ -970,6 +1027,9 @@ s32 BPF_STRUCT_OPS(rusty_init_task, struct task_struct *p,
return ret;
}
if (debug >= 2)
bpf_printk("%s[%d]: INIT (weight %u))", p->comm, p->pid, p->scx.weight);
/*
* Read the entry from the map immediately so we can add the cpumask
* with bpf_kptr_xchg().
@ -1019,26 +1079,25 @@ void BPF_STRUCT_OPS(rusty_exit_task, struct task_struct *p,
static s32 create_dom(u32 dom_id)
{
struct dom_ctx domc_init = {}, *domc;
struct dom_ctx *domc;
struct bpf_cpumask *cpumask;
u32 cpu;
s32 ret;
if (dom_id >= MAX_DOMS) {
scx_bpf_error("Max dom ID %u exceeded (%u)", MAX_DOMS, dom_id);
return -EINVAL;
}
ret = scx_bpf_create_dsq(dom_id, -1);
if (ret < 0) {
scx_bpf_error("Failed to create dsq %u (%d)", dom_id, ret);
return ret;
}
ret = bpf_map_update_elem(&dom_data, &dom_id, &domc_init, 0);
if (ret) {
scx_bpf_error("Failed to add dom_ctx entry %u (%d)", dom_id, ret);
return ret;
}
domc = bpf_map_lookup_elem(&dom_data, &dom_id);
if (!domc) {
/* Should never happen, we just inserted it above. */
/* Should never happen, it's created statically at load time. */
scx_bpf_error("No dom%u", dom_id);
return -ENOENT;
}

View File

@ -6,6 +6,9 @@ mod bpf_skel;
pub use bpf_skel::*;
pub mod bpf_intf;
#[macro_use]
extern crate static_assertions;
use std::cell::Cell;
use std::collections::BTreeMap;
use std::collections::BTreeSet;
@ -409,6 +412,7 @@ struct Tuner {
kick_greedy_under: f64,
proc_reader: procfs::ProcReader,
prev_cpu_stats: BTreeMap<u32, procfs::CpuStat>,
lb_apply_weight: bool,
dom_utils: Vec<f64>,
}
@ -425,6 +429,7 @@ impl Tuner {
proc_reader,
prev_cpu_stats,
dom_utils: vec![0.0; top.nr_doms],
lb_apply_weight: false,
top,
})
}
@ -435,10 +440,10 @@ impl Tuner {
.read_stat()?
.cpus_map
.ok_or_else(|| anyhow!("Expected cpus_map to exist"))?;
let ti = &mut skel.bss_mut().tune_input;
let mut dom_nr_cpus = vec![0; self.top.nr_doms];
let mut dom_util_sum = vec![0.0; self.top.nr_doms];
let mut avg_util = 0.0f64;
for cpu in 0..self.top.nr_cpus {
let cpu32 = cpu as u32;
// None domain indicates the CPU was offline during
@ -449,11 +454,16 @@ impl Tuner {
curr_cpu_stats.get(&cpu32),
self.prev_cpu_stats.get(&cpu32),
) {
let util = calc_util(curr, prev)?;
dom_nr_cpus[dom] += 1;
dom_util_sum[dom] += calc_util(curr, prev)?;
dom_util_sum[dom] += util;
avg_util += util;
}
}
avg_util /= self.top.nr_cpus as f64;
self.lb_apply_weight = approx_ge(avg_util, 0.99999);
let ti = &mut skel.bss_mut().tune_input;
for dom in 0..self.top.nr_doms {
// Calculate the domain avg util. If there are no active CPUs,
// it doesn't really matter. Go with 0.0 as that's less likely
@ -496,6 +506,14 @@ impl Tuner {
}
}
fn approx_eq(a: f64, b: f64) -> bool {
(a - b).abs() <= 0.0001f64
}
fn approx_ge(a: f64, b: f64) -> bool {
a > b || approx_eq(a, b)
}
#[derive(Debug)]
struct TaskInfo {
pid: i32,
@ -509,6 +527,9 @@ struct LoadBalancer<'a, 'b, 'c> {
top: Arc<Topology>,
skip_kworkers: bool,
lb_apply_weight: bool,
infeas_threshold: f64,
tasks_by_load: Vec<Option<BTreeMap<OrderedFloat<f64>, TaskInfo>>>,
load_avg: f64,
dom_loads: Vec<f64>,
@ -520,9 +541,14 @@ struct LoadBalancer<'a, 'b, 'c> {
nr_lb_data_errors: &'c mut u64,
}
// Verify that the number of buckets is a factor of the maximum weight to
// ensure that the range of weight can be split evenly amongst every bucket.
const_assert_eq!(bpf_intf::consts_LB_MAX_WEIGHT % bpf_intf::consts_LB_LOAD_BUCKETS, 0);
impl<'a, 'b, 'c> LoadBalancer<'a, 'b, 'c> {
// If imbalance gets higher than this ratio, try to balance the loads.
const LOAD_IMBAL_HIGH_RATIO: f64 = 0.10;
const LOAD_IMBAL_HIGH_RATIO: f64 = 0.05;
// Aim to transfer this fraction of the imbalance on each round. We want
// to be gradual to avoid unnecessary oscillations. While this can delay
@ -543,12 +569,16 @@ impl<'a, 'b, 'c> LoadBalancer<'a, 'b, 'c> {
skel: &'a mut BpfSkel<'b>,
top: Arc<Topology>,
skip_kworkers: bool,
lb_apply_weight: &bool,
nr_lb_data_errors: &'c mut u64,
) -> Self {
Self {
skel,
skip_kworkers,
lb_apply_weight: lb_apply_weight.clone(),
infeas_threshold: bpf_intf::consts_LB_MAX_WEIGHT as f64,
tasks_by_load: (0..top.nr_doms).map(|_| None).collect(),
load_avg: 0f64,
dom_loads: vec![0.0; top.nr_doms],
@ -563,25 +593,181 @@ impl<'a, 'b, 'c> LoadBalancer<'a, 'b, 'c> {
}
}
fn bucket_range(&self, bucket: u64) -> (f64, f64) {
const MAX_WEIGHT: u64 = bpf_intf::consts_LB_MAX_WEIGHT as u64;
const NUM_BUCKETS: u64 = bpf_intf::consts_LB_LOAD_BUCKETS as u64;
const WEIGHT_PER_BUCKET: u64 = MAX_WEIGHT / NUM_BUCKETS;
if bucket >= NUM_BUCKETS {
panic!("Invalid bucket {}, max {}", bucket, NUM_BUCKETS);
}
// w_x = [1 + (10000 * x) / N, 10000 * (x + 1) / N]
let min_w = 1 + (MAX_WEIGHT * bucket) / NUM_BUCKETS;
let max_w = min_w + WEIGHT_PER_BUCKET - 1;
(min_w as f64, max_w as f64)
}
fn bucket_weight(&self, bucket: u64) -> f64 {
const WEIGHT_PER_BUCKET: f64 = bpf_intf::consts_LB_WEIGHT_PER_BUCKET as f64;
let (min_weight, _) = self.bucket_range(bucket);
// Use the mid-point of the bucket when determining weight
min_weight + (WEIGHT_PER_BUCKET / 2.0f64)
}
fn apply_infeas_threshold(&mut self,
doms_dcycles_buckets: &[f64],
infeas_thrsh: f64) {
const NUM_BUCKETS: u64 = bpf_intf::consts_LB_LOAD_BUCKETS as u64;
self.infeas_threshold = infeas_thrsh;
let mut global_load_sum = 0.0f64;
for dom in 0..self.top.nr_doms {
let dom_offset = (dom as u64) * NUM_BUCKETS;
let mut dom_load_sum = 0.0f64;
for i in 0..NUM_BUCKETS {
let weight = self.bucket_weight(i).min(self.infeas_threshold);
let dcycle = doms_dcycles_buckets[(dom_offset + i) as usize];
dom_load_sum += dcycle * weight;
}
self.dom_loads[dom] = dom_load_sum;
global_load_sum += dom_load_sum;
}
self.load_avg = global_load_sum / self.top.nr_doms as f64;
}
fn adjust_infeas_weights(&mut self,
bucket_dcycles: &[f64],
doms_dcycle_buckets: &[f64],
global_load_sum: f64) -> Result<()> {
// At this point we have the following data points:
//
// P : The number of cores on the system
// L : The total load sum of the system before any adjustments for
// infeasibility
// Lf: The load sum of all feasible tasks
// D : The total sum of duty cycles across all domains in the system
// Di: The duty cycle sum of all infeasible tasks
//
// We need to find a weight lambda_x such that every infeasible task in
// the system will be granted a CPU allocation equal to their duty
// cycle, and all the remaining compute capacity in the system will be
// divided fairly amongst the feasible tasks according to their load.
// Our goal is to find a value lambda_x such that every infeasible task
// is allocated its duty cycle, and the remaining compute capacity is
// shared fairly amongst the feasible tasks on the system.
//
// If L' is the load sum on the system after clamping all weights
// w_x > lambda_x to lambda_x, then lambda_x can be defined as follows:
//
// lambda_x = L' / P
//
// => L' = lambda_x * Di + Lf
// => lambda_x * P' = lambda_x * Di + Lf
// => lambda_x (P' - D_I) = Lf
// => lambda_x = Lf / (P' - Di)
//
// Thus, need to iterate over different values of x (i.e. over buckets)
// until we find a lambda_x such that:
//
// w_x >= lambda_x >= w_x+1
//
// Once we find a lambda_x, we need to:
//
// 1. Adjust the maximum weights of any w_x > lambda_x -> lambda_x
// 2. Subtract (w_i - lambda_x) from the load sums that the buckets were
// contributing to
// 3. Re-calculate the per-domain load, and the global load average.
//
// Note that we should always find a lambda_x at this point, as we
// verified in the caller that there is at least one infeasible bucket
// in the system.
//
// All of this is described and proven in detail in the following pdf:
//
// https://drive.google.com/file/d/1fAoWUlmW-HTp6akuATVpMxpUpvWcGSAv
const NUM_BUCKETS: u64 = bpf_intf::consts_LB_LOAD_BUCKETS as u64;
let p = self.top.nr_cpus as f64;
let mut curr_dcycle_sum = 0.0f64;
let mut curr_load_sum = global_load_sum;
let mut lambda_x = curr_load_sum / p;
for bucket in (0..NUM_BUCKETS).filter(|bucket| !approx_eq(bucket_dcycles[*bucket as usize], 0f64)).rev() {
let weight = self.bucket_weight(bucket);
let dcycles = bucket_dcycles[bucket as usize];
if approx_ge(lambda_x, weight) {
self.apply_infeas_threshold(doms_dcycle_buckets, lambda_x);
return Ok(());
}
curr_dcycle_sum += dcycles;
curr_load_sum -= weight * dcycles;
lambda_x = curr_load_sum / (p - curr_dcycle_sum);
}
// We can fail to find an infeasible weight if the host is
// under-utilized. In this case, just fall back to using weights. If
// this is happening due to a stale system-wide util value due to the
// tuner not having run recently enough, it is a condition that should
// self-correct soon. If it is the result of the user configuring us to
// use weights even when the system is under-utilized, they were warned
// when the scheduler was launched.
self.load_avg = global_load_sum / self.top.nr_doms as f64;
Ok(())
}
fn read_dom_loads(&mut self) -> Result<()> {
let now_mono = now_monotonic();
let load_half_life = self.skel.rodata().load_half_life;
let maps = self.skel.maps();
let dom_data = maps.dom_data();
let mut load_sum = 0.0f64;
const NUM_BUCKETS: u64 = bpf_intf::consts_LB_LOAD_BUCKETS as u64;
for i in 0..self.top.nr_doms {
let key = unsafe { std::mem::transmute::<u32, [u8; 4]>(i as u32) };
// Sum of dcycle and load for each bucket, aggregated across domains.
let mut global_bucket_dcycle = vec![0.0f64; NUM_BUCKETS as usize];
// Global dcycle and load sums.
let mut global_dcycle_sum = 0.0f64;
let mut global_load_sum = 0.0f64;
// dcycle values stored in every bucket. Recorded here so we don't have
// to do another ravg read later when testing and adjusting for
// infeasibility.
let mut doms_dcycle_buckets = vec![0.064; NUM_BUCKETS as usize * self.top.nr_doms];
// Sum of dcycle for each domain. Used if we're going to do load
// balancing based on just dcycle to avoid having to do two iterations.
let mut doms_dcycle_sums = vec![0.064; self.top.nr_doms];
// Track maximum weight so we can test for infeasibility below.
let mut max_weight = 0.0f64;
// Accumulate dcycle and load across all domains and buckets. If we're
// under-utilized, or there are no infeasible weights, this is
// sufficient to collect all of the data we need for load balancing.
for dom in 0..self.top.nr_doms {
let dom_key = unsafe { std::mem::transmute::<u32, [u8; 4]>(dom as u32) };
let dom_offset = dom as u64 * NUM_BUCKETS;
let mut dom_dcycle_sum = 0.0f64;
let mut dom_load_sum = 0.0f64;
if let Some(dom_ctx_map_elem) = dom_data
.lookup(&key, libbpf_rs::MapFlags::ANY)
.lookup(&dom_key, libbpf_rs::MapFlags::ANY)
.context("Failed to lookup dom_ctx")?
{
let dom_ctx =
unsafe { &*(dom_ctx_map_elem.as_slice().as_ptr() as *const bpf_intf::dom_ctx) };
let rd = &dom_ctx.load_rd;
self.dom_loads[i] = ravg_read(
for bucket in 0..NUM_BUCKETS {
let bucket_ctx = dom_ctx.buckets[bucket as usize];
let rd = &bucket_ctx.rd;
let duty_cycle = ravg_read(
rd.val,
rd.val_at,
rd.old,
@ -591,18 +777,79 @@ impl<'a, 'b, 'c> LoadBalancer<'a, 'b, 'c> {
RAVG_FRAC_BITS,
);
load_sum += self.dom_loads[i];
if approx_eq(0.0, duty_cycle) {
continue;
}
dom_dcycle_sum += duty_cycle;
global_bucket_dcycle[bucket as usize] += duty_cycle;
doms_dcycle_buckets[(dom_offset + bucket) as usize] = duty_cycle;
let weight = self.bucket_weight(bucket);
let load = weight * duty_cycle;
dom_load_sum += load;
if weight > max_weight {
max_weight = weight;
}
}
self.load_avg = load_sum / self.top.nr_doms as f64;
global_dcycle_sum += dom_dcycle_sum;
doms_dcycle_sums[dom] = dom_dcycle_sum;
global_load_sum += dom_load_sum;
self.dom_loads[dom] = dom_load_sum;
}
}
if !self.lb_apply_weight {
// System is under-utilized, so just use dcycle instead of load.
self.load_avg = global_dcycle_sum / self.top.nr_doms as f64;
self.dom_loads = doms_dcycle_sums;
return Ok(());
}
// If the sum of duty cycle on the system is >= P, any weight w_x of a
// task that exceeds L / P is guaranteed to be infeasible. Furthermore,
// if any weight w_x == L / P then we know that task t_x can get its
// full duty cycle, as:
//
// c_x = P * (w_x * d_x) / L
// = P * (L/P * d_x) / L
// = d_x / L / L
// = d_x
//
// If there is no bucket whose weight exceeds L / P that has a nonzero
// duty cycle, then all weights are feasible and we can use the data we
// collected above without having to adjust for infeasibility.
// Otherwise, we have at least one infeasible weight.
//
// See the function header for adjust_infeas_weights() for a more
// comprehensive description of the algorithm for adjusting for
// infeasible weights.
let infeasible_thresh = global_load_sum / self.top.nr_cpus as f64;
if approx_ge(max_weight, infeasible_thresh) {
debug!("max_weight={} infeasible_threshold= {}",
max_weight, infeasible_thresh);
return self.adjust_infeas_weights(&global_bucket_dcycle,
&doms_dcycle_buckets,
global_load_sum);
}
self.load_avg = global_load_sum / self.top.nr_doms as f64;
Ok(())
}
/// To balance dom loads, identify doms with lower and higher load than
/// average.
fn calculate_dom_load_balance(&mut self) -> Result<()> {
let mode = if self.lb_apply_weight {
"weighted"
} else {
"dcycle"
};
debug!("mode= {} load_avg= {:.2} infeasible_thresh= {:.2}",
mode, self.load_avg, self.infeas_threshold);
for (dom, dom_load) in self.dom_loads.iter().enumerate() {
let imbal = dom_load - self.load_avg;
if imbal.abs() >= self.load_avg * Self::LOAD_IMBAL_HIGH_RATIO {
@ -658,13 +905,14 @@ impl<'a, 'b, 'c> LoadBalancer<'a, 'b, 'c> {
if let Some(task_data_elem) = task_data.lookup(&key, libbpf_rs::MapFlags::ANY)? {
let task_ctx =
unsafe { &*(task_data_elem.as_slice().as_ptr() as *const bpf_intf::task_ctx) };
if task_ctx.dom_id != dom {
continue;
}
let weight = (task_ctx.weight as f64).min(self.infeas_threshold);
let rd = &task_ctx.dcyc_rd;
let load = task_ctx.weight as f64
let load = weight
* ravg_read(
rd.val,
rd.val_at,
@ -815,7 +1063,7 @@ impl<'a, 'b, 'c> LoadBalancer<'a, 'b, 'c> {
loop {
let last_pushed = pushed;
// Pull from the most imbalaned to least.
// Pull from the most imbalanced to least.
let mut doms_to_pull = BTreeMap::<_, _>::new();
std::mem::swap(&mut self.doms_to_pull, &mut doms_to_pull);
let mut pull_doms = doms_to_pull.into_iter().rev().collect::<Vec<(_, _)>>();
@ -1136,7 +1384,7 @@ impl<'a> Scheduler<'a> {
}
}
fn lb_step(&mut self) -> Result<()> {
fn lb_step(&mut self, lb_apply_weight: bool) -> Result<()> {
let started_at = Instant::now();
let bpf_stats = self.read_bpf_stats()?;
let cpu_busy = self.get_cpu_busy()?;
@ -1145,6 +1393,7 @@ impl<'a> Scheduler<'a> {
&mut self.skel,
self.top.clone(),
self.balanced_kworkers,
&lb_apply_weight,
&mut self.nr_lb_data_errors,
);
@ -1187,9 +1436,10 @@ impl<'a> Scheduler<'a> {
next_tune_at = now + self.tune_interval;
}
}
let lb_apply_weight = self.tuner.lb_apply_weight;
if now >= next_sched_at {
self.lb_step()?;
self.lb_step(lb_apply_weight)?;
next_sched_at += self.sched_interval;
if next_sched_at < now {
next_sched_at = now + self.sched_interval;