Merge pull request #238 from sched-ext/rustland-reduce-topology-overhead

scx_rustland: reduce overhead by caching host topology
This commit is contained in:
Andrea Righi 2024-04-24 22:24:23 +02:00 committed by GitHub
commit 973aded5a8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 61 additions and 10 deletions

View File

@ -61,6 +61,7 @@ pub use topology::Core;
pub use topology::Cpu; pub use topology::Cpu;
pub use topology::Node; pub use topology::Node;
pub use topology::Topology; pub use topology::Topology;
pub use topology::TopologyMap;
mod cpumask; mod cpumask;
pub use cpumask::Cpumask; pub use cpumask::Cpumask;

View File

@ -75,6 +75,7 @@ use glob::glob;
use sscanf::sscanf; use sscanf::sscanf;
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::path::Path; use std::path::Path;
use std::slice::Iter;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct Cpu { pub struct Cpu {
@ -249,6 +250,53 @@ impl Topology {
} }
} }
/// Generate a topology map from a Topology object, represented as an array of arrays.
///
/// Each inner array corresponds to a core containing its associated CPU IDs. This map can
/// facilitate efficient iteration over the host's topology.
///
/// # Example
///
/// ```
/// let topo = Topology::new()?;
/// let topo_map = TopologyMap::new(topo)?;
///
/// for (core_id, core) in topo_map.iter().enumerate() {
/// for cpu in core {
/// println!("core={} cpu={}", core_id, cpu);
/// }
/// }
/// ```
#[derive(Debug)]
pub struct TopologyMap {
map: Vec<Vec<usize>>,
nr_cpus_possible: usize,
}
impl TopologyMap {
pub fn new(topo: Topology) -> Result<TopologyMap> {
let mut map: Vec<Vec<usize>> = Vec::new();
for core in topo.cores().into_iter() {
let mut cpu_ids: Vec<usize> = Vec::new();
for cpu_id in core.span().clone().into_iter() {
cpu_ids.push(cpu_id);
}
map.push(cpu_ids);
}
let nr_cpus_possible = topo.nr_cpus_possible;
Ok(TopologyMap { map, nr_cpus_possible, })
}
pub fn nr_cpus_possible(&self) -> usize {
self.nr_cpus_possible
}
pub fn iter(&self) -> Iter<Vec<usize>> {
self.map.iter()
}
}
/********************************************** /**********************************************
* Helper functions for creating the Topology * * Helper functions for creating the Topology *

View File

@ -10,6 +10,7 @@ mod bpf;
use bpf::*; use bpf::*;
use scx_utils::Topology; use scx_utils::Topology;
use scx_utils::TopologyMap;
use std::thread; use std::thread;
@ -240,7 +241,7 @@ impl TaskTree {
// Main scheduler object // Main scheduler object
struct Scheduler<'a> { struct Scheduler<'a> {
bpf: BpfScheduler<'a>, // BPF connector bpf: BpfScheduler<'a>, // BPF connector
topo: Topology, // Host topology topo_map: TopologyMap, // Host topology
task_pool: TaskTree, // tasks ordered by vruntime task_pool: TaskTree, // tasks ordered by vruntime
task_map: TaskInfoMap, // map pids to the corresponding task information task_map: TaskInfoMap, // map pids to the corresponding task information
min_vruntime: u64, // Keep track of the minimum vruntime across all tasks min_vruntime: u64, // Keep track of the minimum vruntime across all tasks
@ -256,6 +257,7 @@ impl<'a> Scheduler<'a> {
fn init(opts: &Opts) -> Result<Self> { fn init(opts: &Opts) -> Result<Self> {
// Initialize core mapping topology. // Initialize core mapping topology.
let topo = Topology::new().expect("Failed to build host topology"); let topo = Topology::new().expect("Failed to build host topology");
let topo_map = TopologyMap::new(topo).expect("Failed to generate topology map");
// Save the default time slice (in ns) in the scheduler class. // Save the default time slice (in ns) in the scheduler class.
let slice_ns = opts.slice_us * NSEC_PER_USEC; let slice_ns = opts.slice_us * NSEC_PER_USEC;
@ -283,7 +285,7 @@ impl<'a> Scheduler<'a> {
let init_page_faults: u64 = 0; let init_page_faults: u64 = 0;
// Low-level BPF connector. // Low-level BPF connector.
let nr_online_cpus = topo.span().weight(); let nr_online_cpus = topo_map.nr_cpus_possible();
let bpf = BpfScheduler::init( let bpf = BpfScheduler::init(
opts.slice_us, opts.slice_us,
nr_online_cpus as i32, nr_online_cpus as i32,
@ -297,7 +299,7 @@ impl<'a> Scheduler<'a> {
// Return scheduler object. // Return scheduler object.
Ok(Self { Ok(Self {
bpf, bpf,
topo, topo_map,
task_pool, task_pool,
task_map, task_map,
min_vruntime, min_vruntime,
@ -318,10 +320,10 @@ impl<'a> Scheduler<'a> {
let mut idle_cpu_count = 0; let mut idle_cpu_count = 0;
// Count the number of cores where all the CPUs are idle. // Count the number of cores where all the CPUs are idle.
for core in self.topo.cores().iter() { for core in self.topo_map.iter() {
let mut all_idle = true; let mut all_idle = true;
for cpu_id in core.span().clone().into_iter() { for cpu_id in core {
if self.bpf.get_cpu_pid(cpu_id as i32) != 0 { if self.bpf.get_cpu_pid(*cpu_id as i32) != 0 {
all_idle = false; all_idle = false;
break; break;
} }
@ -381,7 +383,7 @@ impl<'a> Scheduler<'a> {
// //
// This allows to survive stress tests that are spawning a massive amount of // This allows to survive stress tests that are spawning a massive amount of
// tasks. // tasks.
self.eff_slice_boost = (self.slice_boost * self.topo.nr_cpus_possible() as u64 self.eff_slice_boost = (self.slice_boost * self.topo_map.nr_cpus_possible() as u64
/ self.task_pool.tasks.len().max(1) as u64) / self.task_pool.tasks.len().max(1) as u64)
.max(1); .max(1);
@ -706,14 +708,14 @@ impl<'a> Scheduler<'a> {
Err(_) => -1, Err(_) => -1,
}; };
info!("Running tasks:"); info!("Running tasks:");
for core in self.topo.cores().iter() { for (core_id, core) in self.topo_map.iter().enumerate() {
for (cpu_id, _) in core.cpus().iter() { for cpu_id in core {
let pid = if *cpu_id as i32 == sched_cpu { let pid = if *cpu_id as i32 == sched_cpu {
"[self]".to_string() "[self]".to_string()
} else { } else {
self.bpf.get_cpu_pid(*cpu_id as i32).to_string() self.bpf.get_cpu_pid(*cpu_id as i32).to_string()
}; };
info!(" core {:2} cpu {:2} pid={}", core.id(), cpu_id, pid); info!(" core {:2} cpu {:2} pid={}", core_id, cpu_id, pid);
} }
} }