Merge pull request #468 from sched-ext/rustland-refactoring

scx_rustland refactoring
This commit is contained in:
Andrea Righi 2024-08-07 11:38:21 +02:00 committed by GitHub
commit 9d808ae206
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 597 additions and 587 deletions

View File

@ -2,7 +2,7 @@
name = "scx_rustland_core"
version = "1.0.1"
edition = "2021"
authors = ["Andrea Righi <andrea.righi@canonical.com>"]
authors = ["Andrea Righi <andrea.righi@linux.dev>"]
license = "GPL-2.0-only"
repository = "https://github.com/sched-ext/scx"
description = "Framework to implement sched_ext schedulers running in user space"

View File

@ -1,4 +1,4 @@
// Copyright (c) Andrea Righi <andrea.righi@canonical.com>
// Copyright (c) Andrea Righi <andrea.righi@linux.dev>
// This software may be used and distributed according to the terms of the
// GNU General Public License version 2.
@ -6,6 +6,9 @@
use crate::bpf_intf;
use crate::bpf_skel::*;
use std::fs::File;
use std::io::Read;
use anyhow::Context;
use anyhow::Result;
@ -21,7 +24,6 @@ use libc::{pthread_self, pthread_setschedparam, sched_param};
use libc::timespec;
use scx_utils::compat;
use scx_utils::init_libbpf_logging;
use scx_utils::scx_ops_attach;
use scx_utils::scx_ops_load;
use scx_utils::scx_ops_open;
@ -56,9 +58,6 @@ pub const RL_PREEMPT_CPU: u64 = bpf_intf::RL_PREEMPT_CPU as u64;
/// objects) and dispatch tasks (in the form of DispatchedTask objects), using respectively the
/// methods dequeue_task() and dispatch_task().
///
/// The CPU ownership map can be accessed using the method get_cpu_pid(), this also allows to keep
/// track of the idle and busy CPUs, with the corresponding PIDs associated to them.
///
/// BPF counters and statistics can be accessed using the methods nr_*_mut(), in particular
/// nr_queued_mut() and nr_scheduled_mut() can be updated to notify the BPF component if the
/// user-space scheduler has some pending work to do or not.
@ -73,7 +72,6 @@ pub struct QueuedTask {
pub pid: i32, // pid that uniquely identifies a task
pub cpu: i32, // CPU where the task is running (-1 = exiting)
pub sum_exec_runtime: u64, // Total cpu time
pub nvcsw: u64, // Voluntary context switches
pub weight: u64, // Task static priority
cpumask_cnt: u64, // cpumask generation counter (private)
}
@ -152,7 +150,6 @@ impl EnqueuedMessage {
cpu: self.inner.cpu,
cpumask_cnt: self.inner.cpumask_cnt,
sum_exec_runtime: self.inner.sum_exec_runtime,
nvcsw: self.inner.nvcsw,
weight: self.inner.weight,
}
}
@ -180,20 +177,29 @@ static mut BUF: AlignedBuffer = AlignedBuffer([0; BUFSIZE]);
// ring buffer.
const LIBBPF_STOP: i32 = -255;
fn is_smt_active() -> std::io::Result<bool> {
let mut file = File::open("/sys/devices/system/cpu/smt/active")?;
let mut contents = String::new();
file.read_to_string(&mut contents)?;
let smt_active: i32 = contents.trim().parse().unwrap_or(0);
Ok(smt_active == 1)
}
impl<'cb> BpfScheduler<'cb> {
pub fn init(
slice_us: u64,
nr_cpus_online: i32,
partial: bool,
exit_dump_len: u32,
partial: bool,
slice_us: u64,
full_user: bool,
low_power: bool,
fifo_sched: bool,
verbose: bool,
debug: bool,
) -> Result<Self> {
// Open the BPF prog first for verification.
let skel_builder = BpfSkelBuilder::default();
init_libbpf_logging(None);
let mut skel_builder = BpfSkelBuilder::default();
skel_builder.obj_builder.debug(verbose);
let mut skel = scx_ops_open!(skel_builder, rustland)?;
// Lock all the memory to prevent page faults that could trigger potential deadlocks during
@ -235,11 +241,8 @@ impl<'cb> BpfScheduler<'cb> {
LIBBPF_STOP
}
// Initialize online CPUs counter.
//
// NOTE: we should probably refresh this counter during the normal execution to support cpu
// hotplugging, but for now let's keep it simple and set this only at initialization).
skel.rodata_mut().num_possible_cpus = nr_cpus_online;
// Check host topology to determine if we need to enable SMT capabilities.
skel.rodata_mut().smt_enabled = is_smt_active()?;
// Set scheduler options (defined in the BPF part).
if partial {
@ -249,10 +252,9 @@ impl<'cb> BpfScheduler<'cb> {
skel.bss_mut().usersched_pid = std::process::id();
skel.rodata_mut().slice_ns = slice_us * 1000;
skel.rodata_mut().debug = debug;
skel.rodata_mut().full_user = full_user;
skel.rodata_mut().low_power = low_power;
skel.rodata_mut().fifo_sched = fifo_sched;
skel.rodata_mut().debug = debug;
// Attach BPF scheduler.
let mut skel = scx_ops_load!(skel, rustland, uei)?;
@ -302,6 +304,12 @@ impl<'cb> BpfScheduler<'cb> {
}
}
// Counter of the online CPUs.
#[allow(dead_code)]
pub fn nr_online_cpus_mut(&mut self) -> &mut u64 {
&mut self.skel.bss_mut().nr_online_cpus
}
// Counter of currently running tasks.
#[allow(dead_code)]
pub fn nr_running_mut(&mut self) -> &mut u64 {
@ -378,14 +386,6 @@ impl<'cb> BpfScheduler<'cb> {
unsafe { pthread_setschedparam(pthread_self(), SCHED_EXT, &param as *const sched_param) }
}
// Get the pid running on a certain CPU, if no tasks are running return 0.
#[allow(dead_code)]
pub fn get_cpu_pid(&self, cpu: i32) -> u32 {
let cpu_map_ptr = self.skel.bss().cpu_map.as_ptr();
unsafe { *cpu_map_ptr.offset(cpu as isize) }
}
// Receive a task to be scheduled from the BPF dispatcher.
//
// NOTE: if task.cpu is negative the task is exiting and it does not require to be scheduled.

View File

@ -65,7 +65,6 @@ struct queued_task_ctx {
s32 cpu; /* CPU where the task is running (-1 = exiting) */
u64 cpumask_cnt; /* cpumask generation counter */
u64 sum_exec_runtime; /* Total cpu time */
u64 nvcsw; /* Voluntary context switches */
u64 weight; /* Task static priority */
};

View File

@ -1,4 +1,4 @@
/* Copyright (c) Andrea Righi <andrea.righi@canonical.com> */
/* Copyright (c) Andrea Righi <andrea.righi@linux.dev> */
/*
* scx_rustland_core: BPF backend for schedulers running in user-space.
*
@ -42,9 +42,6 @@ UEI_DEFINE(uei);
*/
#define SHARED_DSQ MAX_CPUS
/* !0 for veristat, set during init */
const volatile s32 num_possible_cpus = 8;
/*
* Scheduler attributes and statistics.
*/
@ -72,7 +69,7 @@ volatile u64 nr_scheduled;
/*
* Amount of currently running tasks.
*/
volatile u64 nr_running;
volatile u64 nr_running, nr_online_cpus;
/* Dispatch statistics */
volatile u64 nr_user_dispatches, nr_kernel_dispatches,
@ -84,6 +81,12 @@ volatile u64 nr_failed_dispatches, nr_sched_congested;
/* Report additional debugging information */
const volatile bool debug;
/* Allow to use bpf_printk() only when @debug is set */
#define dbg_msg(_fmt, ...) do { \
if (debug) \
bpf_printk(_fmt, ##__VA_ARGS__); \
} while(0)
/*
* Enable/disable full user-space mode.
*
@ -106,23 +109,75 @@ const volatile bool full_user;
const volatile bool low_power;
/*
* Automatically switch to simple FIFO scheduling during periods of system
* underutilization to minimize unnecessary scheduling overhead.
*
* 'fifo_sched' can be used by the user-space scheduler to enable/disable this
* behavior.
*
* 'is_fifo_enabled' indicates whether the scheduling has switched to FIFO mode
* or regular scheduling mode.
* CPUs in the system have SMT is enabled.
*/
const volatile bool fifo_sched;
static bool is_fifo_enabled;
const volatile bool smt_enabled = true;
/* Allow to use bpf_printk() only when @debug is set */
#define dbg_msg(_fmt, ...) do { \
if (debug) \
bpf_printk(_fmt, ##__VA_ARGS__); \
} while(0)
/*
* Mask of offline CPUs, used to properly support CPU hotplugging.
*/
private(BPFLAND) struct bpf_cpumask __kptr *offline_cpumask;
/*
* Set the state of a CPU in a cpumask.
*/
static bool set_cpu_state(struct bpf_cpumask *cpumask, s32 cpu, bool state)
{
if (!cpumask)
return false;
if (state)
return bpf_cpumask_test_and_set_cpu(cpu, cpumask);
else
return bpf_cpumask_test_and_clear_cpu(cpu, cpumask);
}
/*
* Access a cpumask in read-only mode (typically to check bits).
*/
static const struct cpumask *cast_mask(struct bpf_cpumask *mask)
{
return (const struct cpumask *)mask;
}
/*
* Allocate/re-allocate a new cpumask.
*/
static int calloc_cpumask(struct bpf_cpumask **p_cpumask)
{
struct bpf_cpumask *cpumask;
cpumask = bpf_cpumask_create();
if (!cpumask)
return -ENOMEM;
cpumask = bpf_kptr_xchg(p_cpumask, cpumask);
if (cpumask)
bpf_cpumask_release(cpumask);
return 0;
}
/*
* Determine when we need to drain tasks dispatched to CPUs that went offline.
*/
static int offline_needed;
/*
* Notify the scheduler that we need to drain and re-enqueue the tasks
* dispatched to the offline CPU DSQs.
*/
static void set_offline_needed(void)
{
__sync_fetch_and_or(&offline_needed, 1);
}
/*
* Check and clear the state of the offline CPUs re-enqueuing.
*/
static bool test_and_clear_offline_needed(void)
{
return __sync_fetch_and_and(&offline_needed, 0) == 1;
}
/*
* Maximum amount of tasks queued between kernel and user-space at a certain
@ -214,45 +269,11 @@ struct {
} usersched_timer SEC(".maps");
/*
* Time period of the scheduler heartbeat, used to periodically kick the the
* scheduler and check if we need to switch to FIFO mode or regular
* scheduling (default 100ms).
* Time period of the scheduler heartbeat, used to periodically kick the
* user-space scheduler and check if there is any pending activity.
*/
#define USERSCHED_TIMER_NS (NSEC_PER_SEC / 10)
/*
* Map of allocated CPUs.
*/
volatile u32 cpu_map[MAX_CPUS];
/*
* Assign a task to a CPU (used in .running() and .stopping()).
*
* If pid == 0 the CPU will be considered idle.
*/
static void set_cpu_owner(u32 cpu, u32 pid)
{
if (cpu >= MAX_CPUS) {
scx_bpf_error("Invalid cpu: %d", cpu);
return;
}
cpu_map[cpu] = pid;
}
/*
* Get the pid of the task that is currently running on @cpu.
*
* Return 0 if the CPU is idle.
*/
static __maybe_unused u32 get_cpu_owner(u32 cpu)
{
if (cpu >= MAX_CPUS) {
scx_bpf_error("Invalid cpu: %d", cpu);
return 0;
}
return cpu_map[cpu];
}
/*
* Return true if the target task @p is the user-space scheduler.
*/
@ -356,7 +377,7 @@ dispatch_task(struct task_struct *p, u64 dsq_id,
u64 slice = task_slice_ns ? : slice_ns;
u64 curr_cpumask_cnt;
bool force_shared = false;
s32 cpu;
s32 cpu = scx_bpf_task_cpu(p);
switch (dsq_id) {
case SHARED_DSQ:
@ -433,61 +454,49 @@ dispatch_task(struct task_struct *p, u64 dsq_id,
dbg_msg("dispatch: pid=%d (%s) dsq=%llu enq_flags=%llx slice=%llu",
p->pid, p->comm, dsq_id, enq_flags, slice);
/*
* Wake up the target CPU (only if idle and if we are bouncing
* to a different CPU).
*/
if (cpu != bpf_get_smp_processor_id())
scx_bpf_kick_cpu(cpu, SCX_KICK_IDLE);
break;
}
/*
* Wake up the target CPU (only if idle and if we are bouncing
* to a different CPU).
*/
if (cpu != bpf_get_smp_processor_id())
scx_bpf_kick_cpu(cpu, SCX_KICK_IDLE);
}
/*
* Dispatch the user-space scheduler.
*/
static void dispatch_user_scheduler(void)
static bool dispatch_user_scheduler(void)
{
struct task_struct *p;
if (!test_and_clear_usersched_needed())
return;
return false;
p = bpf_task_from_pid(usersched_pid);
if (!p) {
scx_bpf_error("Failed to find usersched task %d", usersched_pid);
return;
return false;
}
/*
* Dispatch the scheduler on the first CPU available, likely the
* current one.
*/
dispatch_task(p, SHARED_DSQ, 0, 0, SCX_ENQ_PREEMPT);
dispatch_task(p, SHARED_DSQ, 0, 0, 0);
bpf_task_release(p);
}
/*
* Directly dispatch a task to its local CPU, bypassing the user-space
* scheduler.
*/
static void
dispatch_direct_local(struct task_struct *p, u64 slice_ns, u64 enq_flags)
{
scx_bpf_dispatch(p, SCX_DSQ_LOCAL, slice_ns, enq_flags);
dbg_msg("dispatch: pid=%d (%s) dsq=SCX_DSQ_LOCAL enq_flags=%llx slice=%llu direct",
p->pid, p->comm, enq_flags, slice_ns);
__sync_fetch_and_add(&nr_kernel_dispatches, 1);
return true;
}
/*
* Directly dispatch a task to a target CPU, bypassing the user-space
* scheduler.
*/
static int
dispatch_direct_cpu(struct task_struct *p, s32 cpu, u64 slice_ns, u64 enq_flags)
static int dispatch_direct_cpu(struct task_struct *p, s32 cpu, u64 enq_flags)
{
struct bpf_cpumask *offline;
u64 dsq_id = cpu_to_dsq(cpu);
if (!bpf_cpumask_test_cpu(cpu, p->cpus_ptr))
@ -497,10 +506,27 @@ dispatch_direct_cpu(struct task_struct *p, s32 cpu, u64 slice_ns, u64 enq_flags)
__sync_fetch_and_add(&nr_kernel_dispatches, 1);
/*
* We know that the CPU is idle here, because it has been assigned in
* select_cpu(), so we don't need to use SCX_KICK_IDLE.
* If the CPU has gone offline notify that the task needs to be
* consumed from another CPU.
*/
scx_bpf_kick_cpu(cpu, 0);
offline = offline_cpumask;
if (!offline)
return 0;
if (bpf_cpumask_test_cpu(cpu, cast_mask(offline))) {
set_offline_needed();
return 0;
}
/*
* Wake-up the target CPU to make sure that the task is consumed as
* soon as possible.
*
* Note: the target CPU must be activated, because the task has been
* dispatched to a DSQ that only the target CPU can consume. If we do
* not kick the CPU, and the CPU is idle, the task can stall in the DSQ
* indefinitely.
*/
scx_bpf_kick_cpu(cpu, SCX_KICK_IDLE);
dbg_msg("dispatch: pid=%d (%s) dsq=%llu enq_flags=%llx slice=%llu direct",
p->pid, p->comm, dsq_id, enq_flags, slice_ns);
@ -508,6 +534,96 @@ dispatch_direct_cpu(struct task_struct *p, s32 cpu, u64 slice_ns, u64 enq_flags)
return 0;
}
/*
* Find an idle CPU in the system for the task.
*
* NOTE: the idle CPU selection doesn't need to be formally perfect, it is
* totally fine to accept racy conditions and potentially make mistakes, by
* picking CPUs that are not idle or even offline, the logic has been designed
* to handle these mistakes in favor of a more efficient response and a reduced
* scheduling overhead.
*/
static s32 pick_idle_cpu(struct task_struct *p, s32 prev_cpu, u64 wake_flags)
{
const struct cpumask *online_cpumask, *idle_smtmask, *idle_cpumask;
s32 cpu;
/*
* For tasks that can run only on a single CPU, we can simply verify if
* their only allowed CPU is idle.
*/
if (p->nr_cpus_allowed == 1) {
if (scx_bpf_test_and_clear_cpu_idle(prev_cpu))
return prev_cpu;
return -ENOENT;
}
/*
* Acquire the CPU masks to determine the online and idle CPUs in the
* system.
*/
online_cpumask = scx_bpf_get_online_cpumask();
idle_smtmask = scx_bpf_get_idle_smtmask();
idle_cpumask = scx_bpf_get_idle_cpumask();
/*
* Find the best idle CPU, prioritizing full idle cores in SMT systems.
*/
if (smt_enabled) {
/*
* If the task can still run on the previously used CPU and
* it's a full-idle core, keep using it.
*/
if (bpf_cpumask_test_cpu(prev_cpu, p->cpus_ptr) &&
bpf_cpumask_test_cpu(prev_cpu, idle_smtmask) &&
scx_bpf_test_and_clear_cpu_idle(prev_cpu)) {
cpu = prev_cpu;
goto out_put_cpumask;
}
/*
* Otherwise, search for another usable full-idle core.
*/
cpu = bpf_cpumask_any_and_distribute(p->cpus_ptr, idle_smtmask);
if (bpf_cpumask_test_cpu(cpu, online_cpumask) &&
scx_bpf_test_and_clear_cpu_idle(cpu))
goto out_put_cpumask;
}
/*
* If a full-idle core can't be found (or if this is not an SMT system)
* try to re-use the same CPU, even if it's not in a full-idle core.
*/
if (bpf_cpumask_test_cpu(prev_cpu, p->cpus_ptr) &&
scx_bpf_test_and_clear_cpu_idle(prev_cpu)) {
cpu = prev_cpu;
goto out_put_cpumask;
}
/*
* If all the previous attempts have failed, try to use any idle CPU in
* the system.
*/
cpu = bpf_cpumask_any_and_distribute(p->cpus_ptr, idle_cpumask);
if (bpf_cpumask_test_cpu(cpu, online_cpumask) &&
scx_bpf_test_and_clear_cpu_idle(cpu))
goto out_put_cpumask;
/*
* If all the previous attempts have failed, dispatch the task to the
* first CPU that will become available.
*/
cpu = -ENOENT;
out_put_cpumask:
scx_bpf_put_cpumask(idle_cpumask);
scx_bpf_put_cpumask(idle_smtmask);
scx_bpf_put_cpumask(online_cpumask);
return cpu;
}
/*
* Select the target CPU where a task can be executed.
*
@ -523,8 +639,7 @@ dispatch_direct_cpu(struct task_struct *p, s32 cpu, u64 slice_ns, u64 enq_flags)
s32 BPF_STRUCT_OPS(rustland_select_cpu, struct task_struct *p, s32 prev_cpu,
u64 wake_flags)
{
s32 cpu = prev_cpu;
bool do_direct = false;
s32 cpu;
/*
* When full_user is enabled, the user-space scheduler is responsible
@ -532,56 +647,13 @@ s32 BPF_STRUCT_OPS(rustland_select_cpu, struct task_struct *p, s32 prev_cpu,
* possibly its own idle tracking mechanism.
*/
if (full_user)
return prev_cpu;
cpu = pick_idle_cpu(p, prev_cpu, wake_flags);
if (cpu >= 0 && !dispatch_direct_cpu(p, cpu, 0))
return cpu;
/*
* If the previously used CPU is still available, keep using it to take
* advantage of the cached working set.
*/
if (bpf_cpumask_test_cpu(cpu, p->cpus_ptr) &&
scx_bpf_test_and_clear_cpu_idle(cpu)) {
do_direct = true;
goto out;
}
/*
* No need to check for other CPUs if the task can only run on one.
*/
if (p->nr_cpus_allowed == 1)
return cpu;
/*
* Try to migrate to a fully idle core, if present.
*/
cpu = scx_bpf_pick_idle_cpu(p->cpus_ptr, SCX_PICK_IDLE_CORE);
if (cpu >= 0) {
do_direct = true;
goto out;
}
/*
* Check for any idle CPU.
*/
cpu = scx_bpf_pick_idle_cpu(p->cpus_ptr, 0);
if (cpu >= 0) {
do_direct = true;
goto out;
}
/*
* Assign the previously used CPU if all the CPUs are busy.
*/
cpu = prev_cpu;
out:
/*
* If FIFO mode is completely disabled, allow to dispatch directly
* here, otherwise dispatch directly only if the scheduler is currently
* operating in FIFO mode.
*/
if ((!fifo_sched || is_fifo_enabled) && do_direct)
dispatch_direct_cpu(p, cpu, slice_ns, 0);
return cpu;
return prev_cpu;
}
/*
@ -608,7 +680,6 @@ static void get_task_info(struct queued_task_ctx *task,
return;
task->cpumask_cnt = tctx->cpumask_cnt;
task->sum_exec_runtime = p->se.sum_exec_runtime;
task->nvcsw = p->nvcsw;
task->weight = p->scx.weight;
task->cpu = scx_bpf_task_cpu(p);
}
@ -629,6 +700,7 @@ static void sched_congested(struct task_struct *p)
*/
void BPF_STRUCT_OPS(rustland_enqueue, struct task_struct *p, u64 enq_flags)
{
s32 cpu = scx_bpf_task_cpu(p);
struct queued_task_ctx *task;
/*
@ -639,31 +711,17 @@ void BPF_STRUCT_OPS(rustland_enqueue, struct task_struct *p, u64 enq_flags)
return;
/*
* Always dispatch per-CPU kthreads to the local CPU DSQ, bypassing the
* user-space scheduler.
* Always dispatch per-CPU kthreads directly on their target CPU.
*
* In this way we can prioritize critical kernel threads that may
* This allows to prioritize critical kernel threads that may
* potentially slow down the entire system if they are blocked for too
* long (i.e., ksoftirqd/N, rcuop/N, etc.).
* long (i.e., ksoftirqd/N, rcuop/N, etc.), but it could also cause
* interactivity problems or unfairness if there are too many softirqs
* being scheduled (e.g., in presence of high RX network traffic).
*/
if (is_kthread(p) && p->nr_cpus_allowed == 1) {
dispatch_direct_local(p, slice_ns, enq_flags);
return;
}
/*
* Check if we can dispatch the task directly, bypassing the user-space
* scheduler.
*/
if (!full_user && is_fifo_enabled) {
if (!dispatch_direct_cpu(p, scx_bpf_task_cpu(p), slice_ns, enq_flags))
if (!full_user && is_kthread(p) && p->nr_cpus_allowed == 1)
if (!dispatch_direct_cpu(p, cpu, enq_flags))
return;
/*
* Use the local DSQ if the target CPU is not valid anymore.
*/
dispatch_direct_local(p, slice_ns, enq_flags);
return;
}
/*
* Add tasks to the @queued list, they will be processed by the
@ -735,6 +793,50 @@ static long handle_dispatched_task(struct bpf_dynptr *dynptr, void *context)
return !scx_bpf_dispatch_nr_slots();
}
/*
* Consume tasks dispatched to CPUs that have gone offline.
*
* These tasks will be consumed on other active CPUs to prevent indefinite
* stalling.
*
* Return true if one task is consumed, false otherwise.
*/
static bool consume_offline_cpus(s32 cpu)
{
u64 nr_cpu_ids = scx_bpf_nr_cpu_ids();
struct bpf_cpumask *offline;
bool ret = false;
if (!test_and_clear_offline_needed())
return false;
offline = offline_cpumask;
if (!offline)
return false;
/*
* Cycle through all the CPUs and evenly consume tasks from the DSQs of
* those that are offline.
*/
bpf_repeat(nr_cpu_ids - 1) {
cpu = (cpu + 1) % nr_cpu_ids;
if (!bpf_cpumask_test_cpu(cpu, cast_mask(offline)))
continue;
/*
* This CPU is offline, if a task has been dispatched there
* consume it immediately on the current CPU.
*/
if (scx_bpf_consume(cpu_to_dsq(cpu))) {
set_offline_needed();
ret = true;
break;
}
}
return ret;
}
/*
* Dispatch tasks that are ready to run.
*
@ -748,28 +850,38 @@ static long handle_dispatched_task(struct bpf_dynptr *dynptr, void *context)
void BPF_STRUCT_OPS(rustland_dispatch, s32 cpu, struct task_struct *prev)
{
/*
* Check if the user-space scheduler needs to run, and in that case try
* to dispatch it immediately.
* Try also to steal tasks directly dispatched to CPUs that have gone
* offline (this allows to prevent indefinite task stalls).
*/
dispatch_user_scheduler();
if (consume_offline_cpus(cpu))
return;
/*
* First check if the user-space scheduler needs to run, and in that
* case try to dispatch it immediately.
*/
if (dispatch_user_scheduler())
return;
/*
* Consume a task from the per-CPU DSQ.
*/
if (scx_bpf_consume(cpu_to_dsq(cpu)))
return;
/*
* Consume all tasks from the @dispatched list and immediately try to
* dispatch them on their target CPU selected by the user-space
* scheduler (at this point the proper ordering has been already
* determined by the scheduler).
* determined so we can simply dispatch them preserving the same
* order).
*/
bpf_user_ringbuf_drain(&dispatched, handle_dispatched_task, NULL, 0);
/* Consume first task both from the shared DSQ and the per-CPU DSQ */
/*
* Consume the first task from the shared DSQ.
*/
scx_bpf_consume(SHARED_DSQ);
if (scx_bpf_consume(cpu_to_dsq(cpu))) {
/*
* Re-kick the current CPU if there are more tasks in the
* per-CPU DSQ
*/
scx_bpf_kick_cpu(cpu, 0);
}
}
/*
@ -780,14 +892,20 @@ void BPF_STRUCT_OPS(rustland_running, struct task_struct *p)
s32 cpu = scx_bpf_task_cpu(p);
dbg_msg("start: pid=%d (%s) cpu=%ld", p->pid, p->comm, cpu);
/*
* Ensure time slice never exceeds slice_ns when a task is started on a
* CPU.
*/
if (p->scx.slice > slice_ns)
p->scx.slice = slice_ns;
/*
* Mark the CPU as busy by setting the pid as owner (ignoring the
* user-space scheduler).
*/
if (!is_usersched_task(p)) {
set_cpu_owner(cpu, p->pid);
if (!is_usersched_task(p))
__sync_fetch_and_add(&nr_running, 1);
}
}
/*
@ -802,7 +920,6 @@ void BPF_STRUCT_OPS(rustland_stopping, struct task_struct *p, bool runnable)
* Mark the CPU as idle by setting the owner to 0.
*/
if (!is_usersched_task(p)) {
set_cpu_owner(scx_bpf_task_cpu(p), 0);
__sync_fetch_and_sub(&nr_running, 1);
/*
* Kick the user-space scheduler immediately when a task
@ -870,6 +987,23 @@ void BPF_STRUCT_OPS(rustland_cpu_release, s32 cpu,
set_usersched_needed();
}
void BPF_STRUCT_OPS(rustland_cpu_online, s32 cpu)
{
/* Set the CPU state to online */
set_cpu_state(offline_cpumask, cpu, false);
__sync_fetch_and_add(&nr_online_cpus, 1);
}
void BPF_STRUCT_OPS(rustland_cpu_offline, s32 cpu)
{
/* Set the CPU state to offline */
set_cpu_state(offline_cpumask, cpu, true);
__sync_fetch_and_sub(&nr_online_cpus, 1);
set_offline_needed();
}
/*
* A new task @p is being created.
*
@ -921,41 +1055,6 @@ void BPF_STRUCT_OPS(rustland_exit_task, struct task_struct *p,
__sync_fetch_and_add(&nr_queued, 1);
}
/*
* Check whether we can switch to FIFO mode if the system is underutilized.
*/
static bool should_enable_fifo(void)
{
/* Moving average of the tasks that are waiting to be scheduled */
static u64 nr_waiting_avg;
/* Current amount of tasks waiting to be scheduled */
u64 nr_waiting = nr_queued + nr_scheduled;
if (!fifo_sched)
return false;
/*
* Exiting from FIFO mode requires to have almost all the CPUs busy.
*/
if (is_fifo_enabled)
return nr_running < num_possible_cpus - 1;
/*
* We are not in FIFO mode, check for the task waiting to be processed
* by the user-space scheduler.
*
* We want to evaluate a moving average of the waiting tasks to prevent
* bouncing too often between FIFO mode and user-space mode.
*/
nr_waiting_avg = (nr_waiting_avg + nr_waiting) / 2;
/*
* The condition to go back to FIFO mode is to have no tasks (in
* average) that are waiting to be scheduled.
*/
return nr_waiting_avg == 0;
}
/*
* Heartbeat scheduler timer callback.
*
@ -972,9 +1071,6 @@ static int usersched_timer_fn(void *map, int *key, struct bpf_timer *timer)
/* Kick the scheduler */
set_usersched_needed();
/* Update flag that determines if FIFO scheduling needs to be enabled */
is_fifo_enabled = should_enable_fifo();
/* Re-arm the timer */
err = bpf_timer_start(timer, USERSCHED_TIMER_NS, 0);
if (err)
@ -1006,6 +1102,28 @@ static int usersched_timer_init(void)
return err;
}
/*
* Evaluate the amount of online CPUs.
*/
s32 get_nr_online_cpus(void)
{
const struct cpumask *online_cpumask;
u64 nr_cpu_ids = scx_bpf_nr_cpu_ids();
int i, cpus = 0;
online_cpumask = scx_bpf_get_online_cpumask();
bpf_for(i, 0, nr_cpu_ids) {
if (!bpf_cpumask_test_cpu(i, online_cpumask))
continue;
cpus++;
}
scx_bpf_put_cpumask(online_cpumask);
return cpus;
}
/*
* Create a DSQ for each CPU available in the system and a global shared DSQ.
*
@ -1017,11 +1135,15 @@ static int usersched_timer_init(void)
*/
static int dsq_init(void)
{
u64 nr_cpu_ids = scx_bpf_nr_cpu_ids();
int err;
s32 cpu;
/* Initialize amount of online CPUs */
nr_online_cpus = get_nr_online_cpus();
/* Create per-CPU DSQs */
bpf_for(cpu, 0, num_possible_cpus) {
bpf_for(cpu, 0, nr_cpu_ids) {
err = scx_bpf_create_dsq(cpu_to_dsq(cpu), -1);
if (err) {
scx_bpf_error("failed to create pcpu DSQ %d: %d",
@ -1045,11 +1167,20 @@ static int dsq_init(void)
*/
s32 BPF_STRUCT_OPS_SLEEPABLE(rustland_init)
{
struct bpf_cpumask *mask;
int err;
/* Compile-time checks */
BUILD_BUG_ON((MAX_CPUS % 2));
/* Initialize the offline CPU mask */
err = calloc_cpumask(&offline_cpumask);
mask = offline_cpumask;
if (!mask)
err = -ENOMEM;
if (err)
return err;
/* Initialize rustland core */
err = dsq_init();
if (err)
@ -1081,6 +1212,8 @@ SCX_OPS_DEFINE(rustland,
.update_idle = (void *)rustland_update_idle,
.set_cpumask = (void *)rustland_set_cpumask,
.cpu_release = (void *)rustland_cpu_release,
.cpu_online = (void *)rustland_cpu_online,
.cpu_offline = (void *)rustland_cpu_offline,
.init_task = (void *)rustland_init_task,
.exit_task = (void *)rustland_exit_task,
.init = (void *)rustland_init,

View File

@ -1,4 +1,4 @@
// Copyright (c) Andrea Righi <andrea.righi@canonical.com>
// Copyright (c) Andrea Righi <andrea.righi@linux.dev>
// Buddy allocator code imported from https://github.com/jjyr/buddy-alloc
// and distributed under the terms of the MIT license.

View File

@ -1,4 +1,4 @@
// Copyright (c) Andrea Righi <andrea.righi@canonical.com>
// Copyright (c) Andrea Righi <andrea.righi@linux.dev>
// This software may be used and distributed according to the terms of the
// GNU General Public License version 2.

View File

@ -609,16 +609,22 @@ static bool consume_offline_cpus(s32 cpu)
* those that are offline.
*/
bpf_repeat(nr_cpu_ids - 1) {
s32 dsq_id;
cpu = (cpu + 1) % nr_cpu_ids;
dsq_id = cpu_to_dsq(cpu);
if (!bpf_cpumask_test_cpu(cpu, cast_mask(offline)))
continue;
if (!scx_bpf_dsq_nr_queued(dsq_id))
continue;
set_offline_needed();
/*
* This CPU is offline, if a task has been dispatched there
* consume it immediately on the current CPU.
*/
if (scx_bpf_consume(cpu_to_dsq(cpu))) {
set_offline_needed();
if (scx_bpf_consume(dsq_id)) {
ret = true;
break;
}

View File

@ -1,7 +1,7 @@
[package]
name = "scx_rlfifo"
version = "1.0.1"
authors = ["Andrea Righi <andrea.righi@canonical.com>", "Canonical"]
authors = ["Andrea Righi <andrea.righi@linux.dev>"]
edition = "2021"
description = "A simple FIFO scheduler in Rust that runs in user-space"
license = "GPL-2.0-only"

View File

@ -1,4 +1,4 @@
// Copyright (c) Andrea Righi <andrea.righi@canonical.com>
// Copyright (c) Andrea Righi <andrea.righi@linux.dev>
// This software may be used and distributed according to the terms of the
// GNU General Public License version 2.
@ -9,7 +9,6 @@ pub mod bpf_intf;
mod bpf;
use bpf::*;
use scx_utils::Topology;
use scx_utils::UserExitInfo;
use std::sync::atomic::AtomicBool;
@ -26,15 +25,13 @@ struct Scheduler<'a> {
impl<'a> Scheduler<'a> {
fn init() -> Result<Self> {
let topo = Topology::new().expect("Failed to build host topology");
let bpf = BpfScheduler::init(
5000, // slice_ns (default task time slice)
topo.nr_cpu_ids() as i32, // nr_cpus (max CPUs available in the system)
false, // partial (include all tasks if disabled)
0, // exit_dump_len (buffer size of exit info)
false, // partial (include all tasks if false)
5000, // slice_ns (default task time slice)
true, // full_user (schedule all tasks in user-space)
false, // low_power (low power mode)
false, // fifo_sched (enable BPF FIFO scheduling)
false, // verbose (verbose output)
false, // debug (debug mode)
)?;
Ok(Self { bpf })

View File

@ -1,7 +1,7 @@
[package]
name = "scx_rustland"
version = "1.0.1"
authors = ["Andrea Righi <andrea.righi@canonical.com>", "Canonical"]
authors = ["Andrea Righi <andrea.righi@linux.dev>"]
edition = "2021"
description = "A BPF component (dispatcher) that implements the low level sched-ext functionalities and a user-space counterpart (scheduler), written in Rust, that implements the actual scheduling policy. This is used within sched_ext, which is a Linux kernel feature which enables implementing kernel thread schedulers in BPF and dynamically loading them. https://github.com/sched-ext/scx/tree/main"
license = "GPL-2.0-only"

View File

@ -1,4 +1,4 @@
// Copyright (c) Andrea Righi <andrea.righi@canonical.com>
// Copyright (c) Andrea Righi <andrea.righi@linux.dev>
// This software may be used and distributed according to the terms of the
// GNU General Public License version 2.
@ -72,44 +72,18 @@ const VERSION: &'static str = env!("CARGO_PKG_VERSION");
///
/// === Troubleshooting ===
///
/// - Adjust the time slice boost parameter (option `-b`) to enhance the responsiveness of
/// low-latency applications (i.e., online gaming, live streaming, video conferencing etc.).
///
/// - Reduce the time slice boost parameter (option `-b`) if you notice poor performance in your
/// CPU-intensive applications or if you experience any stall during your typical workload.
///
/// - Reduce the time slice (option `-s`) if you experience audio issues (i.e., cracking audio or
/// audio packet loss).
///
#[derive(Debug, Parser)]
struct Opts {
/// Scheduling slice duration in microseconds (default is 5ms).
/// Scheduling slice duration in microseconds.
#[clap(short = 's', long, default_value = "5000")]
slice_us: u64,
/// Time slice boost: increasing this value enhances performance of interactive applications
/// (gaming, multimedia, GUIs, etc.), but may lead to decreased responsiveness of other tasks
/// in the system.
///
/// WARNING: setting a large value can make the scheduler quite unpredictable and you may
/// experience temporary system stalls (before hitting the sched-ext watchdog timeout).
///
/// Default time slice boost is 100, which means interactive tasks will get a 100x priority
/// boost to run respect to non-interactive tasks.
///
/// Use "0" to disable time slice boost and fallback to the standard vruntime-based scheduling.
#[clap(short = 'b', long, default_value = "100")]
slice_boost: u64,
/// If specified, disable task preemption.
///
/// Disabling task preemption can help to improve the throughput of CPU-intensive tasks, while
/// still providing a good level of system responsiveness.
///
/// Preemption is enabled by default to provide a higher level of responsiveness to the
/// interactive tasks.
#[clap(short = 'n', long, action = clap::ArgAction::SetTrue)]
no_preemption: bool,
/// Scheduling minimum slice duration in microseconds.
#[clap(short = 'S', long, default_value = "500")]
slice_us_min: u64,
/// If specified, all the scheduling events and actions will be processed in user-space,
/// disabling any form of in-kernel optimization.
@ -127,22 +101,8 @@ struct Opts {
#[clap(short = 'l', long, action = clap::ArgAction::SetTrue)]
low_power: bool,
/// By default the scheduler automatically transitions to FIFO mode when the system is
/// underutilized. This allows to reduce unnecessary scheduling overhead and boost performance
/// when the system is not running at full capacity.
///
/// Be aware that FIFO mode can lead to less predictable performance. Therefore, use this
/// option if performance predictability is important, such as when running real-time audio
/// applications or during live streaming. Conversely, avoid using this option when you care
/// about maximizing performance, such as gaming.
///
/// Set this option to disable this automatic transition.
#[clap(short = 'f', long, action = clap::ArgAction::SetTrue)]
disable_fifo: bool,
/// If specified, only tasks which have their scheduling policy set to
/// SCHED_EXT using sched_setscheduler(2) are switched. Otherwise, all
/// tasks are switched.
/// If specified, only tasks which have their scheduling policy set to SCHED_EXT using
/// sched_setscheduler(2) are switched. Otherwise, all tasks are switched.
#[clap(short = 'p', long, action = clap::ArgAction::SetTrue)]
partial: bool,
@ -150,29 +110,71 @@ struct Opts {
#[clap(long, default_value = "0")]
exit_dump_len: u32,
/// Enable verbose output, including libbpf details.
#[clap(short = 'v', long, action = clap::ArgAction::SetTrue)]
verbose: bool,
/// If specified, all the BPF scheduling events will be reported in
/// debugfs (e.g., /sys/kernel/debug/tracing/trace_pipe).
#[clap(short = 'd', long, action = clap::ArgAction::SetTrue)]
debug: bool,
/// Print scheduler version and exit.
#[clap(short = 'v', long, action = clap::ArgAction::SetTrue)]
#[clap(short = 'V', long, action = clap::ArgAction::SetTrue)]
version: bool,
}
// Time constants.
const NSEC_PER_USEC: u64 = 1_000;
const NSEC_PER_MSEC: u64 = 1_000_000;
const NSEC_PER_SEC: u64 = 1_000_000_000;
#[derive(Debug, Clone)]
struct TaskStat {
pid: i32,
comm: String,
nvcsw: u64,
}
fn parse_proc_pid_stat(pid: i32) -> std::io::Result<TaskStat> {
let path = format!("/proc/{}/status", pid);
let content = std::fs::read_to_string(&path)?;
let mut comm = String::new();
let mut nvcsw = 0;
for line in content.lines() {
if line.starts_with("Name:") {
comm = line.split_whitespace().nth(1).unwrap_or("").to_string();
} else if line.starts_with("voluntary_ctxt_switches:") {
nvcsw = line.split_whitespace().nth(1).unwrap_or("0").parse().unwrap_or(0);
}
}
Ok(TaskStat {
pid,
comm,
nvcsw,
})
}
fn get_all_pids() -> std::io::Result<Vec<i32>> {
let mut pids = Vec::new();
for entry in std::fs::read_dir("/proc")? {
if let Ok(entry) = entry {
let file_name = entry.file_name();
if let Ok(pid) = file_name.to_string_lossy().parse::<i32>() {
pids.push(pid);
}
}
}
Ok(pids)
}
// Basic item stored in the task information map.
#[derive(Debug)]
struct TaskInfo {
sum_exec_runtime: u64, // total cpu time used by the task
vruntime: u64, // total vruntime of the task
avg_nvcsw: u64, // average of voluntary context switches
nvcsw: u64, // total amount of voluntary context switches
nvcsw_ts: u64, // timestamp of the previous nvcsw update
}
// Task information map: store total execution time and vruntime of each task in the system.
@ -202,14 +204,18 @@ impl TaskInfoMap {
struct Task {
qtask: QueuedTask, // queued task
vruntime: u64, // total vruntime (that determines the order how tasks are dispatched)
is_interactive: bool, // task can preempt other tasks
timestamp: u64, // task enqueue timestamp
is_interactive: bool, // task is interactive
}
// Make sure tasks are ordered by vruntime, if multiple tasks have the same vruntime order by pid.
// Sort tasks by their interactive status first (interactive tasks are always scheduled before
// regular tasks), then sort them by their vruntime, then by their timestamp and lastly by their
// pid.
impl Ord for Task {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.vruntime
.cmp(&other.vruntime)
other.is_interactive.cmp(&self.is_interactive)
.then_with(|| self.vruntime.cmp(&other.vruntime))
.then_with(|| self.timestamp.cmp(&other.timestamp))
.then_with(|| self.qtask.pid.cmp(&other.qtask.pid))
}
}
@ -260,13 +266,12 @@ struct Scheduler<'a> {
topo_map: TopologyMap, // Host topology
task_pool: TaskTree, // tasks ordered by vruntime
task_map: TaskInfoMap, // map pids to the corresponding task information
proc_stats: HashMap<i32, u64>, // Task statistics from procfs
interactive_pids: Vec<i32>, // List of interactive tasks
min_vruntime: u64, // Keep track of the minimum vruntime across all tasks
max_vruntime: u64, // Keep track of the maximum vruntime across all tasks
slice_ns: u64, // Default time slice (in ns)
slice_boost: u64, // Slice booster
init_page_faults: u64, // Initial page faults counter
no_preemption: bool, // Disable task preemption
full_user: bool, // Run all tasks through the user-space scheduler
slice_ns: u64, // Default time slice (in ns)
slice_ns_min: u64, // Minimum time slice (in ns)
}
impl<'a> Scheduler<'a> {
@ -275,86 +280,33 @@ impl<'a> Scheduler<'a> {
let topo = Topology::new().expect("Failed to build host topology");
let topo_map = TopologyMap::new(&topo).expect("Failed to generate topology map");
// Save the default time slice (in ns) in the scheduler class.
let slice_ns = opts.slice_us * NSEC_PER_USEC;
// Slice booster (0 = disabled).
let slice_boost = opts.slice_boost;
// Disable task preemption.
let no_preemption = opts.no_preemption;
// Run all tasks through the user-space scheduler.
let full_user = opts.full_user;
// Scheduler task pool to sort tasks by vruntime.
let task_pool = TaskTree::new();
// Scheduler task map to store tasks information.
let task_map = TaskInfoMap::new();
// Initialize global minimum and maximum vruntime.
let min_vruntime: u64 = 0;
let max_vruntime: u64 = 0;
// Initialize initial page fault counter.
let init_page_faults: u64 = 0;
// Low-level BPF connector.
let nr_cpus = topo.nr_cpu_ids();
let bpf = BpfScheduler::init(
opts.slice_us,
nr_cpus as i32,
opts.partial,
opts.exit_dump_len,
opts.partial,
opts.slice_us,
opts.full_user,
opts.low_power,
!opts.disable_fifo,
opts.verbose,
opts.debug,
)?;
info!("{} scheduler attached - {} CPUs", SCHEDULER_NAME, nr_cpus);
info!("{} scheduler attached", SCHEDULER_NAME);
// Return scheduler object.
Ok(Self {
bpf,
topo_map,
task_pool,
task_map,
min_vruntime,
max_vruntime,
slice_ns,
slice_boost,
init_page_faults,
no_preemption,
full_user,
task_pool: TaskTree::new(),
task_map: TaskInfoMap::new(),
proc_stats: HashMap::new(),
interactive_pids: Vec::new(),
min_vruntime: 0,
init_page_faults: 0,
slice_ns: opts.slice_us * NSEC_PER_USEC,
slice_ns_min: opts.slice_us_min * NSEC_PER_USEC,
})
}
// Return the amount of idle cores.
//
// On SMT systems consider only one CPU for each fully idle core, to avoid disrupting
// performnance too much by running multiple tasks in the same core.
fn nr_idle_cpus(&mut self) -> usize {
let mut idle_cpu_count = 0;
// Count the number of cores where all the CPUs are idle.
for core in self.topo_map.iter() {
let mut all_idle = true;
for cpu_id in core {
if self.bpf.get_cpu_pid(*cpu_id as i32) != 0 {
all_idle = false;
break;
}
}
if all_idle {
idle_cpu_count += 1;
}
}
idle_cpu_count
}
// Return current timestamp in ns.
fn now() -> u64 {
let ts = SystemTime::now()
@ -364,29 +316,16 @@ impl<'a> Scheduler<'a> {
}
// Update task's vruntime based on the information collected from the kernel and return to the
// caller the evaluated weighted time slice along with a flag indicating whether the task is
// interactive or not (interactive tasks are allowed to preempt other tasks).
// caller the evaluated task's vruntime.
//
// This method implements the main task ordering logic of the scheduler.
fn update_enqueued(&mut self, task: &QueuedTask) -> (u64, bool) {
fn update_enqueued(&mut self, task: &QueuedTask) -> u64 {
// Determine if a task is new or old, based on their current runtime and previous runtime
// counters.
//
// NOTE: make sure to handle the case where the current sum_exec_runtime is less then the
// previous sum_exec_runtime. This can happen, for example, when a new task is created via
// execve() (or its variants): the kernel will initialize a new task_struct, resetting
// sum_exec_runtime, while keeping the same PID.
//
// Consequently, the existing task_info slot is reused, containing the total run-time of
// the previous task (likely exceeding the current sum_exec_runtime). In such cases, simply
// use sum_exec_runtime as the time slice of the new task.
fn is_new_task(curr_runtime: u64, prev_runtime: u64) -> bool {
curr_runtime < prev_runtime || prev_runtime == 0
}
// Cache the current timestamp.
let now = Self::now();
// Get task information if the task is already stored in the task map,
// otherwise create a new entry for it.
let task_info = self
@ -396,70 +335,27 @@ impl<'a> Scheduler<'a> {
.or_insert_with_key(|&_pid| TaskInfo {
sum_exec_runtime: 0,
vruntime: self.min_vruntime,
nvcsw: task.nvcsw,
nvcsw_ts: now,
avg_nvcsw: 0,
});
// Evaluate last time slot used by the task.
let mut slice = if is_new_task(task.sum_exec_runtime, task_info.sum_exec_runtime) {
// Evaluate used task time slice.
let slice = if is_new_task(task.sum_exec_runtime, task_info.sum_exec_runtime) {
task.sum_exec_runtime
} else {
task.sum_exec_runtime - task_info.sum_exec_runtime
};
}.min(self.slice_ns);
// Determine if a task is interactive, based on the moving average of voluntary context
// switches over time.
//
// NOTE: we should make this threshold a tunable, but for now let's assume that a moving
// average of 10 voluntary context switch per second is enough to classify the task as
// interactive.
let is_interactive = task_info.avg_nvcsw >= 10;
// Apply the slice boost to interactive tasks.
//
// NOTE: some tasks may have a very high weight, that can potentially disrupt our slice
// boost optimizations, therefore always limit the task priority to a max of 1000.
let weight = if is_interactive {
task.weight.min(1000) * self.slice_boost.max(1)
} else {
task.weight.min(1000)
};
// Scale the time slice by the task's priority (weight).
slice = slice * 100 / weight;
// Make sure that the updated vruntime is in the range:
//
// (min_vruntime, min_vruntime + slice_ns]
//
// In this way we ensure that global vruntime is always progressing during each scheduler
// run, preventing excessive starvation of the other tasks sitting in the self.task_pool
// tree.
//
// Moreover, limiting the accounted time slice to slice_ns, allows to prevent starving the
// current task for too long in the scheduler task pool.
task_info.vruntime = self.min_vruntime + slice.clamp(1, self.slice_ns);
// Update maximum vruntime.
self.max_vruntime = self.max_vruntime.max(task_info.vruntime);
// Update task's vruntime re-aligning it to min_vruntime, to avoid
// over-prioritizing tasks with a mostly sleepy behavior.
if task_info.vruntime < self.min_vruntime {
task_info.vruntime = self.min_vruntime;
}
task_info.vruntime += slice * 100 / task.weight;
// Update total task cputime.
task_info.sum_exec_runtime = task.sum_exec_runtime;
// Refresh voluntay context switches average, counter and timestamp every second.
if now - task_info.nvcsw_ts > NSEC_PER_SEC {
let delta_nvcsw = task.nvcsw - task_info.nvcsw;
let delta_t = (now - task_info.nvcsw_ts).max(1);
let avg_nvcsw = delta_nvcsw * NSEC_PER_SEC / delta_t;
task_info.avg_nvcsw = (task_info.avg_nvcsw + avg_nvcsw) / 2;
task_info.nvcsw = task.nvcsw;
task_info.nvcsw_ts = now;
}
// Return the task vruntime and a flag indicating if the task is interactive.
(task_info.vruntime, is_interactive)
// Return the task vruntime.
task_info.vruntime
}
// Drain all the tasks from the queued list, update their vruntime (Self::update_enqueued()),
@ -475,13 +371,16 @@ impl<'a> Scheduler<'a> {
continue;
}
// Update task information and determine vruntime and interactiveness.
let (vruntime, is_interactive) = self.update_enqueued(&task);
// Update task information and determine vruntime.
let vruntime = self.update_enqueued(&task);
let timestamp = Self::now();
let is_interactive = self.interactive_pids.contains(&task.pid);
// Insert task in the task pool (ordered by vruntime).
self.task_pool.push(Task {
qtask: task,
vruntime,
timestamp,
is_interactive,
});
}
@ -501,96 +400,55 @@ impl<'a> Scheduler<'a> {
}
}
// Return the target time slice, proportionally adjusted based on the total amount of tasks
// waiting to be scheduled (more tasks waiting => shorter time slice).
// Dispatch tasks from the task pool in order (sending them to the BPF dispatcher).
fn dispatch_tasks(&mut self) {
// Dispatch only a batch of tasks equal to the amount of idle CPUs in the system.
//
// This allows to have more tasks sitting in the task pool, reducing the pressure on the
// dispatcher queues and giving a chance to higher priority tasks to come in and get
// dispatched earlier, mitigating potential priority inversion issues.
let delta_slice = self.max_vruntime - self.min_vruntime;
let nr_tasks = if delta_slice <= self.slice_ns {
self.nr_idle_cpus().max(1)
} else {
// Scheduler is getting congested, flush all tasks that are waiting to be scheduled to
// mitigate excessive starvation.
usize::MAX
};
for _ in 0..nr_tasks {
match self.task_pool.pop() {
Some(task) => {
// Determine the task's virtual time slice.
//
// The goal is to evaluate the optimal time slice, considering the vruntime as
// a deadline for the task to complete its work before releasing the CPU.
//
// This is accomplished by calculating the difference between the task's
// vruntime and the global current vruntime and use this value as the task time
// slice.
//
// In this way, tasks that "promise" to release the CPU quickly (based on
// their previous work pattern) get a much higher priority (due to
// vruntime-based scheduling and the additional priority boost for being
// classified as interactive), but they are also given a shorter time slice
// to complete their work and fulfill their promise of rapidity.
//
// At the same time tasks that are more CPU-intensive get de-prioritized, but
// they will also tend to have a longer time slice available, reducing in this
// way the amount of context switches that can negatively affect their
// performance.
//
// In conclusion, latency-sensitive tasks get a high priority and a short time
// slice (and they can preempt other tasks), CPU-intensive tasks get low
// priority and a long time slice.
//
// Moreover, ensure that the time slice is never less than 0.25 ms to prevent
// excessive penalty from assigning time slices that are too short and reduce
// context switch overhead.
let slice_ns =
(task.vruntime - self.min_vruntime).clamp(NSEC_PER_MSEC / 4, self.slice_ns);
// Return the total amount of tasks that are waiting to be scheduled.
fn nr_tasks_waiting(&mut self) -> u64 {
let nr_queued = *self.bpf.nr_queued_mut();
let nr_scheduled = *self.bpf.nr_scheduled_mut();
// Update global minimum vruntime.
nr_queued + nr_scheduled
}
// Dispatch the first task from the task pool (sending them to the BPF dispatcher).
fn dispatch_task(&mut self) {
match self.task_pool.pop() {
Some(task) => {
// Update global minimum vruntime.
if self.min_vruntime < task.vruntime {
self.min_vruntime = task.vruntime;
}
// Create a new task to dispatch.
let mut dispatched_task = DispatchedTask::new(&task.qtask);
// Scale time slice based on the amount of tasks that are waiting in the
// scheduler's queue and the previously unused time slice budget, but make sure
// to assign at least slice_us_min.
let slice_ns = (self.slice_ns / (self.nr_tasks_waiting() + 1)).max(self.slice_ns_min);
dispatched_task.set_slice_ns(slice_ns);
// Create a new task to dispatch.
let mut dispatched_task = DispatchedTask::new(&task.qtask);
if task.is_interactive {
// Dispatch interactive tasks on the first CPU available.
dispatched_task.set_flag(RL_CPU_ANY);
// Assign the time slice to the task.
dispatched_task.set_slice_ns(slice_ns);
// Interactive tasks can preempt other tasks.
if !self.no_preemption {
dispatched_task.set_flag(RL_PREEMPT_CPU);
}
}
// Dispatch task on the first CPU available if it is classified as
// interactive, non-interactive tasks will continue to run on the same CPU.
if task.is_interactive {
dispatched_task.set_flag(RL_CPU_ANY);
}
// In full-user mode we skip the built-in idle selection logic, so simply
// dispatch all the tasks on the first CPU available.
if self.full_user {
dispatched_task.set_flag(RL_CPU_ANY);
}
// Send task to the BPF dispatcher.
match self.bpf.dispatch_task(&dispatched_task) {
Ok(_) => {}
Err(_) => {
/*
* Re-add the task to the dispatched list in case of failure and stop
* dispatching.
*/
self.task_pool.push(task);
break;
}
// Send task to the BPF dispatcher.
match self.bpf.dispatch_task(&dispatched_task) {
Ok(_) => {}
Err(_) => {
/*
* Re-add the task to the dispatched list in case of failure and stop
* dispatching.
*/
self.task_pool.push(task);
}
}
None => break,
}
None => {}
}
// Update nr_scheduled to notify the dispatcher that all the tasks received by the
// scheduler has been dispatched, so there is no reason to re-activate the scheduler,
// unless more tasks are queued.
@ -602,7 +460,7 @@ impl<'a> Scheduler<'a> {
// and dispatch them to the BPF part via the dispatched list).
fn schedule(&mut self) {
self.drain_queued_tasks();
self.dispatch_tasks();
self.dispatch_task();
// Yield to avoid using too much CPU from the scheduler itself.
thread::yield_now();
@ -631,37 +489,6 @@ impl<'a> Scheduler<'a> {
}
}
// Get the current CPU where the scheduler is running.
fn get_current_cpu() -> io::Result<i32> {
// Open /proc/self/stat file
let path = Path::new("/proc/self/stat");
let mut file = File::open(path)?;
// Read the content of the file into a String
let mut content = String::new();
file.read_to_string(&mut content)?;
// Split the content into fields using whitespace as the delimiter
let fields: Vec<&str> = content.split_whitespace().collect();
// Parse the 39th field as an i32 and return it.
if let Some(field) = fields.get(38) {
if let Ok(value) = field.parse::<i32>() {
Ok(value)
} else {
Err(io::Error::new(
io::ErrorKind::InvalidData,
"Unable to parse current CPU information as i32",
))
}
} else {
Err(io::Error::new(
io::ErrorKind::InvalidData,
"Unable to get current CPU information",
))
}
}
// Print critical user-space scheduler statistics.
fn print_faults(&mut self) {
// Get counters of scheduling failures.
@ -695,13 +522,11 @@ impl<'a> Scheduler<'a> {
// Print internal scheduler statistics (fetched from the BPF part).
fn print_stats(&mut self) {
// Show minimum vruntime (this should be constantly incrementing).
let delta = self.max_vruntime - self.min_vruntime;
// Show online CPUs, minimum vruntime and time slice.
info!(
"min_vruntime={} max_vruntime={} delta={}us slice={}us",
"cpus={} min_vruntime={} slice={}us",
*self.bpf.nr_online_cpus_mut(),
self.min_vruntime,
self.max_vruntime,
delta / NSEC_PER_USEC,
self.slice_ns / NSEC_PER_USEC,
);
@ -735,26 +560,76 @@ impl<'a> Scheduler<'a> {
// Show total page faults of the user-space scheduler.
self.print_faults();
// Show tasks that are currently running on each core and CPU.
let sched_cpu = match Self::get_current_cpu() {
Ok(cpu_info) => cpu_info,
Err(_) => -1,
};
info!("Running tasks:");
for (core_id, core) in self.topo_map.iter().enumerate() {
for cpu_id in core {
let pid = if *cpu_id as i32 == sched_cpu {
"[self]".to_string()
} else {
self.bpf.get_cpu_pid(*cpu_id as i32).to_string()
};
info!(" core {:2} cpu {:2} pid={}", core_id, cpu_id, pid);
log::logger().flush();
}
fn sync_interactive_tasks(&mut self, stats: &[TaskStat]) {
self.interactive_pids.clear();
info!("{:<8} {:>10} {} <-- interactive tasks", "[pid]", "[nvcsw]", "[comm]");
for i in 0..stats.len() {
let stat = &stats[i];
// At least 10 context switches per sec are required to consider the
// task as interactive.
if stat.nvcsw < 10 {
break;
}
self.interactive_pids.push(stat.pid);
info!(
"{:<8} {:>10} {}",
stat.pid, stat.nvcsw, stat.comm
);
}
log::logger().flush();
}
fn update_interactive_stats(&mut self) -> std::io::Result<Vec<TaskStat>> {
let mut new_stats = Vec::new();
for pid in get_all_pids()? {
if let Ok(stat) = parse_proc_pid_stat(pid) {
// Retrieve the previous nvcsw value, or 0 if not present.
let prev_nvcsw = self.proc_stats.get(&stat.pid).copied().unwrap_or_default();
// Update the proc_stats entry with the new nvcsw.
self.proc_stats.insert(stat.pid, stat.nvcsw);
// Skip the first time that we see the task or if the task has no voluntary context
// switches at all.
if prev_nvcsw > 0 {
// Add the task entry with the delta nvcsw.
let delta_nvcsw = stat.nvcsw.saturating_sub(prev_nvcsw);
new_stats.push(TaskStat {
pid: stat.pid,
comm: stat.comm,
nvcsw: delta_nvcsw,
});
}
}
}
// Sort by delta of nvcsw in descending order to ensure we always classify the tasks with
// greater nvcsw as interactive.
new_stats.sort_by(|a, b| b.nvcsw.cmp(&a.nvcsw));
Ok(new_stats)
}
fn refresh_interactive_tasks(&mut self) -> std::io::Result<()> {
let current_stats = match self.update_interactive_stats() {
Ok(stats) => stats,
Err(e) => {
warn!("Failed to update stats: {}", e);
return Err(e);
}
};
self.sync_interactive_tasks(&current_stats);
Ok(())
}
fn run(&mut self, shutdown: Arc<AtomicBool>) -> Result<UserExitInfo> {
let mut prev_ts = Self::now();
@ -762,12 +637,12 @@ impl<'a> Scheduler<'a> {
// Call the main scheduler body.
self.schedule();
// Print scheduler statistics every second.
let curr_ts = Self::now();
if curr_ts - prev_ts > NSEC_PER_SEC {
let now = Self::now();
if now - prev_ts > NSEC_PER_SEC {
self.print_stats();
self.refresh_interactive_tasks().unwrap();
prev_ts = curr_ts;
prev_ts = now;
}
}
// Dump scheduler statistics before exiting