Compare commits

...

4 Commits

Author SHA1 Message Date
ppw
1f1cc7d533
Merge c7faf70a26 into a7fcda82cc 2024-11-18 10:22:52 +01:00
Andrea Righi
a7fcda82cc
Merge pull request #924 from sched-ext/scx-fair
scheds: introduce scx_flash
2024-11-18 08:21:36 +00:00
Andrea Righi
5b4b6df5e4
Merge branch 'main' into scx-fair 2024-11-18 07:42:09 +01:00
Andrea Righi
678b10133d scheds: introduce scx_flash
Introduce scx_flash (Fair Latency-Aware ScHeduler), a scheduler that
focuses on ensuring fairness among tasks and performance predictability.

This scheduler is introduced as a replacement of the "lowlatency" mode
in scx_bpfland, that has been dropped in commit 78101e4 ("scx_bpfland:
drop lowlatency mode and the priority DSQ").

scx_flash operates based on an EDF (Earliest Deadline First) policy,
where each task is assigned a latency weight. This weight is adjusted
dynamically, influenced by the task's static weight and how often it
releases the CPU before its full assigned time slice is used: tasks that
release the CPU early receive a higher latency weight, granting them
a higher priority over tasks that fully use their time slice.

The combination of dynamic latency weights and EDF scheduling ensures
responsive and stable performance, even in overcommitted systems, making
the scheduler particularly well-suited for latency-sensitive workloads,
such as multimedia or real-time audio processing.

Tested-by: Peter Jung <ptr1337@cachyos.org>
Tested-by: Piotr Gorski <piotrgorski@cachyos.org>
Signed-off-by: Andrea Righi <arighi@nvidia.com>
2024-11-16 14:49:25 +01:00
17 changed files with 1665 additions and 2 deletions

17
Cargo.lock generated
View File

@ -1700,6 +1700,23 @@ dependencies = [
"simplelog",
]
[[package]]
name = "scx_flash"
version = "1.0.4"
dependencies = [
"anyhow",
"clap",
"crossbeam",
"ctrlc",
"libbpf-rs",
"log",
"scx_stats",
"scx_stats_derive",
"scx_utils",
"serde",
"simplelog",
]
[[package]]
name = "scx_lavd"
version = "1.0.6"

View File

@ -6,6 +6,7 @@ members = ["rust/scx_stats",
"rust/scx_loader",
"scheds/rust/scx_lavd",
"scheds/rust/scx_bpfland",
"scheds/rust/scx_flash",
"scheds/rust/scx_rustland",
"scheds/rust/scx_rlfifo",
"scheds/rust/scx_rusty",

View File

@ -22,6 +22,12 @@ sched_args: -v
stress_cmd: stress-ng -t 14 --aggressive -M -c `nproc` -f `nproc`
timeout_sec: 15
[scx_flash]
sched: scx_flash
sched_args:
stress_cmd: stress-ng -t 14 --aggressive -M -c `nproc` -f `nproc`
timeout_sec: 15
[scx_layered]
sched: scx_layered
sched_args: --run-example -v --stats 1

View File

@ -327,7 +327,7 @@ if enable_rust
run_target('fetch', command: [cargo_fetch, cargo], env: cargo_env)
rust_scheds = ['scx_lavd', 'scx_bpfland', 'scx_rustland', 'scx_rlfifo',
'scx_rusty',
'scx_flash', 'scx_rusty',
'scx_layered', 'scx_mitosis']
rust_misc = ['scx_stats', 'scx_stats_derive', 'scx_utils',
'scx_rustland_core',

View File

@ -18,3 +18,4 @@ main.rs or \*.bpf.c files.
- [scx_rlfifo](scx_rlfifo/README.md)
- [scx_lavd](scx_lavd/README.md)
- [scx_bpfland](scx_bpfland/README.md)
- [scx_flash](scx_flash/README.md)

View File

@ -0,0 +1,26 @@
[package]
name = "scx_flash"
version = "1.0.4"
authors = ["Andrea Righi <arighi@nvidia.com>"]
edition = "2021"
description = "A scheduler designed for multimedia and real-time audio processing workloads. https://github.com/sched-ext/scx/tree/main"
license = "GPL-2.0-only"
[dependencies]
anyhow = "1.0.65"
ctrlc = { version = "3.1", features = ["termination"] }
clap = { version = "4.1", features = ["derive", "env", "unicode", "wrap_help"] }
crossbeam = "0.8.4"
libbpf-rs = "0.24.1"
log = "0.4.17"
scx_stats = { path = "../../../rust/scx_stats", version = "1.0.4" }
scx_stats_derive = { path = "../../../rust/scx_stats/scx_stats_derive", version = "1.0.4" }
scx_utils = { path = "../../../rust/scx_utils", version = "1.0.4" }
serde = { version = "1.0", features = ["derive"] }
simplelog = "0.12"
[build-dependencies]
scx_utils = { path = "../../../rust/scx_utils", version = "1.0.4" }
[features]
enable_backtrace = []

View File

@ -0,0 +1 @@
../../../LICENSE

View File

@ -0,0 +1,26 @@
# scx_flash
This is a single user-defined scheduler used within [sched_ext](https://github.com/sched-ext/scx/tree/main), which is a Linux kernel feature which enables implementing kernel thread schedulers in BPF and dynamically loading them. [Read more about sched_ext](https://github.com/sched-ext/scx/tree/main).
## Overview
A scheduler that focuses on ensuring fairness among tasks and performance
predictability.
It operates using an earliest deadline first (EDF) policy, where each task is
assigned a "latency" weight. This weight is dynamically adjusted based on how
often a task release the CPU before its full time slice is used. Tasks that
release the CPU early are given a higher latency weight, prioritizing them over
tasks that fully consume their time slice.
## Typical Use Case
The combination of dynamic latency weights and EDF scheduling ensures
responsive and consistent performance, even in overcommitted systems.
This makes the scheduler particularly well-suited for latency-sensitive
workloads, such as multimedia or real-time audio processing.
## Production Ready?
Yes.

View File

@ -0,0 +1,13 @@
// Copyright (c) Andrea Righi <arighi@nvidia.com>
//
// This software may be used and distributed according to the terms of the
// GNU General Public License version 2.
fn main() {
scx_utils::BpfBuilder::new()
.unwrap()
.enable_intf("src/bpf/intf.h", "bpf_intf.rs")
.enable_skel("src/bpf/main.bpf.c", "bpf")
.build()
.unwrap();
}

View File

@ -0,0 +1,8 @@
# Get help on options with `rustfmt --help=config`
# Please keep these in alphabetical order.
edition = "2021"
group_imports = "StdExternalCrate"
imports_granularity = "Item"
merge_derives = false
use_field_init_shorthand = true
version = "Two"

View File

@ -0,0 +1,43 @@
/* SPDX-License-Identifier: GPL-2.0 */
/*
* Copyright (c) 2024 Andrea Righi <arighi@nvidia.com>
*
* This software may be used and distributed according to the terms of the GNU
* General Public License version 2.
*/
#ifndef __INTF_H
#define __INTF_H
#include <limits.h>
#define MAX(x, y) ((x) > (y) ? (x) : (y))
#define MIN(x, y) ((x) < (y) ? (x) : (y))
#define CLAMP(val, lo, hi) MIN(MAX(val, lo), hi)
#define ARRAY_SIZE(x) (sizeof(x) / sizeof((x)[0]))
enum consts {
NSEC_PER_USEC = 1000ULL,
NSEC_PER_MSEC = (1000ULL * NSEC_PER_USEC),
NSEC_PER_SEC = (1000ULL * NSEC_PER_MSEC),
};
#ifndef __VMLINUX_H__
typedef unsigned char u8;
typedef unsigned short u16;
typedef unsigned int u32;
typedef unsigned long u64;
typedef signed char s8;
typedef signed short s16;
typedef signed int s32;
typedef signed long s64;
typedef int pid_t;
#endif /* __VMLINUX_H__ */
struct domain_arg {
s32 cpu_id;
s32 sibling_cpu_id;
};
#endif /* __INTF_H */

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,12 @@
// SPDX-License-Identifier: GPL-2.0
//
// Copyright (c) 2024 Andrea Righi <arighi@nvidia.com>
// This software may be used and distributed according to the terms of the
// GNU General Public License version 2.
#![allow(non_upper_case_globals)]
#![allow(non_camel_case_types)]
#![allow(non_snake_case)]
#![allow(dead_code)]
include!(concat!(env!("OUT_DIR"), "/bpf_intf.rs"));

View File

@ -0,0 +1,8 @@
// SPDX-License-Identifier: GPL-2.0
//
// Copyright (c) 2024 Andrea Righi <arighi@nvidia.com>
// This software may be used and distributed according to the terms of the
// GNU General Public License version 2.
include!(concat!(env!("OUT_DIR"), "/bpf_skel.rs"));

View File

@ -0,0 +1,334 @@
// SPDX-License-Identifier: GPL-2.0
//
// Copyright (c) 2024 Andrea Righi <arighi@nvidia.com>
// This software may be used and distributed according to the terms of the
// GNU General Public License version 2.
mod bpf_skel;
pub use bpf_skel::*;
pub mod bpf_intf;
pub use bpf_intf::*;
mod stats;
use std::collections::HashMap;
use std::ffi::c_int;
use std::fs::File;
use std::io::Read;
use std::mem::MaybeUninit;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;
use anyhow::bail;
use anyhow::Context;
use anyhow::Result;
use clap::Parser;
use crossbeam::channel::RecvTimeoutError;
use libbpf_rs::skel::OpenSkel;
use libbpf_rs::skel::Skel;
use libbpf_rs::skel::SkelBuilder;
use libbpf_rs::OpenObject;
use libbpf_rs::ProgramInput;
use log::info;
use log::warn;
use scx_stats::prelude::*;
use scx_utils::build_id;
use scx_utils::import_enums;
use scx_utils::scx_enums;
use scx_utils::scx_ops_attach;
use scx_utils::scx_ops_load;
use scx_utils::scx_ops_open;
use scx_utils::set_rlimit_infinity;
use scx_utils::uei_exited;
use scx_utils::uei_report;
use scx_utils::Topology;
use scx_utils::UserExitInfo;
use stats::Metrics;
const SCHEDULER_NAME: &'static str = "scx_flash";
#[derive(Debug, Parser)]
struct Opts {
/// Exit debug dump buffer length. 0 indicates default.
#[clap(long, default_value = "0")]
exit_dump_len: u32,
/// Maximum scheduling slice duration in microseconds.
#[clap(short = 's', long, default_value = "20000")]
slice_us_max: u64,
/// Maximum time slice lag in microseconds.
///
/// Increasing this value can help to enhance the responsiveness of interactive tasks, but it
/// can also make performance more "spikey".
#[clap(short = 'l', long, default_value = "20000")]
slice_us_lag: u64,
/// Enable kthreads prioritization.
///
/// Enabling this can improve system performance, but it may also introduce interactivity
/// issues or unfairness in scenarios with high kthread activity, such as heavy I/O or network
/// traffic.
#[clap(short = 'k', long, action = clap::ArgAction::SetTrue)]
local_kthreads: bool,
/// Enable stats monitoring with the specified interval.
#[clap(long)]
stats: Option<f64>,
/// Run in stats monitoring mode with the specified interval. Scheduler
/// is not launched.
#[clap(long)]
monitor: Option<f64>,
/// Enable verbose output, including libbpf details.
#[clap(short = 'v', long, action = clap::ArgAction::SetTrue)]
verbose: bool,
/// Print scheduler version and exit.
#[clap(short = 'V', long, action = clap::ArgAction::SetTrue)]
version: bool,
/// Show descriptions for statistics.
#[clap(long)]
help_stats: bool,
}
fn is_smt_active() -> std::io::Result<i32> {
let mut file = File::open("/sys/devices/system/cpu/smt/active")?;
let mut contents = String::new();
file.read_to_string(&mut contents)?;
let smt_active: i32 = contents.trim().parse().unwrap_or(0);
Ok(smt_active)
}
struct Scheduler<'a> {
skel: BpfSkel<'a>,
struct_ops: Option<libbpf_rs::Link>,
stats_server: StatsServer<(), Metrics>,
}
impl<'a> Scheduler<'a> {
fn init(opts: &'a Opts, open_object: &'a mut MaybeUninit<OpenObject>) -> Result<Self> {
set_rlimit_infinity();
// Check host topology to determine if we need to enable SMT capabilities.
let smt_enabled = match is_smt_active() {
Ok(value) => value == 1,
Err(e) => bail!("Failed to read SMT status: {}", e),
};
info!(
"{} {} {}",
SCHEDULER_NAME,
*build_id::SCX_FULL_VERSION,
if smt_enabled { "SMT on" } else { "SMT off" }
);
// Initialize BPF connector.
let mut skel_builder = BpfSkelBuilder::default();
skel_builder.obj_builder.debug(opts.verbose);
let mut skel = scx_ops_open!(skel_builder, open_object, flash_ops)?;
skel.struct_ops.flash_ops_mut().exit_dump_len = opts.exit_dump_len;
// Override default BPF scheduling parameters.
skel.maps.rodata_data.slice_max = opts.slice_us_max * 1000;
skel.maps.rodata_data.slice_lag = opts.slice_us_lag * 1000;
skel.maps.rodata_data.local_kthreads = opts.local_kthreads;
skel.maps.rodata_data.smt_enabled = smt_enabled;
// Load the BPF program for validation.
let mut skel = scx_ops_load!(skel, flash_ops, uei)?;
// Initialize CPU topology.
let topo = Topology::new().unwrap();
// Initialize LLC domain.
Self::init_l3_cache_domains(&mut skel, &topo)?;
// Attach the scheduler.
let struct_ops = Some(scx_ops_attach!(skel, flash_ops)?);
let stats_server = StatsServer::new(stats::server_data()).launch()?;
Ok(Self {
skel,
struct_ops,
stats_server,
})
}
fn enable_sibling_cpu(
skel: &mut BpfSkel<'_>,
cpu: usize,
sibling_cpu: usize,
) -> Result<(), u32> {
let prog = &mut skel.progs.enable_sibling_cpu;
let mut args = domain_arg {
cpu_id: cpu as c_int,
sibling_cpu_id: sibling_cpu as c_int,
};
let input = ProgramInput {
context_in: Some(unsafe {
std::slice::from_raw_parts_mut(
&mut args as *mut _ as *mut u8,
std::mem::size_of_val(&args),
)
}),
..Default::default()
};
let out = prog.test_run(input).unwrap();
if out.return_value != 0 {
return Err(out.return_value);
}
Ok(())
}
fn init_cache_domains(
skel: &mut BpfSkel<'_>,
topo: &Topology,
cache_lvl: usize,
enable_sibling_cpu_fn: &dyn Fn(&mut BpfSkel<'_>, usize, usize, usize) -> Result<(), u32>,
) -> Result<(), std::io::Error> {
// Determine the list of CPU IDs associated to each cache node.
let mut cache_id_map: HashMap<usize, Vec<usize>> = HashMap::new();
for core in topo.cores().into_iter() {
for (cpu_id, cpu) in core.cpus() {
let cache_id = match cache_lvl {
2 => cpu.l2_id(),
3 => cpu.l3_id(),
_ => panic!("invalid cache level {}", cache_lvl),
};
cache_id_map
.entry(cache_id)
.or_insert_with(Vec::new)
.push(*cpu_id);
}
}
// Update the BPF cpumasks for the cache domains.
for (cache_id, cpus) in cache_id_map {
info!(
"L{} cache ID {}: sibling CPUs: {:?}",
cache_lvl, cache_id, cpus
);
for cpu in &cpus {
for sibling_cpu in &cpus {
match enable_sibling_cpu_fn(skel, cache_lvl, *cpu, *sibling_cpu) {
Ok(()) => {}
Err(_) => {
warn!(
"L{} cache ID {}: failed to set CPU {} sibling {}",
cache_lvl, cache_id, *cpu, *sibling_cpu
);
}
}
}
}
}
Ok(())
}
fn init_l3_cache_domains(
skel: &mut BpfSkel<'_>,
topo: &Topology,
) -> Result<(), std::io::Error> {
Self::init_cache_domains(skel, topo, 3, &|skel, _lvl, cpu, sibling_cpu| {
Self::enable_sibling_cpu(skel, cpu, sibling_cpu)
})
}
fn get_metrics(&self) -> Metrics {
Metrics {
nr_kthread_dispatches: self.skel.maps.bss_data.nr_kthread_dispatches,
nr_direct_dispatches: self.skel.maps.bss_data.nr_direct_dispatches,
nr_shared_dispatches: self.skel.maps.bss_data.nr_shared_dispatches,
}
}
pub fn exited(&mut self) -> bool {
uei_exited!(&self.skel, uei)
}
fn run(&mut self, shutdown: Arc<AtomicBool>) -> Result<UserExitInfo> {
let (res_ch, req_ch) = self.stats_server.channels();
while !shutdown.load(Ordering::Relaxed) && !self.exited() {
match req_ch.recv_timeout(Duration::from_secs(1)) {
Ok(()) => res_ch.send(self.get_metrics())?,
Err(RecvTimeoutError::Timeout) => {}
Err(e) => Err(e)?,
}
}
self.struct_ops.take();
uei_report!(&self.skel, uei)
}
}
impl<'a> Drop for Scheduler<'a> {
fn drop(&mut self) {
info!("Unregister {} scheduler", SCHEDULER_NAME);
}
}
fn main() -> Result<()> {
let opts = Opts::parse();
if opts.version {
println!("{} {}", SCHEDULER_NAME, *build_id::SCX_FULL_VERSION);
return Ok(());
}
if opts.help_stats {
stats::server_data().describe_meta(&mut std::io::stdout(), None)?;
return Ok(());
}
let loglevel = simplelog::LevelFilter::Info;
let mut lcfg = simplelog::ConfigBuilder::new();
lcfg.set_time_level(simplelog::LevelFilter::Error)
.set_location_level(simplelog::LevelFilter::Off)
.set_target_level(simplelog::LevelFilter::Off)
.set_thread_level(simplelog::LevelFilter::Off);
simplelog::TermLogger::init(
loglevel,
lcfg.build(),
simplelog::TerminalMode::Stderr,
simplelog::ColorChoice::Auto,
)?;
let shutdown = Arc::new(AtomicBool::new(false));
let shutdown_clone = shutdown.clone();
ctrlc::set_handler(move || {
shutdown_clone.store(true, Ordering::Relaxed);
})
.context("Error setting Ctrl-C handler")?;
if let Some(intv) = opts.monitor.or(opts.stats) {
let shutdown_copy = shutdown.clone();
let jh = std::thread::spawn(move || {
stats::monitor(Duration::from_secs_f64(intv), shutdown_copy).unwrap()
});
if opts.monitor.is_some() {
let _ = jh.join();
return Ok(());
}
}
let mut open_object = MaybeUninit::uninit();
loop {
let mut sched = Scheduler::init(&opts, &mut open_object)?;
if !sched.run(shutdown.clone())?.should_restart() {
break;
}
}
Ok(())
}

View File

@ -0,0 +1,82 @@
// SPDX-License-Identifier: GPL-2.0
//
// Copyright (c) 2024 Andrea Righi <arighi@nvidia.com>
// This software may be used and distributed according to the terms of the
// GNU General Public License version 2.
use std::io::Write;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;
use anyhow::Result;
use scx_stats::prelude::*;
use scx_stats_derive::Stats;
use serde::Deserialize;
use serde::Serialize;
#[derive(Clone, Debug, Default, Serialize, Deserialize, Stats)]
#[stat(top)]
pub struct Metrics {
#[stat(desc = "Number of kthread direct dispatches")]
pub nr_kthread_dispatches: u64,
#[stat(desc = "Number of task direct dispatches")]
pub nr_direct_dispatches: u64,
#[stat(desc = "Number of task global dispatches")]
pub nr_shared_dispatches: u64,
}
impl Metrics {
fn format<W: Write>(&self, w: &mut W) -> Result<()> {
writeln!(
w,
"[{}] dispatch -> kthread: {:<5} direct: {:<5} shared: {:<5}",
crate::SCHEDULER_NAME,
self.nr_kthread_dispatches,
self.nr_direct_dispatches,
self.nr_shared_dispatches,
)?;
Ok(())
}
fn delta(&self, rhs: &Self) -> Self {
Self {
nr_kthread_dispatches: self.nr_kthread_dispatches - rhs.nr_kthread_dispatches,
nr_direct_dispatches: self.nr_direct_dispatches - rhs.nr_direct_dispatches,
nr_shared_dispatches: self.nr_shared_dispatches - rhs.nr_shared_dispatches,
..self.clone()
}
}
}
pub fn server_data() -> StatsServerData<(), Metrics> {
let open: Box<dyn StatsOpener<(), Metrics>> = Box::new(move |(req_ch, res_ch)| {
req_ch.send(())?;
let mut prev = res_ch.recv()?;
let read: Box<dyn StatsReader<(), Metrics>> = Box::new(move |_args, (req_ch, res_ch)| {
req_ch.send(())?;
let cur = res_ch.recv()?;
let delta = cur.delta(&prev);
prev = cur;
delta.to_json()
});
Ok(read)
});
StatsServerData::new()
.add_meta(Metrics::meta())
.add_ops("top", StatsOps { open, close: None })
}
pub fn monitor(intv: Duration, shutdown: Arc<AtomicBool>) -> Result<()> {
scx_utils::monitor_stats::<Metrics>(
&vec![],
intv,
|| shutdown.load(Ordering::Relaxed),
|metrics| metrics.format(&mut std::io::stdout()),
)
}

View File

@ -1,4 +1,4 @@
# List of scx_schedulers: scx_bpfland scx_central scx_lavd scx_layered scx_nest scx_qmap scx_rlfifo scx_rustland scx_rusty scx_simple scx_userland
# List of scx_schedulers: scx_bpfland scx_central scx_flash scx_lavd scx_layered scx_nest scx_qmap scx_rlfifo scx_rustland scx_rusty scx_simple scx_userland
SCX_SCHEDULER=scx_bpfland
# Set custom flags for each scheduler, below is an example of how to use