From 0b2de2c10c93b6a50399cef00f9ffc27b6fbeb7f Mon Sep 17 00:00:00 2001 From: Andrea Righi Date: Sat, 12 Oct 2024 00:49:15 +0200 Subject: [PATCH] scx_rustland: use built-in nvcsw metrics Use the nvcsw metric from the scx_rustland_core backend, intead of retrieving this metric in user-space via procfs. Signed-off-by: Andrea Righi --- scheds/rust/scx_rustland/src/main.rs | 158 ++++++--------------------- 1 file changed, 32 insertions(+), 126 deletions(-) diff --git a/scheds/rust/scx_rustland/src/main.rs b/scheds/rust/scx_rustland/src/main.rs index 9facffe..a12f9ab 100644 --- a/scheds/rust/scx_rustland/src/main.rs +++ b/scheds/rust/scx_rustland/src/main.rs @@ -121,57 +121,12 @@ const NSEC_PER_SEC: u64 = 1_000_000_000; // context switches per second. const RL_MAX_VTIME_FACTOR: u64 = 4; -#[derive(Debug, Clone)] -struct TaskStat { - pid: i32, - nvcsw: u64, -} - -fn parse_proc_pid_stat(pid: i32) -> std::io::Result { - let path = format!("/proc/{}/status", pid); - let content = std::fs::read_to_string(&path)?; - - let mut nvcsw = 0; - - for line in content.lines() { - if line.starts_with("Name:") { - let name = line.split_whitespace().nth(1).unwrap_or(""); - // Prioritize kthreads by assigning max nvcsw rate. - if name.starts_with('[') && name.ends_with(']') { - nvcsw = u64::MAX; - break; - } - } - - if line.starts_with("voluntary_ctxt_switches:") { - nvcsw = line - .split_whitespace() - .nth(1) - .unwrap_or("0") - .parse() - .unwrap_or(0); - } - } - - Ok(TaskStat { pid, nvcsw }) -} - -fn get_all_pids() -> std::io::Result> { - let mut pids = Vec::new(); - for entry in std::fs::read_dir("/proc")? { - if let Ok(entry) = entry { - let file_name = entry.file_name(); - if let Ok(pid) = file_name.to_string_lossy().parse::() { - pids.push(pid); - } - } - } - Ok(pids) -} - // Basic item stored in the task information map. #[derive(Debug)] struct TaskInfo { + nvcsw: u64, + nvcsw_ts: u64, + avg_nvcsw: u64, sum_exec_runtime: u64, // total cpu time used by the task vruntime: u64, // total vruntime of the task } @@ -264,8 +219,6 @@ struct Scheduler<'a> { stats_server: StatsServer<(), Metrics>, // statistics task_pool: TaskTree, // tasks ordered by vruntime task_map: TaskInfoMap, // map pids to the corresponding task information - proc_stats: HashMap, // Task statistics from procfs - interactive_pids: HashMap, // Map of task PIDs to their avg_nvcsw metric min_vruntime: u64, // Keep track of the minimum vruntime across all tasks init_page_faults: u64, // Initial page faults counter slice_ns: u64, // Default time slice (in ns) @@ -286,8 +239,6 @@ impl<'a> Scheduler<'a> { stats_server, task_pool: TaskTree::new(), task_map: TaskInfoMap::new(), - proc_stats: HashMap::new(), - interactive_pids: HashMap::new(), min_vruntime: 0, init_page_faults: 0, slice_ns: opts.slice_us * NSEC_PER_USEC, @@ -328,16 +279,16 @@ impl<'a> Scheduler<'a> { ts.as_nanos() as u64 } + fn calc_avg(old_val: u64, new_val: u64) -> u64 { + (old_val - (old_val >> 2)) + (new_val >> 2) + } + // Update task's vruntime based on the information collected from the kernel and return to the // caller the evaluated task's vruntime. // // This method implements the main task ordering logic of the scheduler. fn update_enqueued(&mut self, task: &QueuedTask) -> u64 { - // Determine if a task is new or old, based on their current runtime and previous runtime - // counters. - fn is_new_task(curr_runtime: u64, prev_runtime: u64) -> bool { - curr_runtime < prev_runtime || prev_runtime == 0 - } + let now = Self::now(); // Get task information if the task is already stored in the task map, // otherwise create a new entry for it. @@ -346,28 +297,38 @@ impl<'a> Scheduler<'a> { .tasks .entry(task.pid) .or_insert_with_key(|&_pid| TaskInfo { - sum_exec_runtime: 0, + nvcsw: task.nvcsw, + nvcsw_ts: now, + avg_nvcsw: 0, + sum_exec_runtime: task.sum_exec_runtime, vruntime: self.min_vruntime, }); - // Evaluate used task time slice. - let slice = if is_new_task(task.sum_exec_runtime, task_info.sum_exec_runtime) { - task.sum_exec_runtime - } else { - task.sum_exec_runtime - task_info.sum_exec_runtime - } - .min(self.slice_ns); + // Refresh voluntary context switches metrics. + let delta_t = now - task_info.nvcsw_ts; + if delta_t >= NSEC_PER_SEC { + let delta_nvcsw = task.nvcsw - task_info.nvcsw; + let avg_nvcsw = (delta_nvcsw * NSEC_PER_SEC / delta_t).min(1000); - // Update task's vruntime re-aligning it to min_vruntime, to avoid - // over-prioritizing tasks with a mostly sleepy behavior. - if task_info.vruntime < self.min_vruntime { - task_info.vruntime = self.min_vruntime; + task_info.nvcsw = task.nvcsw; + task_info.nvcsw_ts = now; + task_info.avg_nvcsw = Self::calc_avg(task_info.avg_nvcsw, avg_nvcsw); } - task_info.vruntime += slice * 100 / task.weight; + + // Evaluate used task time slice. + let slice = task.sum_exec_runtime.saturating_sub(task_info.sum_exec_runtime).min(self.slice_ns); // Update total task cputime. task_info.sum_exec_runtime = task.sum_exec_runtime; + // Update task's vruntime re-aligning it to min_vruntime, to avoid + // over-prioritizing tasks with a mostly sleepy behavior. + let min_vruntime = self.min_vruntime - self.slice_ns * task.weight / 100; + if task_info.vruntime < min_vruntime { + task_info.vruntime = min_vruntime; + } + task_info.vruntime += slice * 100 / task.weight; + // Evaluate the target vruntime for the task. let mut vruntime = task_info.vruntime; @@ -377,7 +338,7 @@ impl<'a> Scheduler<'a> { // This essentially creates multiple priority lanes (up to RL_MAX_VTIME_FACTOR + 1), where // tasks with a higher rate of average context are scheduled before others. Tasks with the // same average context switch rate are still ordered by their weighted used time slice. - let avg_nvcsw = self.interactive_pids.get(&task.pid).copied().unwrap_or(0); + let avg_nvcsw = task_info.avg_nvcsw; vruntime = vruntime / (avg_nvcsw.min(RL_MAX_VTIME_FACTOR) + 1) + slice * 100 / task.weight; // Make sure to not de-prioritize tasks too much to prevent starvation. @@ -505,56 +466,8 @@ impl<'a> Scheduler<'a> { } } - fn sync_interactive_tasks(&mut self, stats: &[TaskStat]) { - for i in 0..stats.len() { - let stat = &stats[i]; - self.interactive_pids.insert(stat.pid, stat.nvcsw); - } - } - - fn update_interactive_stats(&mut self) -> std::io::Result> { - let mut new_stats = Vec::new(); - - for pid in get_all_pids()? { - if let Ok(stat) = parse_proc_pid_stat(pid) { - // Retrieve the previous nvcsw value, or 0 if not present. - let prev_nvcsw = self.proc_stats.get(&stat.pid).copied().unwrap_or(0); - - // Update the proc_stats entry with the new nvcsw. - self.proc_stats.insert(stat.pid, stat.nvcsw); - - // Skip the first time that we see the task or if the task has no voluntary context - // switches at all. - if prev_nvcsw > 0 { - // Add the task entry with the delta nvcsw. - let delta_nvcsw = stat.nvcsw.saturating_sub(prev_nvcsw); - new_stats.push(TaskStat { - pid: stat.pid, - nvcsw: delta_nvcsw, - }); - } - } - } - - Ok(new_stats) - } - - fn refresh_interactive_tasks(&mut self) -> std::io::Result<()> { - let current_stats = match self.update_interactive_stats() { - Ok(stats) => stats, - Err(e) => { - warn!("Failed to update stats: {}", e); - return Err(e); - } - }; - self.sync_interactive_tasks(¤t_stats); - - Ok(()) - } - fn run(&mut self) -> Result { let (res_ch, req_ch) = self.stats_server.channels(); - let mut prev_ts = Self::now(); while !self.bpf.exited() { // Call the main scheduler body. @@ -565,13 +478,6 @@ impl<'a> Scheduler<'a> { Ok(()) => res_ch.send(self.get_metrics())?, Err(_) => {} } - - // Refresh tasks statistics. - let now = Self::now(); - if now - prev_ts > NSEC_PER_SEC { - self.refresh_interactive_tasks().unwrap(); - prev_ts = now; - } } self.bpf.shutdown_and_report()