Merge pull request #148 from sched-ext/layered_fixes

layered: Fix static configuration, and dispatch for Grouped layers
This commit is contained in:
David Vernet 2024-02-21 16:12:12 -06:00 committed by GitHub
commit ebce76b0cb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 51 additions and 72 deletions

View File

@ -140,21 +140,34 @@ static struct cpumask *lookup_layer_cpumask(int idx)
}
}
struct layer *lookup_layer(int idx)
{
if (idx < 0 || idx >= nr_layers) {
scx_bpf_error("invalid layer %d", idx);
return NULL;
}
return &layers[idx];
}
static void refresh_cpumasks(int idx)
{
struct layer_cpumask_wrapper *cpumaskw;
struct layer *layer;
int cpu, total = 0;
struct layer *layer = lookup_layer(idx);
if (!__sync_val_compare_and_swap(&layers[idx].refresh_cpus, 1, 0))
if (!layer)
return;
if (!__sync_val_compare_and_swap(&layer->refresh_cpus, 1, 0))
return;
cpumaskw = bpf_map_lookup_elem(&layer_cpumasks, &idx);
bpf_rcu_read_lock();
bpf_for(cpu, 0, nr_possible_cpus) {
u8 *u8_ptr;
if ((u8_ptr = MEMBER_VPTR(layers, [idx].cpus[cpu / 8]))) {
if ((u8_ptr = &layer->cpus[cpu / 8])) {
/*
* XXX - The following test should be outside the loop
* but that makes the verifier think that
@ -162,6 +175,7 @@ static void refresh_cpumasks(int idx)
*/
barrier_var(cpumaskw);
if (!cpumaskw || !cpumaskw->cpumask) {
bpf_rcu_read_unlock();
scx_bpf_error("can't happen");
return;
}
@ -176,13 +190,7 @@ static void refresh_cpumasks(int idx)
scx_bpf_error("can't happen");
}
}
// XXX - shouldn't be necessary
layer = MEMBER_VPTR(layers, [idx]);
if (!layer) {
scx_bpf_error("can't happen");
return;
}
bpf_rcu_read_unlock();
layer->nr_cpus = total;
__sync_fetch_and_add(&layer->cpus_seq, 1);
@ -240,15 +248,6 @@ struct task_ctx *lookup_task_ctx(struct task_struct *p)
}
}
struct layer *lookup_layer(int idx)
{
if (idx < 0 || idx >= nr_layers) {
scx_bpf_error("invalid layer %d", idx);
return NULL;
}
return &layers[idx];
}
/*
* Because the layer membership is by the default hierarchy cgroups rather than
* the CPU controller membership, we can't use ops.cgroup_move(). Let's iterate
@ -506,9 +505,6 @@ void BPF_STRUCT_OPS(layered_dispatch, s32 cpu, struct task_struct *prev)
struct layer *layer = &layers[idx];
struct cpumask *layer_cpumask;
if (layer->open)
continue;
/* consume matching layers */
if (!(layer_cpumask = lookup_layer_cpumask(idx)))
return;
@ -925,16 +921,11 @@ s32 BPF_STRUCT_OPS_SLEEPABLE(layered_init)
if (!cpumask)
return -ENOMEM;
/*
* Start all layers with full cpumask so that everything runs
* everywhere. This will soon be updated by refresh_cpumasks()
* once the scheduler starts running.
*/
bpf_cpumask_setall(cpumask);
cpumask = bpf_kptr_xchg(&cpumaskw->cpumask, cpumask);
if (cpumask)
bpf_cpumask_release(cpumask);
refresh_cpumasks(i);
}
return 0;

View File

@ -841,37 +841,13 @@ impl Layer {
let nr_cpus = cpu_pool.nr_cpus;
let mut layer = Self {
Ok(Self {
name: name.into(),
kind,
nr_cpus: 0,
cpus: bitvec![0; nr_cpus],
};
match &layer.kind {
LayerKind::Confined {
cpus_range,
util_range,
}
| LayerKind::Grouped {
cpus_range,
util_range,
..
} => {
layer.resize_confined_or_grouped(
cpu_pool,
*cpus_range,
*util_range,
(0.0, 0.0),
(0.0, 0.0),
false,
)?;
}
_ => {}
}
Ok(layer)
})
}
fn grow_confined_or_grouped(
@ -1239,17 +1215,7 @@ impl<'a> Scheduler<'a> {
}
Self::init_layers(&mut skel, &layer_specs)?;
// Attach.
let mut skel = skel.load().context("Failed to load BPF program")?;
skel.attach().context("Failed to attach BPF program")?;
let struct_ops = Some(
skel.maps_mut()
.layered()
.attach_struct_ops()
.context("Failed to attach layered struct ops")?,
);
info!("Layered Scheduler Attached");
let mut layers = vec![];
for spec in layer_specs.iter() {
layers.push(Layer::new(&mut cpu_pool, &spec.name, spec.kind.clone())?);
@ -1258,8 +1224,8 @@ impl<'a> Scheduler<'a> {
// Other stuff.
let proc_reader = procfs::ProcReader::new();
Ok(Self {
struct_ops, // should be held to keep it attached
let mut sched = Self {
struct_ops: None,
layer_specs,
sched_intv: Duration::from_secs_f64(opts.interval),
@ -1281,7 +1247,22 @@ impl<'a> Scheduler<'a> {
om_stats: OpenMetricsStats::new(),
om_format: opts.open_metrics_format,
})
};
// Initialize layers before we attach the scheduler
sched.refresh_cpumasks()?;
// Attach.
sched.skel.attach().context("Failed to attach BPF program")?;
sched.struct_ops = Some(
sched.skel.maps_mut()
.layered()
.attach_struct_ops()
.context("Failed to attach layered struct ops")?,
);
info!("Layered Scheduler Attached");
Ok(sched)
}
fn update_bpf_layer_cpumask(layer: &Layer, bpf_layer: &mut bpf_bss_types::layer) {
@ -1295,10 +1276,7 @@ impl<'a> Scheduler<'a> {
bpf_layer.refresh_cpus = 1;
}
fn step(&mut self) -> Result<()> {
let started_at = Instant::now();
self.sched_stats
.refresh(&mut self.skel, &self.proc_reader, started_at)?;
fn refresh_cpumasks(&mut self) -> Result<()> {
let mut updated = false;
for idx in 0..self.layers.len() {
@ -1366,6 +1344,16 @@ impl<'a> Scheduler<'a> {
}
}
Ok(())
}
fn step(&mut self) -> Result<()> {
let started_at = Instant::now();
self.sched_stats
.refresh(&mut self.skel, &self.proc_reader, started_at)?;
self.refresh_cpumasks()?;
self.processing_dur += Instant::now().duration_since(started_at);
Ok(())
}