Compare commits

...

5 Commits

Author SHA1 Message Date
Daniel Hodges
2987513b5e scx_layered: Add netdev IRQ balancing node support
Add support for setting netdev IRQ balancing that is NUMA aware.

Signed-off-by: Daniel Hodges <hodges.daniel.scott@gmail.com>
2024-11-18 06:42:40 -08:00
Daniel Hodges
24999e7f1f scx_utils: Add netdev support
Add support for collecting network device support for use in topology
awareness.

Signed-off-by: Daniel Hodges <hodges.daniel.scott@gmail.com>
2024-11-18 06:42:40 -08:00
Andrea Righi
a7fcda82cc
Merge pull request #924 from sched-ext/scx-fair
scheds: introduce scx_flash
2024-11-18 08:21:36 +00:00
Andrea Righi
5b4b6df5e4
Merge branch 'main' into scx-fair 2024-11-18 07:42:09 +01:00
Andrea Righi
678b10133d scheds: introduce scx_flash
Introduce scx_flash (Fair Latency-Aware ScHeduler), a scheduler that
focuses on ensuring fairness among tasks and performance predictability.

This scheduler is introduced as a replacement of the "lowlatency" mode
in scx_bpfland, that has been dropped in commit 78101e4 ("scx_bpfland:
drop lowlatency mode and the priority DSQ").

scx_flash operates based on an EDF (Earliest Deadline First) policy,
where each task is assigned a latency weight. This weight is adjusted
dynamically, influenced by the task's static weight and how often it
releases the CPU before its full assigned time slice is used: tasks that
release the CPU early receive a higher latency weight, granting them
a higher priority over tasks that fully use their time slice.

The combination of dynamic latency weights and EDF scheduling ensures
responsive and stable performance, even in overcommitted systems, making
the scheduler particularly well-suited for latency-sensitive workloads,
such as multimedia or real-time audio processing.

Tested-by: Peter Jung <ptr1337@cachyos.org>
Tested-by: Piotr Gorski <piotrgorski@cachyos.org>
Signed-off-by: Andrea Righi <arighi@nvidia.com>
2024-11-16 14:49:25 +01:00
22 changed files with 1820 additions and 3 deletions

17
Cargo.lock generated
View File

@ -1700,6 +1700,23 @@ dependencies = [
"simplelog", "simplelog",
] ]
[[package]]
name = "scx_flash"
version = "1.0.4"
dependencies = [
"anyhow",
"clap",
"crossbeam",
"ctrlc",
"libbpf-rs",
"log",
"scx_stats",
"scx_stats_derive",
"scx_utils",
"serde",
"simplelog",
]
[[package]] [[package]]
name = "scx_lavd" name = "scx_lavd"
version = "1.0.6" version = "1.0.6"

View File

@ -6,6 +6,7 @@ members = ["rust/scx_stats",
"rust/scx_loader", "rust/scx_loader",
"scheds/rust/scx_lavd", "scheds/rust/scx_lavd",
"scheds/rust/scx_bpfland", "scheds/rust/scx_bpfland",
"scheds/rust/scx_flash",
"scheds/rust/scx_rustland", "scheds/rust/scx_rustland",
"scheds/rust/scx_rlfifo", "scheds/rust/scx_rlfifo",
"scheds/rust/scx_rusty", "scheds/rust/scx_rusty",

View File

@ -22,6 +22,12 @@ sched_args: -v
stress_cmd: stress-ng -t 14 --aggressive -M -c `nproc` -f `nproc` stress_cmd: stress-ng -t 14 --aggressive -M -c `nproc` -f `nproc`
timeout_sec: 15 timeout_sec: 15
[scx_flash]
sched: scx_flash
sched_args:
stress_cmd: stress-ng -t 14 --aggressive -M -c `nproc` -f `nproc`
timeout_sec: 15
[scx_layered] [scx_layered]
sched: scx_layered sched: scx_layered
sched_args: --run-example -v --stats 1 sched_args: --run-example -v --stats 1

View File

@ -327,7 +327,7 @@ if enable_rust
run_target('fetch', command: [cargo_fetch, cargo], env: cargo_env) run_target('fetch', command: [cargo_fetch, cargo], env: cargo_env)
rust_scheds = ['scx_lavd', 'scx_bpfland', 'scx_rustland', 'scx_rlfifo', rust_scheds = ['scx_lavd', 'scx_bpfland', 'scx_rustland', 'scx_rlfifo',
'scx_rusty', 'scx_flash', 'scx_rusty',
'scx_layered', 'scx_mitosis'] 'scx_layered', 'scx_mitosis']
rust_misc = ['scx_stats', 'scx_stats_derive', 'scx_utils', rust_misc = ['scx_stats', 'scx_stats_derive', 'scx_utils',
'scx_rustland_core', 'scx_rustland_core',

View File

@ -68,7 +68,7 @@ use std::ops::BitOrAssign;
use std::ops::BitXor; use std::ops::BitXor;
use std::ops::BitXorAssign; use std::ops::BitXorAssign;
#[derive(Debug, Eq, Clone, Ord, PartialEq, PartialOrd)] #[derive(Debug, Eq, Clone, Hash, Ord, PartialEq, PartialOrd)]
pub struct Cpumask { pub struct Cpumask {
mask: BitVec<u64, Lsb0>, mask: BitVec<u64, Lsb0>,
} }
@ -146,6 +146,10 @@ impl Cpumask {
} }
} }
pub fn from_bitvec(bitvec: BitVec<u64, Lsb0>) -> Self {
Self { mask: bitvec }
}
/// Return a slice of u64's whose bits reflect the Cpumask. /// Return a slice of u64's whose bits reflect the Cpumask.
pub fn as_raw_slice(&self) -> &[u64] { pub fn as_raw_slice(&self) -> &[u64] {
self.mask.as_raw_slice() self.mask.as_raw_slice()

View File

@ -88,5 +88,9 @@ pub use misc::monitor_stats;
pub use misc::normalize_load_metric; pub use misc::normalize_load_metric;
pub use misc::set_rlimit_infinity; pub use misc::set_rlimit_infinity;
mod netdev;
pub use netdev::read_netdevs;
pub use netdev::NetDev;
pub mod enums; pub mod enums;
pub use enums::scx_enums; pub use enums::scx_enums;

View File

@ -0,0 +1,83 @@
// Copyright (c) Meta Platforms, Inc. and affiliates.
// This software may be used and distributed according to the terms of the
// GNU General Public License version 2.
use std::collections::BTreeMap;
use std::fs;
use std::path::Path;
use crate::misc::read_file_usize;
use crate::Cpumask;
use anyhow::Result;
#[derive(Debug, Clone, Eq, Hash, Ord, PartialEq, PartialOrd)]
pub struct NetDev {
pub iface: String,
pub node: usize,
pub irqs: BTreeMap<usize, Cpumask>,
pub irq_hints: BTreeMap<usize, Cpumask>,
}
impl NetDev {
pub fn apply_cpumasks(&self) -> Result<()> {
for (irq, cpumask) in self.irqs.iter() {
let irq_path = format!("/proc/irq/{}/smp_affinity", irq);
fs::write(irq_path, format!("{:#x}", cpumask))?
}
Ok(())
}
}
pub fn read_netdevs() -> Result<BTreeMap<String, NetDev>> {
let mut netdevs: BTreeMap<String, NetDev> = BTreeMap::new();
for entry in fs::read_dir("/sys/class/net")? {
let entry = entry?;
let iface = entry.file_name().to_string_lossy().into_owned();
let raw_path = format!("/sys/class/net/{}/device/msi_irqs", iface);
let msi_irqs_path = Path::new(&raw_path);
if !msi_irqs_path.exists() {
continue;
}
let node_path_raw = format!("/sys/class/net/{}/device/node", iface);
let node_path = Path::new(&node_path_raw);
let node = read_file_usize(node_path).unwrap_or(0);
let mut irqs = BTreeMap::new();
let mut irq_hints = BTreeMap::new();
for entry in fs::read_dir(msi_irqs_path)? {
let entry = entry.unwrap();
let irq = entry.file_name().to_string_lossy().into_owned();
if let Ok(irq) = irq.parse::<usize>() {
let affinity_raw_path = format!("/proc/irq/{}/smp_affinity", irq);
let smp_affinity_path = Path::new(&affinity_raw_path);
let smp_affinity = fs::read_to_string(smp_affinity_path)?
.replace(",", "")
.replace("\n", "");
let cpumask = Cpumask::from_str(&smp_affinity)?;
irqs.insert(irq, cpumask);
// affinity hints
let affinity_hint_raw_path = format!("/proc/irq/{}/affinity_hint", irq);
let affinity_hint_path = Path::new(&affinity_hint_raw_path);
let affinity_hint = fs::read_to_string(affinity_hint_path)?
.replace(",", "")
.replace("\n", "");
let hint_cpumask = Cpumask::from_str(&affinity_hint)?;
irq_hints.insert(irq, hint_cpumask);
}
}
netdevs.insert(
iface.clone(),
NetDev {
iface,
node,
irqs,
irq_hints,
},
);
}
Ok(netdevs)
}

View File

@ -18,3 +18,4 @@ main.rs or \*.bpf.c files.
- [scx_rlfifo](scx_rlfifo/README.md) - [scx_rlfifo](scx_rlfifo/README.md)
- [scx_lavd](scx_lavd/README.md) - [scx_lavd](scx_lavd/README.md)
- [scx_bpfland](scx_bpfland/README.md) - [scx_bpfland](scx_bpfland/README.md)
- [scx_flash](scx_flash/README.md)

View File

@ -0,0 +1,26 @@
[package]
name = "scx_flash"
version = "1.0.4"
authors = ["Andrea Righi <arighi@nvidia.com>"]
edition = "2021"
description = "A scheduler designed for multimedia and real-time audio processing workloads. https://github.com/sched-ext/scx/tree/main"
license = "GPL-2.0-only"
[dependencies]
anyhow = "1.0.65"
ctrlc = { version = "3.1", features = ["termination"] }
clap = { version = "4.1", features = ["derive", "env", "unicode", "wrap_help"] }
crossbeam = "0.8.4"
libbpf-rs = "0.24.1"
log = "0.4.17"
scx_stats = { path = "../../../rust/scx_stats", version = "1.0.4" }
scx_stats_derive = { path = "../../../rust/scx_stats/scx_stats_derive", version = "1.0.4" }
scx_utils = { path = "../../../rust/scx_utils", version = "1.0.4" }
serde = { version = "1.0", features = ["derive"] }
simplelog = "0.12"
[build-dependencies]
scx_utils = { path = "../../../rust/scx_utils", version = "1.0.4" }
[features]
enable_backtrace = []

View File

@ -0,0 +1 @@
../../../LICENSE

View File

@ -0,0 +1,26 @@
# scx_flash
This is a single user-defined scheduler used within [sched_ext](https://github.com/sched-ext/scx/tree/main), which is a Linux kernel feature which enables implementing kernel thread schedulers in BPF and dynamically loading them. [Read more about sched_ext](https://github.com/sched-ext/scx/tree/main).
## Overview
A scheduler that focuses on ensuring fairness among tasks and performance
predictability.
It operates using an earliest deadline first (EDF) policy, where each task is
assigned a "latency" weight. This weight is dynamically adjusted based on how
often a task release the CPU before its full time slice is used. Tasks that
release the CPU early are given a higher latency weight, prioritizing them over
tasks that fully consume their time slice.
## Typical Use Case
The combination of dynamic latency weights and EDF scheduling ensures
responsive and consistent performance, even in overcommitted systems.
This makes the scheduler particularly well-suited for latency-sensitive
workloads, such as multimedia or real-time audio processing.
## Production Ready?
Yes.

View File

@ -0,0 +1,13 @@
// Copyright (c) Andrea Righi <arighi@nvidia.com>
//
// This software may be used and distributed according to the terms of the
// GNU General Public License version 2.
fn main() {
scx_utils::BpfBuilder::new()
.unwrap()
.enable_intf("src/bpf/intf.h", "bpf_intf.rs")
.enable_skel("src/bpf/main.bpf.c", "bpf")
.build()
.unwrap();
}

View File

@ -0,0 +1,8 @@
# Get help on options with `rustfmt --help=config`
# Please keep these in alphabetical order.
edition = "2021"
group_imports = "StdExternalCrate"
imports_granularity = "Item"
merge_derives = false
use_field_init_shorthand = true
version = "Two"

View File

@ -0,0 +1,43 @@
/* SPDX-License-Identifier: GPL-2.0 */
/*
* Copyright (c) 2024 Andrea Righi <arighi@nvidia.com>
*
* This software may be used and distributed according to the terms of the GNU
* General Public License version 2.
*/
#ifndef __INTF_H
#define __INTF_H
#include <limits.h>
#define MAX(x, y) ((x) > (y) ? (x) : (y))
#define MIN(x, y) ((x) < (y) ? (x) : (y))
#define CLAMP(val, lo, hi) MIN(MAX(val, lo), hi)
#define ARRAY_SIZE(x) (sizeof(x) / sizeof((x)[0]))
enum consts {
NSEC_PER_USEC = 1000ULL,
NSEC_PER_MSEC = (1000ULL * NSEC_PER_USEC),
NSEC_PER_SEC = (1000ULL * NSEC_PER_MSEC),
};
#ifndef __VMLINUX_H__
typedef unsigned char u8;
typedef unsigned short u16;
typedef unsigned int u32;
typedef unsigned long u64;
typedef signed char s8;
typedef signed short s16;
typedef signed int s32;
typedef signed long s64;
typedef int pid_t;
#endif /* __VMLINUX_H__ */
struct domain_arg {
s32 cpu_id;
s32 sibling_cpu_id;
};
#endif /* __INTF_H */

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,12 @@
// SPDX-License-Identifier: GPL-2.0
//
// Copyright (c) 2024 Andrea Righi <arighi@nvidia.com>
// This software may be used and distributed according to the terms of the
// GNU General Public License version 2.
#![allow(non_upper_case_globals)]
#![allow(non_camel_case_types)]
#![allow(non_snake_case)]
#![allow(dead_code)]
include!(concat!(env!("OUT_DIR"), "/bpf_intf.rs"));

View File

@ -0,0 +1,8 @@
// SPDX-License-Identifier: GPL-2.0
//
// Copyright (c) 2024 Andrea Righi <arighi@nvidia.com>
// This software may be used and distributed according to the terms of the
// GNU General Public License version 2.
include!(concat!(env!("OUT_DIR"), "/bpf_skel.rs"));

View File

@ -0,0 +1,334 @@
// SPDX-License-Identifier: GPL-2.0
//
// Copyright (c) 2024 Andrea Righi <arighi@nvidia.com>
// This software may be used and distributed according to the terms of the
// GNU General Public License version 2.
mod bpf_skel;
pub use bpf_skel::*;
pub mod bpf_intf;
pub use bpf_intf::*;
mod stats;
use std::collections::HashMap;
use std::ffi::c_int;
use std::fs::File;
use std::io::Read;
use std::mem::MaybeUninit;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;
use anyhow::bail;
use anyhow::Context;
use anyhow::Result;
use clap::Parser;
use crossbeam::channel::RecvTimeoutError;
use libbpf_rs::skel::OpenSkel;
use libbpf_rs::skel::Skel;
use libbpf_rs::skel::SkelBuilder;
use libbpf_rs::OpenObject;
use libbpf_rs::ProgramInput;
use log::info;
use log::warn;
use scx_stats::prelude::*;
use scx_utils::build_id;
use scx_utils::import_enums;
use scx_utils::scx_enums;
use scx_utils::scx_ops_attach;
use scx_utils::scx_ops_load;
use scx_utils::scx_ops_open;
use scx_utils::set_rlimit_infinity;
use scx_utils::uei_exited;
use scx_utils::uei_report;
use scx_utils::Topology;
use scx_utils::UserExitInfo;
use stats::Metrics;
const SCHEDULER_NAME: &'static str = "scx_flash";
#[derive(Debug, Parser)]
struct Opts {
/// Exit debug dump buffer length. 0 indicates default.
#[clap(long, default_value = "0")]
exit_dump_len: u32,
/// Maximum scheduling slice duration in microseconds.
#[clap(short = 's', long, default_value = "20000")]
slice_us_max: u64,
/// Maximum time slice lag in microseconds.
///
/// Increasing this value can help to enhance the responsiveness of interactive tasks, but it
/// can also make performance more "spikey".
#[clap(short = 'l', long, default_value = "20000")]
slice_us_lag: u64,
/// Enable kthreads prioritization.
///
/// Enabling this can improve system performance, but it may also introduce interactivity
/// issues or unfairness in scenarios with high kthread activity, such as heavy I/O or network
/// traffic.
#[clap(short = 'k', long, action = clap::ArgAction::SetTrue)]
local_kthreads: bool,
/// Enable stats monitoring with the specified interval.
#[clap(long)]
stats: Option<f64>,
/// Run in stats monitoring mode with the specified interval. Scheduler
/// is not launched.
#[clap(long)]
monitor: Option<f64>,
/// Enable verbose output, including libbpf details.
#[clap(short = 'v', long, action = clap::ArgAction::SetTrue)]
verbose: bool,
/// Print scheduler version and exit.
#[clap(short = 'V', long, action = clap::ArgAction::SetTrue)]
version: bool,
/// Show descriptions for statistics.
#[clap(long)]
help_stats: bool,
}
fn is_smt_active() -> std::io::Result<i32> {
let mut file = File::open("/sys/devices/system/cpu/smt/active")?;
let mut contents = String::new();
file.read_to_string(&mut contents)?;
let smt_active: i32 = contents.trim().parse().unwrap_or(0);
Ok(smt_active)
}
struct Scheduler<'a> {
skel: BpfSkel<'a>,
struct_ops: Option<libbpf_rs::Link>,
stats_server: StatsServer<(), Metrics>,
}
impl<'a> Scheduler<'a> {
fn init(opts: &'a Opts, open_object: &'a mut MaybeUninit<OpenObject>) -> Result<Self> {
set_rlimit_infinity();
// Check host topology to determine if we need to enable SMT capabilities.
let smt_enabled = match is_smt_active() {
Ok(value) => value == 1,
Err(e) => bail!("Failed to read SMT status: {}", e),
};
info!(
"{} {} {}",
SCHEDULER_NAME,
*build_id::SCX_FULL_VERSION,
if smt_enabled { "SMT on" } else { "SMT off" }
);
// Initialize BPF connector.
let mut skel_builder = BpfSkelBuilder::default();
skel_builder.obj_builder.debug(opts.verbose);
let mut skel = scx_ops_open!(skel_builder, open_object, flash_ops)?;
skel.struct_ops.flash_ops_mut().exit_dump_len = opts.exit_dump_len;
// Override default BPF scheduling parameters.
skel.maps.rodata_data.slice_max = opts.slice_us_max * 1000;
skel.maps.rodata_data.slice_lag = opts.slice_us_lag * 1000;
skel.maps.rodata_data.local_kthreads = opts.local_kthreads;
skel.maps.rodata_data.smt_enabled = smt_enabled;
// Load the BPF program for validation.
let mut skel = scx_ops_load!(skel, flash_ops, uei)?;
// Initialize CPU topology.
let topo = Topology::new().unwrap();
// Initialize LLC domain.
Self::init_l3_cache_domains(&mut skel, &topo)?;
// Attach the scheduler.
let struct_ops = Some(scx_ops_attach!(skel, flash_ops)?);
let stats_server = StatsServer::new(stats::server_data()).launch()?;
Ok(Self {
skel,
struct_ops,
stats_server,
})
}
fn enable_sibling_cpu(
skel: &mut BpfSkel<'_>,
cpu: usize,
sibling_cpu: usize,
) -> Result<(), u32> {
let prog = &mut skel.progs.enable_sibling_cpu;
let mut args = domain_arg {
cpu_id: cpu as c_int,
sibling_cpu_id: sibling_cpu as c_int,
};
let input = ProgramInput {
context_in: Some(unsafe {
std::slice::from_raw_parts_mut(
&mut args as *mut _ as *mut u8,
std::mem::size_of_val(&args),
)
}),
..Default::default()
};
let out = prog.test_run(input).unwrap();
if out.return_value != 0 {
return Err(out.return_value);
}
Ok(())
}
fn init_cache_domains(
skel: &mut BpfSkel<'_>,
topo: &Topology,
cache_lvl: usize,
enable_sibling_cpu_fn: &dyn Fn(&mut BpfSkel<'_>, usize, usize, usize) -> Result<(), u32>,
) -> Result<(), std::io::Error> {
// Determine the list of CPU IDs associated to each cache node.
let mut cache_id_map: HashMap<usize, Vec<usize>> = HashMap::new();
for core in topo.cores().into_iter() {
for (cpu_id, cpu) in core.cpus() {
let cache_id = match cache_lvl {
2 => cpu.l2_id(),
3 => cpu.l3_id(),
_ => panic!("invalid cache level {}", cache_lvl),
};
cache_id_map
.entry(cache_id)
.or_insert_with(Vec::new)
.push(*cpu_id);
}
}
// Update the BPF cpumasks for the cache domains.
for (cache_id, cpus) in cache_id_map {
info!(
"L{} cache ID {}: sibling CPUs: {:?}",
cache_lvl, cache_id, cpus
);
for cpu in &cpus {
for sibling_cpu in &cpus {
match enable_sibling_cpu_fn(skel, cache_lvl, *cpu, *sibling_cpu) {
Ok(()) => {}
Err(_) => {
warn!(
"L{} cache ID {}: failed to set CPU {} sibling {}",
cache_lvl, cache_id, *cpu, *sibling_cpu
);
}
}
}
}
}
Ok(())
}
fn init_l3_cache_domains(
skel: &mut BpfSkel<'_>,
topo: &Topology,
) -> Result<(), std::io::Error> {
Self::init_cache_domains(skel, topo, 3, &|skel, _lvl, cpu, sibling_cpu| {
Self::enable_sibling_cpu(skel, cpu, sibling_cpu)
})
}
fn get_metrics(&self) -> Metrics {
Metrics {
nr_kthread_dispatches: self.skel.maps.bss_data.nr_kthread_dispatches,
nr_direct_dispatches: self.skel.maps.bss_data.nr_direct_dispatches,
nr_shared_dispatches: self.skel.maps.bss_data.nr_shared_dispatches,
}
}
pub fn exited(&mut self) -> bool {
uei_exited!(&self.skel, uei)
}
fn run(&mut self, shutdown: Arc<AtomicBool>) -> Result<UserExitInfo> {
let (res_ch, req_ch) = self.stats_server.channels();
while !shutdown.load(Ordering::Relaxed) && !self.exited() {
match req_ch.recv_timeout(Duration::from_secs(1)) {
Ok(()) => res_ch.send(self.get_metrics())?,
Err(RecvTimeoutError::Timeout) => {}
Err(e) => Err(e)?,
}
}
self.struct_ops.take();
uei_report!(&self.skel, uei)
}
}
impl<'a> Drop for Scheduler<'a> {
fn drop(&mut self) {
info!("Unregister {} scheduler", SCHEDULER_NAME);
}
}
fn main() -> Result<()> {
let opts = Opts::parse();
if opts.version {
println!("{} {}", SCHEDULER_NAME, *build_id::SCX_FULL_VERSION);
return Ok(());
}
if opts.help_stats {
stats::server_data().describe_meta(&mut std::io::stdout(), None)?;
return Ok(());
}
let loglevel = simplelog::LevelFilter::Info;
let mut lcfg = simplelog::ConfigBuilder::new();
lcfg.set_time_level(simplelog::LevelFilter::Error)
.set_location_level(simplelog::LevelFilter::Off)
.set_target_level(simplelog::LevelFilter::Off)
.set_thread_level(simplelog::LevelFilter::Off);
simplelog::TermLogger::init(
loglevel,
lcfg.build(),
simplelog::TerminalMode::Stderr,
simplelog::ColorChoice::Auto,
)?;
let shutdown = Arc::new(AtomicBool::new(false));
let shutdown_clone = shutdown.clone();
ctrlc::set_handler(move || {
shutdown_clone.store(true, Ordering::Relaxed);
})
.context("Error setting Ctrl-C handler")?;
if let Some(intv) = opts.monitor.or(opts.stats) {
let shutdown_copy = shutdown.clone();
let jh = std::thread::spawn(move || {
stats::monitor(Duration::from_secs_f64(intv), shutdown_copy).unwrap()
});
if opts.monitor.is_some() {
let _ = jh.join();
return Ok(());
}
}
let mut open_object = MaybeUninit::uninit();
loop {
let mut sched = Scheduler::init(&opts, &mut open_object)?;
if !sched.run(shutdown.clone())?.should_restart() {
break;
}
}
Ok(())
}

View File

@ -0,0 +1,82 @@
// SPDX-License-Identifier: GPL-2.0
//
// Copyright (c) 2024 Andrea Righi <arighi@nvidia.com>
// This software may be used and distributed according to the terms of the
// GNU General Public License version 2.
use std::io::Write;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;
use anyhow::Result;
use scx_stats::prelude::*;
use scx_stats_derive::Stats;
use serde::Deserialize;
use serde::Serialize;
#[derive(Clone, Debug, Default, Serialize, Deserialize, Stats)]
#[stat(top)]
pub struct Metrics {
#[stat(desc = "Number of kthread direct dispatches")]
pub nr_kthread_dispatches: u64,
#[stat(desc = "Number of task direct dispatches")]
pub nr_direct_dispatches: u64,
#[stat(desc = "Number of task global dispatches")]
pub nr_shared_dispatches: u64,
}
impl Metrics {
fn format<W: Write>(&self, w: &mut W) -> Result<()> {
writeln!(
w,
"[{}] dispatch -> kthread: {:<5} direct: {:<5} shared: {:<5}",
crate::SCHEDULER_NAME,
self.nr_kthread_dispatches,
self.nr_direct_dispatches,
self.nr_shared_dispatches,
)?;
Ok(())
}
fn delta(&self, rhs: &Self) -> Self {
Self {
nr_kthread_dispatches: self.nr_kthread_dispatches - rhs.nr_kthread_dispatches,
nr_direct_dispatches: self.nr_direct_dispatches - rhs.nr_direct_dispatches,
nr_shared_dispatches: self.nr_shared_dispatches - rhs.nr_shared_dispatches,
..self.clone()
}
}
}
pub fn server_data() -> StatsServerData<(), Metrics> {
let open: Box<dyn StatsOpener<(), Metrics>> = Box::new(move |(req_ch, res_ch)| {
req_ch.send(())?;
let mut prev = res_ch.recv()?;
let read: Box<dyn StatsReader<(), Metrics>> = Box::new(move |_args, (req_ch, res_ch)| {
req_ch.send(())?;
let cur = res_ch.recv()?;
let delta = cur.delta(&prev);
prev = cur;
delta.to_json()
});
Ok(read)
});
StatsServerData::new()
.add_meta(Metrics::meta())
.add_ops("top", StatsOps { open, close: None })
}
pub fn monitor(intv: Duration, shutdown: Arc<AtomicBool>) -> Result<()> {
scx_utils::monitor_stats::<Metrics>(
&vec![],
intv,
|| shutdown.load(Ordering::Relaxed),
|metrics| metrics.format(&mut std::io::stdout()),
)
}

View File

@ -222,6 +222,15 @@ impl CpuPool {
Ok(Some(&self.core_cpus[core])) Ok(Some(&self.core_cpus[core]))
} }
pub fn available_cpus(&self) -> BitVec<u64, Lsb0> {
let mut cpus = bitvec![u64, Lsb0; 0; self.nr_cpus];
for core in self.available_cores.iter_ones() {
let core_cpus = self.core_cpus[core].clone();
cpus |= core_cpus.as_bitslice();
}
cpus
}
pub fn available_cpus_in_mask(&self, allowed_cpus: &BitVec) -> BitVec { pub fn available_cpus_in_mask(&self, allowed_cpus: &BitVec) -> BitVec {
let mut cpus = bitvec![0; self.nr_cpus]; let mut cpus = bitvec![0; self.nr_cpus];
for core in self.available_cores.iter_ones() { for core in self.available_cores.iter_ones() {

View File

@ -44,6 +44,7 @@ use scx_utils::compat;
use scx_utils::import_enums; use scx_utils::import_enums;
use scx_utils::init_libbpf_logging; use scx_utils::init_libbpf_logging;
use scx_utils::ravg::ravg_read; use scx_utils::ravg::ravg_read;
use scx_utils::read_netdevs;
use scx_utils::scx_enums; use scx_utils::scx_enums;
use scx_utils::scx_ops_attach; use scx_utils::scx_ops_attach;
use scx_utils::scx_ops_load; use scx_utils::scx_ops_load;
@ -53,6 +54,7 @@ use scx_utils::uei_report;
use scx_utils::Cache; use scx_utils::Cache;
use scx_utils::CoreType; use scx_utils::CoreType;
use scx_utils::LoadAggregator; use scx_utils::LoadAggregator;
use scx_utils::NetDev;
use scx_utils::Topology; use scx_utils::Topology;
use scx_utils::UserExitInfo; use scx_utils::UserExitInfo;
use stats::LayerStats; use stats::LayerStats;
@ -471,6 +473,10 @@ struct Opts {
#[clap(long, default_value = "false")] #[clap(long, default_value = "false")]
disable_antistall: bool, disable_antistall: bool,
/// Enable netdev IRQ balancing
#[clap(long, default_value = "false")]
netdev_irq_balance: bool,
/// Maximum task runnable_at delay (in seconds) before antistall turns on /// Maximum task runnable_at delay (in seconds) before antistall turns on
#[clap(long, default_value = "3")] #[clap(long, default_value = "3")]
antistall_sec: u64, antistall_sec: u64,
@ -1215,6 +1221,8 @@ struct Scheduler<'a> {
nr_layer_cpus_ranges: Vec<(usize, usize)>, nr_layer_cpus_ranges: Vec<(usize, usize)>,
processing_dur: Duration, processing_dur: Duration,
topo: Topology,
netdevs: BTreeMap<String, NetDev>,
stats_server: StatsServer<StatsReq, StatsRes>, stats_server: StatsServer<StatsReq, StatsRes>,
} }
@ -1399,6 +1407,12 @@ impl<'a> Scheduler<'a> {
let topo = Topology::new()?; let topo = Topology::new()?;
let cpu_pool = CpuPool::new(&topo)?; let cpu_pool = CpuPool::new(&topo)?;
let netdevs = if opts.netdev_irq_balance {
read_netdevs()?
} else {
BTreeMap::new()
};
let disable_topology = if let Some(val) = opts.disable_topology { let disable_topology = if let Some(val) = opts.disable_topology {
val val
} else { } else {
@ -1523,6 +1537,8 @@ impl<'a> Scheduler<'a> {
proc_reader, proc_reader,
skel, skel,
topo,
netdevs,
stats_server, stats_server,
}; };
@ -1542,6 +1558,43 @@ impl<'a> Scheduler<'a> {
bpf_layer.refresh_cpus = 1; bpf_layer.refresh_cpus = 1;
} }
fn update_netdev_cpumasks(&mut self) -> Result<()> {
let available_cpus = self.cpu_pool.available_cpus();
if available_cpus.is_empty() {
return Ok(());
}
for (iface, netdev) in self.netdevs.iter_mut() {
let node = self
.topo
.nodes()
.into_iter()
.take_while(|n| n.id() == netdev.node)
.next()
.ok_or_else(|| anyhow!("Failed to get netdev node"))?;
let node_cpus = node.span();
for (irq, irqmask) in netdev.irqs.iter_mut() {
irqmask.clear();
for cpu in available_cpus.iter_ones() {
if !node_cpus.test_cpu(cpu) {
continue;
}
let _ = irqmask.set_cpu(cpu);
}
trace!("{} updating irq {} cpumask {:?}", iface, irq, irqmask);
// If no CPUs are available in the node then spread the load across the node
if irqmask.weight() == 0 {
for cpu in node_cpus.as_raw_bitvec().iter_ones() {
let _ = irqmask.set_cpu(cpu);
}
}
}
netdev.apply_cpumasks()?;
}
Ok(())
}
fn set_bpf_layer_preemption(layer: &mut Layer, bpf_layer: &mut types::layer, preempt: bool) { fn set_bpf_layer_preemption(layer: &mut Layer, bpf_layer: &mut types::layer, preempt: bool) {
layer.preempt = preempt; layer.preempt = preempt;
bpf_layer.preempt.write(preempt); bpf_layer.preempt.write(preempt);
@ -1656,6 +1709,7 @@ impl<'a> Scheduler<'a> {
} }
} }
let _ = self.update_netdev_cpumasks();
Ok(()) Ok(())
} }

View File

@ -1,4 +1,4 @@
# List of scx_schedulers: scx_bpfland scx_central scx_lavd scx_layered scx_nest scx_qmap scx_rlfifo scx_rustland scx_rusty scx_simple scx_userland # List of scx_schedulers: scx_bpfland scx_central scx_flash scx_lavd scx_layered scx_nest scx_qmap scx_rlfifo scx_rustland scx_rusty scx_simple scx_userland
SCX_SCHEDULER=scx_bpfland SCX_SCHEDULER=scx_bpfland
# Set custom flags for each scheduler, below is an example of how to use # Set custom flags for each scheduler, below is an example of how to use