scx_rustland: use built-in nvcsw metrics

Use the nvcsw metric from the scx_rustland_core backend, intead of
retrieving this metric in user-space via procfs.

Signed-off-by: Andrea Righi <andrea.righi@linux.dev>
This commit is contained in:
Andrea Righi 2024-10-12 00:49:15 +02:00
parent 97629178e2
commit 0b2de2c10c

View File

@ -121,57 +121,12 @@ const NSEC_PER_SEC: u64 = 1_000_000_000;
// context switches per second.
const RL_MAX_VTIME_FACTOR: u64 = 4;
#[derive(Debug, Clone)]
struct TaskStat {
pid: i32,
nvcsw: u64,
}
fn parse_proc_pid_stat(pid: i32) -> std::io::Result<TaskStat> {
let path = format!("/proc/{}/status", pid);
let content = std::fs::read_to_string(&path)?;
let mut nvcsw = 0;
for line in content.lines() {
if line.starts_with("Name:") {
let name = line.split_whitespace().nth(1).unwrap_or("");
// Prioritize kthreads by assigning max nvcsw rate.
if name.starts_with('[') && name.ends_with(']') {
nvcsw = u64::MAX;
break;
}
}
if line.starts_with("voluntary_ctxt_switches:") {
nvcsw = line
.split_whitespace()
.nth(1)
.unwrap_or("0")
.parse()
.unwrap_or(0);
}
}
Ok(TaskStat { pid, nvcsw })
}
fn get_all_pids() -> std::io::Result<Vec<i32>> {
let mut pids = Vec::new();
for entry in std::fs::read_dir("/proc")? {
if let Ok(entry) = entry {
let file_name = entry.file_name();
if let Ok(pid) = file_name.to_string_lossy().parse::<i32>() {
pids.push(pid);
}
}
}
Ok(pids)
}
// Basic item stored in the task information map.
#[derive(Debug)]
struct TaskInfo {
nvcsw: u64,
nvcsw_ts: u64,
avg_nvcsw: u64,
sum_exec_runtime: u64, // total cpu time used by the task
vruntime: u64, // total vruntime of the task
}
@ -264,8 +219,6 @@ struct Scheduler<'a> {
stats_server: StatsServer<(), Metrics>, // statistics
task_pool: TaskTree, // tasks ordered by vruntime
task_map: TaskInfoMap, // map pids to the corresponding task information
proc_stats: HashMap<i32, u64>, // Task statistics from procfs
interactive_pids: HashMap<i32, u64>, // Map of task PIDs to their avg_nvcsw metric
min_vruntime: u64, // Keep track of the minimum vruntime across all tasks
init_page_faults: u64, // Initial page faults counter
slice_ns: u64, // Default time slice (in ns)
@ -286,8 +239,6 @@ impl<'a> Scheduler<'a> {
stats_server,
task_pool: TaskTree::new(),
task_map: TaskInfoMap::new(),
proc_stats: HashMap::new(),
interactive_pids: HashMap::new(),
min_vruntime: 0,
init_page_faults: 0,
slice_ns: opts.slice_us * NSEC_PER_USEC,
@ -328,16 +279,16 @@ impl<'a> Scheduler<'a> {
ts.as_nanos() as u64
}
fn calc_avg(old_val: u64, new_val: u64) -> u64 {
(old_val - (old_val >> 2)) + (new_val >> 2)
}
// Update task's vruntime based on the information collected from the kernel and return to the
// caller the evaluated task's vruntime.
//
// This method implements the main task ordering logic of the scheduler.
fn update_enqueued(&mut self, task: &QueuedTask) -> u64 {
// Determine if a task is new or old, based on their current runtime and previous runtime
// counters.
fn is_new_task(curr_runtime: u64, prev_runtime: u64) -> bool {
curr_runtime < prev_runtime || prev_runtime == 0
}
let now = Self::now();
// Get task information if the task is already stored in the task map,
// otherwise create a new entry for it.
@ -346,28 +297,38 @@ impl<'a> Scheduler<'a> {
.tasks
.entry(task.pid)
.or_insert_with_key(|&_pid| TaskInfo {
sum_exec_runtime: 0,
nvcsw: task.nvcsw,
nvcsw_ts: now,
avg_nvcsw: 0,
sum_exec_runtime: task.sum_exec_runtime,
vruntime: self.min_vruntime,
});
// Evaluate used task time slice.
let slice = if is_new_task(task.sum_exec_runtime, task_info.sum_exec_runtime) {
task.sum_exec_runtime
} else {
task.sum_exec_runtime - task_info.sum_exec_runtime
}
.min(self.slice_ns);
// Refresh voluntary context switches metrics.
let delta_t = now - task_info.nvcsw_ts;
if delta_t >= NSEC_PER_SEC {
let delta_nvcsw = task.nvcsw - task_info.nvcsw;
let avg_nvcsw = (delta_nvcsw * NSEC_PER_SEC / delta_t).min(1000);
// Update task's vruntime re-aligning it to min_vruntime, to avoid
// over-prioritizing tasks with a mostly sleepy behavior.
if task_info.vruntime < self.min_vruntime {
task_info.vruntime = self.min_vruntime;
task_info.nvcsw = task.nvcsw;
task_info.nvcsw_ts = now;
task_info.avg_nvcsw = Self::calc_avg(task_info.avg_nvcsw, avg_nvcsw);
}
task_info.vruntime += slice * 100 / task.weight;
// Evaluate used task time slice.
let slice = task.sum_exec_runtime.saturating_sub(task_info.sum_exec_runtime).min(self.slice_ns);
// Update total task cputime.
task_info.sum_exec_runtime = task.sum_exec_runtime;
// Update task's vruntime re-aligning it to min_vruntime, to avoid
// over-prioritizing tasks with a mostly sleepy behavior.
let min_vruntime = self.min_vruntime - self.slice_ns * task.weight / 100;
if task_info.vruntime < min_vruntime {
task_info.vruntime = min_vruntime;
}
task_info.vruntime += slice * 100 / task.weight;
// Evaluate the target vruntime for the task.
let mut vruntime = task_info.vruntime;
@ -377,7 +338,7 @@ impl<'a> Scheduler<'a> {
// This essentially creates multiple priority lanes (up to RL_MAX_VTIME_FACTOR + 1), where
// tasks with a higher rate of average context are scheduled before others. Tasks with the
// same average context switch rate are still ordered by their weighted used time slice.
let avg_nvcsw = self.interactive_pids.get(&task.pid).copied().unwrap_or(0);
let avg_nvcsw = task_info.avg_nvcsw;
vruntime = vruntime / (avg_nvcsw.min(RL_MAX_VTIME_FACTOR) + 1) + slice * 100 / task.weight;
// Make sure to not de-prioritize tasks too much to prevent starvation.
@ -505,56 +466,8 @@ impl<'a> Scheduler<'a> {
}
}
fn sync_interactive_tasks(&mut self, stats: &[TaskStat]) {
for i in 0..stats.len() {
let stat = &stats[i];
self.interactive_pids.insert(stat.pid, stat.nvcsw);
}
}
fn update_interactive_stats(&mut self) -> std::io::Result<Vec<TaskStat>> {
let mut new_stats = Vec::new();
for pid in get_all_pids()? {
if let Ok(stat) = parse_proc_pid_stat(pid) {
// Retrieve the previous nvcsw value, or 0 if not present.
let prev_nvcsw = self.proc_stats.get(&stat.pid).copied().unwrap_or(0);
// Update the proc_stats entry with the new nvcsw.
self.proc_stats.insert(stat.pid, stat.nvcsw);
// Skip the first time that we see the task or if the task has no voluntary context
// switches at all.
if prev_nvcsw > 0 {
// Add the task entry with the delta nvcsw.
let delta_nvcsw = stat.nvcsw.saturating_sub(prev_nvcsw);
new_stats.push(TaskStat {
pid: stat.pid,
nvcsw: delta_nvcsw,
});
}
}
}
Ok(new_stats)
}
fn refresh_interactive_tasks(&mut self) -> std::io::Result<()> {
let current_stats = match self.update_interactive_stats() {
Ok(stats) => stats,
Err(e) => {
warn!("Failed to update stats: {}", e);
return Err(e);
}
};
self.sync_interactive_tasks(&current_stats);
Ok(())
}
fn run(&mut self) -> Result<UserExitInfo> {
let (res_ch, req_ch) = self.stats_server.channels();
let mut prev_ts = Self::now();
while !self.bpf.exited() {
// Call the main scheduler body.
@ -565,13 +478,6 @@ impl<'a> Scheduler<'a> {
Ok(()) => res_ch.send(self.get_metrics())?,
Err(_) => {}
}
// Refresh tasks statistics.
let now = Self::now();
if now - prev_ts > NSEC_PER_SEC {
self.refresh_interactive_tasks().unwrap();
prev_ts = now;
}
}
self.bpf.shutdown_and_report()