scx_rustland: user-space interactive task classifier

We don't need to send the number of voluntary context switches (nvcsw)
from BPF to user-space, as this information is already accessible in
user-space via procfs. Sending this data would only create unnecessary
overhead for schedulers that don't require it, and those that do can
easily retrieve it through procfs.

Therefore, drop this metric from scx_rustland_core and change
scx_rustland implementing an interactive task classifier fully in the
user-space part of the scheduler.

Also drop some options that are not provide any significant benefit
(also in preparation of a bigger refactoring to define a better API for
the user-space framework).

Signed-off-by: Andrea Righi <andrea.righi@linux.dev>
This commit is contained in:
Andrea Righi 2024-08-04 15:47:06 +02:00
parent a8d14fc0c4
commit d8985306f4
5 changed files with 147 additions and 134 deletions

View File

@ -20,8 +20,6 @@ use libc::{pthread_self, pthread_setschedparam, sched_param};
#[cfg(target_env = "musl")]
use libc::timespec;
use scx_utils::compat;
use scx_utils::init_libbpf_logging;
use scx_utils::scx_ops_attach;
use scx_utils::scx_ops_load;
use scx_utils::scx_ops_open;
@ -73,7 +71,6 @@ pub struct QueuedTask {
pub pid: i32, // pid that uniquely identifies a task
pub cpu: i32, // CPU where the task is running (-1 = exiting)
pub sum_exec_runtime: u64, // Total cpu time
pub nvcsw: u64, // Voluntary context switches
pub weight: u64, // Task static priority
cpumask_cnt: u64, // cpumask generation counter (private)
}
@ -152,7 +149,6 @@ impl EnqueuedMessage {
cpu: self.inner.cpu,
cpumask_cnt: self.inner.cpumask_cnt,
sum_exec_runtime: self.inner.sum_exec_runtime,
nvcsw: self.inner.nvcsw,
weight: self.inner.weight,
}
}
@ -184,7 +180,6 @@ impl<'cb> BpfScheduler<'cb> {
pub fn init(
slice_us: u64,
nr_cpus_online: i32,
partial: bool,
exit_dump_len: u32,
full_user: bool,
low_power: bool,
@ -193,7 +188,6 @@ impl<'cb> BpfScheduler<'cb> {
) -> Result<Self> {
// Open the BPF prog first for verification.
let skel_builder = BpfSkelBuilder::default();
init_libbpf_logging(None);
let mut skel = scx_ops_open!(skel_builder, rustland)?;
// Lock all the memory to prevent page faults that could trigger potential deadlocks during
@ -242,9 +236,6 @@ impl<'cb> BpfScheduler<'cb> {
skel.rodata_mut().num_possible_cpus = nr_cpus_online;
// Set scheduler options (defined in the BPF part).
if partial {
skel.struct_ops.rustland_mut().flags |= *compat::SCX_OPS_SWITCH_PARTIAL;
}
skel.struct_ops.rustland_mut().exit_dump_len = exit_dump_len;
skel.bss_mut().usersched_pid = std::process::id();

View File

@ -65,7 +65,6 @@ struct queued_task_ctx {
s32 cpu; /* CPU where the task is running (-1 = exiting) */
u64 cpumask_cnt; /* cpumask generation counter */
u64 sum_exec_runtime; /* Total cpu time */
u64 nvcsw; /* Voluntary context switches */
u64 weight; /* Task static priority */
};

View File

@ -462,7 +462,7 @@ static void dispatch_user_scheduler(void)
* Dispatch the scheduler on the first CPU available, likely the
* current one.
*/
dispatch_task(p, SHARED_DSQ, 0, 0, SCX_ENQ_PREEMPT);
dispatch_task(p, SHARED_DSQ, 0, 0, 0);
bpf_task_release(p);
}
@ -608,7 +608,6 @@ static void get_task_info(struct queued_task_ctx *task,
return;
task->cpumask_cnt = tctx->cpumask_cnt;
task->sum_exec_runtime = p->se.sum_exec_runtime;
task->nvcsw = p->nvcsw;
task->weight = p->scx.weight;
task->cpu = scx_bpf_task_cpu(p);
}
@ -761,15 +760,16 @@ void BPF_STRUCT_OPS(rustland_dispatch, s32 cpu, struct task_struct *prev)
*/
bpf_user_ringbuf_drain(&dispatched, handle_dispatched_task, NULL, 0);
/* Consume first task both from the shared DSQ and the per-CPU DSQ */
/*
* First try to consume a task from the per-CPU DSQ.
*/
if (scx_bpf_consume(cpu_to_dsq(cpu)))
return;
/*
* Consume a task from the shared DSQ.
*/
scx_bpf_consume(SHARED_DSQ);
if (scx_bpf_consume(cpu_to_dsq(cpu))) {
/*
* Re-kick the current CPU if there are more tasks in the
* per-CPU DSQ
*/
scx_bpf_kick_cpu(cpu, 0);
}
}
/*

View File

@ -30,7 +30,6 @@ impl<'a> Scheduler<'a> {
let bpf = BpfScheduler::init(
5000, // slice_ns (default task time slice)
topo.nr_cpu_ids() as i32, // nr_cpus (max CPUs available in the system)
false, // partial (include all tasks if disabled)
0, // exit_dump_len (buffer size of exit info)
true, // full_user (schedule all tasks in user-space)
false, // low_power (low power mode)

View File

@ -101,16 +101,6 @@ struct Opts {
#[clap(short = 'b', long, default_value = "100")]
slice_boost: u64,
/// If specified, disable task preemption.
///
/// Disabling task preemption can help to improve the throughput of CPU-intensive tasks, while
/// still providing a good level of system responsiveness.
///
/// Preemption is enabled by default to provide a higher level of responsiveness to the
/// interactive tasks.
#[clap(short = 'n', long, action = clap::ArgAction::SetTrue)]
no_preemption: bool,
/// If specified, all the scheduling events and actions will be processed in user-space,
/// disabling any form of in-kernel optimization.
///
@ -140,12 +130,6 @@ struct Opts {
#[clap(short = 'f', long, action = clap::ArgAction::SetTrue)]
disable_fifo: bool,
/// If specified, only tasks which have their scheduling policy set to
/// SCHED_EXT using sched_setscheduler(2) are switched. Otherwise, all
/// tasks are switched.
#[clap(short = 'p', long, action = clap::ArgAction::SetTrue)]
partial: bool,
/// Exit debug dump buffer length. 0 indicates default.
#[clap(long, default_value = "0")]
exit_dump_len: u32,
@ -165,14 +149,53 @@ const NSEC_PER_USEC: u64 = 1_000;
const NSEC_PER_MSEC: u64 = 1_000_000;
const NSEC_PER_SEC: u64 = 1_000_000_000;
#[derive(Debug, Clone)]
struct TaskStat {
pid: i32,
comm: String,
nvcsw: u64,
}
fn parse_proc_pid_stat(pid: i32) -> std::io::Result<TaskStat> {
let path = format!("/proc/{}/status", pid);
let content = std::fs::read_to_string(&path)?;
let mut comm = String::new();
let mut nvcsw = 0;
for line in content.lines() {
if line.starts_with("Name:") {
comm = line.split_whitespace().nth(1).unwrap_or("").to_string();
} else if line.starts_with("voluntary_ctxt_switches:") {
nvcsw = line.split_whitespace().nth(1).unwrap_or("0").parse().unwrap_or(0);
}
}
Ok(TaskStat {
pid,
comm,
nvcsw,
})
}
fn get_all_pids() -> std::io::Result<Vec<i32>> {
let mut pids = Vec::new();
for entry in std::fs::read_dir("/proc")? {
if let Ok(entry) = entry {
let file_name = entry.file_name();
if let Ok(pid) = file_name.to_string_lossy().parse::<i32>() {
pids.push(pid);
}
}
}
Ok(pids)
}
// Basic item stored in the task information map.
#[derive(Debug)]
struct TaskInfo {
sum_exec_runtime: u64, // total cpu time used by the task
vruntime: u64, // total vruntime of the task
avg_nvcsw: u64, // average of voluntary context switches
nvcsw: u64, // total amount of voluntary context switches
nvcsw_ts: u64, // timestamp of the previous nvcsw update
}
// Task information map: store total execution time and vruntime of each task in the system.
@ -202,7 +225,6 @@ impl TaskInfoMap {
struct Task {
qtask: QueuedTask, // queued task
vruntime: u64, // total vruntime (that determines the order how tasks are dispatched)
is_interactive: bool, // task can preempt other tasks
}
// Make sure tasks are ordered by vruntime, if multiple tasks have the same vruntime order by pid.
@ -260,13 +282,13 @@ struct Scheduler<'a> {
topo_map: TopologyMap, // Host topology
task_pool: TaskTree, // tasks ordered by vruntime
task_map: TaskInfoMap, // map pids to the corresponding task information
proc_stats: HashMap<i32, u64>, // Task statistics from procfs
interactive_pids: Vec<i32>, // List of interactive tasks
min_vruntime: u64, // Keep track of the minimum vruntime across all tasks
max_vruntime: u64, // Keep track of the maximum vruntime across all tasks
init_page_faults: u64, // Initial page faults counter
slice_ns: u64, // Default time slice (in ns)
slice_boost: u64, // Slice booster
init_page_faults: u64, // Initial page faults counter
no_preemption: bool, // Disable task preemption
full_user: bool, // Run all tasks through the user-space scheduler
}
impl<'a> Scheduler<'a> {
@ -275,37 +297,11 @@ impl<'a> Scheduler<'a> {
let topo = Topology::new().expect("Failed to build host topology");
let topo_map = TopologyMap::new(&topo).expect("Failed to generate topology map");
// Save the default time slice (in ns) in the scheduler class.
let slice_ns = opts.slice_us * NSEC_PER_USEC;
// Slice booster (0 = disabled).
let slice_boost = opts.slice_boost;
// Disable task preemption.
let no_preemption = opts.no_preemption;
// Run all tasks through the user-space scheduler.
let full_user = opts.full_user;
// Scheduler task pool to sort tasks by vruntime.
let task_pool = TaskTree::new();
// Scheduler task map to store tasks information.
let task_map = TaskInfoMap::new();
// Initialize global minimum and maximum vruntime.
let min_vruntime: u64 = 0;
let max_vruntime: u64 = 0;
// Initialize initial page fault counter.
let init_page_faults: u64 = 0;
// Low-level BPF connector.
let nr_cpus = topo.nr_cpu_ids();
let bpf = BpfScheduler::init(
opts.slice_us,
nr_cpus as i32,
opts.partial,
opts.exit_dump_len,
opts.full_user,
opts.low_power,
@ -318,15 +314,15 @@ impl<'a> Scheduler<'a> {
Ok(Self {
bpf,
topo_map,
task_pool,
task_map,
min_vruntime,
max_vruntime,
slice_ns,
slice_boost,
init_page_faults,
no_preemption,
full_user,
task_pool: TaskTree::new(),
task_map: TaskInfoMap::new(),
proc_stats: HashMap::new(),
interactive_pids: Vec::new(),
min_vruntime: 0,
max_vruntime: 0,
init_page_faults: 0,
slice_ns: opts.slice_us * NSEC_PER_USEC,
slice_boost: opts.slice_boost,
})
}
@ -364,11 +360,10 @@ impl<'a> Scheduler<'a> {
}
// Update task's vruntime based on the information collected from the kernel and return to the
// caller the evaluated weighted time slice along with a flag indicating whether the task is
// interactive or not (interactive tasks are allowed to preempt other tasks).
// caller the evaluated weighted time slice.
//
// This method implements the main task ordering logic of the scheduler.
fn update_enqueued(&mut self, task: &QueuedTask) -> (u64, bool) {
fn update_enqueued(&mut self, task: &QueuedTask) -> u64 {
// Determine if a task is new or old, based on their current runtime and previous runtime
// counters.
//
@ -384,9 +379,6 @@ impl<'a> Scheduler<'a> {
curr_runtime < prev_runtime || prev_runtime == 0
}
// Cache the current timestamp.
let now = Self::now();
// Get task information if the task is already stored in the task map,
// otherwise create a new entry for it.
let task_info = self
@ -396,9 +388,6 @@ impl<'a> Scheduler<'a> {
.or_insert_with_key(|&_pid| TaskInfo {
sum_exec_runtime: 0,
vruntime: self.min_vruntime,
nvcsw: task.nvcsw,
nvcsw_ts: now,
avg_nvcsw: 0,
});
// Evaluate last time slot used by the task.
@ -408,19 +397,11 @@ impl<'a> Scheduler<'a> {
task.sum_exec_runtime - task_info.sum_exec_runtime
};
// Determine if a task is interactive, based on the moving average of voluntary context
// switches over time.
//
// NOTE: we should make this threshold a tunable, but for now let's assume that a moving
// average of 10 voluntary context switch per second is enough to classify the task as
// interactive.
let is_interactive = task_info.avg_nvcsw >= 10;
// Apply the slice boost to interactive tasks.
//
// NOTE: some tasks may have a very high weight, that can potentially disrupt our slice
// boost optimizations, therefore always limit the task priority to a max of 1000.
let weight = if is_interactive {
// Some tasks may have a very high weight, that can potentially disrupt our slice boost
// optimizations, therefore always limit the task priority to a max of 1000.
let weight = if self.interactive_pids.contains(&task.pid) {
task.weight.min(1000) * self.slice_boost.max(1)
} else {
task.weight.min(1000)
@ -447,19 +428,8 @@ impl<'a> Scheduler<'a> {
// Update total task cputime.
task_info.sum_exec_runtime = task.sum_exec_runtime;
// Refresh voluntay context switches average, counter and timestamp every second.
if now - task_info.nvcsw_ts > NSEC_PER_SEC {
let delta_nvcsw = task.nvcsw - task_info.nvcsw;
let delta_t = (now - task_info.nvcsw_ts).max(1);
let avg_nvcsw = delta_nvcsw * NSEC_PER_SEC / delta_t;
task_info.avg_nvcsw = (task_info.avg_nvcsw + avg_nvcsw) / 2;
task_info.nvcsw = task.nvcsw;
task_info.nvcsw_ts = now;
}
// Return the task vruntime and a flag indicating if the task is interactive.
(task_info.vruntime, is_interactive)
// Return the task vruntime.
task_info.vruntime
}
// Drain all the tasks from the queued list, update their vruntime (Self::update_enqueued()),
@ -475,14 +445,13 @@ impl<'a> Scheduler<'a> {
continue;
}
// Update task information and determine vruntime and interactiveness.
let (vruntime, is_interactive) = self.update_enqueued(&task);
// Update task information and determine vruntime.
let vruntime = self.update_enqueued(&task);
// Insert task in the task pool (ordered by vruntime).
self.task_pool.push(Task {
qtask: task,
vruntime,
is_interactive,
});
}
Ok(None) => {
@ -557,23 +526,11 @@ impl<'a> Scheduler<'a> {
// Create a new task to dispatch.
let mut dispatched_task = DispatchedTask::new(&task.qtask);
// Assign the time slice to the task.
dispatched_task.set_slice_ns(slice_ns);
if task.is_interactive {
// Dispatch interactive tasks on the first CPU available.
dispatched_task.set_flag(RL_CPU_ANY);
// Interactive tasks can preempt other tasks.
if !self.no_preemption {
dispatched_task.set_flag(RL_PREEMPT_CPU);
}
}
// In full-user mode we skip the built-in idle selection logic, so simply
// dispatch all the tasks on the first CPU available.
if self.full_user {
dispatched_task.set_flag(RL_CPU_ANY);
}
// Dispatch tasks on the first CPU available.
dispatched_task.set_flag(RL_CPU_ANY);
// Send task to the BPF dispatcher.
match self.bpf.dispatch_task(&dispatched_task) {
@ -755,6 +712,73 @@ impl<'a> Scheduler<'a> {
log::logger().flush();
}
fn sync_interactive_tasks(&mut self, stats: &[TaskStat]) {
self.interactive_pids.clear();
info!("{:<8} {:>10} {} <-- interactive tasks", "[pid]", "[nvcsw]", "[comm]");
for i in 0..stats.len() {
let stat = &stats[i];
// At least 10 context switches per sec are required to consider the
// task as interactive.
if stat.nvcsw < 10 {
break;
}
self.interactive_pids.push(stat.pid);
info!(
"{:<8} {:>10} {}",
stat.pid, stat.nvcsw, stat.comm
);
}
log::logger().flush();
}
fn update_interactive_stats(&mut self) -> std::io::Result<Vec<TaskStat>> {
let mut new_stats = Vec::new();
for pid in get_all_pids()? {
if let Ok(stat) = parse_proc_pid_stat(pid) {
// Retrieve the previous nvcsw value, or 0 if not present.
let prev_nvcsw = self.proc_stats.get(&stat.pid).copied().unwrap_or_default();
// Update the proc_stats entry with the new nvcsw.
self.proc_stats.insert(stat.pid, stat.nvcsw);
// Skip the first time that we see the task or if the task has no voluntary context
// switches at all.
if prev_nvcsw > 0 {
// Add the task entry with the delta nvcsw.
let delta_nvcsw = stat.nvcsw.saturating_sub(prev_nvcsw);
new_stats.push(TaskStat {
pid: stat.pid,
comm: stat.comm,
nvcsw: delta_nvcsw,
});
}
}
}
// Sort by delta of nvcsw in descending order to ensure we always classify the tasks with
// greater nvcsw as interactive.
new_stats.sort_by(|a, b| b.nvcsw.cmp(&a.nvcsw));
Ok(new_stats)
}
fn refresh_interactive_tasks(&mut self) -> std::io::Result<()> {
let current_stats = match self.update_interactive_stats() {
Ok(stats) => stats,
Err(e) => {
warn!("Failed to update stats: {}", e);
return Err(e);
}
};
self.sync_interactive_tasks(&current_stats);
Ok(())
}
fn run(&mut self, shutdown: Arc<AtomicBool>) -> Result<UserExitInfo> {
let mut prev_ts = Self::now();
@ -762,12 +786,12 @@ impl<'a> Scheduler<'a> {
// Call the main scheduler body.
self.schedule();
// Print scheduler statistics every second.
let curr_ts = Self::now();
if curr_ts - prev_ts > NSEC_PER_SEC {
let now = Self::now();
if now - prev_ts > NSEC_PER_SEC {
self.print_stats();
self.refresh_interactive_tasks().unwrap();
prev_ts = curr_ts;
prev_ts = now;
}
}
// Dump scheduler statistics before exiting