Merge pull request #345 from sched-ext/rustland-prevent-starvation

scx_rustland: prevent starvation
This commit is contained in:
Andrea Righi 2024-06-15 10:06:28 +02:00 committed by GitHub
commit fafbc90fa5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 41 additions and 19 deletions

View File

@ -367,7 +367,6 @@ dispatch_task(struct task_struct *p, u64 dsq_id,
s32 cpu;
switch (dsq_id) {
case SCX_DSQ_LOCAL:
case SHARED_DSQ:
scx_bpf_dispatch(p, dsq_id, slice, enq_flags);
dbg_msg("dispatch: pid=%d (%s) dsq=%llu enq_flags=%llx slice=%llu",
@ -457,10 +456,22 @@ static void dispatch_user_scheduler(void)
* Dispatch the scheduler on the first CPU available, likely the
* current one.
*/
dispatch_task(p, SHARED_DSQ, 0, 0, 0);
dispatch_task(p, SHARED_DSQ, 0, 0, SCX_ENQ_PREEMPT);
bpf_task_release(p);
}
/*
* Directly dispatch a task on a target CPU bypassing the user-space scheduler.
*/
static void
dispatch_direct_cpu(struct task_struct *p, s32 cpu, u64 slice_ns, u64 enq_flags)
{
scx_bpf_dispatch(p, cpu_to_dsq(cpu), slice_ns, enq_flags);
scx_bpf_kick_cpu(cpu, __COMPAT_SCX_KICK_IDLE);
__sync_fetch_and_add(&nr_kernel_dispatches, 1);
}
/*
* Select the target CPU where a task can be executed.
*
@ -495,14 +506,10 @@ s32 BPF_STRUCT_OPS(rustland_select_cpu, struct task_struct *p, s32 prev_cpu,
/*
* If the previously used CPU is still available, keep using it to take
* advantage of the cached working set.
*
* NOTE: assign a shorter time slice (slice_ns / 4) to a task directly
* dispatched to prevent it from gaining excessive CPU bandwidth.
*/
if (scx_bpf_test_and_clear_cpu_idle(prev_cpu)) {
tctx->allow_migration = false;
dispatch_task(p, SCX_DSQ_LOCAL, 0, slice_ns / 4, 0);
__sync_fetch_and_add(&nr_kernel_dispatches, 1);
dispatch_direct_cpu(p, prev_cpu, slice_ns, 0);
return prev_cpu;
}
@ -523,8 +530,7 @@ s32 BPF_STRUCT_OPS(rustland_select_cpu, struct task_struct *p, s32 prev_cpu,
cpu = scx_bpf_select_cpu_dfl(p, prev_cpu, wake_flags, &is_idle);
if (is_idle) {
tctx->allow_migration = false;
dispatch_task(p, SCX_DSQ_LOCAL, 0, slice_ns / 4, 0);
__sync_fetch_and_add(&nr_kernel_dispatches, 1);
dispatch_direct_cpu(p, cpu, slice_ns, 0);
return cpu;
}
tctx->allow_migration = true;
@ -595,7 +601,7 @@ void BPF_STRUCT_OPS(rustland_enqueue, struct task_struct *p, u64 enq_flags)
* long (i.e., ksoftirqd/N, rcuop/N, etc.).
*/
if (is_kthread(p) && p->nr_cpus_allowed == 1) {
dispatch_task(p, SCX_DSQ_LOCAL, 0, slice_ns, enq_flags);
scx_bpf_dispatch(p, SCX_DSQ_LOCAL, slice_ns, enq_flags);
__sync_fetch_and_add(&nr_kernel_dispatches, 1);
return;
}
@ -605,12 +611,7 @@ void BPF_STRUCT_OPS(rustland_enqueue, struct task_struct *p, u64 enq_flags)
* FIFO mode.
*/
if (!full_user && is_fifo_enabled) {
s32 cpu = scx_bpf_task_cpu(p);
scx_bpf_dispatch(p, cpu_to_dsq(cpu), slice_ns, enq_flags);
scx_bpf_kick_cpu(cpu, __COMPAT_SCX_KICK_IDLE);
__sync_fetch_and_add(&nr_kernel_dispatches, 1);
dispatch_direct_cpu(p, scx_bpf_task_cpu(p), slice_ns, enq_flags);
return;
}

View File

@ -261,6 +261,7 @@ struct Scheduler<'a> {
task_pool: TaskTree, // tasks ordered by vruntime
task_map: TaskInfoMap, // map pids to the corresponding task information
min_vruntime: u64, // Keep track of the minimum vruntime across all tasks
max_vruntime: u64, // Keep track of the maximum vruntime across all tasks
slice_ns: u64, // Default time slice (in ns)
slice_boost: u64, // Slice booster
init_page_faults: u64, // Initial page faults counter
@ -292,8 +293,9 @@ impl<'a> Scheduler<'a> {
// Scheduler task map to store tasks information.
let task_map = TaskInfoMap::new();
// Initialize global minimum vruntime.
// Initialize global minimum and maximum vruntime.
let min_vruntime: u64 = 0;
let max_vruntime: u64 = 0;
// Initialize initial page fault counter.
let init_page_faults: u64 = 0;
@ -319,6 +321,7 @@ impl<'a> Scheduler<'a> {
task_pool,
task_map,
min_vruntime,
max_vruntime,
slice_ns,
slice_boost,
init_page_faults,
@ -438,6 +441,9 @@ impl<'a> Scheduler<'a> {
// current task for too long in the scheduler task pool.
task_info.vruntime = self.min_vruntime + slice.clamp(1, self.slice_ns);
// Update maximum vruntime.
self.max_vruntime = self.max_vruntime.max(task_info.vruntime);
// Update total task cputime.
task_info.sum_exec_runtime = task.sum_exec_runtime;
@ -504,7 +510,15 @@ impl<'a> Scheduler<'a> {
// This allows to have more tasks sitting in the task pool, reducing the pressure on the
// dispatcher queues and giving a chance to higher priority tasks to come in and get
// dispatched earlier, mitigating potential priority inversion issues.
for _ in 0..self.nr_idle_cpus().max(1) {
let delta_slice = self.max_vruntime - self.min_vruntime;
let nr_tasks = if delta_slice <= self.slice_ns {
self.nr_idle_cpus().max(1)
} else {
// Scheduler is getting congested, flush all tasks that are waiting to be scheduled to
// mitigate excessive starvation.
usize::MAX
};
for _ in 0..nr_tasks {
match self.task_pool.pop() {
Some(task) => {
// Determine the task's virtual time slice.
@ -682,7 +696,14 @@ impl<'a> Scheduler<'a> {
// Print internal scheduler statistics (fetched from the BPF part).
fn print_stats(&mut self) {
// Show minimum vruntime (this should be constantly incrementing).
info!("vruntime={}", self.min_vruntime);
let delta = self.max_vruntime - self.min_vruntime;
info!(
"min_vruntime={} max_vruntime={} delta={}us slice={}us",
self.min_vruntime,
self.max_vruntime,
delta / NSEC_PER_USEC,
self.slice_ns / NSEC_PER_USEC,
);
// Show the total amount of tasks currently monitored by the scheduler.
info!(" tasks={}", self.task_map.tasks.len());