Compare commits

...

8 Commits

Author SHA1 Message Date
I-Hsin Cheng
18277f94b0
Merge d5c28e213e into 7d14df8ca2 2024-11-30 14:46:00 +01:00
Changwoo Min
7d14df8ca2
Merge pull request #1000 from multics69/lavd-load-balancing
scx_lavd: Load balancing across compute domains
2024-11-30 12:10:04 +09:00
Changwoo Min
047e8c81e9 scx_lavd: Perform load balancing at consume_task()
Upon ops.dispatch, perform load balancing based on the set-up plan,
stealing a task from a stealee domain to a stealer domain. To avoid
the thundering herd problem of concurrent stealers, a stealer steals
a task probabilistically. Also, to minimize the task migration distance,
decrease the stealing probability exponentially for each hop in the
distance. Finally, for every stat cycle (50 ms), a stealer will migrate
only one task from a stealee for gradual load balancing.

Signed-off-by: Changwoo Min <changwoo@igalia.com>
2024-11-30 12:09:43 +09:00
Changwoo Min
4f1ffc1bc6 scx_lavd: Refactor consume_task()
Remove unnecessary variables and arguments and
factor out force_to_steal_task().

Signed-off-by: Changwoo Min <changwoo@igalia.com>
2024-11-30 12:09:43 +09:00
Changwoo Min
7991266773 scx_lavd: Decide load balancing plan across compute domains
The goal of load balancing is to maintain almost equal queued
tasks per CPU in a compute domain. To this end, we first decide
which compute domain is under-utilized (i.e., its queue length
per CPU is below average) and which compute domain is over-utilized
(i.e., its queue length per CPU is above average). We call the
under-utilized domain as a stealer domain and the over-utilized
domain as a stealee domain.

Signed-off-by: Changwoo Min <changwoo@igalia.com>
2024-11-30 12:09:43 +09:00
Changwoo Min
ed14a4ca91 scx_lavd: Log out the number of cross-domain task migration
Collect and log out the number of task migration across compute domains.

Signed-off-by: Changwoo Min <changwoo@igalia.com>
2024-11-30 12:09:43 +09:00
I Hsin Cheng
d5c28e213e scx_rusty: Perform task migration immediately
According to #611, the load balancer produce new domain id into
"lb_data" for every load balancing period, however, sometimes these
tasks isn't going to be scheduled in the coming scheduling period. So
there will be no task migration actually performed.

Utilize BPF_PROG_TYPE_SYSCALL so the load balancer can update the task
context's domain right away and transfer related information between
push domain and pull domain immediately.

Signed-off-by: I Hsin Cheng <richard120310@gmail.com>
2024-11-27 18:13:31 +08:00
I Hsin Cheng
23a698956b scx_rusty: Temporary fix of duplicate active tptr
Under severe load unbalance scenario such as mixtures of CPU-insensive
workload and I/O-intensive worload, same tptr may be written into the
same dom_active_tptrs's array.

It will lead to load balancer's failure because when the tptr task
contains large enough load, it tends be to selected so warnings about
same tptr being set in "lb_data" will continue to pop up.

Use a workaround for now , which is to keep a HashSet in userspace
recording the current active tptr under a domain, and do not generate
the same task repeatedly.

Signed-off-by: I Hsin Cheng <richard120310@gmail.com>
2024-11-27 17:36:55 +08:00
10 changed files with 314 additions and 97 deletions

View File

@ -78,12 +78,14 @@ struct sys_stat {
volatile u32 max_perf_cri; /* maximum performance criticality */
volatile u32 thr_perf_cri; /* performance criticality threshold */
volatile u32 nr_stealee; /* number of compute domains to be migrated */
volatile u32 nr_violation; /* number of utilization violation */
volatile u32 nr_active; /* number of active cores */
volatile u64 nr_sched; /* total scheduling so far */
volatile u64 nr_perf_cri; /* number of performance-critical tasks scheduled */
volatile u64 nr_lat_cri; /* number of latency-critical tasks scheduled */
volatile u64 nr_x_migration; /* number of cross domain migration */
volatile u64 nr_big; /* scheduled on big core */
volatile u64 nr_pc_on_big; /* performance-critical tasks scheduled on big core */
volatile u64 nr_lc_on_big; /* latency-critical tasks scheduled on big core */

View File

@ -51,6 +51,9 @@ enum consts_internal {
performance mode when cpu util > 40% */
LAVD_CPDOM_STARV_NS = (2 * LAVD_SLICE_MAX_NS_DFL),
LAVD_CPDOM_MIGRATION_SHIFT = 3, /* 1/2**3 = +/- 12.5% */
LAVD_CPDOM_X_PROB_FT = (LAVD_SYS_STAT_INTERVAL_NS /
(2 * LAVD_SLICE_MAX_NS_DFL)), /* roughly twice per interval */
};
/*
@ -58,12 +61,15 @@ enum consts_internal {
* - system > numa node > llc domain > compute domain per core type (P or E)
*/
struct cpdom_ctx {
u64 last_consume_clk; /* when the associated DSQ was consumed */
u64 id; /* id of this compute domain (== dsq_id) */
u64 alt_id; /* id of the closest compute domain of alternative type (== dsq id) */
u8 node_id; /* numa domain id */
u8 is_big; /* is it a big core or little core? */
u8 is_active; /* if this compute domain is active */
u8 is_stealer; /* this domain should steal tasks from others */
u8 is_stealee; /* stealer doamin should steal tasks from this domain */
u16 nr_cpus; /* the number of CPUs in this compute domain */
u32 nr_q_tasks_per_cpu; /* the number of queued tasks per CPU in this domain (x1000) */
u8 nr_neighbors[LAVD_CPDOM_MAX_DIST]; /* number of neighbors per distance */
u64 neighbor_bits[LAVD_CPDOM_MAX_DIST]; /* bitmask of neighbor bitmask per distance */
u64 __cpumask[LAVD_CPU_ID_MAX/64]; /* cpumasks belongs to this compute domain */
@ -129,6 +135,7 @@ struct cpu_ctx {
/*
* Information for statistics.
*/
volatile u32 nr_x_migration;
volatile u32 nr_perf_cri;
volatile u32 nr_lat_cri;

View File

@ -1108,7 +1108,7 @@ void BPF_STRUCT_OPS(lavd_enqueue, struct task_struct *p, u64 enq_flags)
}
}
static bool consume_dsq(s32 cpu, u64 dsq_id, u64 now)
static bool consume_dsq(u64 dsq_id)
{
struct cpdom_ctx *cpdomc;
@ -1120,7 +1120,6 @@ static bool consume_dsq(s32 cpu, u64 dsq_id, u64 now)
scx_bpf_error("Failed to lookup cpdom_ctx for %llu", dsq_id);
return false;
}
WRITE_ONCE(cpdomc->last_consume_clk, now);
/*
* Try to consume a task on the associated DSQ.
@ -1130,81 +1129,110 @@ static bool consume_dsq(s32 cpu, u64 dsq_id, u64 now)
return false;
}
static bool consume_starving_task(s32 cpu, struct cpu_ctx *cpuc, u64 now)
static bool try_to_steal_task(struct cpdom_ctx *cpdomc)
{
struct cpdom_ctx *cpdomc;
u64 dsq_id = cpuc->cpdom_poll_pos;
u64 dl;
bool ret = false;
int i;
if (nr_cpdoms == 1)
return false;
bpf_for(i, 0, nr_cpdoms) {
if (i >= LAVD_CPDOM_MAX_NR)
break;
dsq_id = (dsq_id + i) % LAVD_CPDOM_MAX_NR;
if (dsq_id == cpuc->cpdom_id)
continue;
cpdomc = MEMBER_VPTR(cpdom_ctxs, [dsq_id]);
if (!cpdomc) {
scx_bpf_error("Failed to lookup cpdom_ctx for %llu", dsq_id);
goto out;
}
if (cpdomc->is_active) {
dl = READ_ONCE(cpdomc->last_consume_clk) + LAVD_CPDOM_STARV_NS;
if (dl < now) {
ret = consume_dsq(cpu, dsq_id, now);
}
goto out;
}
}
out:
cpuc->cpdom_poll_pos = (dsq_id + 1) % LAVD_CPDOM_MAX_NR;
return ret;
}
static bool consume_task(s32 cpu, struct cpu_ctx *cpuc, u64 now)
{
struct cpdom_ctx *cpdomc, *cpdomc_pick;
u64 dsq_id, nr_nbr;
struct cpdom_ctx *cpdomc_pick;
u64 nr_nbr, dsq_id;
s64 nuance;
/*
* If there is a starving DSQ, try to consume it first.
* If all CPUs are not used -- i.e., the system is under-utilized,
* there is no point of load balancing. It is better to make an
* effort to increase the system utilization.
*/
if (consume_starving_task(cpu, cpuc, now))
return true;
/*
* Try to consume from CPU's associated DSQ.
*/
dsq_id = cpuc->cpdom_id;
if (consume_dsq(cpu, dsq_id, now))
return true;
/*
* If there is no task in the assssociated DSQ, traverse neighbor
* compute domains in distance order -- task stealing.
*/
cpdomc = MEMBER_VPTR(cpdom_ctxs, [dsq_id]);
if (!cpdomc) {
scx_bpf_error("Failed to lookup cpdom_ctx for %llu", dsq_id);
if (!use_full_cpus())
return false;
}
/*
* Probabilistically make a go or no go decision to avoid the
* thundering herd problem. In other words, one out of nr_cpus
* will try to steal a task at a moment.
*/
if (!prob_x_out_of_y(1, cpdomc->nr_cpus * LAVD_CPDOM_X_PROB_FT))
return false;
/*
* Traverse neighbor compute domains in distance order.
*/
nuance = bpf_get_prandom_u32();
for (int i = 0; i < LAVD_CPDOM_MAX_DIST; i++) {
nr_nbr = min(cpdomc->nr_neighbors[i], LAVD_CPDOM_MAX_NR);
if (nr_nbr == 0)
break;
nuance = bpf_get_prandom_u32();
for (int j = 0; j < LAVD_CPDOM_MAX_NR; j++, nuance = dsq_id + 1) {
/*
* Traverse neighbor in the same distance in arbitrary order.
*/
for (int j = 0; j < LAVD_CPDOM_MAX_NR; j++, nuance++) {
if (j >= nr_nbr)
break;
dsq_id = pick_any_bit(cpdomc->neighbor_bits[i], nuance);
if (dsq_id == -ENOENT)
continue;
cpdomc_pick = MEMBER_VPTR(cpdom_ctxs, [dsq_id]);
if (!cpdomc_pick) {
scx_bpf_error("Failed to lookup cpdom_ctx for %llu", dsq_id);
return false;
}
if (!cpdomc_pick->is_stealee || !cpdomc_pick->is_active)
continue;
/*
* If task stealing is successful, mark the stealer
* and the stealee's job done. By marking done,
* those compute domains would not be involved in
* load balancing until the end of this round,
* so this helps gradual migration. Note that multiple
* stealers can steal tasks from the same stealee.
* However, we don't coordinate concurrent stealing
* because the chance is low and there is no harm
* in slight over-stealing.
*/
if (consume_dsq(dsq_id)) {
WRITE_ONCE(cpdomc_pick->is_stealee, false);
WRITE_ONCE(cpdomc->is_stealer, false);
return true;
}
}
/*
* Now, we need to steal a task from a farther neighbor
* for load balancing. Since task migration from a farther
* neighbor is more expensive (e.g., crossing a NUMA boundary),
* we will do this with a lot of hesitation. The chance of
* further migration will decrease exponentially as distance
* increases, so, on the other hand, it increases the chance
* of closer migration.
*/
if (!prob_x_out_of_y(1, LAVD_CPDOM_X_PROB_FT))
break;
}
return false;
}
static bool force_to_steal_task(struct cpdom_ctx *cpdomc)
{
struct cpdom_ctx *cpdomc_pick;
u64 nr_nbr, dsq_id;
s64 nuance;
/*
* Traverse neighbor compute domains in distance order.
*/
nuance = bpf_get_prandom_u32();
for (int i = 0; i < LAVD_CPDOM_MAX_DIST; i++) {
nr_nbr = min(cpdomc->nr_neighbors[i], LAVD_CPDOM_MAX_NR);
if (nr_nbr == 0)
break;
/*
* Traverse neighbor in the same distance in arbitrary order.
*/
for (int j = 0; j < LAVD_CPDOM_MAX_NR; j++, nuance++) {
if (j >= nr_nbr)
break;
@ -1221,7 +1249,7 @@ static bool consume_task(s32 cpu, struct cpu_ctx *cpuc, u64 now)
if (!cpdomc_pick->is_active)
continue;
if (consume_dsq(cpu, dsq_id, now))
if (consume_dsq(dsq_id))
return true;
}
}
@ -1229,9 +1257,51 @@ static bool consume_task(s32 cpu, struct cpu_ctx *cpuc, u64 now)
return false;
}
static bool consume_task(struct cpu_ctx *cpuc)
{
struct cpdom_ctx *cpdomc;
u64 dsq_id;
dsq_id = cpuc->cpdom_id;
cpdomc = MEMBER_VPTR(cpdom_ctxs, [dsq_id]);
if (!cpdomc) {
scx_bpf_error("Failed to lookup cpdom_ctx for %llu", dsq_id);
return false;
}
/*
* If the current compute domain is a stealer, try to steal
* a task from any of stealee domains probabilistically.
*/
if (cpdomc->is_stealer && try_to_steal_task(cpdomc))
goto x_domain_migration_out;
/*
* Try to consume a task from CPU's associated DSQ.
*/
if (consume_dsq(dsq_id))
return true;
/*
* If there is no task in the assssociated DSQ, traverse neighbor
* compute domains in distance order -- task stealing.
*/
if (force_to_steal_task(cpdomc))
goto x_domain_migration_out;
return false;
/*
* Task migration across compute domains happens.
* Update the statistics.
*/
x_domain_migration_out:
cpuc->nr_x_migration++;
return true;
}
void BPF_STRUCT_OPS(lavd_dispatch, s32 cpu, struct task_struct *prev)
{
u64 now = bpf_ktime_get_ns();
struct cpu_ctx *cpuc;
struct task_ctx *taskc;
struct bpf_cpumask *active, *ovrflw;
@ -1365,10 +1435,7 @@ consume_out:
/*
* Consume a task if requested.
*/
if (!try_consume)
return;
if (consume_task(cpu, cpuc, now))
if (try_consume && consume_task(cpuc))
return;
/*
@ -1805,8 +1872,6 @@ static s32 init_cpdoms(u64 now)
if (!cpdomc->is_active)
continue;
WRITE_ONCE(cpdomc->last_consume_clk, now);
/*
* Create an associated DSQ on its associated NUMA domain.
*/
@ -2024,6 +2089,7 @@ static s32 init_per_cpu_ctx(u64 now)
}
cpuc->cpdom_id = cpdomc->id;
cpuc->cpdom_alt_id = cpdomc->alt_id;
cpdomc->nr_cpus++;
}
}
}

View File

@ -38,6 +38,8 @@ struct sys_stat_ctx {
u32 nr_sched;
u32 nr_perf_cri;
u32 nr_lat_cri;
u32 nr_x_migration;
u32 nr_stealee;
u32 nr_big;
u32 nr_pc_on_big;
u32 nr_lc_on_big;
@ -62,10 +64,66 @@ static void init_sys_stat_ctx(struct sys_stat_ctx *c)
c->stat_next->last_update_clk = c->now;
}
static void plan_x_cpdom_migration(struct sys_stat_ctx *c)
{
struct cpdom_ctx *cpdomc;
u64 dsq_id;
u32 avg_nr_q_tasks_per_cpu = 0, nr_q_tasks, x_mig_delta;
u32 stealer_threshold, stealee_threshold;
/*
* Calcualte average queued tasks per CPU per compute domain.
*/
bpf_for(dsq_id, 0, nr_cpdoms) {
if (dsq_id >= LAVD_CPDOM_MAX_NR)
break;
nr_q_tasks = scx_bpf_dsq_nr_queued(dsq_id);
c->nr_queued_task += nr_q_tasks;
cpdomc = MEMBER_VPTR(cpdom_ctxs, [dsq_id]);
cpdomc->nr_q_tasks_per_cpu = (nr_q_tasks * 1000) / cpdomc->nr_cpus;
avg_nr_q_tasks_per_cpu += cpdomc->nr_q_tasks_per_cpu;
}
avg_nr_q_tasks_per_cpu /= nr_cpdoms;
/*
* Determine stealer and stealee domains.
*
* A stealer domain, whose per-CPU queue length is shorter than
* the average, will steal a task from any of stealee domain,
* whose per-CPU queue length is longer than the average.
* Compute domain around average will not do anything.
*/
x_mig_delta = avg_nr_q_tasks_per_cpu >> LAVD_CPDOM_MIGRATION_SHIFT;
stealer_threshold = avg_nr_q_tasks_per_cpu - x_mig_delta;
stealee_threshold = avg_nr_q_tasks_per_cpu + x_mig_delta;
bpf_for(dsq_id, 0, nr_cpdoms) {
if (dsq_id >= LAVD_CPDOM_MAX_NR)
break;
cpdomc = MEMBER_VPTR(cpdom_ctxs, [dsq_id]);
if (cpdomc->nr_q_tasks_per_cpu < stealer_threshold) {
WRITE_ONCE(cpdomc->is_stealer, true);
WRITE_ONCE(cpdomc->is_stealee, false);
}
else if (cpdomc->nr_q_tasks_per_cpu > stealee_threshold) {
WRITE_ONCE(cpdomc->is_stealer, false);
WRITE_ONCE(cpdomc->is_stealee, true);
c->nr_stealee++;
}
else {
WRITE_ONCE(cpdomc->is_stealer, false);
WRITE_ONCE(cpdomc->is_stealee, false);
}
}
}
static void collect_sys_stat(struct sys_stat_ctx *c)
{
u64 dsq_id;
int cpu, nr;
int cpu;
bpf_for(cpu, 0, nr_cpu_ids) {
struct cpu_ctx *cpuc = get_cpu_ctx_id(cpu);
@ -94,6 +152,9 @@ static void collect_sys_stat(struct sys_stat_ctx *c)
c->nr_lat_cri += cpuc->nr_lat_cri;
cpuc->nr_lat_cri = 0;
c->nr_x_migration += cpuc->nr_x_migration;
cpuc->nr_x_migration = 0;
/*
* Accumulate task's latency criticlity information.
*
@ -169,12 +230,6 @@ static void collect_sys_stat(struct sys_stat_ctx *c)
c->idle_total += cpuc->idle_total;
cpuc->idle_total = 0;
}
bpf_for(dsq_id, 0, LAVD_CPDOM_MAX_NR) {
nr = scx_bpf_dsq_nr_queued(dsq_id);
if (nr > 0)
c->nr_queued_task += nr;
}
}
static void calc_sys_stat(struct sys_stat_ctx *c)
@ -239,6 +294,8 @@ static void update_sys_stat_next(struct sys_stat_ctx *c)
c->stat_cur->thr_perf_cri; /* will be updated later */
}
stat_next->nr_stealee = c->nr_stealee;
stat_next->nr_violation =
calc_avg32(stat_cur->nr_violation, c->nr_violation);
@ -260,6 +317,7 @@ static void update_sys_stat_next(struct sys_stat_ctx *c)
stat_next->nr_sched >>= 1;
stat_next->nr_perf_cri >>= 1;
stat_next->nr_lat_cri >>= 1;
stat_next->nr_x_migration >>= 1;
stat_next->nr_big >>= 1;
stat_next->nr_pc_on_big >>= 1;
stat_next->nr_lc_on_big >>= 1;
@ -272,6 +330,7 @@ static void update_sys_stat_next(struct sys_stat_ctx *c)
stat_next->nr_sched += c->nr_sched;
stat_next->nr_perf_cri += c->nr_perf_cri;
stat_next->nr_lat_cri += c->nr_lat_cri;
stat_next->nr_x_migration += c->nr_x_migration;
stat_next->nr_big += c->nr_big;
stat_next->nr_pc_on_big += c->nr_pc_on_big;
stat_next->nr_lc_on_big += c->nr_lc_on_big;
@ -287,6 +346,7 @@ static void do_update_sys_stat(void)
* Collect and prepare the next version of stat.
*/
init_sys_stat_ctx(&c);
plan_x_cpdom_migration(&c);
collect_sys_stat(&c);
calc_sys_stat(&c);
update_sys_stat_next(&c);

View File

@ -299,3 +299,14 @@ static void set_on_core_type(struct task_ctx *taskc,
WRITE_ONCE(taskc->on_big, on_big);
WRITE_ONCE(taskc->on_little, on_little);
}
static bool prob_x_out_of_y(u32 x, u32 y)
{
/*
* [0, r, y)
* ---- x?
*/
u32 r = bpf_get_prandom_u32() % y;
return r < x;
}

View File

@ -711,6 +711,8 @@ impl<'a> Scheduler<'a> {
let nr_sched = st.nr_sched;
let pc_pc = Self::get_pc(st.nr_perf_cri, nr_sched);
let pc_lc = Self::get_pc(st.nr_lat_cri, nr_sched);
let pc_x_migration = Self::get_pc(st.nr_x_migration, nr_sched);
let nr_stealee = st.nr_stealee;
let nr_big = st.nr_big;
let pc_big = Self::get_pc(nr_big, nr_sched);
let pc_pc_on_big = Self::get_pc(st.nr_pc_on_big, nr_big);
@ -730,6 +732,8 @@ impl<'a> Scheduler<'a> {
nr_sched,
pc_pc,
pc_lc,
pc_x_migration,
nr_stealee,
pc_big,
pc_pc_on_big,
pc_lc_on_big,

View File

@ -37,6 +37,12 @@ pub struct SysStats {
#[stat(desc = "% of latency-critical tasks")]
pub pc_lc: f64,
#[stat(desc = "% of cross domain task migration")]
pub pc_x_migration: f64,
#[stat(desc = "Number of stealee domains")]
pub nr_stealee: u32,
#[stat(desc = "% of tasks scheduled on big cores")]
pub pc_big: f64,
@ -63,13 +69,15 @@ impl SysStats {
pub fn format_header<W: Write>(w: &mut W) -> Result<()> {
writeln!(
w,
"\x1b[93m| {:8} | {:9} | {:9} | {:8} | {:8} | {:8} | {:8} | {:8} | {:8} | {:11} | {:12} | {:12} | {:12} |\x1b[0m",
"\x1b[93m| {:8} | {:9} | {:9} | {:8} | {:8} | {:8} | {:8} | {:8} | {:8} | {:8} | {:8} | {:11} | {:12} | {:12} | {:12} |\x1b[0m",
"MSEQ",
"# Q TASK",
"# ACT CPU",
"# SCHED",
"PERF-CR%",
"LAT-CR%",
"X-MIG%",
"# STLEE",
"BIG%",
"PC/BIG%",
"LC/BIG%",
@ -88,13 +96,15 @@ impl SysStats {
writeln!(
w,
"| {:8} | {:9} | {:9} | {:8} | {:8} | {:8} | {:8} | {:8} | {:8} | {:11} | {:12} | {:12} | {:12} |",
"| {:8} | {:9} | {:9} | {:8} | {:8} | {:8} | {:8} | {:8} | {:8} | {:8} | {:8} | {:11} | {:12} | {:12} | {:12} |",
self.mseq,
self.nr_queued_task,
self.nr_active,
self.nr_sched,
GPoint(self.pc_pc),
GPoint(self.pc_lc),
GPoint(self.pc_x_migration),
self.nr_stealee,
GPoint(self.pc_big),
GPoint(self.pc_pc_on_big),
GPoint(self.pc_lc_on_big),

View File

@ -154,4 +154,9 @@ struct node_ctx {
struct bpf_cpumask __kptr *cpumask;
};
struct migrate_arg {
u64 tptr;
u32 new_dom_id;
};
#endif /* __INTF_H */

View File

@ -335,7 +335,7 @@ static void dom_dcycle_adj(u32 dom_id, u32 weight, u64 now, bool runnable)
}
}
static void dom_dcycle_xfer_task(struct task_struct *p, struct task_ctx *taskc,
static void dom_dcycle_xfer_task(struct task_ctx *taskc,
struct dom_ctx *from_domc,
struct dom_ctx *to_domc, u64 now)
{
@ -454,7 +454,7 @@ int dom_xfer_task(u64 tptr, u32 new_dom_id, u64 now)
if (!from_domc || !to_domc || !taskc)
return 0;
dom_dcycle_xfer_task(p, taskc, from_domc, to_domc, now);
dom_dcycle_xfer_task(taskc, from_domc, to_domc, now);
return 0;
}
@ -1146,18 +1146,10 @@ void BPF_STRUCT_OPS(rusty_enqueue, struct task_struct *p, u64 enq_flags)
}
/*
* Migrate @p to a new domain if requested by userland through lb_data.
* Update @p's domain information.
*/
new_dom = bpf_map_lookup_elem(&lb_data, &tptr);
if (new_dom && *new_dom != taskc->dom_id &&
task_set_domain(taskc, p, *new_dom, false)) {
stat_add(RUSTY_STAT_LOAD_BALANCE, 1);
taskc->dispatch_local = false;
cpu = scx_bpf_pick_any_cpu(cast_mask(p_cpumask), 0);
if (cpu >= 0)
scx_bpf_kick_cpu(cpu, 0);
goto dom_queue;
}
if (!(task_set_domain(taskc, p, taskc->dom_id, true)))
return;
if (taskc->dispatch_local) {
taskc->dispatch_local = false;
@ -1209,6 +1201,34 @@ dom_queue:
}
}
SEC("syscall")
int enqueue_migrate_queue(struct migrate_arg *input)
{
u64 tptr = input->tptr;
u32 new_dom_id = input->new_dom_id;
struct dom_ctx *from_domc;
struct task_ctx *taskc;
taskc = bpf_map_lookup_elem(&task_data, &tptr);
if (!taskc)
return -EINVAL;
if (new_dom_id == taskc->dom_id || new_dom_id == NO_DOM_FOUND)
return -EINVAL;
u64 now = bpf_ktime_get_ns();
dom_xfer_task(tptr, new_dom_id, now);
taskc->dom_id = new_dom_id;
taskc->dispatch_local = false;
stat_add(RUSTY_STAT_LOAD_BALANCE, 1);
return 0;
}
static bool cpumask_intersects_domain(const struct cpumask *cpumask, u32 dom_id)
{
struct dom_ctx *domc;

View File

@ -133,6 +133,7 @@
use core::cmp::Ordering;
use std::cell::Cell;
use std::collections::BTreeMap;
use std::collections::HashSet;
use std::collections::VecDeque;
use std::fmt;
use std::sync::Arc;
@ -141,6 +142,7 @@ use anyhow::bail;
use anyhow::Context;
use anyhow::Result;
use libbpf_rs::MapCore as _;
use libbpf_rs::ProgramInput;
use log::debug;
use log::warn;
use ordered_float::OrderedFloat;
@ -150,6 +152,7 @@ use scx_utils::LoadLedger;
use sorted_vec::SortedVec;
use crate::bpf_intf;
use crate::bpf_intf::migrate_arg;
use crate::bpf_skel::*;
use crate::stats::DomainStats;
use crate::stats::NodeStats;
@ -343,6 +346,7 @@ struct Domain {
queried_tasks: bool,
load: LoadEntity,
tasks: SortedVec<TaskInfo>,
active_tptr_set: HashSet<u64>,
}
impl Domain {
@ -362,6 +366,7 @@ impl Domain {
load_avg,
),
tasks: SortedVec::new(),
active_tptr_set: HashSet::new(),
}
}
@ -680,6 +685,10 @@ impl<'a, 'b> LoadBalancer<'a, 'b> {
let tptr = active_tptrs.tptrs[(idx % MAX_TPTRS) as usize];
let key = unsafe { std::mem::transmute::<u64, [u8; 8]>(tptr) };
if dom.active_tptr_set.contains(&tptr) {
continue;
}
if let Some(task_data_elem) = task_data.lookup(&key, libbpf_rs::MapFlags::ANY)? {
let task_ctx =
unsafe { &*(task_data_elem.as_slice().as_ptr() as *const bpf_intf::task_ctx) };
@ -705,6 +714,7 @@ impl<'a, 'b> LoadBalancer<'a, 'b> {
};
load *= weight;
dom.active_tptr_set.insert(tptr);
dom.tasks.insert(TaskInfo {
tptr,
load: OrderedFloat(load),
@ -809,6 +819,28 @@ impl<'a, 'b> LoadBalancer<'a, 'b> {
std::mem::swap(&mut push_dom.tasks, &mut SortedVec::from_unsorted(tasks));
push_dom.transfer_load(load, tptr, pull_dom, &mut self.skel);
let prog = &mut self.skel.progs.enqueue_migrate_queue;
let mut args = migrate_arg {
tptr,
new_dom_id: pull_dom.id.try_into().unwrap(),
};
let input = ProgramInput {
context_in: Some(unsafe {
std::slice::from_raw_parts_mut(
&mut args as *mut _ as *mut u8,
std::mem::size_of_val(&args),
)
}),
..Default::default()
};
if let Err(e) = prog.test_run(input) {
warn!(
"Failed to execute task migration immediately for tptr={} error={:?}",
tptr, &e
);
}
Ok(Some(load))
}