Merge pull request #568 from sched-ext/rustland-core-design-improv

scx_rustland_core: small core design improvements
This commit is contained in:
Andrea Righi 2024-08-26 20:06:21 +02:00 committed by GitHub
commit 35db89e90d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 240 additions and 94 deletions

View File

@ -54,9 +54,144 @@ scx_rustland_core = "0.1"
## Example
You can find a simple example of a fully working FIFO scheduler implemented
using the `scx_rustland_core` framework here:
[scx_rlfifo](https://github.com/sched-ext/scx/tree/main/scheds/rust/scx_rlfifo).
Following you can find a simple example of a fully working FIFO scheduler,
implemented using the `scx_rustland_core` framework:
```
// Copyright (c) Andrea Righi <andrea.righi@linux.dev>
// This software may be used and distributed according to the terms of the
// GNU General Public License version 2.
mod bpf_skel;
pub use bpf_skel::*;
pub mod bpf_intf;
mod bpf;
use bpf::*;
use scx_utils::UserExitInfo;
use libbpf_rs::OpenObject;
use std::mem::MaybeUninit;
use std::collections::VecDeque;
use anyhow::Result;
const SLICE_US: u64 = 5000;
struct Scheduler<'a> {
bpf: BpfScheduler<'a>,
task_queue: VecDeque<QueuedTask>,
}
impl<'a> Scheduler<'a> {
fn init(open_object: &'a mut MaybeUninit<OpenObject>) -> Result<Self> {
let bpf = BpfScheduler::init(
open_object,
0, // exit_dump_len (buffer size of exit info, 0 = default)
false, // partial (false = include all tasks)
false, // debug (false = debug mode off)
)?;
Ok(Self { bpf, task_queue: VecDeque::new() })
}
fn consume_all_tasks(&mut self) {
// Consume all tasks that are ready to run.
//
// Each task contains the following details:
//
// pub struct QueuedTask {
// pub pid: i32, // pid that uniquely identifies a task
// pub cpu: i32, // CPU where the task is running
// pub sum_exec_runtime: u64, // Total cpu time
// pub weight: u64, // Task static priority
// }
//
// Although the FIFO scheduler doesn't use these fields, they can provide valuable data for
// implementing more sophisticated scheduling policies.
while let Ok(Some(task)) = self.bpf.dequeue_task() {
self.task_queue.push_back(task);
}
}
fn dispatch_next_task(&mut self) {
if let Some(task) = self.task_queue.pop_front() {
// Create a new task to be dispatched, derived from the received enqueued task.
//
// pub struct DispatchedTask {
// pub pid: i32, // pid that uniquely identifies a task
// pub cpu: i32, // target CPU selected by the scheduler
// pub flags: u64, // special dispatch flags
// pub slice_ns: u64, // time slice assigned to the task (0 = default)
// }
//
// The dispatched task's information are pre-populated from the QueuedTask and they can
// be modified before dispatching it via self.bpf.dispatch_task().
let mut dispatched_task = DispatchedTask::new(&task);
// Decide where the task needs to run (target CPU).
//
// A call to select_cpu() will return the most suitable idle CPU for the task,
// considering its previously used CPU.
let cpu = self.bpf.select_cpu(task.pid, task.cpu, 0);
if cpu >= 0 {
dispatched_task.cpu = cpu;
} else {
dispatched_task.flags |= RL_CPU_ANY;
}
// Decide for how long the task needs to run (time slice); if not specified
// SCX_SLICE_DFL will be used by default.
dispatched_task.slice_ns = SLICE_US;
// Dispatch the task on the target CPU.
self.bpf.dispatch_task(&dispatched_task).unwrap();
// Notify the BPF component of the number of pending tasks and immediately give a
// chance to run to the dispatched task.
self.bpf.notify_complete(self.task_queue.len() as u64);
}
}
fn dispatch_tasks(&mut self) {
loop {
// Consume all tasks before dispatching any.
self.consume_all_tasks();
// Dispatch one task from the queue.
self.dispatch_next_task();
// If no task is ready to run (or in case of error), stop dispatching tasks and notify
// the BPF component that all tasks have been scheduled / dispatched, with no remaining
// pending tasks.
if self.task_queue.is_empty() {
self.bpf.notify_complete(0);
break;
}
}
}
fn run(&mut self) -> Result<UserExitInfo> {
while !self.bpf.exited() {
self.dispatch_tasks();
}
self.bpf.shutdown_and_report()
}
}
fn main() -> Result<()> {
// Initialize and load the FIFO scheduler.
let mut open_object = MaybeUninit::uninit();
loop {
let mut sched = Scheduler::init(&mut open_object)?;
if !sched.run()?.should_restart() {
break;
}
}
Ok(())
}
```
## License

View File

@ -53,10 +53,6 @@ const SCHED_EXT: i32 = 7;
#[allow(dead_code)]
pub const RL_CPU_ANY: u64 = bpf_intf::RL_CPU_ANY as u64;
// Allow to preempt the target CPU when dispatching the task.
#[allow(dead_code)]
pub const RL_PREEMPT_CPU: u64 = bpf_intf::RL_PREEMPT_CPU as u64;
/// High-level Rust abstraction to interact with a generic sched-ext BPF component.
///
/// Overview
@ -94,6 +90,7 @@ pub struct DispatchedTask {
pub cpu: i32, // target CPU selected by the scheduler
pub flags: u64, // special dispatch flags
pub slice_ns: u64, // time slice assigned to the task (0 = default)
pub vtime: u64, // task deadline / vruntime
cpumask_cnt: u64, // cpumask generation counter (private)
}
@ -109,6 +106,7 @@ impl DispatchedTask {
flags: 0,
cpumask_cnt: task.cpumask_cnt,
slice_ns: 0, // use default time slice
vtime: 0,
}
}
}
@ -433,16 +431,18 @@ impl<'cb> BpfScheduler<'cb> {
pid,
cpu,
flags,
cpumask_cnt,
slice_ns,
vtime,
cpumask_cnt,
..
} = &mut dispatched_task.as_mut();
*pid = task.pid;
*cpu = task.cpu;
*flags = task.flags;
*cpumask_cnt = task.cpumask_cnt;
*slice_ns = task.slice_ns;
*vtime = task.vtime;
*cpumask_cnt = task.cpumask_cnt;
// Store the task in the user ring buffer.
//

View File

@ -54,11 +54,6 @@ enum {
* on the first CPU available.
*/
RL_CPU_ANY = 1 << 0,
/*
* Allow to preempt the target CPU when dispatching the task.
*/
RL_PREEMPT_CPU = 1 << 1,
};
/*
@ -94,6 +89,7 @@ struct dispatched_task_ctx {
s32 cpu; /* CPU where the task should be dispatched */
u64 flags; /* special dispatch flags */
u64 slice_ns; /* time slice assigned to the task (0=default) */
u64 vtime; /* task deadline / vruntime */
u64 cpumask_cnt; /* cpumask generation counter */
};

View File

@ -13,9 +13,10 @@
* to be dispatched in the proper order.
*
* Messages between the BPF component and the user-space scheduler are passed
* using two BPF_MAP_TYPE_QUEUE maps: @queued for the messages sent by the BPF
* dispatcher to the user-space scheduler and @dispatched for the messages sent
* by the user-space scheduler to the BPF dispatcher.
* using BPF_MAP_TYPE_RINGBUFFER / BPF_MAP_TYPE_USER_RINGBUF maps: @queued for
* the messages sent by the BPF dispatcher to the user-space scheduler and
* @dispatched for the messages sent by the user-space scheduler to the BPF
* dispatcher.
*
* The BPF dispatcher is completely agnostic of the particular scheduling
* policy implemented in user-space. For this reason developers that are
@ -349,16 +350,17 @@ static inline u64 task_slice(struct task_struct *p)
struct task_ctx *tctx;
tctx = lookup_task_ctx(p);
if (!tctx)
if (!tctx || !tctx->slice_ns)
return SCX_SLICE_DFL;
return tctx->slice_ns;
}
/*
* Dispatch a task to a target DSQ, waking up the corresponding CPU, if needed.
* Dispatch a task to a target per-CPU DSQ, waking up the corresponding CPU, if
* needed.
*/
static void dispatch_task(struct task_struct *p, u64 dsq_id,
u64 cpumask_cnt, u64 slice, u64 enq_flags)
u64 cpumask_cnt, u64 slice, u64 vtime)
{
struct task_ctx *tctx;
u64 curr_cpumask_cnt;
@ -378,9 +380,9 @@ static void dispatch_task(struct task_struct *p, u64 dsq_id,
*/
switch (dsq_id) {
case SHARED_DSQ:
scx_bpf_dispatch(p, dsq_id, SCX_SLICE_DFL, enq_flags);
dbg_msg("dispatch: pid=%d (%s) dsq=%llu enq_flags=%llx slice=%llu",
p->pid, p->comm, dsq_id, enq_flags, slice);
scx_bpf_dispatch_vtime(p, dsq_id, SCX_SLICE_DFL, vtime, 0);
dbg_msg("dispatch: pid=%d (%s) dsq=%llu slice=%llu vtime=%llu",
p->pid, p->comm, dsq_id, slice, vtime);
break;
default:
/*
@ -399,7 +401,7 @@ static void dispatch_task(struct task_struct *p, u64 dsq_id,
* core sched-ext code, potentially selecting a different cpu
* and a different cpumask.
*/
scx_bpf_dispatch(p, dsq_id, SCX_SLICE_DFL, enq_flags);
scx_bpf_dispatch_vtime(p, dsq_id, SCX_SLICE_DFL, vtime, 0);
/* Read current cpumask generation counter */
curr_cpumask_cnt = tctx->cpumask_cnt;
@ -427,15 +429,15 @@ static void dispatch_task(struct task_struct *p, u64 dsq_id,
scx_bpf_dispatch_cancel();
__sync_fetch_and_add(&nr_bounce_dispatches, 1);
scx_bpf_dispatch(p, SHARED_DSQ, SCX_SLICE_DFL, enq_flags);
dbg_msg("dispatch: pid=%d (%s) dsq=%llu enq_flags=%llx slice=%llu bounce",
p->pid, p->comm, dsq_id, enq_flags, slice);
scx_bpf_dispatch_vtime(p, SHARED_DSQ, SCX_SLICE_DFL, vtime, 0);
dbg_msg("dispatch: pid=%d (%s) dsq=%llu slice=%llu vtime=%llu bounce",
p->pid, p->comm, dsq_id, slice, vtime);
return;
}
/* Requested dispatch was valid */
dbg_msg("dispatch: pid=%d (%s) dsq=%llu enq_flags=%llx slice=%llu",
p->pid, p->comm, dsq_id, enq_flags, slice);
dbg_msg("dispatch: pid=%d (%s) dsq=%llu slice=%llu vtime=%llu",
p->pid, p->comm, dsq_id, slice, vtime);
break;
}
@ -629,7 +631,6 @@ static void sched_congested(struct task_struct *p)
*/
void BPF_STRUCT_OPS(rustland_enqueue, struct task_struct *p, u64 enq_flags)
{
s32 cpu = scx_bpf_task_cpu(p);
struct queued_task_ctx *task;
/*
@ -650,7 +651,7 @@ void BPF_STRUCT_OPS(rustland_enqueue, struct task_struct *p, u64 enq_flags)
task = bpf_ringbuf_reserve(&queued, sizeof(*task), 0);
if (!task) {
sched_congested(p);
dispatch_task(p, SHARED_DSQ, 0, SCX_SLICE_DFL, enq_flags);
dispatch_task(p, SHARED_DSQ, 0, SCX_SLICE_DFL, 0);
__sync_fetch_and_add(&nr_kernel_dispatches, 1);
return;
}
@ -683,12 +684,7 @@ static long handle_dispatched_task(struct bpf_dynptr *dynptr, void *context)
dbg_msg("usersched: pid=%d cpu=%d cpumask_cnt=%llu slice_ns=%llu flags=%llx",
task->pid, task->cpu, task->cpumask_cnt, task->slice_ns, task->flags);
/*
* Map RL_PREEMPT_CPU to SCX_ENQ_PREEMPT and allow this task to
* preempt others.
*/
if (task->flags & RL_PREEMPT_CPU)
enq_flags = SCX_ENQ_PREEMPT;
/*
* Check whether the user-space scheduler assigned a different
* CPU to the task and migrate (if possible).
@ -701,7 +697,8 @@ static long handle_dispatched_task(struct bpf_dynptr *dynptr, void *context)
dsq_id = SHARED_DSQ;
else
dsq_id = cpu_to_dsq(task->cpu);
dispatch_task(p, dsq_id, task->cpumask_cnt, task->slice_ns, enq_flags);
dispatch_task(p, dsq_id, task->cpumask_cnt, task->slice_ns, task->vtime);
bpf_task_release(p);
__sync_fetch_and_add(&nr_user_dispatches, 1);

View File

@ -13,6 +13,7 @@ use scx_utils::UserExitInfo;
use libbpf_rs::OpenObject;
use std::collections::VecDeque;
use std::mem::MaybeUninit;
use std::time::SystemTime;
@ -22,6 +23,7 @@ const SLICE_US: u64 = 5000;
struct Scheduler<'a> {
bpf: BpfScheduler<'a>,
task_queue: VecDeque<QueuedTask>,
}
impl<'a> Scheduler<'a> {
@ -32,68 +34,84 @@ impl<'a> Scheduler<'a> {
false, // partial (false = include all tasks)
false, // debug (false = debug mode off)
)?;
Ok(Self { bpf })
Ok(Self {
bpf,
task_queue: VecDeque::new(),
})
}
fn consume_all_tasks(&mut self) {
// Consume all tasks that are ready to run.
//
// Each task contains the following details:
//
// pub struct QueuedTask {
// pub pid: i32, // pid that uniquely identifies a task
// pub cpu: i32, // CPU where the task is running
// pub sum_exec_runtime: u64, // Total cpu time
// pub weight: u64, // Task static priority
// }
//
// Although the FIFO scheduler doesn't use these fields, they can provide valuable data for
// implementing more sophisticated scheduling policies.
while let Ok(Some(task)) = self.bpf.dequeue_task() {
self.task_queue.push_back(task);
}
}
fn dispatch_next_task(&mut self) {
if let Some(task) = self.task_queue.pop_front() {
// Create a new task to be dispatched, derived from the received enqueued task.
//
// pub struct DispatchedTask {
// pub pid: i32, // pid that uniquely identifies a task
// pub cpu: i32, // target CPU selected by the scheduler
// pub flags: u64, // special dispatch flags
// pub slice_ns: u64, // time slice assigned to the task (0 = default)
// }
//
// The dispatched task's information are pre-populated from the QueuedTask and they can
// be modified before dispatching it via self.bpf.dispatch_task().
let mut dispatched_task = DispatchedTask::new(&task);
// Decide where the task needs to run (target CPU).
//
// A call to select_cpu() will return the most suitable idle CPU for the task,
// considering its previously used CPU.
let cpu = self.bpf.select_cpu(task.pid, task.cpu, 0);
if cpu >= 0 {
dispatched_task.cpu = cpu;
} else {
dispatched_task.flags |= RL_CPU_ANY;
}
// Decide for how long the task needs to run (time slice); if not specified
// SCX_SLICE_DFL will be used by default.
dispatched_task.slice_ns = SLICE_US;
// Dispatch the task on the target CPU.
self.bpf.dispatch_task(&dispatched_task).unwrap();
// Notify the BPF component of the number of pending tasks and immediately give a
// chance to run to the dispatched task.
self.bpf.notify_complete(self.task_queue.len() as u64);
}
}
fn dispatch_tasks(&mut self) {
loop {
// Consume a taks that wants to run.
match self.bpf.dequeue_task() {
// Consume a task that is ready to run.
//
// The task contains the following details:
//
// pub struct QueuedTask {
// pub pid: i32, // pid that uniquely identifies a task
// pub cpu: i32, // CPU where the task is running
// pub sum_exec_runtime: u64, // Total cpu time
// pub weight: u64, // Task static priority
// }
//
// Although the FIFO scheduler doesn't use these fields, they can provide
// valuable data for implementing more sophisticated scheduling policies.
Ok(Some(task)) => {
// Create a new task to be dispatched, derived from the received enqueued task.
//
// pub struct DispatchedTask {
// pub pid: i32, // pid that uniquely identifies a task
// pub cpu: i32, // target CPU selected by the scheduler
// pub flags: u64, // special dispatch flags
// pub slice_ns: u64, // time slice assigned to the task (0 = default)
// }
//
// The dispatched task's information are pre-populated from the QueuedTask and
// they can be modified before dispatching it via self.bpf.dispatch_task().
let mut dispatched_task = DispatchedTask::new(&task);
// Consume all tasks before dispatching any.
self.consume_all_tasks();
// Decide where the task needs to run (target CPU).
//
// A call to select_cpu() will return the most suitable idle CPU for the task,
// considering its previously used CPU.
let cpu = self.bpf.select_cpu(task.pid, task.cpu, 0);
if cpu >= 0 {
// Assign the selected CPU to the task to be dispatched.
dispatched_task.cpu = cpu;
} else {
// No idle CPU found: dspatch task on the first CPU available via the flag
// RL_CPU_ANY.
dispatched_task.flags |= RL_CPU_ANY;
}
// Dispatch one task from the queue.
self.dispatch_next_task();
// Decide for how long the task needs to run (time slice); if not specified
// SCX_SLICE_DFL will be used by default.
dispatched_task.slice_ns = SLICE_US;
// Dispatch the task on the target CPU.
self.bpf.dispatch_task(&dispatched_task).unwrap();
}
Ok(None) | Err(_) => {
// If no task is ready to run (or in case of error), stop dispatching tasks and
// notify the BPF component that all tasks have been scheduled / dispatched,
// with no remaining pending tasks.
self.bpf.notify_complete(0);
break;
}
// If no task is ready to run (or in case of error), stop dispatching tasks and notify
// the BPF component that all tasks have been scheduled / dispatched, with no remaining
// pending tasks.
if self.task_queue.is_empty() {
self.bpf.notify_complete(0);
break;
}
}
}