diff --git a/scheds/rust/scx_rustland/src/bpf.rs b/scheds/rust/scx_rustland/src/bpf.rs index 07106ab..60e75a8 100644 --- a/scheds/rust/scx_rustland/src/bpf.rs +++ b/scheds/rust/scx_rustland/src/bpf.rs @@ -144,10 +144,10 @@ pub struct QueuedTask { // Task queued for dispatching to the BPF component (see bpf_intf::dispatched_task_ctx). #[derive(Debug)] pub struct DispatchedTask { - pub pid: i32, // pid that uniquely identifies a task - pub cpu: i32, // target CPU selected by the scheduler - pub cpumask_cnt: u64, // cpumask generation counter - pub payload: u64, // task payload (used for debugging) + pub pid: i32, // pid that uniquely identifies a task + pub cpu: i32, // target CPU selected by the scheduler + pub cpumask_cnt: u64, // cpumask generation counter + pub payload: u64, // task payload (used for debugging) } // Message received from the dispatcher (see bpf_intf::queued_task_ctx for details). @@ -205,12 +205,17 @@ impl DispatchedMessage { } } -pub struct BpfScheduler<'a> { - pub skel: BpfSkel<'a>, // Low-level BPF connector +pub struct BpfScheduler<'cb> { + pub skel: BpfSkel<'cb>, // Low-level BPF connector + queued: libbpf_rs::RingBuffer<'cb>, // Ring buffer of queued tasks struct_ops: Option, // Low-level BPF methods } -impl<'a> BpfScheduler<'a> { +// Buffer to store a task read from the ring buffer. +const BUFSIZE: usize = std::mem::size_of::(); +static mut BUF: [u8; BUFSIZE] = [0; BUFSIZE]; + +impl<'cb> BpfScheduler<'cb> { pub fn init(slice_us: u64, nr_cpus_online: i32, partial: bool, debug: bool) -> Result { // Open the BPF prog first for verification. let skel_builder = BpfSkelBuilder::default(); @@ -220,6 +225,26 @@ impl<'a> BpfScheduler<'a> { // scheduling. ALLOCATOR.lock_memory(); + // Copy one item from the ring buffer. + fn callback(data: &[u8]) -> i32 { + unsafe { + BUF.copy_from_slice(data); + } + + // Return an unsupported error to stop early and consume only one item. + // + // NOTE: this is quite a hack. I wish libbpf would honor stopping after the first item + // is consumed, upon returnin a non-zero positive value here, but it doesn't seem to be + // the case: + // + // https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/tree/tools/lib/bpf/ringbuf.c?h=v6.8-rc5#n260 + // + // Maybe we should fix this to stop processing items from the ring buffer also when a + // value > 0 is returned. + // + -255 + } + // Initialize online CPUs counter. // // NOTE: we should probably refresh this counter during the normal execution to support cpu @@ -242,9 +267,21 @@ impl<'a> BpfScheduler<'a> { .context("Failed to attach struct ops")?, ); + // Build the ring buffer of queued tasks. + let binding = skel.maps(); + let queued_ring_buffer = binding.queued(); + let mut rbb = libbpf_rs::RingBufferBuilder::new(); + rbb.add(queued_ring_buffer, callback) + .expect("failed to add ringbuf callback"); + let queued = rbb.build().expect("failed to build ringbuf"); + // Make sure to use the SCHED_EXT class at least for the scheduler itself. match Self::use_sched_ext() { - 0 => Ok(Self { skel, struct_ops }), + 0 => Ok(Self { + skel, + queued, + struct_ops, + }), err => Err(anyhow::Error::msg(format!( "sched_setscheduler error: {}", err @@ -337,16 +374,14 @@ impl<'a> BpfScheduler<'a> { // // NOTE: if task.cpu is negative the task is exiting and it does not require to be scheduled. pub fn dequeue_task(&mut self) -> Result, libbpf_rs::Error> { - let maps = self.skel.maps(); - let queued = maps.queued(); - - match queued.lookup_and_delete(&[]) { - Ok(Some(msg)) => { - let task = EnqueuedMessage::from_bytes(msg.as_slice()).to_queued_task(); + match self.queued.consume() { + Ok(()) => Ok(None), + Err(error) if error.kind() == libbpf_rs::ErrorKind::Other => { + // A valid task is received, convert data to a proper task struct. + let task = unsafe { EnqueuedMessage::from_bytes(&BUF).to_queued_task() }; Ok(Some(task)) } - Ok(None) => Ok(None), - Err(err) => Err(err), + Err(error) => Err(error), } } diff --git a/scheds/rust/scx_rustland/src/bpf/main.bpf.c b/scheds/rust/scx_rustland/src/bpf/main.bpf.c index 561b27e..7dda89c 100644 --- a/scheds/rust/scx_rustland/src/bpf/main.bpf.c +++ b/scheds/rust/scx_rustland/src/bpf/main.bpf.c @@ -119,8 +119,7 @@ const volatile bool debug; * This map is drained by the user space scheduler. */ struct { - __uint(type, BPF_MAP_TYPE_QUEUE); - __type(value, struct queued_task_ctx); + __uint(type, BPF_MAP_TYPE_RINGBUF); __uint(max_entries, MAX_ENQUEUED_TASKS); } queued SEC(".maps"); @@ -157,11 +156,11 @@ struct { } task_ctx_stor SEC(".maps"); /* Return a local task context from a generic task */ -struct task_ctx *lookup_task_ctx(struct task_struct *p) +struct task_ctx *lookup_task_ctx(const struct task_struct *p) { struct task_ctx *tctx; - tctx = bpf_task_storage_get(&task_ctx_stor, p, 0, 0); + tctx = bpf_task_storage_get(&task_ctx_stor, (struct task_struct *)p, 0, 0); if (!tctx) { scx_bpf_error("Failed to lookup task ctx for %s", p->comm); return NULL; @@ -495,7 +494,7 @@ static void sched_congested(struct task_struct *p) */ void BPF_STRUCT_OPS(rustland_enqueue, struct task_struct *p, u64 enq_flags) { - struct queued_task_ctx task; + struct queued_task_ctx *task; /* * Scheduler is dispatched directly in .dispatch() when needed, so @@ -523,17 +522,20 @@ void BPF_STRUCT_OPS(rustland_enqueue, struct task_struct *p, u64 enq_flags) * user-space scheduler. * * If @queued list is full (user-space scheduler is congested) tasks - * will be dispatched directly from the kernel (re-using their - * previously used CPU in this case). + * will be dispatched directly from the kernel (using the first CPU + * available in this case). */ - get_task_info(&task, p, false); - dbg_msg("enqueue: pid=%d (%s)", p->pid, p->comm); - if (bpf_map_push_elem(&queued, &task, 0)) { + task = bpf_ringbuf_reserve(&queued, sizeof(*task), 0); + if (!task) { sched_congested(p); dispatch_task(p, SHARED_DSQ, 0, enq_flags); __sync_fetch_and_add(&nr_kernel_dispatches, 1); return; } + get_task_info(task, p, false); + dbg_msg("enqueue: pid=%d (%s)", p->pid, p->comm); + bpf_ringbuf_submit(task, 0); + __sync_fetch_and_add(&nr_queued, 1); } @@ -736,11 +738,11 @@ s32 BPF_STRUCT_OPS(rustland_init_task, struct task_struct *p, void BPF_STRUCT_OPS(rustland_exit_task, struct task_struct *p, struct scx_exit_task_args *args) { - struct queued_task_ctx task = {}; + struct queued_task_ctx *task; dbg_msg("exit: pid=%d (%s)", p->pid, p->comm); - get_task_info(&task, p, true); - if (bpf_map_push_elem(&queued, &task, 0)) { + task = bpf_ringbuf_reserve(&queued, sizeof(*task), 0); + if (!task) { /* * We may have a memory leak in the scheduler at this point, * because we failed to notify it about this exiting task and @@ -755,6 +757,9 @@ void BPF_STRUCT_OPS(rustland_exit_task, struct task_struct *p, sched_congested(p); return; } + get_task_info(task, p, true); + bpf_ringbuf_submit(task, 0); + __sync_fetch_and_add(&nr_queued, 1); } diff --git a/scheds/rust/scx_rustland/src/main.rs b/scheds/rust/scx_rustland/src/main.rs index 0a4cbdc..fc93d88 100644 --- a/scheds/rust/scx_rustland/src/main.rs +++ b/scheds/rust/scx_rustland/src/main.rs @@ -471,7 +471,7 @@ impl<'a> Scheduler<'a> { // Dynamically adjust the time slice based on the amount of waiting tasks. fn scale_slice_ns(&mut self) { let nr_scheduled = self.task_pool.tasks.len() as u64; - let slice_us_max = self.slice_ns / MSEC_PER_SEC; + let slice_us_max = self.slice_ns / NSEC_PER_USEC; // Scale time slice as a function of nr_scheduled, but never scale below 250 us. let scaling = ((nr_scheduled + 1) / 2).max(1);