mirror of
https://github.com/sched-ext/scx.git
synced 2024-12-11 03:12:27 +00:00
Merge pull request #142 from sched-ext/rustland-ringbuf
scx_rustland: improve kernel/user-space communication
This commit is contained in:
commit
3535e55985
@ -144,10 +144,10 @@ pub struct QueuedTask {
|
||||
// Task queued for dispatching to the BPF component (see bpf_intf::dispatched_task_ctx).
|
||||
#[derive(Debug)]
|
||||
pub struct DispatchedTask {
|
||||
pub pid: i32, // pid that uniquely identifies a task
|
||||
pub cpu: i32, // target CPU selected by the scheduler
|
||||
pub cpumask_cnt: u64, // cpumask generation counter
|
||||
pub payload: u64, // task payload (used for debugging)
|
||||
pub pid: i32, // pid that uniquely identifies a task
|
||||
pub cpu: i32, // target CPU selected by the scheduler
|
||||
pub cpumask_cnt: u64, // cpumask generation counter
|
||||
pub payload: u64, // task payload (used for debugging)
|
||||
}
|
||||
|
||||
// Message received from the dispatcher (see bpf_intf::queued_task_ctx for details).
|
||||
@ -205,12 +205,17 @@ impl DispatchedMessage {
|
||||
}
|
||||
}
|
||||
|
||||
pub struct BpfScheduler<'a> {
|
||||
pub skel: BpfSkel<'a>, // Low-level BPF connector
|
||||
pub struct BpfScheduler<'cb> {
|
||||
pub skel: BpfSkel<'cb>, // Low-level BPF connector
|
||||
queued: libbpf_rs::RingBuffer<'cb>, // Ring buffer of queued tasks
|
||||
struct_ops: Option<libbpf_rs::Link>, // Low-level BPF methods
|
||||
}
|
||||
|
||||
impl<'a> BpfScheduler<'a> {
|
||||
// Buffer to store a task read from the ring buffer.
|
||||
const BUFSIZE: usize = std::mem::size_of::<QueuedTask>();
|
||||
static mut BUF: [u8; BUFSIZE] = [0; BUFSIZE];
|
||||
|
||||
impl<'cb> BpfScheduler<'cb> {
|
||||
pub fn init(slice_us: u64, nr_cpus_online: i32, partial: bool, debug: bool) -> Result<Self> {
|
||||
// Open the BPF prog first for verification.
|
||||
let skel_builder = BpfSkelBuilder::default();
|
||||
@ -220,6 +225,26 @@ impl<'a> BpfScheduler<'a> {
|
||||
// scheduling.
|
||||
ALLOCATOR.lock_memory();
|
||||
|
||||
// Copy one item from the ring buffer.
|
||||
fn callback(data: &[u8]) -> i32 {
|
||||
unsafe {
|
||||
BUF.copy_from_slice(data);
|
||||
}
|
||||
|
||||
// Return an unsupported error to stop early and consume only one item.
|
||||
//
|
||||
// NOTE: this is quite a hack. I wish libbpf would honor stopping after the first item
|
||||
// is consumed, upon returnin a non-zero positive value here, but it doesn't seem to be
|
||||
// the case:
|
||||
//
|
||||
// https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/tree/tools/lib/bpf/ringbuf.c?h=v6.8-rc5#n260
|
||||
//
|
||||
// Maybe we should fix this to stop processing items from the ring buffer also when a
|
||||
// value > 0 is returned.
|
||||
//
|
||||
-255
|
||||
}
|
||||
|
||||
// Initialize online CPUs counter.
|
||||
//
|
||||
// NOTE: we should probably refresh this counter during the normal execution to support cpu
|
||||
@ -242,9 +267,21 @@ impl<'a> BpfScheduler<'a> {
|
||||
.context("Failed to attach struct ops")?,
|
||||
);
|
||||
|
||||
// Build the ring buffer of queued tasks.
|
||||
let binding = skel.maps();
|
||||
let queued_ring_buffer = binding.queued();
|
||||
let mut rbb = libbpf_rs::RingBufferBuilder::new();
|
||||
rbb.add(queued_ring_buffer, callback)
|
||||
.expect("failed to add ringbuf callback");
|
||||
let queued = rbb.build().expect("failed to build ringbuf");
|
||||
|
||||
// Make sure to use the SCHED_EXT class at least for the scheduler itself.
|
||||
match Self::use_sched_ext() {
|
||||
0 => Ok(Self { skel, struct_ops }),
|
||||
0 => Ok(Self {
|
||||
skel,
|
||||
queued,
|
||||
struct_ops,
|
||||
}),
|
||||
err => Err(anyhow::Error::msg(format!(
|
||||
"sched_setscheduler error: {}",
|
||||
err
|
||||
@ -328,34 +365,23 @@ impl<'a> BpfScheduler<'a> {
|
||||
// Get the pid running on a certain CPU, if no tasks are running return 0.
|
||||
#[allow(dead_code)]
|
||||
pub fn get_cpu_pid(&self, cpu: i32) -> u32 {
|
||||
let maps = self.skel.maps();
|
||||
let cpu_map = maps.cpu_map();
|
||||
let cpu_map_ptr = self.skel.bss().cpu_map.as_ptr();
|
||||
|
||||
let key = cpu.to_ne_bytes();
|
||||
let value = cpu_map.lookup(&key, libbpf_rs::MapFlags::ANY).unwrap();
|
||||
let pid = value.map_or(0u32, |vec| {
|
||||
let mut array: [u8; 4] = Default::default();
|
||||
array.copy_from_slice(&vec[..std::cmp::min(4, vec.len())]);
|
||||
u32::from_le_bytes(array)
|
||||
});
|
||||
|
||||
pid
|
||||
unsafe { *cpu_map_ptr.offset(cpu as isize) }
|
||||
}
|
||||
|
||||
// Receive a task to be scheduled from the BPF dispatcher.
|
||||
//
|
||||
// NOTE: if task.cpu is negative the task is exiting and it does not require to be scheduled.
|
||||
pub fn dequeue_task(&mut self) -> Result<Option<QueuedTask>, libbpf_rs::Error> {
|
||||
let maps = self.skel.maps();
|
||||
let queued = maps.queued();
|
||||
|
||||
match queued.lookup_and_delete(&[]) {
|
||||
Ok(Some(msg)) => {
|
||||
let task = EnqueuedMessage::from_bytes(msg.as_slice()).to_queued_task();
|
||||
match self.queued.consume() {
|
||||
Ok(()) => Ok(None),
|
||||
Err(error) if error.kind() == libbpf_rs::ErrorKind::Other => {
|
||||
// A valid task is received, convert data to a proper task struct.
|
||||
let task = unsafe { EnqueuedMessage::from_bytes(&BUF).to_queued_task() };
|
||||
Ok(Some(task))
|
||||
}
|
||||
Ok(None) => Ok(None),
|
||||
Err(err) => Err(err),
|
||||
Err(error) => Err(error),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -119,8 +119,7 @@ const volatile bool debug;
|
||||
* This map is drained by the user space scheduler.
|
||||
*/
|
||||
struct {
|
||||
__uint(type, BPF_MAP_TYPE_QUEUE);
|
||||
__type(value, struct queued_task_ctx);
|
||||
__uint(type, BPF_MAP_TYPE_RINGBUF);
|
||||
__uint(max_entries, MAX_ENQUEUED_TASKS);
|
||||
} queued SEC(".maps");
|
||||
|
||||
@ -157,11 +156,11 @@ struct {
|
||||
} task_ctx_stor SEC(".maps");
|
||||
|
||||
/* Return a local task context from a generic task */
|
||||
struct task_ctx *lookup_task_ctx(struct task_struct *p)
|
||||
struct task_ctx *lookup_task_ctx(const struct task_struct *p)
|
||||
{
|
||||
struct task_ctx *tctx;
|
||||
|
||||
tctx = bpf_task_storage_get(&task_ctx_stor, p, 0, 0);
|
||||
tctx = bpf_task_storage_get(&task_ctx_stor, (struct task_struct *)p, 0, 0);
|
||||
if (!tctx) {
|
||||
scx_bpf_error("Failed to lookup task ctx for %s", p->comm);
|
||||
return NULL;
|
||||
@ -190,12 +189,7 @@ struct {
|
||||
/*
|
||||
* Map of allocated CPUs.
|
||||
*/
|
||||
struct {
|
||||
__uint(type, BPF_MAP_TYPE_ARRAY);
|
||||
__uint(max_entries, MAX_CPUS);
|
||||
__type(key, u32);
|
||||
__type(value, u32);
|
||||
} cpu_map SEC(".maps");
|
||||
volatile u32 cpu_map[MAX_CPUS];
|
||||
|
||||
/*
|
||||
* Assign a task to a CPU (used in .running() and .stopping()).
|
||||
@ -204,14 +198,11 @@ struct {
|
||||
*/
|
||||
static void set_cpu_owner(u32 cpu, u32 pid)
|
||||
{
|
||||
u32 *owner;
|
||||
|
||||
owner = bpf_map_lookup_elem(&cpu_map, &cpu);
|
||||
if (!owner) {
|
||||
scx_bpf_error("Failed to look up cpu_map for cpu %u", cpu);
|
||||
if (cpu >= MAX_CPUS) {
|
||||
scx_bpf_error("Invalid cpu: %d", cpu);
|
||||
return;
|
||||
}
|
||||
*owner = pid;
|
||||
cpu_map[cpu] = pid;
|
||||
}
|
||||
|
||||
/*
|
||||
@ -221,14 +212,11 @@ static void set_cpu_owner(u32 cpu, u32 pid)
|
||||
*/
|
||||
static u32 get_cpu_owner(u32 cpu)
|
||||
{
|
||||
u32 *owner;
|
||||
|
||||
owner = bpf_map_lookup_elem(&cpu_map, &cpu);
|
||||
if (!owner) {
|
||||
scx_bpf_error("Failed to look up cpu_map for cpu %u", cpu);
|
||||
if (cpu >= MAX_CPUS) {
|
||||
scx_bpf_error("Invalid cpu: %d", cpu);
|
||||
return 0;
|
||||
}
|
||||
return *owner;
|
||||
return cpu_map[cpu];
|
||||
}
|
||||
|
||||
/*
|
||||
@ -506,7 +494,7 @@ static void sched_congested(struct task_struct *p)
|
||||
*/
|
||||
void BPF_STRUCT_OPS(rustland_enqueue, struct task_struct *p, u64 enq_flags)
|
||||
{
|
||||
struct queued_task_ctx task;
|
||||
struct queued_task_ctx *task;
|
||||
|
||||
/*
|
||||
* Scheduler is dispatched directly in .dispatch() when needed, so
|
||||
@ -534,17 +522,20 @@ void BPF_STRUCT_OPS(rustland_enqueue, struct task_struct *p, u64 enq_flags)
|
||||
* user-space scheduler.
|
||||
*
|
||||
* If @queued list is full (user-space scheduler is congested) tasks
|
||||
* will be dispatched directly from the kernel (re-using their
|
||||
* previously used CPU in this case).
|
||||
* will be dispatched directly from the kernel (using the first CPU
|
||||
* available in this case).
|
||||
*/
|
||||
get_task_info(&task, p, false);
|
||||
dbg_msg("enqueue: pid=%d (%s)", p->pid, p->comm);
|
||||
if (bpf_map_push_elem(&queued, &task, 0)) {
|
||||
task = bpf_ringbuf_reserve(&queued, sizeof(*task), 0);
|
||||
if (!task) {
|
||||
sched_congested(p);
|
||||
dispatch_task(p, SHARED_DSQ, 0, enq_flags);
|
||||
__sync_fetch_and_add(&nr_kernel_dispatches, 1);
|
||||
return;
|
||||
}
|
||||
get_task_info(task, p, false);
|
||||
dbg_msg("enqueue: pid=%d (%s)", p->pid, p->comm);
|
||||
bpf_ringbuf_submit(task, 0);
|
||||
|
||||
__sync_fetch_and_add(&nr_queued, 1);
|
||||
}
|
||||
|
||||
@ -747,11 +738,11 @@ s32 BPF_STRUCT_OPS(rustland_init_task, struct task_struct *p,
|
||||
void BPF_STRUCT_OPS(rustland_exit_task, struct task_struct *p,
|
||||
struct scx_exit_task_args *args)
|
||||
{
|
||||
struct queued_task_ctx task = {};
|
||||
struct queued_task_ctx *task;
|
||||
|
||||
dbg_msg("exit: pid=%d (%s)", p->pid, p->comm);
|
||||
get_task_info(&task, p, true);
|
||||
if (bpf_map_push_elem(&queued, &task, 0)) {
|
||||
task = bpf_ringbuf_reserve(&queued, sizeof(*task), 0);
|
||||
if (!task) {
|
||||
/*
|
||||
* We may have a memory leak in the scheduler at this point,
|
||||
* because we failed to notify it about this exiting task and
|
||||
@ -766,6 +757,9 @@ void BPF_STRUCT_OPS(rustland_exit_task, struct task_struct *p,
|
||||
sched_congested(p);
|
||||
return;
|
||||
}
|
||||
get_task_info(task, p, true);
|
||||
bpf_ringbuf_submit(task, 0);
|
||||
|
||||
__sync_fetch_and_add(&nr_queued, 1);
|
||||
}
|
||||
|
||||
|
@ -283,14 +283,8 @@ impl<'a> Scheduler<'a> {
|
||||
//
|
||||
// On SMT systems consider only one CPU for each fully idle core, to avoid disrupting
|
||||
// performnance too much by running multiple tasks in the same core.
|
||||
fn get_idle_cpus(&self) -> Vec<i32> {
|
||||
fn get_idle_cpus(&mut self) -> Vec<i32> {
|
||||
let cores = &self.cores.map;
|
||||
let num_cpus = self.cores.nr_cpus_online;
|
||||
|
||||
// Cache the results of self.bpf.get_cpu_pid() for all CPUs.
|
||||
let cpu_pid_map: Vec<u32> = (0..num_cpus)
|
||||
.map(|cpu_id| self.bpf.get_cpu_pid(cpu_id))
|
||||
.collect();
|
||||
|
||||
// Generate the list of idle CPU IDs by selecting the first item from each list of CPU IDs
|
||||
// associated to the idle cores. The remaining sibling CPUs will be used as spare/emergency
|
||||
@ -305,7 +299,7 @@ impl<'a> Scheduler<'a> {
|
||||
.filter_map(|(&core_id, core_cpus)| {
|
||||
if core_cpus
|
||||
.iter()
|
||||
.all(|&cpu_id| cpu_pid_map[cpu_id as usize] == 0)
|
||||
.all(|&cpu_id| self.bpf.get_cpu_pid(cpu_id) == 0)
|
||||
{
|
||||
Some(core_id)
|
||||
} else {
|
||||
@ -477,7 +471,7 @@ impl<'a> Scheduler<'a> {
|
||||
// Dynamically adjust the time slice based on the amount of waiting tasks.
|
||||
fn scale_slice_ns(&mut self) {
|
||||
let nr_scheduled = self.task_pool.tasks.len() as u64;
|
||||
let slice_us_max = self.slice_ns / MSEC_PER_SEC;
|
||||
let slice_us_max = self.slice_ns / NSEC_PER_USEC;
|
||||
|
||||
// Scale time slice as a function of nr_scheduled, but never scale below 250 us.
|
||||
let scaling = ((nr_scheduled + 1) / 2).max(1);
|
||||
|
Loading…
Reference in New Issue
Block a user