scx_bpfland: introduce cache awareness

While the system is not saturated the scheduler will use the following
strategy to select the next CPU for a task:
  - pick the same CPU if it's a full-idle SMT core
  - pick any full-idle SMT core in the primary scheduling group that
    shares the same L2 cache
  - pick any full-idle SMT core in the primary scheduling grouop that
    shares the same L3 cache
  - pick the same CPU (ignoring SMT)
  - pick any idle CPU in the primary scheduling group that shares the
    same L2 cache
  - pick any idle CPU in the primary scheduling group that shares the
    same L3 cache
  - pick any idle CPU in the system

While the system is completely saturated (no idle CPUs available), tasks
will be dispatched on the first CPU that becomes available.

Signed-off-by: Andrea Righi <andrea.righi@linux.dev>
This commit is contained in:
Andrea Righi 2024-08-16 12:06:03 +02:00
parent 96050f8bdd
commit 174993f9d2
3 changed files with 403 additions and 43 deletions

View File

@ -38,4 +38,10 @@ struct cpu_arg {
s32 cpu_id;
};
struct domain_arg {
s32 lvl_id;
s32 cpu_id;
s32 sibling_cpu_id;
};
#endif /* __INTF_H */

View File

@ -107,10 +107,10 @@ volatile u64 nr_running, nr_waiting, nr_interactive, nr_online_cpus;
UEI_DEFINE(uei);
/*
* Mask of CPUs that the scheduler can use, until the system becomes saturated,
* Mask of CPUs that the scheduler can use until the system becomes saturated,
* at which point tasks may overflow to other available CPUs.
*/
private(BPFLAND) struct bpf_cpumask __kptr *allowed_cpumask;
private(BPFLAND) struct bpf_cpumask __kptr *primary_cpumask;
/*
* Mask of offline CPUs, used to properly support CPU hotplugging.
@ -122,6 +122,13 @@ private(BPFLAND) struct bpf_cpumask __kptr *offline_cpumask;
*/
static int offline_needed;
/*
* CPU hotplugging generation counter (used to notify the user-space
* counterpart when a CPU hotplug event happened, allowing it to refresh the
* topology information).
*/
volatile u64 cpu_hotplug_cnt;
/*
* Notify the scheduler that we need to drain and re-enqueue the tasks
* dispatched to the offline CPU DSQs.
@ -149,6 +156,30 @@ const volatile bool smt_enabled = true;
*/
static u64 vtime_now;
/*
* Per-CPU context.
*/
struct cpu_ctx {
struct bpf_cpumask __kptr *l2_cpumask;
struct bpf_cpumask __kptr *l3_cpumask;
};
struct {
__uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
__type(key, u32);
__type(value, struct cpu_ctx);
__uint(max_entries, 1);
} cpu_ctx_stor SEC(".maps");
/*
* Return a CPU context.
*/
struct cpu_ctx *try_lookup_cpu_ctx(s32 cpu)
{
const u32 idx = 0;
return bpf_map_lookup_percpu_elem(&cpu_ctx_stor, &idx, cpu);
}
/*
* Per-task local storage.
*
@ -156,9 +187,11 @@ static u64 vtime_now;
*/
struct task_ctx {
/*
* A temporary cpumask for calculating the allowed CPU mask.
* Temporary cpumask for calculating scheduling domains.
*/
struct bpf_cpumask __kptr *cpumask;
struct bpf_cpumask __kptr *l2_cpumask;
struct bpf_cpumask __kptr *l3_cpumask;
/*
* Voluntary context switches metrics.
@ -431,13 +464,18 @@ static int dispatch_direct_cpu(struct task_struct *p, s32 cpu, u64 enq_flags)
static s32 pick_idle_cpu(struct task_struct *p, s32 prev_cpu, u64 wake_flags)
{
const struct cpumask *online_cpumask, *idle_smtmask, *idle_cpumask;
struct bpf_cpumask *p_mask, *allowed;
struct bpf_cpumask *primary, *l2_domain, *l3_domain;
struct bpf_cpumask *p_mask, *l2_mask, *l3_mask;
struct task_ctx *tctx;
struct cpu_ctx *cctx;
s32 cpu;
tctx = try_lookup_task_ctx(p);
if (!tctx)
return prev_cpu;
cctx = try_lookup_cpu_ctx(prev_cpu);
if (!cctx)
return prev_cpu;
/*
* For tasks that can run only on a single CPU, we can simply verify if
@ -452,8 +490,8 @@ static s32 pick_idle_cpu(struct task_struct *p, s32 prev_cpu, u64 wake_flags)
return -ENOENT;
}
allowed = allowed_cpumask;
if (!allowed)
primary = primary_cpumask;
if (!primary)
return -ENOENT;
/*
@ -464,17 +502,65 @@ static s32 pick_idle_cpu(struct task_struct *p, s32 prev_cpu, u64 wake_flags)
idle_smtmask = scx_bpf_get_idle_smtmask();
idle_cpumask = scx_bpf_get_idle_cpumask();
/*
* Scheduling domains of the previously used CPU.
*/
l2_domain = cctx->l2_cpumask;
if (!l2_domain)
l2_domain = primary;
l3_domain = cctx->l3_cpumask;
if (!l3_domain)
l3_domain = primary;
/*
* Task's scheduling domains.
*/
p_mask = tctx->cpumask;
if (!p_mask) {
scx_bpf_error("cpumask not initialized");
cpu = prev_cpu;
goto out_put_cpumask;
}
l2_mask = tctx->l2_cpumask;
if (!l2_mask) {
scx_bpf_error("l2 cpumask not initialized");
cpu = prev_cpu;
goto out_put_cpumask;
}
l3_mask = tctx->l3_cpumask;
if (!l3_mask) {
scx_bpf_error("l3 cpumask not initialized");
cpu = prev_cpu;
goto out_put_cpumask;
}
/*
* Enable the task to run in the intersection of its permitted CPUs and
* the primary scheduling domain.
* Determine the task's primary domain as the intersection of the
* task's allowed cpumask and the global primary scheduling domain.
*/
bpf_cpumask_and(p_mask, p->cpus_ptr, cast_mask(allowed));
bpf_cpumask_and(p_mask, p->cpus_ptr, cast_mask(primary));
/*
* Determine the L2 cache domain as the intersection of the task's
* primary cpumask and the L2 cache domain mask of the previously used
* CPU (ignore if this cpumask completely overlaps with the task's
* cpumask).
*/
bpf_cpumask_and(l2_mask, cast_mask(p_mask), cast_mask(l2_domain));
if (bpf_cpumask_empty(cast_mask(l2_mask)) ||
bpf_cpumask_equal(cast_mask(l2_mask), cast_mask(p_mask)))
l2_mask = NULL;
/*
* Determine the L3 cache domain as the intersection of the task's
* primary cpumask and the L3 cache domain mask of the previously used
* CPU (ignore if this cpumask completely overlaps with the task's
* cpumask).
*/
bpf_cpumask_and(l3_mask, cast_mask(p_mask), cast_mask(l3_domain));
if (bpf_cpumask_empty(cast_mask(l3_mask)) ||
bpf_cpumask_equal(cast_mask(l3_mask), cast_mask(p_mask)))
l3_mask = NULL;
/*
* Find the best idle CPU, prioritizing full idle cores in SMT systems.
@ -492,7 +578,29 @@ static s32 pick_idle_cpu(struct task_struct *p, s32 prev_cpu, u64 wake_flags)
}
/*
* Otherwise, search for another usable full-idle core.
* Search for any full-idle CPU in the primary domain that
* shares the same L2 cache.
*/
if (l2_mask) {
cpu = bpf_cpumask_any_and_distribute(cast_mask(l2_mask), idle_smtmask);
if (bpf_cpumask_test_cpu(cpu, online_cpumask) &&
scx_bpf_test_and_clear_cpu_idle(cpu))
goto out_put_cpumask;
}
/*
* Search for any full-idle CPU in the primary domain that
* shares the same L3 cache.
*/
if (l3_mask) {
cpu = bpf_cpumask_any_and_distribute(cast_mask(l3_mask), idle_smtmask);
if (bpf_cpumask_test_cpu(cpu, online_cpumask) &&
scx_bpf_test_and_clear_cpu_idle(cpu))
goto out_put_cpumask;
}
/*
* Search for any other full-idle core in the primary domain.
*/
cpu = bpf_cpumask_any_and_distribute(cast_mask(p_mask), idle_smtmask);
if (bpf_cpumask_test_cpu(cpu, online_cpumask) &&
@ -511,8 +619,29 @@ static s32 pick_idle_cpu(struct task_struct *p, s32 prev_cpu, u64 wake_flags)
}
/*
* If all the previous attempts have failed, try to use any idle CPU in
* the system.
* Search for any idle CPU in the primary domain that shares the same
* L2 cache.
*/
if (l2_mask) {
cpu = bpf_cpumask_any_and_distribute(cast_mask(l2_mask), idle_cpumask);
if (bpf_cpumask_test_cpu(cpu, online_cpumask) &&
scx_bpf_test_and_clear_cpu_idle(cpu))
goto out_put_cpumask;
}
/*
* Search for any idle CPU in the primary domain that shares the same
* L3 cache.
*/
if (l3_mask) {
cpu = bpf_cpumask_any_and_distribute(cast_mask(l3_mask), idle_cpumask);
if (bpf_cpumask_test_cpu(cpu, online_cpumask) &&
scx_bpf_test_and_clear_cpu_idle(cpu))
goto out_put_cpumask;
}
/*
* Search for any idle CPU in the primary domain.
*/
cpu = bpf_cpumask_any_and_distribute(cast_mask(p_mask), idle_cpumask);
if (bpf_cpumask_test_cpu(cpu, online_cpumask) &&
@ -520,7 +649,16 @@ static s32 pick_idle_cpu(struct task_struct *p, s32 prev_cpu, u64 wake_flags)
goto out_put_cpumask;
/*
* If all the previous attempts have failed, dispatch the task to the
* If all the previous attempts have failed, try to use any idle CPU in
* the system.
*/
cpu = bpf_cpumask_any_and_distribute(p->cpus_ptr, idle_cpumask);
if (bpf_cpumask_test_cpu(cpu, online_cpumask) &&
scx_bpf_test_and_clear_cpu_idle(cpu))
goto out_put_cpumask;
/*
* We couldn't find any idle CPU, so simply dispatch the task to the
* first CPU that will become available.
*/
cpu = -ENOENT;
@ -906,6 +1044,7 @@ void BPF_STRUCT_OPS(bpfland_cpu_online, s32 cpu)
set_cpu_state(offline_cpumask, cpu, false);
__sync_fetch_and_add(&nr_online_cpus, 1);
__sync_fetch_and_add(&cpu_hotplug_cnt, 1);
}
void BPF_STRUCT_OPS(bpfland_cpu_offline, s32 cpu)
@ -914,6 +1053,8 @@ void BPF_STRUCT_OPS(bpfland_cpu_offline, s32 cpu)
set_cpu_state(offline_cpumask, cpu, true);
__sync_fetch_and_sub(&nr_online_cpus, 1);
__sync_fetch_and_add(&cpu_hotplug_cnt, 1);
set_offline_needed();
}
@ -927,11 +1068,31 @@ s32 BPF_STRUCT_OPS(bpfland_init_task, struct task_struct *p,
BPF_LOCAL_STORAGE_GET_F_CREATE);
if (!tctx)
return -ENOMEM;
/*
* Create task's primary cpumask.
*/
cpumask = bpf_cpumask_create();
if (!cpumask)
return -ENOMEM;
cpumask = bpf_kptr_xchg(&tctx->cpumask, cpumask);
if (cpumask)
bpf_cpumask_release(cpumask);
/*
* Create task's L2 cache cpumask.
*/
cpumask = bpf_cpumask_create();
if (!cpumask)
return -ENOMEM;
cpumask = bpf_kptr_xchg(&tctx->l2_cpumask, cpumask);
if (cpumask)
bpf_cpumask_release(cpumask);
/*
* Create task's L3 cache cpumask.
*/
cpumask = bpf_cpumask_create();
if (!cpumask)
return -ENOMEM;
cpumask = bpf_kptr_xchg(&tctx->l3_cpumask, cpumask);
if (cpumask)
bpf_cpumask_release(cpumask);
@ -960,7 +1121,7 @@ s32 get_nr_online_cpus(void)
return cpus;
}
static int init_allowed_cpus(void)
static int init_cpumask(struct bpf_cpumask **cpumask)
{
struct bpf_cpumask *mask;
int err = 0;
@ -968,15 +1129,15 @@ static int init_allowed_cpus(void)
/*
* Do nothing if the mask is already initialized.
*/
mask = allowed_cpumask;
mask = *cpumask;
if (mask)
return 0;
/*
* Create the allowed CPU mask.
* Create the CPU mask.
*/
err = calloc_cpumask(&allowed_cpumask);
err = calloc_cpumask(cpumask);
if (!err)
mask = allowed_cpumask;
mask = *cpumask;
if (!mask)
err = -ENOMEM;
@ -984,20 +1145,55 @@ static int init_allowed_cpus(void)
}
SEC("syscall")
int enable_cpu(struct cpu_arg *input)
int enable_sibling_cpu(struct domain_arg *input)
{
struct cpu_ctx *cctx;
struct bpf_cpumask *mask, **pmask;
int err = 0;
cctx = try_lookup_cpu_ctx(input->cpu_id);
if (!cctx)
return -ENOENT;
/* Make sure the target CPU mask is initialized */
switch (input->lvl_id) {
case 2:
pmask = &cctx->l2_cpumask;
break;
case 3:
pmask = &cctx->l3_cpumask;
break;
default:
return -EINVAL;
}
err = init_cpumask(pmask);
if (err)
return err;
bpf_rcu_read_lock();
mask = *pmask;
if (mask)
bpf_cpumask_set_cpu(input->sibling_cpu_id, mask);
bpf_rcu_read_unlock();
return err;
}
SEC("syscall")
int enable_primary_cpu(struct cpu_arg *input)
{
struct bpf_cpumask *mask;
int err = 0;
/* Make sure the allowed CPU mask is initialized */
err = init_allowed_cpus();
/* Make sure the primary CPU mask is initialized */
err = init_cpumask(&primary_cpumask);
if (err)
return err;
/*
* Enable the target CPU in the primary scheduling domain.
*/
bpf_rcu_read_lock();
mask = allowed_cpumask;
mask = primary_cpumask;
if (mask)
bpf_cpumask_set_cpu(input->cpu_id, mask);
bpf_rcu_read_unlock();
@ -1058,7 +1254,11 @@ s32 BPF_STRUCT_OPS_SLEEPABLE(bpfland_init)
return err;
/* Initialize the primary scheduling domain */
return init_allowed_cpus();
err = init_cpumask(&primary_cpumask);
if (err)
return err;
return 0;
}
void BPF_STRUCT_OPS(bpfland_exit, struct scx_exit_info *ei)

View File

@ -10,11 +10,14 @@ pub use bpf_skel::*;
pub mod bpf_intf;
pub use bpf_intf::*;
use std::collections::HashMap;
use std::ffi::c_int;
use std::fs::File;
use std::io::Read;
use std::mem::MaybeUninit;
use std::path::Path;
use std::str;
use std::str::FromStr;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::sync::Arc;
@ -160,6 +163,14 @@ struct Opts {
#[clap(short = 'm', long, default_value = "", value_parser = parse_cpumask)]
primary_domain: CpuMask,
/// Disable L2 cache awareness.
#[clap(long, action = clap::ArgAction::SetTrue)]
disable_l2: bool,
/// Disable L3 cache awareness.
#[clap(long, action = clap::ArgAction::SetTrue)]
disable_l3: bool,
/// Maximum threshold of voluntary context switch per second, used to classify interactive
/// tasks (0 = disable interactive tasks classification).
#[clap(short = 'c', long, default_value = "10")]
@ -238,7 +249,9 @@ fn is_smt_active() -> std::io::Result<i32> {
struct Scheduler<'a> {
skel: BpfSkel<'a>,
struct_ops: Option<libbpf_rs::Link>,
opts: &'a Opts,
metrics: Metrics,
cpu_hotplug_cnt: u64,
}
impl<'a> Scheduler<'a> {
@ -284,6 +297,15 @@ impl<'a> Scheduler<'a> {
// Initialize primary domain CPUs.
Self::init_primary_domain(&mut skel, &opts.primary_domain)?;
// Initialize L2 cache domains.
if !opts.disable_l2 {
Self::init_l2_cache_domains(&mut skel)?;
}
// Initialize L3 cache domains.
if !opts.disable_l3 {
Self::init_l3_cache_domains(&mut skel)?;
}
// Attach the scheduler.
let struct_ops = Some(scx_ops_attach!(skel, bpfland_ops)?);
@ -298,12 +320,14 @@ impl<'a> Scheduler<'a> {
Ok(Self {
skel,
struct_ops,
opts,
metrics: Metrics::new(),
cpu_hotplug_cnt: 0,
})
}
fn enable_cpu(skel: &mut BpfSkel<'_>, cpu: usize) -> Result<(), u32> {
let prog = &mut skel.progs.enable_cpu;
fn enable_primary_cpu(skel: &mut BpfSkel<'_>, cpu: usize) -> Result<(), u32> {
let prog = &mut skel.progs.enable_primary_cpu;
let mut args = cpu_arg {
cpu_id: cpu as c_int,
};
@ -329,11 +353,8 @@ impl<'a> Scheduler<'a> {
for cpu in 0..libbpf_rs::num_possible_cpus().unwrap() {
if primary_domain.is_cpu_set(cpu) {
if let Err(err) = Self::enable_cpu(skel, cpu) {
warn!(
"Failed to add CPU {} to primary domain: error {}",
cpu, err
);
if let Err(err) = Self::enable_primary_cpu(skel, cpu) {
warn!("failed to add CPU {} to primary domain: error {}", cpu, err);
}
}
}
@ -341,6 +362,146 @@ impl<'a> Scheduler<'a> {
Ok(())
}
fn read_cache_id(path: &Path) -> Result<String, std::io::Error> {
if !path.exists() {
return Err(std::io::Error::new(
std::io::ErrorKind::NotFound,
"file not found",
));
}
std::fs::read_to_string(path).map(|content| content.trim().to_string())
}
fn read_cpu_ids(sysfs_path: &str) -> Result<Vec<usize>, std::io::Error> {
let mut cpu_ids = Vec::new();
for entry in std::fs::read_dir(sysfs_path)? {
let entry = entry?;
if entry.path().is_dir() {
if let Some(cpu_id_str) = entry.path().file_name().and_then(|name| name.to_str()) {
if let Some(cpu_id_str) = cpu_id_str.strip_prefix("cpu") {
if let Ok(cpu_id) = usize::from_str(cpu_id_str) {
cpu_ids.push(cpu_id);
}
}
}
}
}
Ok(cpu_ids)
}
fn enable_sibling_cpu(
skel: &mut BpfSkel<'_>,
lvl: usize,
cpu: usize,
sibling_cpu: usize,
) -> Result<(), u32> {
let prog = &mut skel.progs.enable_sibling_cpu;
let mut args = domain_arg {
lvl_id: lvl as c_int,
cpu_id: cpu as c_int,
sibling_cpu_id: sibling_cpu as c_int,
};
let input = ProgramInput {
context_in: Some(unsafe {
std::slice::from_raw_parts_mut(
&mut args as *mut _ as *mut u8,
std::mem::size_of_val(&args),
)
}),
..Default::default()
};
let out = prog.test_run(input).unwrap();
if out.return_value != 0 {
return Err(out.return_value);
}
Ok(())
}
fn init_cache_domains(
skel: &mut BpfSkel<'_>,
cache_lvl: usize,
enable_sibling_cpu_fn: &dyn Fn(&mut BpfSkel<'_>, usize, usize, usize) -> Result<(), u32>
) -> Result<(), std::io::Error> {
let sysfs_path = "/sys/devices/system/cpu/";
let mut cache_id_map: HashMap<String, Vec<usize>> = HashMap::new();
for cpu_id in Self::read_cpu_ids(sysfs_path)? {
let cache_path = Path::new(sysfs_path).join(format!("cpu{}/cache/index{}", cpu_id, cache_lvl));
if !cache_path.exists() {
continue;
}
match Self::read_cache_id(&cache_path.join("id")) {
Ok(cache_id) => {
cache_id_map
.entry(cache_id)
.or_insert_with(Vec::new)
.push(cpu_id);
}
Err(_) => {
warn!("failed to read cache ID for CPU {}", cpu_id);
}
}
}
for (cache_id, cpus) in cache_id_map {
for cpu in &cpus {
for sibling_cpu in &cpus {
info!(
"L{} cache ID {}: sibling CPUs: {}, {}",
cache_lvl, cache_id, cpu, sibling_cpu
);
match enable_sibling_cpu_fn(skel, cache_lvl, *cpu, *sibling_cpu) {
Ok(()) => {},
Err(_) => {
warn!(
"L{} cache ID {}: failed to set CPU {} sibling {}",
cache_lvl, cache_id, *cpu, *sibling_cpu
);
}
}
}
}
}
Ok(())
}
fn init_l2_cache_domains(skel: &mut BpfSkel<'_>) -> Result<(), std::io::Error> {
Self::init_cache_domains(skel, 2, &|skel, lvl, cpu, sibling_cpu| {
Self::enable_sibling_cpu(skel, lvl, cpu, sibling_cpu)
})
}
fn init_l3_cache_domains(skel: &mut BpfSkel<'_>) -> Result<(), std::io::Error> {
Self::init_cache_domains(skel, 3, &|skel, lvl, cpu, sibling_cpu| {
Self::enable_sibling_cpu(skel, lvl, cpu, sibling_cpu)
})
}
fn refresh_cache_domains(&mut self) {
// Check if we need to refresh the CPU cache information.
if self.cpu_hotplug_cnt == self.skel.maps.bss_data.cpu_hotplug_cnt {
return;
}
// Re-initialize L2 cache domains.
if !self.opts.disable_l2 {
if let Err(e) = Self::init_l2_cache_domains(&mut self.skel) {
warn!("failed to initialize L2 cache domains: {}", e);
}
}
// Re-initialize L3 cache domains.
if !self.opts.disable_l3 {
if let Err(e) = Self::init_l3_cache_domains(&mut self.skel) {
warn!("failed to initialize L3 cache domains: {}", e);
}
}
// Update CPU hotplug generation counter.
self.cpu_hotplug_cnt = self.skel.maps.bss_data.cpu_hotplug_cnt;
}
fn update_stats(&mut self) {
let nr_cpus = self.skel.maps.bss_data.nr_online_cpus;
let nr_running = self.skel.maps.bss_data.nr_running;
@ -352,18 +513,10 @@ impl<'a> Scheduler<'a> {
let nr_shared_dispatches = self.skel.maps.bss_data.nr_shared_dispatches;
// Update Prometheus statistics.
self.metrics
.nr_running
.set(nr_running as f64);
self.metrics
.nr_interactive
.set(nr_interactive as f64);
self.metrics
.nr_waiting
.set(nr_waiting as f64);
self.metrics
.nvcsw_avg_thresh
.set(nvcsw_avg_thresh as f64);
self.metrics.nr_running.set(nr_running as f64);
self.metrics.nr_interactive.set(nr_interactive as f64);
self.metrics.nr_waiting.set(nr_waiting as f64);
self.metrics.nvcsw_avg_thresh.set(nvcsw_avg_thresh as f64);
self.metrics
.nr_direct_dispatches
.set(nr_direct_dispatches as f64);
@ -393,6 +546,7 @@ impl<'a> Scheduler<'a> {
fn run(&mut self, shutdown: Arc<AtomicBool>) -> Result<UserExitInfo> {
while !shutdown.load(Ordering::Relaxed) && !self.exited() {
self.refresh_cache_domains();
self.update_stats();
std::thread::sleep(Duration::from_millis(1000));
}