Merge pull request #924 from sched-ext/scx-fair

scheds: introduce scx_flash
This commit is contained in:
Andrea Righi 2024-11-18 08:21:36 +00:00 committed by GitHub
commit a7fcda82cc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
17 changed files with 1665 additions and 2 deletions

17
Cargo.lock generated
View File

@ -1700,6 +1700,23 @@ dependencies = [
"simplelog", "simplelog",
] ]
[[package]]
name = "scx_flash"
version = "1.0.4"
dependencies = [
"anyhow",
"clap",
"crossbeam",
"ctrlc",
"libbpf-rs",
"log",
"scx_stats",
"scx_stats_derive",
"scx_utils",
"serde",
"simplelog",
]
[[package]] [[package]]
name = "scx_lavd" name = "scx_lavd"
version = "1.0.6" version = "1.0.6"

View File

@ -6,6 +6,7 @@ members = ["rust/scx_stats",
"rust/scx_loader", "rust/scx_loader",
"scheds/rust/scx_lavd", "scheds/rust/scx_lavd",
"scheds/rust/scx_bpfland", "scheds/rust/scx_bpfland",
"scheds/rust/scx_flash",
"scheds/rust/scx_rustland", "scheds/rust/scx_rustland",
"scheds/rust/scx_rlfifo", "scheds/rust/scx_rlfifo",
"scheds/rust/scx_rusty", "scheds/rust/scx_rusty",

View File

@ -22,6 +22,12 @@ sched_args: -v
stress_cmd: stress-ng -t 14 --aggressive -M -c `nproc` -f `nproc` stress_cmd: stress-ng -t 14 --aggressive -M -c `nproc` -f `nproc`
timeout_sec: 15 timeout_sec: 15
[scx_flash]
sched: scx_flash
sched_args:
stress_cmd: stress-ng -t 14 --aggressive -M -c `nproc` -f `nproc`
timeout_sec: 15
[scx_layered] [scx_layered]
sched: scx_layered sched: scx_layered
sched_args: --run-example -v --stats 1 sched_args: --run-example -v --stats 1

View File

@ -327,7 +327,7 @@ if enable_rust
run_target('fetch', command: [cargo_fetch, cargo], env: cargo_env) run_target('fetch', command: [cargo_fetch, cargo], env: cargo_env)
rust_scheds = ['scx_lavd', 'scx_bpfland', 'scx_rustland', 'scx_rlfifo', rust_scheds = ['scx_lavd', 'scx_bpfland', 'scx_rustland', 'scx_rlfifo',
'scx_rusty', 'scx_flash', 'scx_rusty',
'scx_layered', 'scx_mitosis'] 'scx_layered', 'scx_mitosis']
rust_misc = ['scx_stats', 'scx_stats_derive', 'scx_utils', rust_misc = ['scx_stats', 'scx_stats_derive', 'scx_utils',
'scx_rustland_core', 'scx_rustland_core',

View File

@ -18,3 +18,4 @@ main.rs or \*.bpf.c files.
- [scx_rlfifo](scx_rlfifo/README.md) - [scx_rlfifo](scx_rlfifo/README.md)
- [scx_lavd](scx_lavd/README.md) - [scx_lavd](scx_lavd/README.md)
- [scx_bpfland](scx_bpfland/README.md) - [scx_bpfland](scx_bpfland/README.md)
- [scx_flash](scx_flash/README.md)

View File

@ -0,0 +1,26 @@
[package]
name = "scx_flash"
version = "1.0.4"
authors = ["Andrea Righi <arighi@nvidia.com>"]
edition = "2021"
description = "A scheduler designed for multimedia and real-time audio processing workloads. https://github.com/sched-ext/scx/tree/main"
license = "GPL-2.0-only"
[dependencies]
anyhow = "1.0.65"
ctrlc = { version = "3.1", features = ["termination"] }
clap = { version = "4.1", features = ["derive", "env", "unicode", "wrap_help"] }
crossbeam = "0.8.4"
libbpf-rs = "0.24.1"
log = "0.4.17"
scx_stats = { path = "../../../rust/scx_stats", version = "1.0.4" }
scx_stats_derive = { path = "../../../rust/scx_stats/scx_stats_derive", version = "1.0.4" }
scx_utils = { path = "../../../rust/scx_utils", version = "1.0.4" }
serde = { version = "1.0", features = ["derive"] }
simplelog = "0.12"
[build-dependencies]
scx_utils = { path = "../../../rust/scx_utils", version = "1.0.4" }
[features]
enable_backtrace = []

View File

@ -0,0 +1 @@
../../../LICENSE

View File

@ -0,0 +1,26 @@
# scx_flash
This is a single user-defined scheduler used within [sched_ext](https://github.com/sched-ext/scx/tree/main), which is a Linux kernel feature which enables implementing kernel thread schedulers in BPF and dynamically loading them. [Read more about sched_ext](https://github.com/sched-ext/scx/tree/main).
## Overview
A scheduler that focuses on ensuring fairness among tasks and performance
predictability.
It operates using an earliest deadline first (EDF) policy, where each task is
assigned a "latency" weight. This weight is dynamically adjusted based on how
often a task release the CPU before its full time slice is used. Tasks that
release the CPU early are given a higher latency weight, prioritizing them over
tasks that fully consume their time slice.
## Typical Use Case
The combination of dynamic latency weights and EDF scheduling ensures
responsive and consistent performance, even in overcommitted systems.
This makes the scheduler particularly well-suited for latency-sensitive
workloads, such as multimedia or real-time audio processing.
## Production Ready?
Yes.

View File

@ -0,0 +1,13 @@
// Copyright (c) Andrea Righi <arighi@nvidia.com>
//
// This software may be used and distributed according to the terms of the
// GNU General Public License version 2.
fn main() {
scx_utils::BpfBuilder::new()
.unwrap()
.enable_intf("src/bpf/intf.h", "bpf_intf.rs")
.enable_skel("src/bpf/main.bpf.c", "bpf")
.build()
.unwrap();
}

View File

@ -0,0 +1,8 @@
# Get help on options with `rustfmt --help=config`
# Please keep these in alphabetical order.
edition = "2021"
group_imports = "StdExternalCrate"
imports_granularity = "Item"
merge_derives = false
use_field_init_shorthand = true
version = "Two"

View File

@ -0,0 +1,43 @@
/* SPDX-License-Identifier: GPL-2.0 */
/*
* Copyright (c) 2024 Andrea Righi <arighi@nvidia.com>
*
* This software may be used and distributed according to the terms of the GNU
* General Public License version 2.
*/
#ifndef __INTF_H
#define __INTF_H
#include <limits.h>
#define MAX(x, y) ((x) > (y) ? (x) : (y))
#define MIN(x, y) ((x) < (y) ? (x) : (y))
#define CLAMP(val, lo, hi) MIN(MAX(val, lo), hi)
#define ARRAY_SIZE(x) (sizeof(x) / sizeof((x)[0]))
enum consts {
NSEC_PER_USEC = 1000ULL,
NSEC_PER_MSEC = (1000ULL * NSEC_PER_USEC),
NSEC_PER_SEC = (1000ULL * NSEC_PER_MSEC),
};
#ifndef __VMLINUX_H__
typedef unsigned char u8;
typedef unsigned short u16;
typedef unsigned int u32;
typedef unsigned long u64;
typedef signed char s8;
typedef signed short s16;
typedef signed int s32;
typedef signed long s64;
typedef int pid_t;
#endif /* __VMLINUX_H__ */
struct domain_arg {
s32 cpu_id;
s32 sibling_cpu_id;
};
#endif /* __INTF_H */

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,12 @@
// SPDX-License-Identifier: GPL-2.0
//
// Copyright (c) 2024 Andrea Righi <arighi@nvidia.com>
// This software may be used and distributed according to the terms of the
// GNU General Public License version 2.
#![allow(non_upper_case_globals)]
#![allow(non_camel_case_types)]
#![allow(non_snake_case)]
#![allow(dead_code)]
include!(concat!(env!("OUT_DIR"), "/bpf_intf.rs"));

View File

@ -0,0 +1,8 @@
// SPDX-License-Identifier: GPL-2.0
//
// Copyright (c) 2024 Andrea Righi <arighi@nvidia.com>
// This software may be used and distributed according to the terms of the
// GNU General Public License version 2.
include!(concat!(env!("OUT_DIR"), "/bpf_skel.rs"));

View File

@ -0,0 +1,334 @@
// SPDX-License-Identifier: GPL-2.0
//
// Copyright (c) 2024 Andrea Righi <arighi@nvidia.com>
// This software may be used and distributed according to the terms of the
// GNU General Public License version 2.
mod bpf_skel;
pub use bpf_skel::*;
pub mod bpf_intf;
pub use bpf_intf::*;
mod stats;
use std::collections::HashMap;
use std::ffi::c_int;
use std::fs::File;
use std::io::Read;
use std::mem::MaybeUninit;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;
use anyhow::bail;
use anyhow::Context;
use anyhow::Result;
use clap::Parser;
use crossbeam::channel::RecvTimeoutError;
use libbpf_rs::skel::OpenSkel;
use libbpf_rs::skel::Skel;
use libbpf_rs::skel::SkelBuilder;
use libbpf_rs::OpenObject;
use libbpf_rs::ProgramInput;
use log::info;
use log::warn;
use scx_stats::prelude::*;
use scx_utils::build_id;
use scx_utils::import_enums;
use scx_utils::scx_enums;
use scx_utils::scx_ops_attach;
use scx_utils::scx_ops_load;
use scx_utils::scx_ops_open;
use scx_utils::set_rlimit_infinity;
use scx_utils::uei_exited;
use scx_utils::uei_report;
use scx_utils::Topology;
use scx_utils::UserExitInfo;
use stats::Metrics;
const SCHEDULER_NAME: &'static str = "scx_flash";
#[derive(Debug, Parser)]
struct Opts {
/// Exit debug dump buffer length. 0 indicates default.
#[clap(long, default_value = "0")]
exit_dump_len: u32,
/// Maximum scheduling slice duration in microseconds.
#[clap(short = 's', long, default_value = "20000")]
slice_us_max: u64,
/// Maximum time slice lag in microseconds.
///
/// Increasing this value can help to enhance the responsiveness of interactive tasks, but it
/// can also make performance more "spikey".
#[clap(short = 'l', long, default_value = "20000")]
slice_us_lag: u64,
/// Enable kthreads prioritization.
///
/// Enabling this can improve system performance, but it may also introduce interactivity
/// issues or unfairness in scenarios with high kthread activity, such as heavy I/O or network
/// traffic.
#[clap(short = 'k', long, action = clap::ArgAction::SetTrue)]
local_kthreads: bool,
/// Enable stats monitoring with the specified interval.
#[clap(long)]
stats: Option<f64>,
/// Run in stats monitoring mode with the specified interval. Scheduler
/// is not launched.
#[clap(long)]
monitor: Option<f64>,
/// Enable verbose output, including libbpf details.
#[clap(short = 'v', long, action = clap::ArgAction::SetTrue)]
verbose: bool,
/// Print scheduler version and exit.
#[clap(short = 'V', long, action = clap::ArgAction::SetTrue)]
version: bool,
/// Show descriptions for statistics.
#[clap(long)]
help_stats: bool,
}
fn is_smt_active() -> std::io::Result<i32> {
let mut file = File::open("/sys/devices/system/cpu/smt/active")?;
let mut contents = String::new();
file.read_to_string(&mut contents)?;
let smt_active: i32 = contents.trim().parse().unwrap_or(0);
Ok(smt_active)
}
struct Scheduler<'a> {
skel: BpfSkel<'a>,
struct_ops: Option<libbpf_rs::Link>,
stats_server: StatsServer<(), Metrics>,
}
impl<'a> Scheduler<'a> {
fn init(opts: &'a Opts, open_object: &'a mut MaybeUninit<OpenObject>) -> Result<Self> {
set_rlimit_infinity();
// Check host topology to determine if we need to enable SMT capabilities.
let smt_enabled = match is_smt_active() {
Ok(value) => value == 1,
Err(e) => bail!("Failed to read SMT status: {}", e),
};
info!(
"{} {} {}",
SCHEDULER_NAME,
*build_id::SCX_FULL_VERSION,
if smt_enabled { "SMT on" } else { "SMT off" }
);
// Initialize BPF connector.
let mut skel_builder = BpfSkelBuilder::default();
skel_builder.obj_builder.debug(opts.verbose);
let mut skel = scx_ops_open!(skel_builder, open_object, flash_ops)?;
skel.struct_ops.flash_ops_mut().exit_dump_len = opts.exit_dump_len;
// Override default BPF scheduling parameters.
skel.maps.rodata_data.slice_max = opts.slice_us_max * 1000;
skel.maps.rodata_data.slice_lag = opts.slice_us_lag * 1000;
skel.maps.rodata_data.local_kthreads = opts.local_kthreads;
skel.maps.rodata_data.smt_enabled = smt_enabled;
// Load the BPF program for validation.
let mut skel = scx_ops_load!(skel, flash_ops, uei)?;
// Initialize CPU topology.
let topo = Topology::new().unwrap();
// Initialize LLC domain.
Self::init_l3_cache_domains(&mut skel, &topo)?;
// Attach the scheduler.
let struct_ops = Some(scx_ops_attach!(skel, flash_ops)?);
let stats_server = StatsServer::new(stats::server_data()).launch()?;
Ok(Self {
skel,
struct_ops,
stats_server,
})
}
fn enable_sibling_cpu(
skel: &mut BpfSkel<'_>,
cpu: usize,
sibling_cpu: usize,
) -> Result<(), u32> {
let prog = &mut skel.progs.enable_sibling_cpu;
let mut args = domain_arg {
cpu_id: cpu as c_int,
sibling_cpu_id: sibling_cpu as c_int,
};
let input = ProgramInput {
context_in: Some(unsafe {
std::slice::from_raw_parts_mut(
&mut args as *mut _ as *mut u8,
std::mem::size_of_val(&args),
)
}),
..Default::default()
};
let out = prog.test_run(input).unwrap();
if out.return_value != 0 {
return Err(out.return_value);
}
Ok(())
}
fn init_cache_domains(
skel: &mut BpfSkel<'_>,
topo: &Topology,
cache_lvl: usize,
enable_sibling_cpu_fn: &dyn Fn(&mut BpfSkel<'_>, usize, usize, usize) -> Result<(), u32>,
) -> Result<(), std::io::Error> {
// Determine the list of CPU IDs associated to each cache node.
let mut cache_id_map: HashMap<usize, Vec<usize>> = HashMap::new();
for core in topo.cores().into_iter() {
for (cpu_id, cpu) in core.cpus() {
let cache_id = match cache_lvl {
2 => cpu.l2_id(),
3 => cpu.l3_id(),
_ => panic!("invalid cache level {}", cache_lvl),
};
cache_id_map
.entry(cache_id)
.or_insert_with(Vec::new)
.push(*cpu_id);
}
}
// Update the BPF cpumasks for the cache domains.
for (cache_id, cpus) in cache_id_map {
info!(
"L{} cache ID {}: sibling CPUs: {:?}",
cache_lvl, cache_id, cpus
);
for cpu in &cpus {
for sibling_cpu in &cpus {
match enable_sibling_cpu_fn(skel, cache_lvl, *cpu, *sibling_cpu) {
Ok(()) => {}
Err(_) => {
warn!(
"L{} cache ID {}: failed to set CPU {} sibling {}",
cache_lvl, cache_id, *cpu, *sibling_cpu
);
}
}
}
}
}
Ok(())
}
fn init_l3_cache_domains(
skel: &mut BpfSkel<'_>,
topo: &Topology,
) -> Result<(), std::io::Error> {
Self::init_cache_domains(skel, topo, 3, &|skel, _lvl, cpu, sibling_cpu| {
Self::enable_sibling_cpu(skel, cpu, sibling_cpu)
})
}
fn get_metrics(&self) -> Metrics {
Metrics {
nr_kthread_dispatches: self.skel.maps.bss_data.nr_kthread_dispatches,
nr_direct_dispatches: self.skel.maps.bss_data.nr_direct_dispatches,
nr_shared_dispatches: self.skel.maps.bss_data.nr_shared_dispatches,
}
}
pub fn exited(&mut self) -> bool {
uei_exited!(&self.skel, uei)
}
fn run(&mut self, shutdown: Arc<AtomicBool>) -> Result<UserExitInfo> {
let (res_ch, req_ch) = self.stats_server.channels();
while !shutdown.load(Ordering::Relaxed) && !self.exited() {
match req_ch.recv_timeout(Duration::from_secs(1)) {
Ok(()) => res_ch.send(self.get_metrics())?,
Err(RecvTimeoutError::Timeout) => {}
Err(e) => Err(e)?,
}
}
self.struct_ops.take();
uei_report!(&self.skel, uei)
}
}
impl<'a> Drop for Scheduler<'a> {
fn drop(&mut self) {
info!("Unregister {} scheduler", SCHEDULER_NAME);
}
}
fn main() -> Result<()> {
let opts = Opts::parse();
if opts.version {
println!("{} {}", SCHEDULER_NAME, *build_id::SCX_FULL_VERSION);
return Ok(());
}
if opts.help_stats {
stats::server_data().describe_meta(&mut std::io::stdout(), None)?;
return Ok(());
}
let loglevel = simplelog::LevelFilter::Info;
let mut lcfg = simplelog::ConfigBuilder::new();
lcfg.set_time_level(simplelog::LevelFilter::Error)
.set_location_level(simplelog::LevelFilter::Off)
.set_target_level(simplelog::LevelFilter::Off)
.set_thread_level(simplelog::LevelFilter::Off);
simplelog::TermLogger::init(
loglevel,
lcfg.build(),
simplelog::TerminalMode::Stderr,
simplelog::ColorChoice::Auto,
)?;
let shutdown = Arc::new(AtomicBool::new(false));
let shutdown_clone = shutdown.clone();
ctrlc::set_handler(move || {
shutdown_clone.store(true, Ordering::Relaxed);
})
.context("Error setting Ctrl-C handler")?;
if let Some(intv) = opts.monitor.or(opts.stats) {
let shutdown_copy = shutdown.clone();
let jh = std::thread::spawn(move || {
stats::monitor(Duration::from_secs_f64(intv), shutdown_copy).unwrap()
});
if opts.monitor.is_some() {
let _ = jh.join();
return Ok(());
}
}
let mut open_object = MaybeUninit::uninit();
loop {
let mut sched = Scheduler::init(&opts, &mut open_object)?;
if !sched.run(shutdown.clone())?.should_restart() {
break;
}
}
Ok(())
}

View File

@ -0,0 +1,82 @@
// SPDX-License-Identifier: GPL-2.0
//
// Copyright (c) 2024 Andrea Righi <arighi@nvidia.com>
// This software may be used and distributed according to the terms of the
// GNU General Public License version 2.
use std::io::Write;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;
use anyhow::Result;
use scx_stats::prelude::*;
use scx_stats_derive::Stats;
use serde::Deserialize;
use serde::Serialize;
#[derive(Clone, Debug, Default, Serialize, Deserialize, Stats)]
#[stat(top)]
pub struct Metrics {
#[stat(desc = "Number of kthread direct dispatches")]
pub nr_kthread_dispatches: u64,
#[stat(desc = "Number of task direct dispatches")]
pub nr_direct_dispatches: u64,
#[stat(desc = "Number of task global dispatches")]
pub nr_shared_dispatches: u64,
}
impl Metrics {
fn format<W: Write>(&self, w: &mut W) -> Result<()> {
writeln!(
w,
"[{}] dispatch -> kthread: {:<5} direct: {:<5} shared: {:<5}",
crate::SCHEDULER_NAME,
self.nr_kthread_dispatches,
self.nr_direct_dispatches,
self.nr_shared_dispatches,
)?;
Ok(())
}
fn delta(&self, rhs: &Self) -> Self {
Self {
nr_kthread_dispatches: self.nr_kthread_dispatches - rhs.nr_kthread_dispatches,
nr_direct_dispatches: self.nr_direct_dispatches - rhs.nr_direct_dispatches,
nr_shared_dispatches: self.nr_shared_dispatches - rhs.nr_shared_dispatches,
..self.clone()
}
}
}
pub fn server_data() -> StatsServerData<(), Metrics> {
let open: Box<dyn StatsOpener<(), Metrics>> = Box::new(move |(req_ch, res_ch)| {
req_ch.send(())?;
let mut prev = res_ch.recv()?;
let read: Box<dyn StatsReader<(), Metrics>> = Box::new(move |_args, (req_ch, res_ch)| {
req_ch.send(())?;
let cur = res_ch.recv()?;
let delta = cur.delta(&prev);
prev = cur;
delta.to_json()
});
Ok(read)
});
StatsServerData::new()
.add_meta(Metrics::meta())
.add_ops("top", StatsOps { open, close: None })
}
pub fn monitor(intv: Duration, shutdown: Arc<AtomicBool>) -> Result<()> {
scx_utils::monitor_stats::<Metrics>(
&vec![],
intv,
|| shutdown.load(Ordering::Relaxed),
|metrics| metrics.format(&mut std::io::stdout()),
)
}

View File

@ -1,4 +1,4 @@
# List of scx_schedulers: scx_bpfland scx_central scx_lavd scx_layered scx_nest scx_qmap scx_rlfifo scx_rustland scx_rusty scx_simple scx_userland # List of scx_schedulers: scx_bpfland scx_central scx_flash scx_lavd scx_layered scx_nest scx_qmap scx_rlfifo scx_rustland scx_rusty scx_simple scx_userland
SCX_SCHEDULER=scx_bpfland SCX_SCHEDULER=scx_bpfland
# Set custom flags for each scheduler, below is an example of how to use # Set custom flags for each scheduler, below is an example of how to use