diff --git a/scheds/rust/scx_rustland/src/bpf.rs b/scheds/rust/scx_rustland/src/bpf.rs index 409f3d2..8a05fa1 100644 --- a/scheds/rust/scx_rustland/src/bpf.rs +++ b/scheds/rust/scx_rustland/src/bpf.rs @@ -19,6 +19,9 @@ use libbpf_rs::skel::SkelBuilder as _; use libc::{sched_param, sched_setscheduler}; +mod alloc; +use alloc::*; + // Defined in UAPI const SCHED_EXT: i32 = 7; @@ -122,6 +125,18 @@ const SCHED_EXT: i32 = 7; /// } /// +// Override default memory allocator. +// +// To prevent potential deadlock conditions under heavy loads, any scheduler that delegates +// scheduling decisions to user-space should avoid triggering page faults. +// +// To address this issue, replace the global allocator with a custom one (RustLandAllocator), +// designed to operate on a pre-allocated buffer. This, coupled with the memory locking achieved +// through mlockall(), prevents page faults from occurring during the execution of the user-space +// scheduler. +#[global_allocator] +static ALLOCATOR: RustLandAllocator = RustLandAllocator; + // Task queued for scheduling from the BPF component (see bpf_intf::queued_task_ctx). #[derive(Debug)] pub struct QueuedTask { @@ -204,6 +219,10 @@ impl<'a> BpfScheduler<'a> { let skel_builder = BpfSkelBuilder::default(); let mut skel = skel_builder.open().context("Failed to open BPF program")?; + // Lock all the memory to prevent page faults that could trigger potential deadlocks during + // scheduling. + ALLOCATOR.lock_memory(); + // Initialize online CPUs counter. // // We should probably refresh this counter during the normal execution to support cpu diff --git a/scheds/rust/scx_rustland/src/bpf/alloc.rs b/scheds/rust/scx_rustland/src/bpf/alloc.rs new file mode 100644 index 0000000..5b33293 --- /dev/null +++ b/scheds/rust/scx_rustland/src/bpf/alloc.rs @@ -0,0 +1,142 @@ +// Copyright (c) Andrea Righi + +// This software may be used and distributed according to the terms of the +// GNU General Public License version 2. + +use std::alloc::{GlobalAlloc, Layout}; +use std::cell::UnsafeCell; +use std::sync::{Mutex, MutexGuard}; + +/// scx_rustland: memory allocator. +/// +/// RustLandAllocator is a very simple block-based memory allocator that relies on a pre-allocated +/// buffer and an array to manage the status of allocated and free blocks. +/// +/// The purpose of this allocator is to prevent the user-space scheduler from triggering page +/// faults, which could lead to potential deadlocks under heavy system load conditions. +/// +/// Despite its simplicity, this allocator exhibits reasonable speed and efficiency in meeting +/// memory requests from the user-space scheduler, particularly when dealing with small, uniformly +/// sized allocations. + +// Pre-allocate an area of 64MB, with a block size of 64 bytes, that should be reasonable enough to +// handle small uniform allocations performed by the user-space scheduler without introducing too +// much fragmentation and overhead. +const ARENA_SIZE: usize = 64 * 1024 * 1024; +const BLOCK_SIZE: usize = 64; +const NUM_BLOCKS: usize = ARENA_SIZE / BLOCK_SIZE; + +#[repr(C, align(4096))] +struct RustLandMemory { + // Pre-allocated buffer. + arena: UnsafeCell<[u8; ARENA_SIZE]>, + // Allocation map. + // + // Each slot represents the status of a memory block (true = allocated, false = free). + allocation_map: Mutex<[bool; NUM_BLOCKS]>, +} + +unsafe impl Sync for RustLandMemory {} + +// Memory pool for the allocator. +static MEMORY: RustLandMemory = RustLandMemory { + arena: UnsafeCell::new([0; ARENA_SIZE]), + allocation_map: Mutex::new([false; NUM_BLOCKS]), +}; + +// Main allocator class. +pub struct RustLandAllocator; + +impl RustLandAllocator { + unsafe fn block_to_addr(&self, block: usize) -> *mut u8 { + MEMORY.arena.get().cast::().add(block * BLOCK_SIZE) + } + + unsafe fn is_aligned(&self, block: usize, align_size: usize) -> bool { + self.block_to_addr(block) as usize & (align_size - 1) == 0 + } + + pub fn lock_memory(&self) { + unsafe { + // Call setrlimit to set the locked-in-memory limit to unlimited. + let new_rlimit = libc::rlimit { + rlim_cur: libc::RLIM_INFINITY, + rlim_max: libc::RLIM_INFINITY, + }; + let res = libc::setrlimit(libc::RLIMIT_MEMLOCK, &new_rlimit); + if res != 0 { + panic!("setrlimit failed with error code: {}", res); + } + + // Lock all memory to prevent being paged out. + let res = libc::mlockall(libc::MCL_CURRENT | libc::MCL_FUTURE); + if res != 0 { + panic!("mlockall failed with error code: {}", res); + } + }; + } +} + +// Override global allocator methods. +unsafe impl GlobalAlloc for RustLandAllocator { + unsafe fn alloc(&self, layout: Layout) -> *mut u8 { + let align = layout.align(); + let size = layout.size(); + + // Find the first sequence of free blocks that can accommodate the requested size. + let mut map_guard: MutexGuard<[bool; NUM_BLOCKS]> = MEMORY.allocation_map.lock().unwrap(); + let mut contiguous_blocks = 0; + let mut start_block = None; + + for (block, &is_allocated) in map_guard.iter().enumerate() { + // Reset consecutive blocks count if an allocated block is encountered or if the + // first block is not aligned to the requested alignment. + if is_allocated + || (contiguous_blocks == 0 && !self.is_aligned(block, align)) + { + contiguous_blocks = 0; + } else { + contiguous_blocks += 1; + if contiguous_blocks * BLOCK_SIZE >= size { + // Found a sequence of free blocks that can accommodate the size. + start_block = Some(block + 1 - contiguous_blocks); + break; + } + } + } + + match start_block { + Some(start) => { + // Mark the corresponding blocks as allocated. + for i in start..start + contiguous_blocks { + map_guard[i] = true; + } + // Return a pointer to the aligned allocated block. + self.block_to_addr(start) + } + None => { + // No contiguous block sequence found, just panic. + // + // NOTE: we want to panic here so that we can better detect when we run out of + // memory, instead of returning a null_ptr that could potentially hide the real + // problem. + panic!("Out of memory"); + } + } + } + + unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) { + let size = layout.size(); + + // Calculate the block index from the released pointer. + let offset = ptr as usize - MEMORY.arena.get() as usize; + let start_block = offset / BLOCK_SIZE; + let end_block = (offset + size - 1) / BLOCK_SIZE + 1; + + // Update the allocation map for all blocks in the released range. + let mut map_guard: MutexGuard<[bool; NUM_BLOCKS]> = MEMORY.allocation_map.lock().unwrap(); + for index in start_block..end_block { + map_guard[index] = false; + } + } +} diff --git a/scheds/rust/scx_rustland/src/main.rs b/scheds/rust/scx_rustland/src/main.rs index 6d0a185..c895a00 100644 --- a/scheds/rust/scx_rustland/src/main.rs +++ b/scheds/rust/scx_rustland/src/main.rs @@ -205,6 +205,7 @@ struct Scheduler<'a> { min_vruntime: u64, // Keep track of the minimum vruntime across all tasks slice_ns: u64, // Default time slice (in ns) slice_boost: u64, // Slice booster + init_page_faults: u64, // Initial page faults counter } impl<'a> Scheduler<'a> { @@ -228,6 +229,9 @@ impl<'a> Scheduler<'a> { // Initialize global minimum vruntime. let min_vruntime: u64 = 0; + // Initialize initial page fault counter. + let init_page_faults: u64 = 0; + // Return scheduler object. Ok(Self { bpf, @@ -236,6 +240,7 @@ impl<'a> Scheduler<'a> { min_vruntime, slice_ns, slice_boost, + init_page_faults, }) } @@ -479,6 +484,29 @@ impl<'a> Scheduler<'a> { thread::yield_now(); } + // Get total page faults from /proc/self/stat. + fn get_page_faults() -> Result { + let path = format!("/proc/self/stat"); + let mut file = File::open(path)?; + + // Read the contents of the file into a string. + let mut content = String::new(); + file.read_to_string(&mut content)?; + + // Parse the relevant fields and calculate the total page faults. + let fields: Vec<&str> = content.split_whitespace().collect(); + if fields.len() >= 12 { + let minflt: u64 = fields[9].parse().unwrap_or(0); + let majflt: u64 = fields[11].parse().unwrap_or(0); + Ok(minflt + majflt) + } else { + Err(io::Error::new( + io::ErrorKind::InvalidData, + "Invalid format in /proc/[PID]/stat", + )) + } + } + // Get the current CPU where the scheduler is running. fn get_current_cpu() -> io::Result { // Open /proc/self/stat file @@ -510,14 +538,44 @@ impl<'a> Scheduler<'a> { } } + // Print critical user-space scheduler statistics. + fn print_faults(&mut self) { + // Get counters of scheduling failures. + let nr_failed_dispatches = *self.bpf.nr_failed_dispatches_mut(); + let nr_sched_congested = *self.bpf.nr_sched_congested_mut(); + + // Get the total amount of page faults of the user-space scheduler. + // + // NOTE:this value must remain set to 0, if the user-space scheduler is faulting we may + // experience deadlock conditions in the scheduler. + let page_faults = match Self::get_page_faults() { + Ok(page_faults) => page_faults, + Err(_) => 0, + }; + if self.init_page_faults == 0 { + self.init_page_faults = page_faults; + } + let nr_page_faults = page_faults - self.init_page_faults; + + // Report overall scheduler status at the end. + let status = if nr_page_faults + nr_failed_dispatches + nr_sched_congested > 0 { + "WARNING" + } else { + "OK" + }; + info!( + " nr_failed_dispatches={} nr_sched_congested={} nr_page_faults={} [{}]", + nr_failed_dispatches, nr_sched_congested, nr_page_faults, status + ); + } + // Print internal scheduler statistics (fetched from the BPF part). fn print_stats(&mut self) { // Show minimum vruntime (this should be constantly incrementing). - info!( - "vruntime={} tasks={}", - self.min_vruntime, - self.task_map.tasks.len() - ); + info!("vruntime={}", self.min_vruntime); + + // Show the total amount of tasks currently monitored by the scheduler. + info!(" tasks={}", self.task_map.tasks.len()); // Show general statistics. let nr_user_dispatches = *self.bpf.nr_user_dispatches_mut(); @@ -527,14 +585,6 @@ impl<'a> Scheduler<'a> { nr_user_dispatches, nr_kernel_dispatches ); - // Show failure statistics. - let nr_failed_dispatches = *self.bpf.nr_failed_dispatches_mut(); - let nr_sched_congested = *self.bpf.nr_sched_congested_mut(); - info!( - " nr_failed_dispatches={} nr_sched_congested={}", - nr_failed_dispatches, nr_sched_congested - ); - // Show tasks that are waiting to be dispatched. let nr_queued = *self.bpf.nr_queued_mut(); let nr_scheduled = *self.bpf.nr_scheduled_mut(); @@ -544,6 +594,9 @@ impl<'a> Scheduler<'a> { nr_waiting, nr_queued, nr_scheduled ); + // Show total page faults of the user-space scheduler. + self.print_faults(); + // Show current used time slice. info!("time slice = {} us", self.bpf.get_effective_slice_us());