scx_rustland: use a ring buffer for queued tasks

Switch from a BPF_MAP_TYPE_QUEUE to a BPF_MAP_TYPE_RINGBUF to store the
tasks that need to be processed by the user-space scheduler.

A ring buffer allows to save a lot of memory copies and syscalls, since
the memory is directly shared between the BPF and the user-space
components.

Performance profile before this change:

  2.44%  [kernel]  [k] __memset
  2.19%  [kernel]  [k] __sys_bpf
  1.59%  [kernel]  [k] __kmem_cache_alloc_node
  1.00%  [kernel]  [k] _copy_from_user

After this change:

  1.42%  [kernel]  [k] __memset
  0.14%  [kernel]  [k] __sys_bpf
  0.10%  [kernel]  [k] __kmem_cache_alloc_node
  0.07%  [kernel]  [k] _copy_from_user

Both the overhead of sys_bpf() and copy_from_user() are reduced by a
factor of ~15x now (only the dispatch path is using sys_bpf() now).

NOTE: despite being very effective, the current implementation is a bit
of a hack. This is because the present ring buffer API exclusively
permits consumption in a greedy manner, where multiple items can be
consumed simultaneously. However, libbpf-rs does not provide precise
information regarding the exact number of items consumed. By utilizing a
more refined libbpf-rs API [1] we may be able to improve this code a
bit.

Moreover, libbpf-rs doesn't provide an API for the user_ring_buffer, so
at the moment there's not a trivial way to apply the same change to the
dispatched tasks.

However, just with this change applied, the overhead of sys_bpf() and
copy_from_user() is already minimal, so we won't get much benefits by
changing the dispatch path to use a BPF ring buffer.

[1] https://github.com/libbpf/libbpf-rs/pull/680

Signed-off-by: Andrea Righi <andrea.righi@canonical.com>
This commit is contained in:
Andrea Righi 2024-02-16 21:57:20 +01:00
parent 04685e633f
commit 93dc615653
3 changed files with 70 additions and 30 deletions

View File

@ -144,10 +144,10 @@ pub struct QueuedTask {
// Task queued for dispatching to the BPF component (see bpf_intf::dispatched_task_ctx). // Task queued for dispatching to the BPF component (see bpf_intf::dispatched_task_ctx).
#[derive(Debug)] #[derive(Debug)]
pub struct DispatchedTask { pub struct DispatchedTask {
pub pid: i32, // pid that uniquely identifies a task pub pid: i32, // pid that uniquely identifies a task
pub cpu: i32, // target CPU selected by the scheduler pub cpu: i32, // target CPU selected by the scheduler
pub cpumask_cnt: u64, // cpumask generation counter pub cpumask_cnt: u64, // cpumask generation counter
pub payload: u64, // task payload (used for debugging) pub payload: u64, // task payload (used for debugging)
} }
// Message received from the dispatcher (see bpf_intf::queued_task_ctx for details). // Message received from the dispatcher (see bpf_intf::queued_task_ctx for details).
@ -205,12 +205,17 @@ impl DispatchedMessage {
} }
} }
pub struct BpfScheduler<'a> { pub struct BpfScheduler<'cb> {
pub skel: BpfSkel<'a>, // Low-level BPF connector pub skel: BpfSkel<'cb>, // Low-level BPF connector
queued: libbpf_rs::RingBuffer<'cb>, // Ring buffer of queued tasks
struct_ops: Option<libbpf_rs::Link>, // Low-level BPF methods struct_ops: Option<libbpf_rs::Link>, // Low-level BPF methods
} }
impl<'a> BpfScheduler<'a> { // Buffer to store a task read from the ring buffer.
const BUFSIZE: usize = std::mem::size_of::<QueuedTask>();
static mut BUF: [u8; BUFSIZE] = [0; BUFSIZE];
impl<'cb> BpfScheduler<'cb> {
pub fn init(slice_us: u64, nr_cpus_online: i32, partial: bool, debug: bool) -> Result<Self> { pub fn init(slice_us: u64, nr_cpus_online: i32, partial: bool, debug: bool) -> Result<Self> {
// Open the BPF prog first for verification. // Open the BPF prog first for verification.
let skel_builder = BpfSkelBuilder::default(); let skel_builder = BpfSkelBuilder::default();
@ -220,6 +225,26 @@ impl<'a> BpfScheduler<'a> {
// scheduling. // scheduling.
ALLOCATOR.lock_memory(); ALLOCATOR.lock_memory();
// Copy one item from the ring buffer.
fn callback(data: &[u8]) -> i32 {
unsafe {
BUF.copy_from_slice(data);
}
// Return an unsupported error to stop early and consume only one item.
//
// NOTE: this is quite a hack. I wish libbpf would honor stopping after the first item
// is consumed, upon returnin a non-zero positive value here, but it doesn't seem to be
// the case:
//
// https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/tree/tools/lib/bpf/ringbuf.c?h=v6.8-rc5#n260
//
// Maybe we should fix this to stop processing items from the ring buffer also when a
// value > 0 is returned.
//
-255
}
// Initialize online CPUs counter. // Initialize online CPUs counter.
// //
// NOTE: we should probably refresh this counter during the normal execution to support cpu // NOTE: we should probably refresh this counter during the normal execution to support cpu
@ -242,9 +267,21 @@ impl<'a> BpfScheduler<'a> {
.context("Failed to attach struct ops")?, .context("Failed to attach struct ops")?,
); );
// Build the ring buffer of queued tasks.
let binding = skel.maps();
let queued_ring_buffer = binding.queued();
let mut rbb = libbpf_rs::RingBufferBuilder::new();
rbb.add(queued_ring_buffer, callback)
.expect("failed to add ringbuf callback");
let queued = rbb.build().expect("failed to build ringbuf");
// Make sure to use the SCHED_EXT class at least for the scheduler itself. // Make sure to use the SCHED_EXT class at least for the scheduler itself.
match Self::use_sched_ext() { match Self::use_sched_ext() {
0 => Ok(Self { skel, struct_ops }), 0 => Ok(Self {
skel,
queued,
struct_ops,
}),
err => Err(anyhow::Error::msg(format!( err => Err(anyhow::Error::msg(format!(
"sched_setscheduler error: {}", "sched_setscheduler error: {}",
err err
@ -337,16 +374,14 @@ impl<'a> BpfScheduler<'a> {
// //
// NOTE: if task.cpu is negative the task is exiting and it does not require to be scheduled. // NOTE: if task.cpu is negative the task is exiting and it does not require to be scheduled.
pub fn dequeue_task(&mut self) -> Result<Option<QueuedTask>, libbpf_rs::Error> { pub fn dequeue_task(&mut self) -> Result<Option<QueuedTask>, libbpf_rs::Error> {
let maps = self.skel.maps(); match self.queued.consume() {
let queued = maps.queued(); Ok(()) => Ok(None),
Err(error) if error.kind() == libbpf_rs::ErrorKind::Other => {
match queued.lookup_and_delete(&[]) { // A valid task is received, convert data to a proper task struct.
Ok(Some(msg)) => { let task = unsafe { EnqueuedMessage::from_bytes(&BUF).to_queued_task() };
let task = EnqueuedMessage::from_bytes(msg.as_slice()).to_queued_task();
Ok(Some(task)) Ok(Some(task))
} }
Ok(None) => Ok(None), Err(error) => Err(error),
Err(err) => Err(err),
} }
} }

View File

@ -119,8 +119,7 @@ const volatile bool debug;
* This map is drained by the user space scheduler. * This map is drained by the user space scheduler.
*/ */
struct { struct {
__uint(type, BPF_MAP_TYPE_QUEUE); __uint(type, BPF_MAP_TYPE_RINGBUF);
__type(value, struct queued_task_ctx);
__uint(max_entries, MAX_ENQUEUED_TASKS); __uint(max_entries, MAX_ENQUEUED_TASKS);
} queued SEC(".maps"); } queued SEC(".maps");
@ -157,11 +156,11 @@ struct {
} task_ctx_stor SEC(".maps"); } task_ctx_stor SEC(".maps");
/* Return a local task context from a generic task */ /* Return a local task context from a generic task */
struct task_ctx *lookup_task_ctx(struct task_struct *p) struct task_ctx *lookup_task_ctx(const struct task_struct *p)
{ {
struct task_ctx *tctx; struct task_ctx *tctx;
tctx = bpf_task_storage_get(&task_ctx_stor, p, 0, 0); tctx = bpf_task_storage_get(&task_ctx_stor, (struct task_struct *)p, 0, 0);
if (!tctx) { if (!tctx) {
scx_bpf_error("Failed to lookup task ctx for %s", p->comm); scx_bpf_error("Failed to lookup task ctx for %s", p->comm);
return NULL; return NULL;
@ -495,7 +494,7 @@ static void sched_congested(struct task_struct *p)
*/ */
void BPF_STRUCT_OPS(rustland_enqueue, struct task_struct *p, u64 enq_flags) void BPF_STRUCT_OPS(rustland_enqueue, struct task_struct *p, u64 enq_flags)
{ {
struct queued_task_ctx task; struct queued_task_ctx *task;
/* /*
* Scheduler is dispatched directly in .dispatch() when needed, so * Scheduler is dispatched directly in .dispatch() when needed, so
@ -523,17 +522,20 @@ void BPF_STRUCT_OPS(rustland_enqueue, struct task_struct *p, u64 enq_flags)
* user-space scheduler. * user-space scheduler.
* *
* If @queued list is full (user-space scheduler is congested) tasks * If @queued list is full (user-space scheduler is congested) tasks
* will be dispatched directly from the kernel (re-using their * will be dispatched directly from the kernel (using the first CPU
* previously used CPU in this case). * available in this case).
*/ */
get_task_info(&task, p, false); task = bpf_ringbuf_reserve(&queued, sizeof(*task), 0);
dbg_msg("enqueue: pid=%d (%s)", p->pid, p->comm); if (!task) {
if (bpf_map_push_elem(&queued, &task, 0)) {
sched_congested(p); sched_congested(p);
dispatch_task(p, SHARED_DSQ, 0, enq_flags); dispatch_task(p, SHARED_DSQ, 0, enq_flags);
__sync_fetch_and_add(&nr_kernel_dispatches, 1); __sync_fetch_and_add(&nr_kernel_dispatches, 1);
return; return;
} }
get_task_info(task, p, false);
dbg_msg("enqueue: pid=%d (%s)", p->pid, p->comm);
bpf_ringbuf_submit(task, 0);
__sync_fetch_and_add(&nr_queued, 1); __sync_fetch_and_add(&nr_queued, 1);
} }
@ -736,11 +738,11 @@ s32 BPF_STRUCT_OPS(rustland_init_task, struct task_struct *p,
void BPF_STRUCT_OPS(rustland_exit_task, struct task_struct *p, void BPF_STRUCT_OPS(rustland_exit_task, struct task_struct *p,
struct scx_exit_task_args *args) struct scx_exit_task_args *args)
{ {
struct queued_task_ctx task = {}; struct queued_task_ctx *task;
dbg_msg("exit: pid=%d (%s)", p->pid, p->comm); dbg_msg("exit: pid=%d (%s)", p->pid, p->comm);
get_task_info(&task, p, true); task = bpf_ringbuf_reserve(&queued, sizeof(*task), 0);
if (bpf_map_push_elem(&queued, &task, 0)) { if (!task) {
/* /*
* We may have a memory leak in the scheduler at this point, * We may have a memory leak in the scheduler at this point,
* because we failed to notify it about this exiting task and * because we failed to notify it about this exiting task and
@ -755,6 +757,9 @@ void BPF_STRUCT_OPS(rustland_exit_task, struct task_struct *p,
sched_congested(p); sched_congested(p);
return; return;
} }
get_task_info(task, p, true);
bpf_ringbuf_submit(task, 0);
__sync_fetch_and_add(&nr_queued, 1); __sync_fetch_and_add(&nr_queued, 1);
} }

View File

@ -471,7 +471,7 @@ impl<'a> Scheduler<'a> {
// Dynamically adjust the time slice based on the amount of waiting tasks. // Dynamically adjust the time slice based on the amount of waiting tasks.
fn scale_slice_ns(&mut self) { fn scale_slice_ns(&mut self) {
let nr_scheduled = self.task_pool.tasks.len() as u64; let nr_scheduled = self.task_pool.tasks.len() as u64;
let slice_us_max = self.slice_ns / MSEC_PER_SEC; let slice_us_max = self.slice_ns / NSEC_PER_USEC;
// Scale time slice as a function of nr_scheduled, but never scale below 250 us. // Scale time slice as a function of nr_scheduled, but never scale below 250 us.
let scaling = ((nr_scheduled + 1) / 2).max(1); let scaling = ((nr_scheduled + 1) / 2).max(1);