mirror of
https://github.com/JakeHillion/scx.git
synced 2024-11-26 11:30:22 +00:00
rusty: Use a flat list of NumaNodes during LB
As Tejun pointed out in review, the disadvantage of using push/pull/balanced lists is that if the domains inside the nodes are balanced, we won't be able to push load between them. I'd originally done it that way both as an optimization, but also to allow me to iterate over the lists of pushable and pullable domains mutably. That was addressed in the prior commit, but the nodes themselves were still put into 3 buckets. I think this is generally just a cleaner way of doing things, so let's just collapse the nodes into a flat list as well. This prevents us from having to coalesce the lists, std::mem::swap them, etc. Signed-off-by: David Vernet <void@manifault.com>
This commit is contained in:
parent
829b1d3ced
commit
24d798c2ff
@ -507,9 +507,7 @@ pub struct LoadBalancer<'a, 'b> {
|
||||
|
||||
infeas_threshold: f64,
|
||||
|
||||
push_nodes: SortedVec<NumaNode>,
|
||||
pull_nodes: SortedVec<NumaNode>,
|
||||
balanced_nodes: Vec<NumaNode>,
|
||||
nodes: SortedVec<NumaNode>,
|
||||
|
||||
lb_apply_weight: bool,
|
||||
balance_load: bool,
|
||||
@ -534,9 +532,7 @@ impl<'a, 'b> LoadBalancer<'a, 'b> {
|
||||
|
||||
infeas_threshold: bpf_intf::consts_LB_MAX_WEIGHT as f64,
|
||||
|
||||
push_nodes: SortedVec::new(),
|
||||
pull_nodes: SortedVec::new(),
|
||||
balanced_nodes: Vec::new(),
|
||||
nodes: SortedVec::new(),
|
||||
|
||||
lb_apply_weight: lb_apply_weight.clone(),
|
||||
balance_load,
|
||||
@ -560,16 +556,8 @@ impl<'a, 'b> LoadBalancer<'a, 'b> {
|
||||
}
|
||||
|
||||
pub fn get_stats(&self) -> Vec<NumaStat> {
|
||||
let push_len = self.push_nodes.len();
|
||||
let pull_len = self.pull_nodes.len();
|
||||
|
||||
if push_len > 0 || pull_len > 0 {
|
||||
panic!("Expected only balanced nodes, got {} pushed, {} pulled",
|
||||
push_len, pull_len);
|
||||
}
|
||||
|
||||
let mut numa_stats = Vec::with_capacity(self.dom_group.nr_nodes());
|
||||
for node in self.balanced_nodes.iter() {
|
||||
for node in self.nodes.iter() {
|
||||
numa_stats.push(node.numa_stat());
|
||||
}
|
||||
|
||||
@ -609,27 +597,12 @@ impl<'a, 'b> LoadBalancer<'a, 'b> {
|
||||
}
|
||||
|
||||
for _ in 0..num_numa_nodes {
|
||||
self.insert_node(nodes.pop().unwrap());
|
||||
self.nodes.insert(nodes.pop().unwrap());
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn insert_node(&mut self, node: NumaNode) {
|
||||
let state = node.load.state();
|
||||
match state {
|
||||
BalanceState::Balanced => {
|
||||
self.balanced_nodes.push(node);
|
||||
},
|
||||
BalanceState::NeedsPush => {
|
||||
self.push_nodes.insert(node);
|
||||
},
|
||||
_ => {
|
||||
self.pull_nodes.insert(node);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn calculate_load_avgs(&mut self) -> Result<LoadLedger> {
|
||||
const NUM_BUCKETS: u64 = bpf_intf::consts_LB_LOAD_BUCKETS as u64;
|
||||
let now_mono = now_monotonic();
|
||||
@ -904,52 +877,50 @@ impl<'a, 'b> LoadBalancer<'a, 'b> {
|
||||
}
|
||||
|
||||
fn balance_between_nodes(&mut self) -> Result<()> {
|
||||
let n_push_nodes = self.push_nodes.len();
|
||||
let n_pull_nodes = self.pull_nodes.len();
|
||||
|
||||
debug!("Node <-> Node LB started ({} pushers -> {} pullers)",
|
||||
n_push_nodes, n_pull_nodes);
|
||||
|
||||
if n_push_nodes == 0 || n_pull_nodes == 0 {
|
||||
if self.nodes.len() < 2 {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let mut push_nodes = std::mem::take(&mut self.push_nodes).into_vec();
|
||||
debug!("Node <-> Node LB started");
|
||||
|
||||
// Push from the most imbalanced to least.
|
||||
for push_node in push_nodes.iter_mut().rev() {
|
||||
let push_cutoff = push_node.load.push_cutoff();
|
||||
let mut pushed = 0f64;
|
||||
let mut pushers = Vec::new();
|
||||
|
||||
if push_node.load.imbal() < 0.0f64 {
|
||||
bail!("Push node {} had imbal {}", push_node.id, push_node.load.imbal());
|
||||
loop {
|
||||
// Push from the busiest node
|
||||
let mut push_node = self.nodes.pop().unwrap();
|
||||
if self.nodes.len() == 0 || push_node.load.state() != BalanceState::NeedsPush {
|
||||
self.nodes.insert(push_node);
|
||||
break;
|
||||
}
|
||||
let push_imbal = push_node.load.imbal();
|
||||
if push_imbal <= 0.0f64 {
|
||||
bail!("Push node {} had imbal {}", push_node.id, push_imbal);
|
||||
}
|
||||
|
||||
// Always try to send load to the nodes that need it most, in
|
||||
// descending order.
|
||||
let mut transfer_occurred = false;
|
||||
let mut pushed = 0f64;
|
||||
loop {
|
||||
let mut transfer_occurred = false;
|
||||
let mut pull_nodes = std::mem::take(&mut self.pull_nodes).into_vec();
|
||||
|
||||
for pull_node in pull_nodes.iter_mut() {
|
||||
if pull_node.load.imbal() >= 0.0f64 {
|
||||
bail!("Pull node {} had imbal {}", pull_node.id, pull_node.load.imbal());
|
||||
}
|
||||
let migrated = self.transfer_between_nodes(push_node, pull_node)?;
|
||||
if migrated > 0.0f64 {
|
||||
// Break after a successful migration so that we can
|
||||
// rebalance the pulling domains before the next
|
||||
// transfer attempt, and ensure that we're trying to
|
||||
// pull from domains in descending-imbalance order.
|
||||
pushed += migrated;
|
||||
transfer_occurred = true;
|
||||
debug!("NODE {} sending {:.06} --> NODE {}", push_node.id, migrated, pull_node.id);
|
||||
break;
|
||||
}
|
||||
// To the least busy node
|
||||
let mut pull_node = self.nodes.remove_index(0);
|
||||
let pull_id = pull_node.id;
|
||||
let pull_imbal = pull_node.load.imbal();
|
||||
if pull_node.load.state() != BalanceState::NeedsPull {
|
||||
self.nodes.insert(pull_node);
|
||||
break;
|
||||
}
|
||||
std::mem::swap(&mut self.pull_nodes, &mut SortedVec::from_unsorted(pull_nodes));
|
||||
|
||||
if !transfer_occurred || pushed >= push_cutoff {
|
||||
if pull_imbal >= 0.0f64 {
|
||||
bail!("Push node {} had imbal {}", pull_node.id, pull_imbal);
|
||||
}
|
||||
let migrated = self.transfer_between_nodes(&mut push_node, &mut pull_node)?;
|
||||
self.nodes.insert(pull_node);
|
||||
if migrated > 0.0f64 {
|
||||
// Break after a successful migration so that we can
|
||||
// rebalance the pulling domains before the next
|
||||
// transfer attempt, and ensure that we're trying to
|
||||
// pull from domains in descending-imbalance order.
|
||||
pushed += migrated;
|
||||
transfer_occurred = true;
|
||||
debug!("NODE {} sending {:.06} --> NODE {}", push_node.id, migrated, pull_id);
|
||||
break;
|
||||
}
|
||||
}
|
||||
@ -957,8 +928,20 @@ impl<'a, 'b> LoadBalancer<'a, 'b> {
|
||||
if pushed > 0.0f64 {
|
||||
debug!("NODE {} pushed {:.06} total load", push_node.id, pushed);
|
||||
}
|
||||
pushers.push(push_node);
|
||||
|
||||
if !transfer_occurred || pushed == 0.0f64 {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
loop {
|
||||
let push_node = pushers.pop();
|
||||
if push_node.is_none() {
|
||||
break;
|
||||
}
|
||||
self.nodes.insert(push_node.unwrap());
|
||||
}
|
||||
std::mem::swap(&mut self.push_nodes, &mut SortedVec::from_unsorted(push_nodes));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@ -968,7 +951,7 @@ impl<'a, 'b> LoadBalancer<'a, 'b> {
|
||||
return Ok(())
|
||||
}
|
||||
|
||||
debug!("Intra node {} LB started)", node.id);
|
||||
debug!("Intra node {} LB started", node.id);
|
||||
|
||||
let mut pushers = Vec::new();
|
||||
|
||||
@ -985,7 +968,6 @@ impl<'a, 'b> LoadBalancer<'a, 'b> {
|
||||
bail!("Node {} push dom {} had imbal {}", node.id, push_dom.id, push_imbal);
|
||||
}
|
||||
let mut load = 0.0f64;
|
||||
let mut did_transfer = false;
|
||||
loop {
|
||||
let mut pull_dom = node.domains.remove_index(0);
|
||||
if pull_dom.load.state() != BalanceState::NeedsPull {
|
||||
@ -997,6 +979,7 @@ impl<'a, 'b> LoadBalancer<'a, 'b> {
|
||||
bail!("Node {} pull dom {} had imbal {}", node.id, pull_dom.id, pull_imbal);
|
||||
}
|
||||
let xfer = push_dom.xfer_between(&pull_dom);
|
||||
let mut did_transfer = false;
|
||||
if let Some(transferred) = self.try_find_move_task((&mut push_dom, push_imbal),
|
||||
(&mut pull_dom, pull_imbal),
|
||||
xfer)?
|
||||
@ -1017,7 +1000,7 @@ impl<'a, 'b> LoadBalancer<'a, 'b> {
|
||||
}
|
||||
|
||||
pushers.push(push_dom);
|
||||
if !did_transfer {
|
||||
if load == 0.0f64 {
|
||||
break;
|
||||
}
|
||||
}
|
||||
@ -1050,14 +1033,12 @@ impl<'a, 'b> LoadBalancer<'a, 'b> {
|
||||
debug!("Intra node LBs started");
|
||||
|
||||
// Assume all nodes are now balanced.
|
||||
self.balanced_nodes.append(&mut std::mem::take(&mut self.push_nodes).into_vec());
|
||||
self.balanced_nodes.append(&mut std::mem::take(&mut self.pull_nodes).into_vec());
|
||||
|
||||
let mut bal_nodes = std::mem::take(&mut self.balanced_nodes);
|
||||
for node in bal_nodes.iter_mut() {
|
||||
let mut nodes = std::mem::take(&mut self.nodes).into_vec();
|
||||
for node in nodes.iter_mut() {
|
||||
self.balance_within_node(node)?;
|
||||
}
|
||||
std::mem::swap(&mut self.balanced_nodes, &mut bal_nodes);
|
||||
std::mem::swap(&mut self.nodes, &mut SortedVec::from_unsorted(nodes));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user