This commit is contained in:
I-Hsin Cheng 2024-11-30 14:46:00 +01:00 committed by GitHub
commit 18277f94b0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 70 additions and 13 deletions

View File

@ -154,4 +154,9 @@ struct node_ctx {
struct bpf_cpumask __kptr *cpumask;
};
struct migrate_arg {
u64 tptr;
u32 new_dom_id;
};
#endif /* __INTF_H */

View File

@ -335,7 +335,7 @@ static void dom_dcycle_adj(u32 dom_id, u32 weight, u64 now, bool runnable)
}
}
static void dom_dcycle_xfer_task(struct task_struct *p, struct task_ctx *taskc,
static void dom_dcycle_xfer_task(struct task_ctx *taskc,
struct dom_ctx *from_domc,
struct dom_ctx *to_domc, u64 now)
{
@ -454,7 +454,7 @@ int dom_xfer_task(u64 tptr, u32 new_dom_id, u64 now)
if (!from_domc || !to_domc || !taskc)
return 0;
dom_dcycle_xfer_task(p, taskc, from_domc, to_domc, now);
dom_dcycle_xfer_task(taskc, from_domc, to_domc, now);
return 0;
}
@ -1146,18 +1146,10 @@ void BPF_STRUCT_OPS(rusty_enqueue, struct task_struct *p, u64 enq_flags)
}
/*
* Migrate @p to a new domain if requested by userland through lb_data.
* Update @p's domain information.
*/
new_dom = bpf_map_lookup_elem(&lb_data, &tptr);
if (new_dom && *new_dom != taskc->dom_id &&
task_set_domain(taskc, p, *new_dom, false)) {
stat_add(RUSTY_STAT_LOAD_BALANCE, 1);
taskc->dispatch_local = false;
cpu = scx_bpf_pick_any_cpu(cast_mask(p_cpumask), 0);
if (cpu >= 0)
scx_bpf_kick_cpu(cpu, 0);
goto dom_queue;
}
if (!(task_set_domain(taskc, p, taskc->dom_id, true)))
return;
if (taskc->dispatch_local) {
taskc->dispatch_local = false;
@ -1209,6 +1201,34 @@ dom_queue:
}
}
SEC("syscall")
int enqueue_migrate_queue(struct migrate_arg *input)
{
u64 tptr = input->tptr;
u32 new_dom_id = input->new_dom_id;
struct dom_ctx *from_domc;
struct task_ctx *taskc;
taskc = bpf_map_lookup_elem(&task_data, &tptr);
if (!taskc)
return -EINVAL;
if (new_dom_id == taskc->dom_id || new_dom_id == NO_DOM_FOUND)
return -EINVAL;
u64 now = bpf_ktime_get_ns();
dom_xfer_task(tptr, new_dom_id, now);
taskc->dom_id = new_dom_id;
taskc->dispatch_local = false;
stat_add(RUSTY_STAT_LOAD_BALANCE, 1);
return 0;
}
static bool cpumask_intersects_domain(const struct cpumask *cpumask, u32 dom_id)
{
struct dom_ctx *domc;

View File

@ -133,6 +133,7 @@
use core::cmp::Ordering;
use std::cell::Cell;
use std::collections::BTreeMap;
use std::collections::HashSet;
use std::collections::VecDeque;
use std::fmt;
use std::sync::Arc;
@ -141,6 +142,7 @@ use anyhow::bail;
use anyhow::Context;
use anyhow::Result;
use libbpf_rs::MapCore as _;
use libbpf_rs::ProgramInput;
use log::debug;
use log::warn;
use ordered_float::OrderedFloat;
@ -150,6 +152,7 @@ use scx_utils::LoadLedger;
use sorted_vec::SortedVec;
use crate::bpf_intf;
use crate::bpf_intf::migrate_arg;
use crate::bpf_skel::*;
use crate::stats::DomainStats;
use crate::stats::NodeStats;
@ -343,6 +346,7 @@ struct Domain {
queried_tasks: bool,
load: LoadEntity,
tasks: SortedVec<TaskInfo>,
active_tptr_set: HashSet<u64>,
}
impl Domain {
@ -362,6 +366,7 @@ impl Domain {
load_avg,
),
tasks: SortedVec::new(),
active_tptr_set: HashSet::new(),
}
}
@ -680,6 +685,10 @@ impl<'a, 'b> LoadBalancer<'a, 'b> {
let tptr = active_tptrs.tptrs[(idx % MAX_TPTRS) as usize];
let key = unsafe { std::mem::transmute::<u64, [u8; 8]>(tptr) };
if dom.active_tptr_set.contains(&tptr) {
continue;
}
if let Some(task_data_elem) = task_data.lookup(&key, libbpf_rs::MapFlags::ANY)? {
let task_ctx =
unsafe { &*(task_data_elem.as_slice().as_ptr() as *const bpf_intf::task_ctx) };
@ -705,6 +714,7 @@ impl<'a, 'b> LoadBalancer<'a, 'b> {
};
load *= weight;
dom.active_tptr_set.insert(tptr);
dom.tasks.insert(TaskInfo {
tptr,
load: OrderedFloat(load),
@ -809,6 +819,28 @@ impl<'a, 'b> LoadBalancer<'a, 'b> {
std::mem::swap(&mut push_dom.tasks, &mut SortedVec::from_unsorted(tasks));
push_dom.transfer_load(load, tptr, pull_dom, &mut self.skel);
let prog = &mut self.skel.progs.enqueue_migrate_queue;
let mut args = migrate_arg {
tptr,
new_dom_id: pull_dom.id.try_into().unwrap(),
};
let input = ProgramInput {
context_in: Some(unsafe {
std::slice::from_raw_parts_mut(
&mut args as *mut _ as *mut u8,
std::mem::size_of_val(&args),
)
}),
..Default::default()
};
if let Err(e) = prog.test_run(input) {
warn!(
"Failed to execute task migration immediately for tptr={} error={:?}",
tptr, &e
);
}
Ok(Some(load))
}