Merge pull request #87 from sched-ext/scx-rustland-allocator

scx_userland: use a custom memory allocator to prevent page faults
This commit is contained in:
Andrea Righi 2024-01-15 16:21:17 +01:00 committed by GitHub
commit 09e7905ee0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 227 additions and 13 deletions

View File

@ -19,6 +19,9 @@ use libbpf_rs::skel::SkelBuilder as _;
use libc::{sched_param, sched_setscheduler}; use libc::{sched_param, sched_setscheduler};
mod alloc;
use alloc::*;
// Defined in UAPI // Defined in UAPI
const SCHED_EXT: i32 = 7; const SCHED_EXT: i32 = 7;
@ -122,6 +125,18 @@ const SCHED_EXT: i32 = 7;
/// } /// }
/// ///
// Override default memory allocator.
//
// To prevent potential deadlock conditions under heavy loads, any scheduler that delegates
// scheduling decisions to user-space should avoid triggering page faults.
//
// To address this issue, replace the global allocator with a custom one (RustLandAllocator),
// designed to operate on a pre-allocated buffer. This, coupled with the memory locking achieved
// through mlockall(), prevents page faults from occurring during the execution of the user-space
// scheduler.
#[global_allocator]
static ALLOCATOR: RustLandAllocator = RustLandAllocator;
// Task queued for scheduling from the BPF component (see bpf_intf::queued_task_ctx). // Task queued for scheduling from the BPF component (see bpf_intf::queued_task_ctx).
#[derive(Debug)] #[derive(Debug)]
pub struct QueuedTask { pub struct QueuedTask {
@ -204,6 +219,10 @@ impl<'a> BpfScheduler<'a> {
let skel_builder = BpfSkelBuilder::default(); let skel_builder = BpfSkelBuilder::default();
let mut skel = skel_builder.open().context("Failed to open BPF program")?; let mut skel = skel_builder.open().context("Failed to open BPF program")?;
// Lock all the memory to prevent page faults that could trigger potential deadlocks during
// scheduling.
ALLOCATOR.lock_memory();
// Initialize online CPUs counter. // Initialize online CPUs counter.
// //
// We should probably refresh this counter during the normal execution to support cpu // We should probably refresh this counter during the normal execution to support cpu

View File

@ -0,0 +1,142 @@
// Copyright (c) Andrea Righi <andrea.righi@canonical.com>
// This software may be used and distributed according to the terms of the
// GNU General Public License version 2.
use std::alloc::{GlobalAlloc, Layout};
use std::cell::UnsafeCell;
use std::sync::{Mutex, MutexGuard};
/// scx_rustland: memory allocator.
///
/// RustLandAllocator is a very simple block-based memory allocator that relies on a pre-allocated
/// buffer and an array to manage the status of allocated and free blocks.
///
/// The purpose of this allocator is to prevent the user-space scheduler from triggering page
/// faults, which could lead to potential deadlocks under heavy system load conditions.
///
/// Despite its simplicity, this allocator exhibits reasonable speed and efficiency in meeting
/// memory requests from the user-space scheduler, particularly when dealing with small, uniformly
/// sized allocations.
// Pre-allocate an area of 64MB, with a block size of 64 bytes, that should be reasonable enough to
// handle small uniform allocations performed by the user-space scheduler without introducing too
// much fragmentation and overhead.
const ARENA_SIZE: usize = 64 * 1024 * 1024;
const BLOCK_SIZE: usize = 64;
const NUM_BLOCKS: usize = ARENA_SIZE / BLOCK_SIZE;
#[repr(C, align(4096))]
struct RustLandMemory {
// Pre-allocated buffer.
arena: UnsafeCell<[u8; ARENA_SIZE]>,
// Allocation map.
//
// Each slot represents the status of a memory block (true = allocated, false = free).
allocation_map: Mutex<[bool; NUM_BLOCKS]>,
}
unsafe impl Sync for RustLandMemory {}
// Memory pool for the allocator.
static MEMORY: RustLandMemory = RustLandMemory {
arena: UnsafeCell::new([0; ARENA_SIZE]),
allocation_map: Mutex::new([false; NUM_BLOCKS]),
};
// Main allocator class.
pub struct RustLandAllocator;
impl RustLandAllocator {
unsafe fn block_to_addr(&self, block: usize) -> *mut u8 {
MEMORY.arena.get().cast::<u8>().add(block * BLOCK_SIZE)
}
unsafe fn is_aligned(&self, block: usize, align_size: usize) -> bool {
self.block_to_addr(block) as usize & (align_size - 1) == 0
}
pub fn lock_memory(&self) {
unsafe {
// Call setrlimit to set the locked-in-memory limit to unlimited.
let new_rlimit = libc::rlimit {
rlim_cur: libc::RLIM_INFINITY,
rlim_max: libc::RLIM_INFINITY,
};
let res = libc::setrlimit(libc::RLIMIT_MEMLOCK, &new_rlimit);
if res != 0 {
panic!("setrlimit failed with error code: {}", res);
}
// Lock all memory to prevent being paged out.
let res = libc::mlockall(libc::MCL_CURRENT | libc::MCL_FUTURE);
if res != 0 {
panic!("mlockall failed with error code: {}", res);
}
};
}
}
// Override global allocator methods.
unsafe impl GlobalAlloc for RustLandAllocator {
unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
let align = layout.align();
let size = layout.size();
// Find the first sequence of free blocks that can accommodate the requested size.
let mut map_guard: MutexGuard<[bool; NUM_BLOCKS]> = MEMORY.allocation_map.lock().unwrap();
let mut contiguous_blocks = 0;
let mut start_block = None;
for (block, &is_allocated) in map_guard.iter().enumerate() {
// Reset consecutive blocks count if an allocated block is encountered or if the
// first block is not aligned to the requested alignment.
if is_allocated
|| (contiguous_blocks == 0 && !self.is_aligned(block, align))
{
contiguous_blocks = 0;
} else {
contiguous_blocks += 1;
if contiguous_blocks * BLOCK_SIZE >= size {
// Found a sequence of free blocks that can accommodate the size.
start_block = Some(block + 1 - contiguous_blocks);
break;
}
}
}
match start_block {
Some(start) => {
// Mark the corresponding blocks as allocated.
for i in start..start + contiguous_blocks {
map_guard[i] = true;
}
// Return a pointer to the aligned allocated block.
self.block_to_addr(start)
}
None => {
// No contiguous block sequence found, just panic.
//
// NOTE: we want to panic here so that we can better detect when we run out of
// memory, instead of returning a null_ptr that could potentially hide the real
// problem.
panic!("Out of memory");
}
}
}
unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) {
let size = layout.size();
// Calculate the block index from the released pointer.
let offset = ptr as usize - MEMORY.arena.get() as usize;
let start_block = offset / BLOCK_SIZE;
let end_block = (offset + size - 1) / BLOCK_SIZE + 1;
// Update the allocation map for all blocks in the released range.
let mut map_guard: MutexGuard<[bool; NUM_BLOCKS]> = MEMORY.allocation_map.lock().unwrap();
for index in start_block..end_block {
map_guard[index] = false;
}
}
}

View File

@ -205,6 +205,7 @@ struct Scheduler<'a> {
min_vruntime: u64, // Keep track of the minimum vruntime across all tasks min_vruntime: u64, // Keep track of the minimum vruntime across all tasks
slice_ns: u64, // Default time slice (in ns) slice_ns: u64, // Default time slice (in ns)
slice_boost: u64, // Slice booster slice_boost: u64, // Slice booster
init_page_faults: u64, // Initial page faults counter
} }
impl<'a> Scheduler<'a> { impl<'a> Scheduler<'a> {
@ -228,6 +229,9 @@ impl<'a> Scheduler<'a> {
// Initialize global minimum vruntime. // Initialize global minimum vruntime.
let min_vruntime: u64 = 0; let min_vruntime: u64 = 0;
// Initialize initial page fault counter.
let init_page_faults: u64 = 0;
// Return scheduler object. // Return scheduler object.
Ok(Self { Ok(Self {
bpf, bpf,
@ -236,6 +240,7 @@ impl<'a> Scheduler<'a> {
min_vruntime, min_vruntime,
slice_ns, slice_ns,
slice_boost, slice_boost,
init_page_faults,
}) })
} }
@ -479,6 +484,29 @@ impl<'a> Scheduler<'a> {
thread::yield_now(); thread::yield_now();
} }
// Get total page faults from /proc/self/stat.
fn get_page_faults() -> Result<u64, io::Error> {
let path = format!("/proc/self/stat");
let mut file = File::open(path)?;
// Read the contents of the file into a string.
let mut content = String::new();
file.read_to_string(&mut content)?;
// Parse the relevant fields and calculate the total page faults.
let fields: Vec<&str> = content.split_whitespace().collect();
if fields.len() >= 12 {
let minflt: u64 = fields[9].parse().unwrap_or(0);
let majflt: u64 = fields[11].parse().unwrap_or(0);
Ok(minflt + majflt)
} else {
Err(io::Error::new(
io::ErrorKind::InvalidData,
"Invalid format in /proc/[PID]/stat",
))
}
}
// Get the current CPU where the scheduler is running. // Get the current CPU where the scheduler is running.
fn get_current_cpu() -> io::Result<i32> { fn get_current_cpu() -> io::Result<i32> {
// Open /proc/self/stat file // Open /proc/self/stat file
@ -510,14 +538,44 @@ impl<'a> Scheduler<'a> {
} }
} }
// Print critical user-space scheduler statistics.
fn print_faults(&mut self) {
// Get counters of scheduling failures.
let nr_failed_dispatches = *self.bpf.nr_failed_dispatches_mut();
let nr_sched_congested = *self.bpf.nr_sched_congested_mut();
// Get the total amount of page faults of the user-space scheduler.
//
// NOTE:this value must remain set to 0, if the user-space scheduler is faulting we may
// experience deadlock conditions in the scheduler.
let page_faults = match Self::get_page_faults() {
Ok(page_faults) => page_faults,
Err(_) => 0,
};
if self.init_page_faults == 0 {
self.init_page_faults = page_faults;
}
let nr_page_faults = page_faults - self.init_page_faults;
// Report overall scheduler status at the end.
let status = if nr_page_faults + nr_failed_dispatches + nr_sched_congested > 0 {
"WARNING"
} else {
"OK"
};
info!(
" nr_failed_dispatches={} nr_sched_congested={} nr_page_faults={} [{}]",
nr_failed_dispatches, nr_sched_congested, nr_page_faults, status
);
}
// Print internal scheduler statistics (fetched from the BPF part). // Print internal scheduler statistics (fetched from the BPF part).
fn print_stats(&mut self) { fn print_stats(&mut self) {
// Show minimum vruntime (this should be constantly incrementing). // Show minimum vruntime (this should be constantly incrementing).
info!( info!("vruntime={}", self.min_vruntime);
"vruntime={} tasks={}",
self.min_vruntime, // Show the total amount of tasks currently monitored by the scheduler.
self.task_map.tasks.len() info!(" tasks={}", self.task_map.tasks.len());
);
// Show general statistics. // Show general statistics.
let nr_user_dispatches = *self.bpf.nr_user_dispatches_mut(); let nr_user_dispatches = *self.bpf.nr_user_dispatches_mut();
@ -527,14 +585,6 @@ impl<'a> Scheduler<'a> {
nr_user_dispatches, nr_kernel_dispatches nr_user_dispatches, nr_kernel_dispatches
); );
// Show failure statistics.
let nr_failed_dispatches = *self.bpf.nr_failed_dispatches_mut();
let nr_sched_congested = *self.bpf.nr_sched_congested_mut();
info!(
" nr_failed_dispatches={} nr_sched_congested={}",
nr_failed_dispatches, nr_sched_congested
);
// Show tasks that are waiting to be dispatched. // Show tasks that are waiting to be dispatched.
let nr_queued = *self.bpf.nr_queued_mut(); let nr_queued = *self.bpf.nr_queued_mut();
let nr_scheduled = *self.bpf.nr_scheduled_mut(); let nr_scheduled = *self.bpf.nr_scheduled_mut();
@ -544,6 +594,9 @@ impl<'a> Scheduler<'a> {
nr_waiting, nr_queued, nr_scheduled nr_waiting, nr_queued, nr_scheduled
); );
// Show total page faults of the user-space scheduler.
self.print_faults();
// Show current used time slice. // Show current used time slice.
info!("time slice = {} us", self.bpf.get_effective_slice_us()); info!("time slice = {} us", self.bpf.get_effective_slice_us());