Merge pull request #74 from sched-ext/scx-rustland-multicore-fixes

scx_rustland: multicore fixes
This commit is contained in:
Tejun Heo 2024-01-08 09:21:13 -10:00 committed by GitHub
commit 0ed47cd9a3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 42 additions and 21 deletions

View File

@ -7,6 +7,8 @@ use crate::bpf_intf;
use crate::bpf_skel::*;
use std::ffi::CStr;
use std::fs::File;
use std::io::{self, BufRead};
use anyhow::Context;
use anyhow::Result;
@ -200,6 +202,13 @@ impl<'a> BpfScheduler<'a> {
let skel_builder = BpfSkelBuilder::default();
let mut skel = skel_builder.open().context("Failed to open BPF program")?;
// Initialize online CPUs counter.
//
// We should probably refresh this counter during the normal execution to support cpu
// hotplugging, but for now let's keep it simple and set this only at initialization).
let nr_cpus_online = Self::count_cpus()?;
skel.rodata_mut().num_possible_cpus = nr_cpus_online;
// Set scheduler options (defined in the BPF part).
skel.bss_mut().usersched_pid = std::process::id();
skel.rodata_mut().slice_ns = slice_us * 1000;
@ -226,6 +235,30 @@ impl<'a> BpfScheduler<'a> {
}
}
// Return the amount of available CPUs in the system (according to /proc/stat).
fn count_cpus() -> io::Result<i32> {
let file = File::open("/proc/stat")?;
let reader = io::BufReader::new(file);
let mut cpu_count = -1;
for line in reader.lines() {
let line = line?;
if line.starts_with("cpu") {
cpu_count += 1;
} else {
break;
}
}
Ok(cpu_count)
}
// Override the default scheduler time slice (in us).
#[allow(dead_code)]
pub fn get_nr_cpus(&self) -> i32 {
self.skel.rodata().num_possible_cpus
}
// Override the default scheduler time slice (in us).
#[allow(dead_code)]
pub fn set_effective_slice_us(&mut self, slice_us: u64) {

View File

@ -44,7 +44,7 @@ char _license[] SEC("license") = "GPL";
#define MAX_CPUS 1024
/* !0 for veristat, set during init */
const volatile u32 num_possible_cpus = 8;
const volatile s32 num_possible_cpus = 8;
/*
* Exit info (passed to the user-space counterpart).
@ -323,7 +323,7 @@ static s32 get_task_cpu(struct task_struct *p, s32 cpu)
* Return -ENOENT if no CPU is available.
*/
cpu = bpf_cpumask_any_distribute(p->cpus_ptr);
return cpu < num_possible_cpus ? : -ENOENT;
return cpu < num_possible_cpus ? cpu : -ENOENT;
}
/*

View File

@ -164,7 +164,6 @@ struct Scheduler<'a> {
task_pool: TaskTree, // tasks ordered by vruntime
task_map: TaskInfoMap, // map pids to the corresponding task information
min_vruntime: u64, // Keep track of the minimum vruntime across all tasks
nr_cpus_online: i32, // Amount of the available CPUs in the system
slice_ns: u64, // Default time slice (in ns)
}
@ -186,19 +185,12 @@ impl<'a> Scheduler<'a> {
// Initialize global minimum vruntime.
let min_vruntime: u64 = 0;
// Initialize online CPUs counter.
//
// We should probably refresh this counter during the normal execution to support cpu
// hotplugging, but for now let's keep it simple and set this only at initialization).
let nr_cpus_online = libbpf_rs::num_possible_cpus().unwrap() as i32;
// Return scheduler object.
Ok(Self {
bpf,
task_pool,
task_map,
min_vruntime,
nr_cpus_online,
slice_ns,
})
}
@ -207,7 +199,7 @@ impl<'a> Scheduler<'a> {
fn get_idle_cpus(&self) -> Vec<i32> {
let mut idle_cpus = Vec::new();
for cpu in 0..self.nr_cpus_online {
for cpu in 0..self.bpf.get_nr_cpus() {
let pid = self.bpf.get_cpu_pid(cpu);
if pid == 0 {
idle_cpus.push(cpu);
@ -228,10 +220,6 @@ impl<'a> Scheduler<'a> {
min_vruntime: u64,
slice_ns: u64,
) {
// Allow to scale the maximum time slice by a factor of 10 to increase the range of allowed
// time delta and give a better chance to prioritize tasks with higher weight.
let max_slice_ns = slice_ns * 10;
// Evaluate last time slot used by the task, scaled by its priority (weight).
//
// NOTE: make sure to handle the case where the current sum_exec_runtime is less then the
@ -251,15 +239,15 @@ impl<'a> Scheduler<'a> {
// Make sure that the updated vruntime is in the range:
//
// (min_vruntime, min_vruntime + max_slice_ns]
// (min_vruntime, min_vruntime + slice_ns]
//
// In this way we ensure that global vruntime is always progressing during each scheduler
// run, preventing excessive starvation of the other tasks sitting in the self.task_pool
// tree.
//
// Moreover, limiting the accounted time slice to max_slice_ns, allows to prevent starving
// the current task for too long in the scheduler task pool.
task_info.vruntime = min_vruntime + slice.clamp(1, max_slice_ns);
// Moreover, limiting the accounted time slice to slice_ns, allows to prevent starving the
// current task for too long in the scheduler task pool.
task_info.vruntime = min_vruntime + slice.clamp(1, slice_ns);
// Update total task cputime.
task_info.sum_exec_runtime = sum_exec_runtime;
@ -327,7 +315,7 @@ impl<'a> Scheduler<'a> {
let nr_queued = *self.bpf.nr_queued_mut();
let nr_scheduled = *self.bpf.nr_scheduled_mut();
let nr_waiting = nr_queued + nr_scheduled;
let nr_cpus = self.nr_cpus_online as u64;
let nr_cpus = self.bpf.get_nr_cpus() as u64;
// Scale time slice, but never scale below 1 ms.
let scaling = nr_waiting / nr_cpus + 1;
@ -475,7 +463,7 @@ impl<'a> Scheduler<'a> {
Err(_) => -1,
};
info!("Running tasks:");
for cpu in 0..self.nr_cpus_online {
for cpu in 0..self.bpf.get_nr_cpus() {
let pid = if cpu == sched_cpu {
"[self]".to_string()
} else {