Merge pull request #57 from arighi/scx-rustland-improve-cpu-selection

scx_rustland: improve scheduler cpu selection
This commit is contained in:
Tejun Heo 2023-12-30 21:56:48 +09:00 committed by GitHub
commit 641f9b76e9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 28 additions and 35 deletions

View File

@ -440,13 +440,15 @@ void BPF_STRUCT_OPS(rustland_dispatch, s32 cpu, struct task_struct *prev)
if (!p)
continue;
/*
* Check if the scheduler assigned a different CPU to the task
* and migrate, otherwise dispatch on the global DSQ.
* Check whether the scheduler assigned a different CPU to the
* task and migrate (if possible); otherwise, dispatch on the
* global DSQ.
*/
prev_cpu = scx_bpf_task_cpu(p);
dbg_msg("usersched: pid=%d prev_cpu=%d cpu=%d payload=%llu",
task.pid, task.cpu, task.payload);
if (task.cpu != prev_cpu)
if ((task.cpu != prev_cpu) &&
bpf_cpumask_test_cpu(task.cpu, p->cpus_ptr))
dispatch_on_cpu(p, task.cpu, 0);
else
dispatch_global(p, 0);

View File

@ -216,7 +216,7 @@ struct Scheduler<'a> {
task_pool: TaskTree, // tasks ordered by vruntime
task_map: TaskInfoMap, // map pids to the corresponding task information
min_vruntime: u64, // Keep track of the minimum vruntime across all tasks
nr_cpus_online: u64, // Amount of the available CPUs in the system
nr_cpus_online: i32, // Amount of the available CPUs in the system
struct_ops: Option<libbpf_rs::Link>,
}
@ -240,7 +240,7 @@ impl<'a> Scheduler<'a> {
//
// We should probably refresh this counter during the normal execution to support cpu
// hotplugging, but for now let's keep it simple and set this only at initialization).
let nr_cpus_online = libbpf_rs::num_possible_cpus().unwrap() as u64;
let nr_cpus_online = libbpf_rs::num_possible_cpus().unwrap() as i32;
// Set scheduler options (defined in the BPF part).
skel.bss_mut().usersched_pid = pid;
@ -292,7 +292,7 @@ impl<'a> Scheduler<'a> {
}
// Get the pid running on a certain CPU, if no tasks are running return 0
fn get_cpu_pid(&self, cpu: u32) -> u32 {
fn get_cpu_pid(&self, cpu: i32) -> u32 {
let maps = self.skel.maps();
let cpu_map = maps.cpu_map();
@ -307,35 +307,18 @@ impl<'a> Scheduler<'a> {
pid
}
// Return the amount of idle CPUs in the system.
fn get_idle_cpus(&self) -> u32 {
let mut count = 0;
// Return the array of idle CPU ids.
fn get_idle_cpus(&self) -> Vec<i32> {
let mut idle_cpus = Vec::new();
for cpu in 0..self.nr_cpus_online {
let pid = self.get_cpu_pid(cpu as u32);
let pid = self.get_cpu_pid(cpu);
if pid == 0 {
count += 1;
}
}
return count;
}
// Search for an idle CPU in the system.
//
// First check the previously used CPU, that is always the best choice (to mitigate migration
// overhead), otherwise check all the others in order.
//
// If all the CPUs are busy return the previouly used CPU.
fn select_task_cpu(&self, prev_cpu: i32) -> i32 {
if self.get_cpu_pid(prev_cpu as u32) != 0 {
for cpu in 0..self.nr_cpus_online {
let pid = self.get_cpu_pid(cpu as u32);
if pid == 0 {
return cpu as i32;
}
idle_cpus.push(cpu);
}
}
prev_cpu
idle_cpus
}
// Update task's vruntime based on the information collected from the kernel part.
@ -382,9 +365,8 @@ impl<'a> Scheduler<'a> {
sum_exec_runtime: task.sum_exec_runtime,
vruntime: self.min_vruntime,
};
let cpu = self.select_task_cpu(task.cpu);
self.task_map.insert(task.pid, task_info);
self.task_pool.push(task.pid, cpu, self.min_vruntime);
self.task_pool.push(task.pid, task.cpu, self.min_vruntime);
}
}
Ok(None) => {
@ -407,18 +389,27 @@ impl<'a> Scheduler<'a> {
fn dispatch_tasks(&mut self) {
let maps = self.skel.maps();
let dispatched = maps.dispatched();
let idle_cpus = self.get_idle_cpus();
// Dispatch only a batch of tasks equal to the amount of idle CPUs in the system.
//
// This allows to have more tasks sitting in the task pool, reducing the pressure on the
// dispatcher queues and giving a chance to higher priority tasks to come in and get
// dispatched earlier, mitigating potential priority inversion issues.
for _ in 0..self.get_idle_cpus() {
for cpu in &idle_cpus {
match self.task_pool.pop() {
Some(task) => {
Some(mut task) => {
// Update global minimum vruntime.
self.min_vruntime = task.vruntime;
// Select a CPU to dispatch the task.
//
// Use the previously used CPU if idle, that is always the best choice (to
// mitigate migration overhead), otherwise pick the next idle CPU available.
if !idle_cpus.contains(&task.cpu) {
task.cpu = *cpu;
}
// Send task to the dispatcher.
let msg = DispatchedMessage::from_task(&task);
match dispatched.update(&[], msg.as_bytes(), libbpf_rs::MapFlags::ANY) {
@ -478,7 +469,7 @@ impl<'a> Scheduler<'a> {
// Show tasks that are currently running.
info!("Running tasks:");
for cpu in 0..self.nr_cpus_online {
let pid = self.get_cpu_pid(cpu as u32);
let pid = self.get_cpu_pid(cpu);
info!(" cpu={} pid={}", cpu, pid);
}