rusty: Fix a few remaining issues

Fixing alignment, moving a couple bail! calls around, and adding a
missing break from move_between_nodes() that lets us bail out of a loop
early.

Signed-off-by: David Vernet <void@manifault.com>
This commit is contained in:
David Vernet 2024-03-11 16:16:22 -05:00
parent 24d798c2ff
commit 03f68092ee
2 changed files with 52 additions and 40 deletions

View File

@ -151,7 +151,6 @@ use scx_utils::Topology;
use scx_utils::LoadLedger;
use scx_utils::LoadAggregator;
use sorted_vec::SortedVec;
use std::vec::Vec;
const RAVG_FRAC_BITS: u32 = bpf_intf::ravg_consts_RAVG_FRAC_BITS;
@ -852,28 +851,48 @@ impl<'a, 'b> LoadBalancer<'a, 'b> {
let pull_imbal = pull_node.load.imbal();
let xfer = push_node.xfer_between(&pull_node);
let mut delta = 0.0f64;
let mut push_doms = std::mem::take(&mut push_node.domains).into_vec();
for push_dom in push_doms.iter_mut().rev() {
let mut pull_doms = std::mem::take(&mut pull_node.domains).into_vec();
for pull_dom in pull_doms.iter_mut() {
if let Some(transferred) = self.try_find_move_task((push_dom, push_imbal),
(pull_dom, pull_imbal),
xfer)?
{
delta = transferred;
pull_node.update_load(delta);
if push_imbal <= 0.0f64 || pull_imbal >= 0.0f64 {
bail!("push node {}:{}, pull node {}:{}",
push_node.id, push_imbal, pull_node.id, pull_imbal);
}
let mut pushers = Vec::with_capacity(push_node.domains.len());
let mut pullers = Vec::with_capacity(pull_node.domains.len());
let mut pushed = 0f64;
while push_node.domains.len() > 0 {
// Push from the busiest node
let mut push_dom = push_node.domains.pop().unwrap();
if push_dom.load.state() != BalanceState::NeedsPush {
push_node.domains.insert(push_dom);
break;
}
while pull_node.domains.len() > 0 {
let mut pull_dom = pull_node.domains.remove_index(0);
let transferred = self.try_find_move_task((&mut push_dom, push_imbal),
(&mut pull_dom, pull_imbal),
xfer)?;
pullers.push(pull_dom);
if let Some(transferred) = transferred {
pushed = transferred;
push_node.update_load(-transferred);
pull_node.update_load(transferred);
break;
}
}
std::mem::swap(&mut pull_node.domains, &mut SortedVec::from_unsorted(pull_doms));
if delta > 0.0f64 {
push_node.update_load(-delta);
while pullers.len() > 0 {
pull_node.domains.insert(pullers.pop().unwrap());
}
pushers.push(push_dom);
if pushed > 0.0f64 {
break;
}
}
std::mem::swap(&mut push_node.domains, &mut SortedVec::from_unsorted(push_doms));
while pushers.len() > 0 {
push_node.domains.insert(pushers.pop().unwrap());
}
Ok(delta)
Ok(pushed)
}
fn balance_between_nodes(&mut self) -> Result<()> {
@ -883,64 +902,57 @@ impl<'a, 'b> LoadBalancer<'a, 'b> {
debug!("Node <-> Node LB started");
let mut pushers = Vec::new();
let mut pushers = Vec::with_capacity(self.nodes.len());
let mut pullers = Vec::with_capacity(self.nodes.len());
loop {
while self.nodes.len() >= 2 {
// Push from the busiest node
let mut push_node = self.nodes.pop().unwrap();
if self.nodes.len() == 0 || push_node.load.state() != BalanceState::NeedsPush {
if push_node.load.state() != BalanceState::NeedsPush {
self.nodes.insert(push_node);
break;
}
let push_imbal = push_node.load.imbal();
if push_imbal <= 0.0f64 {
bail!("Push node {} had imbal {}", push_node.id, push_imbal);
}
let mut transfer_occurred = false;
let push_cutoff = push_node.load.push_cutoff();
let mut pushed = 0f64;
loop {
while self.nodes.len() > 0 && pushed < push_cutoff {
// To the least busy node
let mut pull_node = self.nodes.remove_index(0);
let pull_id = pull_node.id;
let pull_imbal = pull_node.load.imbal();
if pull_node.load.state() != BalanceState::NeedsPull {
self.nodes.insert(pull_node);
break;
}
if pull_imbal >= 0.0f64 {
bail!("Push node {} had imbal {}", pull_node.id, pull_imbal);
}
let migrated = self.transfer_between_nodes(&mut push_node, &mut pull_node)?;
self.nodes.insert(pull_node);
if migrated > 0.0f64 {
// Break after a successful migration so that we can
// rebalance the pulling domains before the next
// transfer attempt, and ensure that we're trying to
// pull from domains in descending-imbalance order.
pushed += migrated;
transfer_occurred = true;
self.nodes.insert(pull_node);
debug!("NODE {} sending {:.06} --> NODE {}", push_node.id, migrated, pull_id);
break;
} else {
pullers.push(pull_node);
}
}
while pullers.len() > 0 {
self.nodes.insert(pullers.pop().unwrap());
}
if pushed > 0.0f64 {
debug!("NODE {} pushed {:.06} total load", push_node.id, pushed);
}
pushers.push(push_node);
if !transfer_occurred || pushed == 0.0f64 {
if pushed == 0.0f64 {
break;
}
}
loop {
let push_node = pushers.pop();
if push_node.is_none() {
break;
}
self.nodes.insert(push_node.unwrap());
while pushers.len() > 0 {
self.nodes.insert(pushers.pop().unwrap());
}
Ok(())