scx_rustland_core: drop CPU ownership API

The API for determining which PID is running on a specific CPU is racy
and is unnecessary since this information can be obtained from user
space.

Additionally, it's not reliable for identifying idle CPUs.  Therefore,
it's better to remove this API and, in the future, provide a cpumask
alternative that can export the idle state of the CPUs to user space.

As a consequence also change scx_rustland to dispatch one task a time,
instead of dispatching tasks in batches of idle cores (that are usually
not accurate due to the racy nature of the CPU ownership interaface).

Dispatching one task at a time even makes the scheduler more performant,
due to the vruntime scheduling being applied to more tasks sitting in
the scheduler's queue.

Signed-off-by: Andrea Righi <andrea.righi@linux.dev>
This commit is contained in:
Andrea Righi 2024-08-05 21:11:29 +02:00
parent 9a0e7755df
commit e1f2b3822e
4 changed files with 34 additions and 165 deletions

View File

@ -57,9 +57,6 @@ pub const RL_PREEMPT_CPU: u64 = bpf_intf::RL_PREEMPT_CPU as u64;
/// objects) and dispatch tasks (in the form of DispatchedTask objects), using respectively the /// objects) and dispatch tasks (in the form of DispatchedTask objects), using respectively the
/// methods dequeue_task() and dispatch_task(). /// methods dequeue_task() and dispatch_task().
/// ///
/// The CPU ownership map can be accessed using the method get_cpu_pid(), this also allows to keep
/// track of the idle and busy CPUs, with the corresponding PIDs associated to them.
///
/// BPF counters and statistics can be accessed using the methods nr_*_mut(), in particular /// BPF counters and statistics can be accessed using the methods nr_*_mut(), in particular
/// nr_queued_mut() and nr_scheduled_mut() can be updated to notify the BPF component if the /// nr_queued_mut() and nr_scheduled_mut() can be updated to notify the BPF component if the
/// user-space scheduler has some pending work to do or not. /// user-space scheduler has some pending work to do or not.
@ -384,14 +381,6 @@ impl<'cb> BpfScheduler<'cb> {
unsafe { pthread_setschedparam(pthread_self(), SCHED_EXT, &param as *const sched_param) } unsafe { pthread_setschedparam(pthread_self(), SCHED_EXT, &param as *const sched_param) }
} }
// Get the pid running on a certain CPU, if no tasks are running return 0.
#[allow(dead_code)]
pub fn get_cpu_pid(&self, cpu: i32) -> u32 {
let cpu_map_ptr = self.skel.bss().cpu_map.as_ptr();
unsafe { *cpu_map_ptr.offset(cpu as isize) }
}
// Receive a task to be scheduled from the BPF dispatcher. // Receive a task to be scheduled from the BPF dispatcher.
// //
// NOTE: if task.cpu is negative the task is exiting and it does not require to be scheduled. // NOTE: if task.cpu is negative the task is exiting and it does not require to be scheduled.

View File

@ -274,39 +274,6 @@ struct {
*/ */
#define USERSCHED_TIMER_NS (NSEC_PER_SEC / 10) #define USERSCHED_TIMER_NS (NSEC_PER_SEC / 10)
/*
* Map of allocated CPUs.
*/
volatile u32 cpu_map[MAX_CPUS];
/*
* Assign a task to a CPU (used in .running() and .stopping()).
*
* If pid == 0 the CPU will be considered idle.
*/
static void set_cpu_owner(u32 cpu, u32 pid)
{
if (cpu >= MAX_CPUS) {
scx_bpf_error("Invalid cpu: %d", cpu);
return;
}
cpu_map[cpu] = pid;
}
/*
* Get the pid of the task that is currently running on @cpu.
*
* Return 0 if the CPU is idle.
*/
static __maybe_unused u32 get_cpu_owner(u32 cpu)
{
if (cpu >= MAX_CPUS) {
scx_bpf_error("Invalid cpu: %d", cpu);
return 0;
}
return cpu_map[cpu];
}
/* /*
* Return true if the target task @p is the user-space scheduler. * Return true if the target task @p is the user-space scheduler.
*/ */
@ -937,10 +904,8 @@ void BPF_STRUCT_OPS(rustland_running, struct task_struct *p)
* Mark the CPU as busy by setting the pid as owner (ignoring the * Mark the CPU as busy by setting the pid as owner (ignoring the
* user-space scheduler). * user-space scheduler).
*/ */
if (!is_usersched_task(p)) { if (!is_usersched_task(p))
set_cpu_owner(cpu, p->pid);
__sync_fetch_and_add(&nr_running, 1); __sync_fetch_and_add(&nr_running, 1);
}
} }
/* /*
@ -955,7 +920,6 @@ void BPF_STRUCT_OPS(rustland_stopping, struct task_struct *p, bool runnable)
* Mark the CPU as idle by setting the owner to 0. * Mark the CPU as idle by setting the owner to 0.
*/ */
if (!is_usersched_task(p)) { if (!is_usersched_task(p)) {
set_cpu_owner(scx_bpf_task_cpu(p), 0);
__sync_fetch_and_sub(&nr_running, 1); __sync_fetch_and_sub(&nr_running, 1);
/* /*
* Kick the user-space scheduler immediately when a task * Kick the user-space scheduler immediately when a task

View File

@ -9,7 +9,6 @@ pub mod bpf_intf;
mod bpf; mod bpf;
use bpf::*; use bpf::*;
use scx_utils::Topology;
use scx_utils::UserExitInfo; use scx_utils::UserExitInfo;
use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicBool;

View File

@ -301,31 +301,6 @@ impl<'a> Scheduler<'a> {
}) })
} }
// Return the amount of idle cores.
//
// On SMT systems consider only one CPU for each fully idle core, to avoid disrupting
// performnance too much by running multiple tasks in the same core.
fn nr_idle_cpus(&mut self) -> usize {
let mut idle_cpu_count = 0;
// Count the number of cores where all the CPUs are idle.
for core in self.topo_map.iter() {
let mut all_idle = true;
for cpu_id in core {
if self.bpf.get_cpu_pid(*cpu_id as i32) != 0 {
all_idle = false;
break;
}
}
if all_idle {
idle_cpu_count += 1;
}
}
idle_cpu_count
}
// Return current timestamp in ns. // Return current timestamp in ns.
fn now() -> u64 { fn now() -> u64 {
let ts = SystemTime::now() let ts = SystemTime::now()
@ -427,57 +402,47 @@ impl<'a> Scheduler<'a> {
nr_queued + nr_scheduled nr_queued + nr_scheduled
} }
// Return the target time slice, proportionally adjusted based on the total amount of tasks // Dispatch the first task from the task pool (sending them to the BPF dispatcher).
// waiting to be scheduled (more tasks waiting => shorter time slice). fn dispatch_task(&mut self) {
// Dispatch tasks from the task pool in order (sending them to the BPF dispatcher). match self.task_pool.pop() {
fn dispatch_tasks(&mut self) { Some(task) => {
// Dispatch only a batch of tasks equal to the amount of idle CPUs in the system. // Update global minimum vruntime.
// if self.min_vruntime < task.vruntime {
// This allows to have more tasks sitting in the task pool, reducing the pressure on the self.min_vruntime = task.vruntime;
// dispatcher queues and giving a chance to higher priority tasks to come in and get }
// dispatched earlier, mitigating potential priority inversion issues.
let nr_tasks = self.nr_idle_cpus().max(1);
for _ in 0..nr_tasks {
match self.task_pool.pop() {
Some(task) => {
// Update global minimum vruntime.
if self.min_vruntime < task.vruntime {
self.min_vruntime = task.vruntime;
}
// Scale time slice based on the amount of tasks that are waiting in the // Scale time slice based on the amount of tasks that are waiting in the
// scheduler's queue and the previously unused time slice budget, but make sure // scheduler's queue and the previously unused time slice budget, but make sure
// to assign at least slice_us_min. // to assign at least slice_us_min.
let slice_ns = (self.slice_ns / (self.nr_tasks_waiting() + 1)).max(self.slice_ns_min); let slice_ns = (self.slice_ns / (self.nr_tasks_waiting() + 1)).max(self.slice_ns_min);
// Create a new task to dispatch. // Create a new task to dispatch.
let mut dispatched_task = DispatchedTask::new(&task.qtask); let mut dispatched_task = DispatchedTask::new(&task.qtask);
// Assign the time slice to the task. // Assign the time slice to the task.
dispatched_task.set_slice_ns(slice_ns); dispatched_task.set_slice_ns(slice_ns);
// Dispatch task on the first CPU available if it is classified as // Dispatch task on the first CPU available if it is classified as
// interactive, non-interactive tasks will continue to run on the same CPU. // interactive, non-interactive tasks will continue to run on the same CPU.
if task.is_interactive { if task.is_interactive {
dispatched_task.set_flag(RL_CPU_ANY); dispatched_task.set_flag(RL_CPU_ANY);
} }
// Send task to the BPF dispatcher. // Send task to the BPF dispatcher.
match self.bpf.dispatch_task(&dispatched_task) { match self.bpf.dispatch_task(&dispatched_task) {
Ok(_) => {} Ok(_) => {}
Err(_) => { Err(_) => {
/* /*
* Re-add the task to the dispatched list in case of failure and stop * Re-add the task to the dispatched list in case of failure and stop
* dispatching. * dispatching.
*/ */
self.task_pool.push(task); self.task_pool.push(task);
break;
}
} }
} }
None => break,
} }
None => {}
} }
// Update nr_scheduled to notify the dispatcher that all the tasks received by the // Update nr_scheduled to notify the dispatcher that all the tasks received by the
// scheduler has been dispatched, so there is no reason to re-activate the scheduler, // scheduler has been dispatched, so there is no reason to re-activate the scheduler,
// unless more tasks are queued. // unless more tasks are queued.
@ -489,7 +454,7 @@ impl<'a> Scheduler<'a> {
// and dispatch them to the BPF part via the dispatched list). // and dispatch them to the BPF part via the dispatched list).
fn schedule(&mut self) { fn schedule(&mut self) {
self.drain_queued_tasks(); self.drain_queued_tasks();
self.dispatch_tasks(); self.dispatch_task();
// Yield to avoid using too much CPU from the scheduler itself. // Yield to avoid using too much CPU from the scheduler itself.
thread::yield_now(); thread::yield_now();
@ -518,37 +483,6 @@ impl<'a> Scheduler<'a> {
} }
} }
// Get the current CPU where the scheduler is running.
fn get_current_cpu() -> io::Result<i32> {
// Open /proc/self/stat file
let path = Path::new("/proc/self/stat");
let mut file = File::open(path)?;
// Read the content of the file into a String
let mut content = String::new();
file.read_to_string(&mut content)?;
// Split the content into fields using whitespace as the delimiter
let fields: Vec<&str> = content.split_whitespace().collect();
// Parse the 39th field as an i32 and return it.
if let Some(field) = fields.get(38) {
if let Ok(value) = field.parse::<i32>() {
Ok(value)
} else {
Err(io::Error::new(
io::ErrorKind::InvalidData,
"Unable to parse current CPU information as i32",
))
}
} else {
Err(io::Error::new(
io::ErrorKind::InvalidData,
"Unable to get current CPU information",
))
}
}
// Print critical user-space scheduler statistics. // Print critical user-space scheduler statistics.
fn print_faults(&mut self) { fn print_faults(&mut self) {
// Get counters of scheduling failures. // Get counters of scheduling failures.
@ -620,23 +554,6 @@ impl<'a> Scheduler<'a> {
// Show total page faults of the user-space scheduler. // Show total page faults of the user-space scheduler.
self.print_faults(); self.print_faults();
// Show tasks that are currently running on each core and CPU.
let sched_cpu = match Self::get_current_cpu() {
Ok(cpu_info) => cpu_info,
Err(_) => -1,
};
info!("Running tasks:");
for (core_id, core) in self.topo_map.iter().enumerate() {
for cpu_id in core {
let pid = if *cpu_id as i32 == sched_cpu {
"[self]".to_string()
} else {
self.bpf.get_cpu_pid(*cpu_id as i32).to_string()
};
info!(" core {:2} cpu {:2} pid={}", core_id, cpu_id, pid);
}
}
log::logger().flush(); log::logger().flush();
} }