Merge pull request #509 from sched-ext/bpfland-topology

scx_bpfland: topology awareness
This commit is contained in:
Andrea Righi 2024-08-20 14:37:23 +02:00 committed by GitHub
commit 33b6ada98e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 491 additions and 59 deletions

View File

@ -83,6 +83,8 @@ pub struct Cpu {
min_freq: usize,
max_freq: usize,
trans_lat_ns: usize,
l2_id: usize,
l3_id: usize,
llc_id: usize,
}
@ -107,6 +109,16 @@ impl Cpu {
self.trans_lat_ns
}
/// Get the L2 id of the this Cpu
pub fn l2_id(&self) -> usize {
self.l2_id
}
/// Get the L2 id of the this Cpu
pub fn l3_id(&self) -> usize {
self.l3_id
}
/// Get the LLC id of the this Cpu
pub fn llc_id(&self) -> usize {
self.llc_id
@ -348,8 +360,6 @@ impl TopologyMap {
* Helper functions for creating the Topology *
**********************************************/
const CACHE_LEVEL: usize = 3;
fn read_file_usize(path: &Path) -> Result<usize> {
let val = match std::fs::read_to_string(&path) {
Ok(val) => val,
@ -406,13 +416,18 @@ fn create_insert_cpu(cpu_id: usize, node: &mut Node, online_mask: &Cpumask) -> R
let top_path = cpu_path.join("topology");
let core_id = read_file_usize(&top_path.join("core_id"))?;
// L3 cache ID
// Evaluate L2, L3 and LLC cache IDs.
//
// Use ID 0 if we fail to detect the cache hierarchy. This seems to happen on certain SKUs, so
// if there's no cache information then we have no option but to assume a single unified cache
// per node.
let cache_path = cpu_path.join("cache");
// Use LLC 0 if we fail to detect the cache hierarchy. This seems to
// happen on certain SKUs, so if there's no cache information then
// we have no option but to assume a single unified cache per node.
let llc_id =
read_file_usize(&cache_path.join(format!("index{}", CACHE_LEVEL)).join("id")).unwrap_or(0);
let l2_id =
read_file_usize(&cache_path.join(format!("index{}", 2)).join("id")).unwrap_or(0);
let l3_id =
read_file_usize(&cache_path.join(format!("index{}", 3)).join("id")).unwrap_or(0);
// Assume that LLC is always 3.
let llc_id = l3_id;
// Min and max frequencies. If the kernel is not compiled with
// CONFIG_CPU_FREQ, just assume 0 for both frequencies.
@ -440,6 +455,8 @@ fn create_insert_cpu(cpu_id: usize, node: &mut Node, online_mask: &Cpumask) -> R
min_freq: min_freq,
max_freq: max_freq,
trans_lat_ns: trans_lat_ns,
l2_id: l2_id,
l3_id: l3_id,
llc_id: llc_id,
},
);

View File

@ -38,4 +38,10 @@ struct cpu_arg {
s32 cpu_id;
};
struct domain_arg {
s32 lvl_id;
s32 cpu_id;
s32 sibling_cpu_id;
};
#endif /* __INTF_H */

View File

@ -107,10 +107,10 @@ volatile u64 nr_running, nr_waiting, nr_interactive, nr_online_cpus;
UEI_DEFINE(uei);
/*
* Mask of CPUs that the scheduler can use, until the system becomes saturated,
* Mask of CPUs that the scheduler can use until the system becomes saturated,
* at which point tasks may overflow to other available CPUs.
*/
private(BPFLAND) struct bpf_cpumask __kptr *allowed_cpumask;
private(BPFLAND) struct bpf_cpumask __kptr *primary_cpumask;
/*
* Mask of offline CPUs, used to properly support CPU hotplugging.
@ -122,6 +122,13 @@ private(BPFLAND) struct bpf_cpumask __kptr *offline_cpumask;
*/
static int offline_needed;
/*
* CPU hotplugging generation counter (used to notify the user-space
* counterpart when a CPU hotplug event happened, allowing it to refresh the
* topology information).
*/
volatile u64 cpu_hotplug_cnt;
/*
* Notify the scheduler that we need to drain and re-enqueue the tasks
* dispatched to the offline CPU DSQs.
@ -149,6 +156,30 @@ const volatile bool smt_enabled = true;
*/
static u64 vtime_now;
/*
* Per-CPU context.
*/
struct cpu_ctx {
struct bpf_cpumask __kptr *l2_cpumask;
struct bpf_cpumask __kptr *l3_cpumask;
};
struct {
__uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
__type(key, u32);
__type(value, struct cpu_ctx);
__uint(max_entries, 1);
} cpu_ctx_stor SEC(".maps");
/*
* Return a CPU context.
*/
struct cpu_ctx *try_lookup_cpu_ctx(s32 cpu)
{
const u32 idx = 0;
return bpf_map_lookup_percpu_elem(&cpu_ctx_stor, &idx, cpu);
}
/*
* Per-task local storage.
*
@ -156,9 +187,11 @@ static u64 vtime_now;
*/
struct task_ctx {
/*
* A temporary cpumask for calculating the allowed CPU mask.
* Temporary cpumask for calculating scheduling domains.
*/
struct bpf_cpumask __kptr *cpumask;
struct bpf_cpumask __kptr *l2_cpumask;
struct bpf_cpumask __kptr *l3_cpumask;
/*
* Voluntary context switches metrics.
@ -431,13 +464,18 @@ static int dispatch_direct_cpu(struct task_struct *p, s32 cpu, u64 enq_flags)
static s32 pick_idle_cpu(struct task_struct *p, s32 prev_cpu, u64 wake_flags)
{
const struct cpumask *online_cpumask, *idle_smtmask, *idle_cpumask;
struct bpf_cpumask *p_mask, *allowed;
struct bpf_cpumask *primary, *l2_domain, *l3_domain;
struct bpf_cpumask *p_mask, *l2_mask, *l3_mask;
struct task_ctx *tctx;
struct cpu_ctx *cctx;
s32 cpu;
tctx = try_lookup_task_ctx(p);
if (!tctx)
return prev_cpu;
cctx = try_lookup_cpu_ctx(prev_cpu);
if (!cctx)
return prev_cpu;
/*
* For tasks that can run only on a single CPU, we can simply verify if
@ -452,8 +490,8 @@ static s32 pick_idle_cpu(struct task_struct *p, s32 prev_cpu, u64 wake_flags)
return -ENOENT;
}
allowed = allowed_cpumask;
if (!allowed)
primary = primary_cpumask;
if (!primary)
return -ENOENT;
/*
@ -464,17 +502,65 @@ static s32 pick_idle_cpu(struct task_struct *p, s32 prev_cpu, u64 wake_flags)
idle_smtmask = scx_bpf_get_idle_smtmask();
idle_cpumask = scx_bpf_get_idle_cpumask();
/*
* Scheduling domains of the previously used CPU.
*/
l2_domain = cctx->l2_cpumask;
if (!l2_domain)
l2_domain = primary;
l3_domain = cctx->l3_cpumask;
if (!l3_domain)
l3_domain = primary;
/*
* Task's scheduling domains.
*/
p_mask = tctx->cpumask;
if (!p_mask) {
scx_bpf_error("cpumask not initialized");
cpu = prev_cpu;
goto out_put_cpumask;
}
l2_mask = tctx->l2_cpumask;
if (!l2_mask) {
scx_bpf_error("l2 cpumask not initialized");
cpu = prev_cpu;
goto out_put_cpumask;
}
l3_mask = tctx->l3_cpumask;
if (!l3_mask) {
scx_bpf_error("l3 cpumask not initialized");
cpu = prev_cpu;
goto out_put_cpumask;
}
/*
* Enable the task to run in the intersection of its permitted CPUs and
* the primary scheduling domain.
* Determine the task's primary domain as the intersection of the
* task's allowed cpumask and the global primary scheduling domain.
*/
bpf_cpumask_and(p_mask, p->cpus_ptr, cast_mask(allowed));
bpf_cpumask_and(p_mask, p->cpus_ptr, cast_mask(primary));
/*
* Determine the L2 cache domain as the intersection of the task's
* primary cpumask and the L2 cache domain mask of the previously used
* CPU (ignore if this cpumask completely overlaps with the task's
* cpumask).
*/
bpf_cpumask_and(l2_mask, cast_mask(p_mask), cast_mask(l2_domain));
if (bpf_cpumask_empty(cast_mask(l2_mask)) ||
bpf_cpumask_equal(cast_mask(l2_mask), cast_mask(p_mask)))
l2_mask = NULL;
/*
* Determine the L3 cache domain as the intersection of the task's
* primary cpumask and the L3 cache domain mask of the previously used
* CPU (ignore if this cpumask completely overlaps with the task's
* cpumask).
*/
bpf_cpumask_and(l3_mask, cast_mask(p_mask), cast_mask(l3_domain));
if (bpf_cpumask_empty(cast_mask(l3_mask)) ||
bpf_cpumask_equal(cast_mask(l3_mask), cast_mask(p_mask)))
l3_mask = NULL;
/*
* Find the best idle CPU, prioritizing full idle cores in SMT systems.
@ -492,7 +578,29 @@ static s32 pick_idle_cpu(struct task_struct *p, s32 prev_cpu, u64 wake_flags)
}
/*
* Otherwise, search for another usable full-idle core.
* Search for any full-idle CPU in the primary domain that
* shares the same L2 cache.
*/
if (l2_mask) {
cpu = bpf_cpumask_any_and_distribute(cast_mask(l2_mask), idle_smtmask);
if (bpf_cpumask_test_cpu(cpu, online_cpumask) &&
scx_bpf_test_and_clear_cpu_idle(cpu))
goto out_put_cpumask;
}
/*
* Search for any full-idle CPU in the primary domain that
* shares the same L3 cache.
*/
if (l3_mask) {
cpu = bpf_cpumask_any_and_distribute(cast_mask(l3_mask), idle_smtmask);
if (bpf_cpumask_test_cpu(cpu, online_cpumask) &&
scx_bpf_test_and_clear_cpu_idle(cpu))
goto out_put_cpumask;
}
/*
* Search for any other full-idle core in the primary domain.
*/
cpu = bpf_cpumask_any_and_distribute(cast_mask(p_mask), idle_smtmask);
if (bpf_cpumask_test_cpu(cpu, online_cpumask) &&
@ -511,8 +619,29 @@ static s32 pick_idle_cpu(struct task_struct *p, s32 prev_cpu, u64 wake_flags)
}
/*
* If all the previous attempts have failed, try to use any idle CPU in
* the system.
* Search for any idle CPU in the primary domain that shares the same
* L2 cache.
*/
if (l2_mask) {
cpu = bpf_cpumask_any_and_distribute(cast_mask(l2_mask), idle_cpumask);
if (bpf_cpumask_test_cpu(cpu, online_cpumask) &&
scx_bpf_test_and_clear_cpu_idle(cpu))
goto out_put_cpumask;
}
/*
* Search for any idle CPU in the primary domain that shares the same
* L3 cache.
*/
if (l3_mask) {
cpu = bpf_cpumask_any_and_distribute(cast_mask(l3_mask), idle_cpumask);
if (bpf_cpumask_test_cpu(cpu, online_cpumask) &&
scx_bpf_test_and_clear_cpu_idle(cpu))
goto out_put_cpumask;
}
/*
* Search for any idle CPU in the primary domain.
*/
cpu = bpf_cpumask_any_and_distribute(cast_mask(p_mask), idle_cpumask);
if (bpf_cpumask_test_cpu(cpu, online_cpumask) &&
@ -520,7 +649,16 @@ static s32 pick_idle_cpu(struct task_struct *p, s32 prev_cpu, u64 wake_flags)
goto out_put_cpumask;
/*
* If all the previous attempts have failed, dispatch the task to the
* If all the previous attempts have failed, try to use any idle CPU in
* the system.
*/
cpu = bpf_cpumask_any_and_distribute(p->cpus_ptr, idle_cpumask);
if (bpf_cpumask_test_cpu(cpu, online_cpumask) &&
scx_bpf_test_and_clear_cpu_idle(cpu))
goto out_put_cpumask;
/*
* We couldn't find any idle CPU, so simply dispatch the task to the
* first CPU that will become available.
*/
cpu = -ENOENT;
@ -906,6 +1044,7 @@ void BPF_STRUCT_OPS(bpfland_cpu_online, s32 cpu)
set_cpu_state(offline_cpumask, cpu, false);
__sync_fetch_and_add(&nr_online_cpus, 1);
__sync_fetch_and_add(&cpu_hotplug_cnt, 1);
}
void BPF_STRUCT_OPS(bpfland_cpu_offline, s32 cpu)
@ -914,6 +1053,8 @@ void BPF_STRUCT_OPS(bpfland_cpu_offline, s32 cpu)
set_cpu_state(offline_cpumask, cpu, true);
__sync_fetch_and_sub(&nr_online_cpus, 1);
__sync_fetch_and_add(&cpu_hotplug_cnt, 1);
set_offline_needed();
}
@ -927,11 +1068,31 @@ s32 BPF_STRUCT_OPS(bpfland_init_task, struct task_struct *p,
BPF_LOCAL_STORAGE_GET_F_CREATE);
if (!tctx)
return -ENOMEM;
/*
* Create task's primary cpumask.
*/
cpumask = bpf_cpumask_create();
if (!cpumask)
return -ENOMEM;
cpumask = bpf_kptr_xchg(&tctx->cpumask, cpumask);
if (cpumask)
bpf_cpumask_release(cpumask);
/*
* Create task's L2 cache cpumask.
*/
cpumask = bpf_cpumask_create();
if (!cpumask)
return -ENOMEM;
cpumask = bpf_kptr_xchg(&tctx->l2_cpumask, cpumask);
if (cpumask)
bpf_cpumask_release(cpumask);
/*
* Create task's L3 cache cpumask.
*/
cpumask = bpf_cpumask_create();
if (!cpumask)
return -ENOMEM;
cpumask = bpf_kptr_xchg(&tctx->l3_cpumask, cpumask);
if (cpumask)
bpf_cpumask_release(cpumask);
@ -960,7 +1121,7 @@ s32 get_nr_online_cpus(void)
return cpus;
}
static int init_allowed_cpus(void)
static int init_cpumask(struct bpf_cpumask **cpumask)
{
struct bpf_cpumask *mask;
int err = 0;
@ -968,15 +1129,15 @@ static int init_allowed_cpus(void)
/*
* Do nothing if the mask is already initialized.
*/
mask = allowed_cpumask;
mask = *cpumask;
if (mask)
return 0;
/*
* Create the allowed CPU mask.
* Create the CPU mask.
*/
err = calloc_cpumask(&allowed_cpumask);
err = calloc_cpumask(cpumask);
if (!err)
mask = allowed_cpumask;
mask = *cpumask;
if (!mask)
err = -ENOMEM;
@ -984,20 +1145,55 @@ static int init_allowed_cpus(void)
}
SEC("syscall")
int enable_cpu(struct cpu_arg *input)
int enable_sibling_cpu(struct domain_arg *input)
{
struct cpu_ctx *cctx;
struct bpf_cpumask *mask, **pmask;
int err = 0;
cctx = try_lookup_cpu_ctx(input->cpu_id);
if (!cctx)
return -ENOENT;
/* Make sure the target CPU mask is initialized */
switch (input->lvl_id) {
case 2:
pmask = &cctx->l2_cpumask;
break;
case 3:
pmask = &cctx->l3_cpumask;
break;
default:
return -EINVAL;
}
err = init_cpumask(pmask);
if (err)
return err;
bpf_rcu_read_lock();
mask = *pmask;
if (mask)
bpf_cpumask_set_cpu(input->sibling_cpu_id, mask);
bpf_rcu_read_unlock();
return err;
}
SEC("syscall")
int enable_primary_cpu(struct cpu_arg *input)
{
struct bpf_cpumask *mask;
int err = 0;
/* Make sure the allowed CPU mask is initialized */
err = init_allowed_cpus();
/* Make sure the primary CPU mask is initialized */
err = init_cpumask(&primary_cpumask);
if (err)
return err;
/*
* Enable the target CPU in the primary scheduling domain.
*/
bpf_rcu_read_lock();
mask = allowed_cpumask;
mask = primary_cpumask;
if (mask)
bpf_cpumask_set_cpu(input->cpu_id, mask);
bpf_rcu_read_unlock();
@ -1058,7 +1254,11 @@ s32 BPF_STRUCT_OPS_SLEEPABLE(bpfland_init)
return err;
/* Initialize the primary scheduling domain */
return init_allowed_cpus();
err = init_cpumask(&primary_cpumask);
if (err)
return err;
return 0;
}
void BPF_STRUCT_OPS(bpfland_exit, struct scx_exit_info *ei)

View File

@ -10,11 +10,11 @@ pub use bpf_skel::*;
pub mod bpf_intf;
pub use bpf_intf::*;
use std::collections::HashMap;
use std::ffi::c_int;
use std::fs::File;
use std::io::Read;
use std::mem::MaybeUninit;
use std::str;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::sync::Arc;
@ -45,6 +45,7 @@ use scx_utils::scx_ops_open;
use scx_utils::uei_exited;
use scx_utils::uei_report;
use scx_utils::UserExitInfo;
use scx_utils::Topology;
const SCHEDULER_NAME: &'static str = "scx_bpfland";
@ -106,9 +107,85 @@ impl CpuMask {
}
}
fn get_primary_cpus(powersave: bool) -> std::io::Result<Vec<usize>> {
let topo = Topology::new().unwrap();
// Iterate over each CPU directory and collect CPU ID and its max frequency.
let mut cpu_freqs = Vec::new();
for core in topo.cores().into_iter() {
for (cpu_id, cpu) in core.cpus() {
cpu_freqs.push((*cpu_id, cpu.max_freq()));
}
}
if cpu_freqs.is_empty() {
return Ok(Vec::new());
}
// Find the smallest maximum frequency.
let min_freq = cpu_freqs.iter().map(|&(_, freq)| freq).min().unwrap();
// Check if all CPUs have the smallest frequency.
let all_have_min_freq = cpu_freqs.iter().all(|&(_, freq)| freq == min_freq);
let selected_cpu_ids: Vec<usize> = if all_have_min_freq {
// If all CPUs have the smallest frequency, return all CPU IDs.
cpu_freqs.into_iter().map(|(cpu_id, _)| cpu_id).collect()
} else if powersave {
// If powersave is true, return the CPUs with the smallest frequency.
cpu_freqs.into_iter()
.filter(|&(_, freq)| freq == min_freq)
.map(|(cpu_id, _)| cpu_id)
.collect()
} else {
// If powersave is false, return the CPUs with the highest frequency.
cpu_freqs.into_iter()
.filter(|&(_, freq)| freq != min_freq)
.map(|(cpu_id, _)| cpu_id)
.collect()
};
Ok(selected_cpu_ids)
}
// Convert an array of CPUs to the corresponding cpumask of any arbitrary size.
fn cpus_to_cpumask(cpus: &Vec<usize>) -> String {
if cpus.is_empty() {
return String::from("0x0");
}
// Determine the maximum CPU ID to create a sufficiently large byte vector.
let max_cpu_id = *cpus.iter().max().unwrap();
// Create a byte vector with enough bytes to cover all CPU IDs.
let mut bitmask = vec![0u8; (max_cpu_id + 1 + 7) / 8];
// Set the appropriate bits for each CPU ID.
for cpu_id in cpus {
let byte_index = cpu_id / 8;
let bit_index = cpu_id % 8;
bitmask[byte_index] |= 1 << bit_index;
}
// Convert the byte vector to a hexadecimal string.
let hex_str: String = bitmask.iter()
.rev()
.map(|byte| format!("{:02x}", byte))
.collect();
format!("0x{}", hex_str)
}
// Custom parser function for cpumask using CpuMask's from_str method
fn parse_cpumask(hex_str: &str) -> Result<CpuMask, std::num::ParseIntError> {
CpuMask::from_str(hex_str)
fn parse_cpumask(cpu_str: &str) -> Result<CpuMask, std::num::ParseIntError> {
if cpu_str == "performance" {
let cpus = get_primary_cpus(false).unwrap();
CpuMask::from_str(&cpus_to_cpumask(&cpus))
} else if cpu_str == "powersave" {
let cpus = get_primary_cpus(true).unwrap();
CpuMask::from_str(&cpus_to_cpumask(&cpus))
} else {
CpuMask::from_str(cpu_str)
}
}
/// scx_bpfland: a vruntime-based sched_ext scheduler that prioritizes interactive workloads.
@ -156,10 +233,22 @@ struct Opts {
/// scheduler will use to dispatch tasks, until the system becomes saturated, at which point
/// tasks may overflow to other available CPUs.
///
/// (empty string = all CPUs are used for initial scheduling)
/// Special values:
/// - "performance" = automatically detect and use the fastest CPUs
/// - "powersave" = automatically detect and use the slowest CPUs
///
/// By default all CPUs are used for the primary scheduling domain.
#[clap(short = 'm', long, default_value = "", value_parser = parse_cpumask)]
primary_domain: CpuMask,
/// Disable L2 cache awareness.
#[clap(long, action = clap::ArgAction::SetTrue)]
disable_l2: bool,
/// Disable L3 cache awareness.
#[clap(long, action = clap::ArgAction::SetTrue)]
disable_l3: bool,
/// Maximum threshold of voluntary context switch per second, used to classify interactive
/// tasks (0 = disable interactive tasks classification).
#[clap(short = 'c', long, default_value = "10")]
@ -238,7 +327,9 @@ fn is_smt_active() -> std::io::Result<i32> {
struct Scheduler<'a> {
skel: BpfSkel<'a>,
struct_ops: Option<libbpf_rs::Link>,
opts: &'a Opts,
metrics: Metrics,
cpu_hotplug_cnt: u64,
}
impl<'a> Scheduler<'a> {
@ -281,8 +372,20 @@ impl<'a> Scheduler<'a> {
// Load the BPF program for validation.
let mut skel = scx_ops_load!(skel, bpfland_ops, uei)?;
// Initialize primary domain CPUs.
Self::init_primary_domain(&mut skel, &opts.primary_domain)?;
// Initialize CPU topology.
let topo = Topology::new().unwrap();
// Initialize the primary scheduling domain (based on the --primary-domain option).
Self::init_primary_domain(&mut skel, &topo, &opts.primary_domain)?;
// Initialize L2 cache domains.
if !opts.disable_l2 {
Self::init_l2_cache_domains(&mut skel, &topo)?;
}
// Initialize L3 cache domains.
if !opts.disable_l3 {
Self::init_l3_cache_domains(&mut skel, &topo)?;
}
// Attach the scheduler.
let struct_ops = Some(scx_ops_attach!(skel, bpfland_ops)?);
@ -298,12 +401,14 @@ impl<'a> Scheduler<'a> {
Ok(Self {
skel,
struct_ops,
opts,
metrics: Metrics::new(),
cpu_hotplug_cnt: 0,
})
}
fn enable_cpu(skel: &mut BpfSkel<'_>, cpu: usize) -> Result<(), u32> {
let prog = &mut skel.progs.enable_cpu;
fn enable_primary_cpu(skel: &mut BpfSkel<'_>, cpu: usize) -> Result<(), u32> {
let prog = &mut skel.progs.enable_primary_cpu;
let mut args = cpu_arg {
cpu_id: cpu as c_int,
};
@ -324,16 +429,13 @@ impl<'a> Scheduler<'a> {
Ok(())
}
fn init_primary_domain(skel: &mut BpfSkel<'_>, primary_domain: &CpuMask) -> Result<()> {
fn init_primary_domain(skel: &mut BpfSkel<'_>, topo: &Topology, primary_domain: &CpuMask) -> Result<()> {
info!("primary CPU domain = {}", primary_domain.to_string());
for cpu in 0..libbpf_rs::num_possible_cpus().unwrap() {
for cpu in 0..topo.nr_cpu_ids() {
if primary_domain.is_cpu_set(cpu) {
if let Err(err) = Self::enable_cpu(skel, cpu) {
warn!(
"Failed to add CPU {} to primary domain: error {}",
cpu, err
);
if let Err(err) = Self::enable_primary_cpu(skel, cpu) {
warn!("failed to add CPU {} to primary domain: error {}", cpu, err);
}
}
}
@ -341,6 +443,120 @@ impl<'a> Scheduler<'a> {
Ok(())
}
fn enable_sibling_cpu(
skel: &mut BpfSkel<'_>,
lvl: usize,
cpu: usize,
sibling_cpu: usize,
) -> Result<(), u32> {
let prog = &mut skel.progs.enable_sibling_cpu;
let mut args = domain_arg {
lvl_id: lvl as c_int,
cpu_id: cpu as c_int,
sibling_cpu_id: sibling_cpu as c_int,
};
let input = ProgramInput {
context_in: Some(unsafe {
std::slice::from_raw_parts_mut(
&mut args as *mut _ as *mut u8,
std::mem::size_of_val(&args),
)
}),
..Default::default()
};
let out = prog.test_run(input).unwrap();
if out.return_value != 0 {
return Err(out.return_value);
}
Ok(())
}
fn init_cache_domains(
skel: &mut BpfSkel<'_>,
topo: &Topology,
cache_lvl: usize,
enable_sibling_cpu_fn: &dyn Fn(&mut BpfSkel<'_>, usize, usize, usize) -> Result<(), u32>
) -> Result<(), std::io::Error> {
// Determine the list of CPU IDs associated to each cache node.
let mut cache_id_map: HashMap<usize, Vec<usize>> = HashMap::new();
for core in topo.cores().into_iter() {
for (cpu_id, cpu) in core.cpus() {
let cache_id = match cache_lvl {
2 => cpu.l2_id(),
3 => cpu.l3_id(),
_ => panic!("invalid cache level {}", cache_lvl),
};
cache_id_map
.entry(cache_id)
.or_insert_with(Vec::new)
.push(*cpu_id);
}
}
// Update the BPF cpumasks for the cache domains.
for (cache_id, cpus) in cache_id_map {
info!(
"L{} cache ID {}: sibling CPUs: {:?}",
cache_lvl, cache_id, cpus
);
for cpu in &cpus {
for sibling_cpu in &cpus {
match enable_sibling_cpu_fn(skel, cache_lvl, *cpu, *sibling_cpu) {
Ok(()) => {},
Err(_) => {
warn!(
"L{} cache ID {}: failed to set CPU {} sibling {}",
cache_lvl, cache_id, *cpu, *sibling_cpu
);
}
}
}
}
}
Ok(())
}
fn init_l2_cache_domains(skel: &mut BpfSkel<'_>, topo: &Topology) -> Result<(), std::io::Error> {
Self::init_cache_domains(skel, topo, 2, &|skel, lvl, cpu, sibling_cpu| {
Self::enable_sibling_cpu(skel, lvl, cpu, sibling_cpu)
})
}
fn init_l3_cache_domains(skel: &mut BpfSkel<'_>, topo: &Topology) -> Result<(), std::io::Error> {
Self::init_cache_domains(skel, topo, 3, &|skel, lvl, cpu, sibling_cpu| {
Self::enable_sibling_cpu(skel, lvl, cpu, sibling_cpu)
})
}
fn refresh_cache_domains(&mut self) {
// Check if we need to refresh the CPU cache information.
if self.cpu_hotplug_cnt == self.skel.maps.bss_data.cpu_hotplug_cnt {
return;
}
// Re-initialize CPU topology.
let topo = Topology::new().unwrap();
// Re-initialize L2 cache domains.
if !self.opts.disable_l2 {
if let Err(e) = Self::init_l2_cache_domains(&mut self.skel, &topo) {
warn!("failed to initialize L2 cache domains: {}", e);
}
}
// Re-initialize L3 cache domains.
if !self.opts.disable_l3 {
if let Err(e) = Self::init_l3_cache_domains(&mut self.skel, &topo) {
warn!("failed to initialize L3 cache domains: {}", e);
}
}
// Update CPU hotplug generation counter.
self.cpu_hotplug_cnt = self.skel.maps.bss_data.cpu_hotplug_cnt;
}
fn update_stats(&mut self) {
let nr_cpus = self.skel.maps.bss_data.nr_online_cpus;
let nr_running = self.skel.maps.bss_data.nr_running;
@ -352,18 +568,10 @@ impl<'a> Scheduler<'a> {
let nr_shared_dispatches = self.skel.maps.bss_data.nr_shared_dispatches;
// Update Prometheus statistics.
self.metrics
.nr_running
.set(nr_running as f64);
self.metrics
.nr_interactive
.set(nr_interactive as f64);
self.metrics
.nr_waiting
.set(nr_waiting as f64);
self.metrics
.nvcsw_avg_thresh
.set(nvcsw_avg_thresh as f64);
self.metrics.nr_running.set(nr_running as f64);
self.metrics.nr_interactive.set(nr_interactive as f64);
self.metrics.nr_waiting.set(nr_waiting as f64);
self.metrics.nvcsw_avg_thresh.set(nvcsw_avg_thresh as f64);
self.metrics
.nr_direct_dispatches
.set(nr_direct_dispatches as f64);
@ -393,6 +601,7 @@ impl<'a> Scheduler<'a> {
fn run(&mut self, shutdown: Arc<AtomicBool>) -> Result<UserExitInfo> {
while !shutdown.load(Ordering::Relaxed) && !self.exited() {
self.refresh_cache_domains();
self.update_stats();
std::thread::sleep(Duration::from_millis(1000));
}