From 678b10133dc6a7587e16e5068bc870a3f810a52a Mon Sep 17 00:00:00 2001 From: Andrea Righi Date: Fri, 13 Sep 2024 15:52:45 +0200 Subject: [PATCH] scheds: introduce scx_flash Introduce scx_flash (Fair Latency-Aware ScHeduler), a scheduler that focuses on ensuring fairness among tasks and performance predictability. This scheduler is introduced as a replacement of the "lowlatency" mode in scx_bpfland, that has been dropped in commit 78101e4 ("scx_bpfland: drop lowlatency mode and the priority DSQ"). scx_flash operates based on an EDF (Earliest Deadline First) policy, where each task is assigned a latency weight. This weight is adjusted dynamically, influenced by the task's static weight and how often it releases the CPU before its full assigned time slice is used: tasks that release the CPU early receive a higher latency weight, granting them a higher priority over tasks that fully use their time slice. The combination of dynamic latency weights and EDF scheduling ensures responsive and stable performance, even in overcommitted systems, making the scheduler particularly well-suited for latency-sensitive workloads, such as multimedia or real-time audio processing. Tested-by: Peter Jung Tested-by: Piotr Gorski Signed-off-by: Andrea Righi --- Cargo.lock | 17 + Cargo.toml | 1 + meson-scripts/stress_tests.ini | 6 + meson.build | 2 +- scheds/rust/README.md | 1 + scheds/rust/scx_flash/Cargo.toml | 26 + scheds/rust/scx_flash/LICENSE | 1 + scheds/rust/scx_flash/README.md | 26 + scheds/rust/scx_flash/build.rs | 13 + scheds/rust/scx_flash/rustfmt.toml | 8 + scheds/rust/scx_flash/src/bpf/intf.h | 43 + scheds/rust/scx_flash/src/bpf/main.bpf.c | 1085 ++++++++++++++++++++++ scheds/rust/scx_flash/src/bpf_intf.rs | 12 + scheds/rust/scx_flash/src/bpf_skel.rs | 8 + scheds/rust/scx_flash/src/main.rs | 334 +++++++ scheds/rust/scx_flash/src/stats.rs | 82 ++ services/scx | 2 +- 17 files changed, 1665 insertions(+), 2 deletions(-) create mode 100644 scheds/rust/scx_flash/Cargo.toml create mode 120000 scheds/rust/scx_flash/LICENSE create mode 100644 scheds/rust/scx_flash/README.md create mode 100644 scheds/rust/scx_flash/build.rs create mode 100644 scheds/rust/scx_flash/rustfmt.toml create mode 100644 scheds/rust/scx_flash/src/bpf/intf.h create mode 100644 scheds/rust/scx_flash/src/bpf/main.bpf.c create mode 100644 scheds/rust/scx_flash/src/bpf_intf.rs create mode 100644 scheds/rust/scx_flash/src/bpf_skel.rs create mode 100644 scheds/rust/scx_flash/src/main.rs create mode 100644 scheds/rust/scx_flash/src/stats.rs diff --git a/Cargo.lock b/Cargo.lock index 0a2cb0bc..c7828b7c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1700,6 +1700,23 @@ dependencies = [ "simplelog", ] +[[package]] +name = "scx_flash" +version = "1.0.4" +dependencies = [ + "anyhow", + "clap", + "crossbeam", + "ctrlc", + "libbpf-rs", + "log", + "scx_stats", + "scx_stats_derive", + "scx_utils", + "serde", + "simplelog", +] + [[package]] name = "scx_lavd" version = "1.0.6" diff --git a/Cargo.toml b/Cargo.toml index 577a3264..fa3ac50b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,7 @@ members = ["rust/scx_stats", "rust/scx_loader", "scheds/rust/scx_lavd", "scheds/rust/scx_bpfland", + "scheds/rust/scx_flash", "scheds/rust/scx_rustland", "scheds/rust/scx_rlfifo", "scheds/rust/scx_rusty", diff --git a/meson-scripts/stress_tests.ini b/meson-scripts/stress_tests.ini index e15717c1..7d35371b 100644 --- a/meson-scripts/stress_tests.ini +++ b/meson-scripts/stress_tests.ini @@ -22,6 +22,12 @@ sched_args: -v stress_cmd: stress-ng -t 14 --aggressive -M -c `nproc` -f `nproc` timeout_sec: 15 +[scx_flash] +sched: scx_flash +sched_args: +stress_cmd: stress-ng -t 14 --aggressive -M -c `nproc` -f `nproc` +timeout_sec: 15 + [scx_layered] sched: scx_layered sched_args: --run-example -v --stats 1 diff --git a/meson.build b/meson.build index cf5dd5ca..d8a338d0 100644 --- a/meson.build +++ b/meson.build @@ -327,7 +327,7 @@ if enable_rust run_target('fetch', command: [cargo_fetch, cargo], env: cargo_env) rust_scheds = ['scx_lavd', 'scx_bpfland', 'scx_rustland', 'scx_rlfifo', - 'scx_rusty', + 'scx_flash', 'scx_rusty', 'scx_layered', 'scx_mitosis'] rust_misc = ['scx_stats', 'scx_stats_derive', 'scx_utils', 'scx_rustland_core', diff --git a/scheds/rust/README.md b/scheds/rust/README.md index d6319a5a..d777c9a6 100644 --- a/scheds/rust/README.md +++ b/scheds/rust/README.md @@ -18,3 +18,4 @@ main.rs or \*.bpf.c files. - [scx_rlfifo](scx_rlfifo/README.md) - [scx_lavd](scx_lavd/README.md) - [scx_bpfland](scx_bpfland/README.md) +- [scx_flash](scx_flash/README.md) diff --git a/scheds/rust/scx_flash/Cargo.toml b/scheds/rust/scx_flash/Cargo.toml new file mode 100644 index 00000000..2aada9ec --- /dev/null +++ b/scheds/rust/scx_flash/Cargo.toml @@ -0,0 +1,26 @@ +[package] +name = "scx_flash" +version = "1.0.4" +authors = ["Andrea Righi "] +edition = "2021" +description = "A scheduler designed for multimedia and real-time audio processing workloads. https://github.com/sched-ext/scx/tree/main" +license = "GPL-2.0-only" + +[dependencies] +anyhow = "1.0.65" +ctrlc = { version = "3.1", features = ["termination"] } +clap = { version = "4.1", features = ["derive", "env", "unicode", "wrap_help"] } +crossbeam = "0.8.4" +libbpf-rs = "0.24.1" +log = "0.4.17" +scx_stats = { path = "../../../rust/scx_stats", version = "1.0.4" } +scx_stats_derive = { path = "../../../rust/scx_stats/scx_stats_derive", version = "1.0.4" } +scx_utils = { path = "../../../rust/scx_utils", version = "1.0.4" } +serde = { version = "1.0", features = ["derive"] } +simplelog = "0.12" + +[build-dependencies] +scx_utils = { path = "../../../rust/scx_utils", version = "1.0.4" } + +[features] +enable_backtrace = [] diff --git a/scheds/rust/scx_flash/LICENSE b/scheds/rust/scx_flash/LICENSE new file mode 120000 index 00000000..5853aaea --- /dev/null +++ b/scheds/rust/scx_flash/LICENSE @@ -0,0 +1 @@ +../../../LICENSE \ No newline at end of file diff --git a/scheds/rust/scx_flash/README.md b/scheds/rust/scx_flash/README.md new file mode 100644 index 00000000..752f4e97 --- /dev/null +++ b/scheds/rust/scx_flash/README.md @@ -0,0 +1,26 @@ +# scx_flash + +This is a single user-defined scheduler used within [sched_ext](https://github.com/sched-ext/scx/tree/main), which is a Linux kernel feature which enables implementing kernel thread schedulers in BPF and dynamically loading them. [Read more about sched_ext](https://github.com/sched-ext/scx/tree/main). + +## Overview + +A scheduler that focuses on ensuring fairness among tasks and performance +predictability. + +It operates using an earliest deadline first (EDF) policy, where each task is +assigned a "latency" weight. This weight is dynamically adjusted based on how +often a task release the CPU before its full time slice is used. Tasks that +release the CPU early are given a higher latency weight, prioritizing them over +tasks that fully consume their time slice. + +## Typical Use Case + +The combination of dynamic latency weights and EDF scheduling ensures +responsive and consistent performance, even in overcommitted systems. + +This makes the scheduler particularly well-suited for latency-sensitive +workloads, such as multimedia or real-time audio processing. + +## Production Ready? + +Yes. diff --git a/scheds/rust/scx_flash/build.rs b/scheds/rust/scx_flash/build.rs new file mode 100644 index 00000000..2bf0ec07 --- /dev/null +++ b/scheds/rust/scx_flash/build.rs @@ -0,0 +1,13 @@ +// Copyright (c) Andrea Righi +// +// This software may be used and distributed according to the terms of the +// GNU General Public License version 2. + +fn main() { + scx_utils::BpfBuilder::new() + .unwrap() + .enable_intf("src/bpf/intf.h", "bpf_intf.rs") + .enable_skel("src/bpf/main.bpf.c", "bpf") + .build() + .unwrap(); +} diff --git a/scheds/rust/scx_flash/rustfmt.toml b/scheds/rust/scx_flash/rustfmt.toml new file mode 100644 index 00000000..b7258ed0 --- /dev/null +++ b/scheds/rust/scx_flash/rustfmt.toml @@ -0,0 +1,8 @@ +# Get help on options with `rustfmt --help=config` +# Please keep these in alphabetical order. +edition = "2021" +group_imports = "StdExternalCrate" +imports_granularity = "Item" +merge_derives = false +use_field_init_shorthand = true +version = "Two" diff --git a/scheds/rust/scx_flash/src/bpf/intf.h b/scheds/rust/scx_flash/src/bpf/intf.h new file mode 100644 index 00000000..aeb06b2a --- /dev/null +++ b/scheds/rust/scx_flash/src/bpf/intf.h @@ -0,0 +1,43 @@ +/* SPDX-License-Identifier: GPL-2.0 */ +/* + * Copyright (c) 2024 Andrea Righi + * + * This software may be used and distributed according to the terms of the GNU + * General Public License version 2. + */ +#ifndef __INTF_H +#define __INTF_H + +#include + +#define MAX(x, y) ((x) > (y) ? (x) : (y)) +#define MIN(x, y) ((x) < (y) ? (x) : (y)) +#define CLAMP(val, lo, hi) MIN(MAX(val, lo), hi) +#define ARRAY_SIZE(x) (sizeof(x) / sizeof((x)[0])) + +enum consts { + NSEC_PER_USEC = 1000ULL, + NSEC_PER_MSEC = (1000ULL * NSEC_PER_USEC), + NSEC_PER_SEC = (1000ULL * NSEC_PER_MSEC), +}; + +#ifndef __VMLINUX_H__ +typedef unsigned char u8; +typedef unsigned short u16; +typedef unsigned int u32; +typedef unsigned long u64; + +typedef signed char s8; +typedef signed short s16; +typedef signed int s32; +typedef signed long s64; + +typedef int pid_t; +#endif /* __VMLINUX_H__ */ + +struct domain_arg { + s32 cpu_id; + s32 sibling_cpu_id; +}; + +#endif /* __INTF_H */ diff --git a/scheds/rust/scx_flash/src/bpf/main.bpf.c b/scheds/rust/scx_flash/src/bpf/main.bpf.c new file mode 100644 index 00000000..7ec6d11e --- /dev/null +++ b/scheds/rust/scx_flash/src/bpf/main.bpf.c @@ -0,0 +1,1085 @@ +/* SPDX-License-Identifier: GPL-2.0 */ +/* + * Copyright (c) 2024 Andrea Righi + */ +#include +#include "intf.h" + +char _license[] SEC("license") = "GPL"; + +extern unsigned CONFIG_HZ __kconfig; + +/* + * Maximum task weight. + */ +#define MAX_TASK_WEIGHT 10000 + +/* + * Maximum amount of voluntary context switches (this limit allows to prevent + * spikes or abuse of the nvcsw dynamic). + */ +#define MAX_AVG_NVCSW 128 + +/* + * Global DSQ used to dispatch tasks. + */ +#define SHARED_DSQ 0 + +/* + * Minimum time slice that can be assigned to a task (in ns). + */ +#define SLICE_MIN (NSEC_PER_SEC / CONFIG_HZ) + +/* + * Task time slice range. + */ +const volatile u64 slice_max = 20ULL * NSEC_PER_MSEC; +const volatile u64 slice_lag = 20ULL * NSEC_PER_MSEC; + +/* + * When enabled always dispatch all kthreads directly. + * + * This allows to prioritize critical kernel threads that may potentially slow + * down the entire system if they are blocked for too long, but it may also + * introduce interactivity issues or unfairness in scenarios with high kthread + * activity, such as heavy I/O or network traffic. + */ +const volatile bool local_kthreads; + +/* + * Scheduling statistics. + */ +volatile u64 nr_kthread_dispatches, nr_direct_dispatches, nr_shared_dispatches; + +/* + * Exit information. + */ +UEI_DEFINE(uei); + +/* + * CPUs in the system have SMT is enabled. + */ +const volatile bool smt_enabled = true; + +/* + * Current global vruntime. + */ +static u64 vtime_now; + +/* + * Per-CPU context. + */ +struct cpu_ctx { + struct bpf_cpumask __kptr *llc_mask; +}; + +struct { + __uint(type, BPF_MAP_TYPE_PERCPU_ARRAY); + __type(key, u32); + __type(value, struct cpu_ctx); + __uint(max_entries, 1); +} cpu_ctx_stor SEC(".maps"); + +/* + * Return a CPU context. + */ +struct cpu_ctx *try_lookup_cpu_ctx(s32 cpu) +{ + const u32 idx = 0; + return bpf_map_lookup_percpu_elem(&cpu_ctx_stor, &idx, cpu); +} + +/* + * Per-task local storage. + * + * This contain all the per-task information used internally by the BPF code. + */ +struct task_ctx { + /* + * Temporary cpumask for calculating scheduling domains. + */ + struct bpf_cpumask __kptr *llc_mask; + + /* + * Voluntary context switches metrics. + */ + u64 nvcsw; + u64 nvcsw_ts; + u64 avg_nvcsw; + + /* + * Task's average used time slice. + */ + u64 avg_runtime; + u64 sum_runtime; + u64 last_run_at; + + /* + * Task's deadline. + */ + u64 deadline; + + /* + * Task is holding a lock. + */ + bool lock_boost; +}; + +/* Map that contains task-local storage. */ +struct { + __uint(type, BPF_MAP_TYPE_TASK_STORAGE); + __uint(map_flags, BPF_F_NO_PREALLOC); + __type(key, int); + __type(value, struct task_ctx); +} task_ctx_stor SEC(".maps"); + +/* + * Return a local task context from a generic task. + */ +struct task_ctx *try_lookup_task_ctx(const struct task_struct *p) +{ + return bpf_task_storage_get(&task_ctx_stor, + (struct task_struct *)p, 0, 0); +} + +/* + * User-space locking detection: re-using a logic similar to scx_lavd. + */ +struct futex_vector; +struct hrtimer_sleeper; +struct file; + +static bool is_task_locked(struct task_struct *p) +{ + struct task_ctx *tctx; + + tctx = try_lookup_task_ctx(p); + return tctx ? tctx->lock_boost : false; +} + +static void task_lock(struct task_struct *p) +{ + struct task_ctx *tctx; + + tctx = try_lookup_task_ctx(p); + if (tctx) + tctx->lock_boost = true; +} + +static void task_unlock(struct task_struct *p) +{ + struct task_ctx *tctx; + + tctx = try_lookup_task_ctx(p); + if (tctx) + tctx->lock_boost = false; +} + +SEC("fexit/__futex_wait") +int BPF_PROG(fexit___futex_wait, u32 *uaddr, unsigned int flags, u32 val, + struct hrtimer_sleeper *to, u32 bitset, int ret) +{ + if (ret == 0) { + struct task_struct *p = (void *)bpf_get_current_task_btf(); + task_lock(p); + } + return 0; +} + +SEC("fexit/futex_wait_multiple") +int BPF_PROG(fexit_futex_wait_multiple, struct futex_vector *vs, + unsigned int count, struct hrtimer_sleeper *to, int ret) +{ + if (ret == 0) { + struct task_struct *p = (void *)bpf_get_current_task_btf(); + task_lock(p); + } + return 0; +} + +SEC("fexit/futex_wait_requeue_pi") +int BPF_PROG(fexit_futex_wait_requeue_pi, u32 *uaddr, unsigned int flags, + u32 val, ktime_t *abs_time, u32 bitset, u32 *uaddr2, int ret) +{ + if (ret == 0) { + struct task_struct *p = (void *)bpf_get_current_task_btf(); + task_lock(p); + } + return 0; +} + +SEC("fexit/futex_lock_pi") +int BPF_PROG(fexit_futex_lock_pi, u32 *uaddr, unsigned int flags, + ktime_t *time, int trylock, int ret) +{ + if (ret == 0) { + struct task_struct *p = (void *)bpf_get_current_task_btf(); + task_lock(p); + } + return 0; +} + +SEC("fexit/futex_wake") +int BPF_PROG(fexit_futex_wake, u32 *uaddr, unsigned int flags, + int nr_wake, u32 bitset, int ret) +{ + if (ret == 0) { + struct task_struct *p = (void *)bpf_get_current_task_btf(); + task_unlock(p); + } + return 0; +} + +SEC("fexit/futex_wake_op") +int BPF_PROG(fexit_futex_wake_op, u32 *uaddr1, unsigned int flags, + u32 *uaddr2, int nr_wake, int nr_wake2, int op, int ret) +{ + if (ret == 0) { + struct task_struct *p = (void *)bpf_get_current_task_btf(); + task_unlock(p); + } + return 0; +} + +SEC("fexit/futex_unlock_pi") +int BPF_PROG(fexit_futex_unlock_pi, u32 *uaddr, unsigned int flags, int ret) +{ + if (ret == 0) { + struct task_struct *p = (void *)bpf_get_current_task_btf(); + task_unlock(p); + } + return 0; +} + +/* + * Prevent excessive prioritization of tasks performing massive fsync() + * operations on the filesystem. These tasks can degrade system responsiveness + * by not being inherently latency-sensitive. + */ +SEC("kprobe/vfs_fsync_range") +int BPF_PROG(fexit_vfs_fsync_range, struct file *file, u64 start, u64 end, int datasync) +{ + struct task_struct *p = (void *)bpf_get_current_task_btf(); + struct task_ctx *tctx; + + tctx = try_lookup_task_ctx(p); + if (tctx) + tctx->avg_nvcsw = 0; +} + +/* + * Allocate/re-allocate a new cpumask. + */ +static int calloc_cpumask(struct bpf_cpumask **p_cpumask) +{ + struct bpf_cpumask *cpumask; + + cpumask = bpf_cpumask_create(); + if (!cpumask) + return -ENOMEM; + + cpumask = bpf_kptr_xchg(p_cpumask, cpumask); + if (cpumask) + bpf_cpumask_release(cpumask); + + return 0; +} + +/* + * Exponential weighted moving average (EWMA). + * + * Copied from scx_lavd. Returns the new average as: + * + * new_avg := (old_avg * .75) + (new_val * .25); + */ +static u64 calc_avg(u64 old_val, u64 new_val) +{ + return (old_val - (old_val >> 2)) + (new_val >> 2); +} + +/* + * Evaluate the EWMA limited to the range [low ... high] + */ +static u64 calc_avg_clamp(u64 old_val, u64 new_val, u64 low, u64 high) +{ + return CLAMP(calc_avg(old_val, new_val), low, high); +} + +/* + * Compare two vruntime values, returns true if the first value is less than + * the second one. + * + * Copied from scx_simple. + */ +static inline bool vtime_before(u64 a, u64 b) +{ + return (s64)(a - b) < 0; +} + +/* + * Return true if the target task @p is a kernel thread, false instead. + */ +static inline bool is_kthread(const struct task_struct *p) +{ + return p->flags & PF_KTHREAD; +} + +/* + * Return the amount of tasks that are waiting to run. + */ +static inline u64 nr_tasks_waiting(void) +{ + return scx_bpf_dsq_nr_queued(SHARED_DSQ) + 1; +} + +/* + * Return task's weight. + */ +static u64 task_weight(const struct task_struct *p, const struct task_ctx *tctx) +{ + if (tctx->lock_boost) + return MAX_TASK_WEIGHT; + + return p->scx.weight; +} + +/* + * Return a value proportionally scaled to the task's priority. + */ +static u64 scale_up_fair(const struct task_struct *p, + const struct task_ctx *tctx, u64 value) +{ + return value * task_weight(p, tctx) / 100; +} + +/* + * Return a value inversely proportional to the task's priority. + */ +static u64 scale_inverse_fair(const struct task_struct *p, + const struct task_ctx *tctx, u64 value) +{ + return value * 100 / task_weight(p, tctx); +} + +/* + * Return the task's allowed lag: used to determine how early its vruntime can + * be. + */ +static u64 task_lag(const struct task_struct *p, const struct task_ctx *tctx) +{ + return scale_up_fair(p, tctx, slice_lag); +} + +/* + * ** Taken directly from fair.c in the Linux kernel ** + * + * The "10% effect" is relative and cumulative: from _any_ nice level, + * if you go up 1 level, it's -10% CPU usage, if you go down 1 level + * it's +10% CPU usage. (to achieve that we use a multiplier of 1.25. + * If a task goes up by ~10% and another task goes down by ~10% then + * the relative distance between them is ~25%.) + */ +const int sched_prio_to_weight[40] = { + /* -20 */ 88761, 71755, 56483, 46273, 36291, + /* -15 */ 29154, 23254, 18705, 14949, 11916, + /* -10 */ 9548, 7620, 6100, 4904, 3906, + /* -5 */ 3121, 2501, 1991, 1586, 1277, + /* 0 */ 1024, 820, 655, 526, 423, + /* 5 */ 335, 272, 215, 172, 137, + /* 10 */ 110, 87, 70, 56, 45, + /* 15 */ 36, 29, 23, 18, 15, +}; + +static u64 max_sched_prio(void) +{ + return ARRAY_SIZE(sched_prio_to_weight); +} + +/* + * Convert task priority to weight (following fair.c logic). + */ +static u64 sched_prio_to_latency_weight(u64 prio) +{ + u64 max_prio = max_sched_prio(); + + if (prio >= max_prio) { + scx_bpf_error("invalid priority"); + return 0; + } + + return sched_prio_to_weight[max_prio - prio - 1]; +} + +/* + * Evaluate task's deadline. + * + * Reuse a logic similar to scx_rusty or scx_lavd and evaluate the deadline as + * a function of the waiting and wake-up events and the average task's runtime. + */ +static u64 task_deadline(struct task_struct *p, struct task_ctx *tctx) +{ + u64 avg_run_scaled, lat_prio, lat_weight; + + /* + * Evaluate the "latency priority" as a function of the average amount + * of context switches and the expected task runtime, using the + * following formula: + * + * lat_prio = avg_nvcsw - log2(avg_run_scaled) + * + * The avg_run_scaled component is used to scale the latency priority + * proportionally to the task's weight and inversely proportional to + * its runtime, so that a task with a higher weight / shorter runtime + * gets a higher latency priority than a task with a lower weight / + * higher runtime. + * + * The log2() on the average runtime ensures that the runtime metric is + * more proportional and comparable to the average rate of voluntary + * context switches. + */ + avg_run_scaled = scale_inverse_fair(p, tctx, tctx->avg_runtime); + avg_run_scaled = log2_u64(avg_run_scaled + 1); + + lat_prio = scale_up_fair(p, tctx, tctx->avg_nvcsw); + if (lat_prio > avg_run_scaled) + lat_prio -= avg_run_scaled; + else + lat_prio = 0; + + lat_prio = MIN(lat_prio, max_sched_prio() - 1); + + /* + * Lastly, translate the latency priority into a weight and apply it to + * the task's average runtime to determine the task's deadline. + */ + lat_weight = sched_prio_to_latency_weight(lat_prio); + + return tctx->avg_runtime * 1024 / lat_weight; +} + +/* + * Return task's evaluated deadline applied to its vruntime. + */ +static u64 task_vtime(struct task_struct *p, struct task_ctx *tctx) +{ + u64 min_vruntime = vtime_now - task_lag(p, tctx); + + /* + * Limit the vruntime to to avoid excessively penalizing tasks. + */ + if (vtime_before(p->scx.dsq_vtime, min_vruntime)) { + p->scx.dsq_vtime = min_vruntime; + tctx->deadline = p->scx.dsq_vtime + task_deadline(p, tctx); + } + + return tctx->deadline; +} + +/* + * Evaluate task's time slice in function of the total amount of tasks that are + * waiting to be dispatched and the task's weight. + */ +static void task_refill_slice(struct task_struct *p) +{ + struct task_ctx *tctx; + u64 slice; + + tctx = try_lookup_task_ctx(p); + if (!tctx) + return; + + /* + * Assign a time slice proportional to the task weight and inversely + * proportional to the total amount of tasks that are waiting to be + * scheduled. + */ + slice = scale_up_fair(p, tctx, slice_max / nr_tasks_waiting()); + p->scx.slice = CLAMP(slice, SLICE_MIN, slice_max); +} + +static void task_set_domain(struct task_struct *p, s32 cpu, + const struct cpumask *cpumask) +{ + const struct cpumask *llc_domain; + struct bpf_cpumask *llc_mask; + struct task_ctx *tctx; + struct cpu_ctx *cctx; + + tctx = try_lookup_task_ctx(p); + if (!tctx) + return; + + cctx = try_lookup_cpu_ctx(cpu); + if (!cctx) + return; + + llc_domain = cast_mask(cctx->llc_mask); + if (!llc_domain) + llc_domain = p->cpus_ptr; + + llc_mask = tctx->llc_mask; + if (!llc_mask) { + scx_bpf_error("LLC cpumask not initialized"); + return; + } + + /* + * Determine the LLC cache domain as the intersection of the task's + * primary cpumask and the LLC cache domain mask of the previously used + * CPU. + */ + bpf_cpumask_and(llc_mask, p->cpus_ptr, llc_domain); +} + +/* + * Find an idle CPU in the system. + * + * NOTE: the idle CPU selection doesn't need to be formally perfect, it is + * totally fine to accept racy conditions and potentially make mistakes, by + * picking CPUs that are not idle or even offline, the logic has been designed + * to handle these mistakes in favor of a more efficient response and a reduced + * scheduling overhead. + */ +static s32 pick_idle_cpu(struct task_struct *p, s32 prev_cpu, u64 wake_flags, bool *is_idle) +{ + const struct cpumask *idle_smtmask, *idle_cpumask; + const struct cpumask *llc_mask; + struct task_ctx *tctx; + bool is_prev_llc_affine = false; + s32 cpu; + + *is_idle = false; + + tctx = try_lookup_task_ctx(p); + if (!tctx) + return -ENOENT; + + /* + * Acquire the CPU masks to determine the idle CPUs in the system. + */ + idle_smtmask = scx_bpf_get_idle_smtmask(); + idle_cpumask = scx_bpf_get_idle_cpumask(); + + /* + * Task's scheduling domains. + */ + llc_mask = cast_mask(tctx->llc_mask); + if (!llc_mask) { + scx_bpf_error("LLC cpumask not initialized"); + cpu = -EINVAL; + goto out_put_cpumask; + } + + /* + * Check if the previously used CPU is still in the LLC task domain. If + * not, we may want to move the task back to its original LLC domain. + */ + is_prev_llc_affine = bpf_cpumask_test_cpu(prev_cpu, llc_mask); + + /* + * If the current task is waking up another task and releasing the CPU + * (WAKE_SYNC), attempt to migrate the wakee on the same CPU as the + * waker. + */ + if (wake_flags & SCX_WAKE_SYNC) { + struct task_struct *current = (void *)bpf_get_current_task_btf(); + const struct cpumask *curr_llc_domain; + struct cpu_ctx *cctx; + bool share_llc, has_idle; + + /* + * Determine waker CPU scheduling domain. + */ + cpu = bpf_get_smp_processor_id(); + cctx = try_lookup_cpu_ctx(cpu); + if (!cctx) { + cpu = -EINVAL; + goto out_put_cpumask; + } + + curr_llc_domain = cast_mask(cctx->llc_mask); + if (!curr_llc_domain) + curr_llc_domain = p->cpus_ptr; + + /* + * If both the waker and wakee share the same LLC domain keep + * using the same CPU if possible. + */ + share_llc = bpf_cpumask_test_cpu(prev_cpu, curr_llc_domain); + if (share_llc && + scx_bpf_test_and_clear_cpu_idle(prev_cpu)) { + cpu = prev_cpu; + *is_idle = true; + goto out_put_cpumask; + } + + /* + * If the waker's LLC domain is not saturated attempt to migrate + * the wakee on the same CPU as the waker (since it's going to + * block and release the current CPU). + */ + has_idle = bpf_cpumask_intersects(curr_llc_domain, idle_cpumask); + if (has_idle && + bpf_cpumask_test_cpu(cpu, p->cpus_ptr) && + !(current->flags & PF_EXITING) && + scx_bpf_dsq_nr_queued(SCX_DSQ_LOCAL_ON | cpu) == 0) { + *is_idle = true; + goto out_put_cpumask; + } + } + + /* + * Find the best idle CPU, prioritizing full idle cores in SMT systems. + */ + if (smt_enabled) { + /* + * If the task can still run on the previously used CPU and + * it's a full-idle core, keep using it. + */ + if (is_prev_llc_affine && + bpf_cpumask_test_cpu(prev_cpu, idle_smtmask) && + scx_bpf_test_and_clear_cpu_idle(prev_cpu)) { + cpu = prev_cpu; + *is_idle = true; + goto out_put_cpumask; + } + + /* + * Search for any full-idle CPU in the primary domain that + * shares the same LLC domain. + */ + cpu = scx_bpf_pick_idle_cpu(llc_mask, SCX_PICK_IDLE_CORE); + if (cpu >= 0) { + *is_idle = true; + goto out_put_cpumask; + } + + /* + * Search for any other full-idle core in the task's domain. + */ + cpu = scx_bpf_pick_idle_cpu(p->cpus_ptr, SCX_PICK_IDLE_CORE); + if (cpu >= 0) { + *is_idle = true; + goto out_put_cpumask; + } + } + + /* + * If a full-idle core can't be found (or if this is not an SMT system) + * try to re-use the same CPU, even if it's not in a full-idle core. + */ + if (is_prev_llc_affine && + scx_bpf_test_and_clear_cpu_idle(prev_cpu)) { + cpu = prev_cpu; + *is_idle = true; + goto out_put_cpumask; + } + + /* + * Search for any idle CPU in the primary domain that shares the same + * LLC domain. + */ + cpu = scx_bpf_pick_idle_cpu(llc_mask, 0); + if (cpu >= 0) { + *is_idle = true; + goto out_put_cpumask; + } + + /* + * Search for any idle CPU in the task's scheduling domain. + */ + cpu = scx_bpf_pick_idle_cpu(p->cpus_ptr, 0); + if (cpu >= 0) { + *is_idle = true; + goto out_put_cpumask; + } + + /* + * We couldn't find any idle CPU, return the previous CPU if it is in + * the task's LLC domain, otherwise pick any other CPU in the LLC + * domain. + */ + if (is_prev_llc_affine) + cpu = prev_cpu; + else + cpu = scx_bpf_pick_any_cpu(llc_mask, 0); + +out_put_cpumask: + scx_bpf_put_cpumask(idle_cpumask); + scx_bpf_put_cpumask(idle_smtmask); + + /* + * If we couldn't find any CPU, or in case of error, return the + * previously used CPU. + */ + if (cpu < 0) + cpu = prev_cpu; + + return cpu; +} + +/* + * Pick a target CPU for a task which is being woken up. + * + * If a task is dispatched here, ops.enqueue() will be skipped: task will be + * dispatched directly to the CPU returned by this callback. + */ +s32 BPF_STRUCT_OPS(flash_select_cpu, struct task_struct *p, + s32 prev_cpu, u64 wake_flags) +{ + bool is_idle = false; + s32 cpu; + + cpu = pick_idle_cpu(p, prev_cpu, wake_flags, &is_idle); + if (is_idle) { + scx_bpf_dispatch(p, SCX_DSQ_LOCAL, SCX_SLICE_DFL, 0); + __sync_fetch_and_add(&nr_direct_dispatches, 1); + } + + return cpu; +} + +/* + * Wake up an idle CPU for task @p. + */ +static void kick_task_cpu(struct task_struct *p) +{ + s32 cpu = scx_bpf_task_cpu(p); + bool is_idle = false; + + /* + * If the task changed CPU affinity just try to kick any usable idle + * CPU. + */ + if (!bpf_cpumask_test_cpu(cpu, p->cpus_ptr)) { + cpu = scx_bpf_pick_idle_cpu(p->cpus_ptr, 0); + if (cpu >= 0) + scx_bpf_kick_cpu(cpu, 0); + return; + } + + /* + * For tasks that can run only on a single CPU, we can simply verify if + * their only allowed CPU is still idle. + */ + if (p->nr_cpus_allowed == 1 || p->migration_disabled) { + if (scx_bpf_test_and_clear_cpu_idle(cpu)) + scx_bpf_kick_cpu(cpu, 0); + return; + } + + /* + * Otherwise try to kick the best idle CPU for the task. + */ + cpu = pick_idle_cpu(p, cpu, 0, &is_idle); + if (is_idle) + scx_bpf_kick_cpu(cpu, 0); +} + +/* + * Dispatch all the other tasks that were not dispatched directly in + * select_cpu(). + */ +void BPF_STRUCT_OPS(flash_enqueue, struct task_struct *p, u64 enq_flags) +{ + struct task_ctx *tctx; + + tctx = try_lookup_task_ctx(p); + if (!tctx) + return; + + /* + * Per-CPU kthreads can be critical for system responsiveness, when + * local_kthreads is specified they are always dispatched directly + * before any other task. + */ + if (is_kthread(p) && (local_kthreads || p->nr_cpus_allowed == 1)) { + scx_bpf_dispatch(p, SCX_DSQ_LOCAL, SCX_SLICE_DFL, + enq_flags | SCX_ENQ_PREEMPT); + __sync_fetch_and_add(&nr_kthread_dispatches, 1); + return; + } + + /* + * Enqueue the task to the global DSQ. The task will be dispatched on + * the first CPU that becomes available. + */ + scx_bpf_dispatch_vtime(p, SHARED_DSQ, SCX_SLICE_DFL, + task_vtime(p, tctx), enq_flags); + __sync_fetch_and_add(&nr_shared_dispatches, 1); + + /* + * If there is an idle CPU available for the task, wake it up so it can + * consume the task immediately. + */ + kick_task_cpu(p); +} + +void BPF_STRUCT_OPS(flash_dispatch, s32 cpu, struct task_struct *prev) +{ + /* + * If the task can still run and it's holding a user-space lock, let it + * run for another round. + */ + if (prev && (prev->scx.flags & SCX_TASK_QUEUED) && + is_task_locked(prev)) { + task_unlock(prev); + task_refill_slice(prev); + return; + } + + /* + * Select a new task to run. + */ + if (scx_bpf_consume(SHARED_DSQ)) + return; + + /* + * If the current task expired its time slice and no other task wants + * to run, simply replenish its time slice and let it run for another + * round on the same CPU. + */ + if (prev && (prev->scx.flags & SCX_TASK_QUEUED)) + task_refill_slice(prev); +} + +void BPF_STRUCT_OPS(flash_running, struct task_struct *p) +{ + struct task_ctx *tctx; + + /* + * Refresh task's time slice immediately before it starts to run on its + * assigned CPU. + */ + task_refill_slice(p); + + tctx = try_lookup_task_ctx(p); + if (!tctx) + return; + tctx->last_run_at = bpf_ktime_get_ns(); + + /* + * Update global vruntime. + */ + if (vtime_before(vtime_now, p->scx.dsq_vtime)) + vtime_now = p->scx.dsq_vtime; +} + +void BPF_STRUCT_OPS(flash_stopping, struct task_struct *p, bool runnable) +{ + u64 now = bpf_ktime_get_ns(), slice; + s64 delta_t; + struct task_ctx *tctx; + + tctx = try_lookup_task_ctx(p); + if (!tctx) + return; + + /* + * If the time slice is not fully depleted, it means that the task + * voluntarily relased the CPU, therefore update the voluntary context + * switch counter. + * + * NOTE: the sched_ext core implements sched_yield() by setting the + * time slice to 0, so we won't boost the priority of tasks that are + * explicitly calling sched_yield(). + * + * This is actually a good thing, because we want to prioritize tasks + * that are releasing the CPU, because they're doing I/O, waiting for + * input or sending output to other tasks. + * + * Tasks that are using sched_yield() don't really need the priority + * boost and when they get the chance to run again they will be + * naturally prioritized by the vruntime-based scheduling policy. + */ + if (p->scx.slice > 0) + tctx->nvcsw++; + + /* + * Update task's average runtime. + */ + slice = now - tctx->last_run_at; + tctx->sum_runtime += slice; + tctx->avg_runtime = calc_avg(tctx->avg_runtime, tctx->sum_runtime); + + /* + * Update task vruntime charging the weighted used time slice. + */ + p->scx.dsq_vtime += scale_inverse_fair(p, tctx, slice); + tctx->deadline = p->scx.dsq_vtime + task_deadline(p, tctx); + + /* + * Refresh voluntary context switch metrics. + * + * Evaluate the average number of voluntary context switches per second + * using an exponentially weighted moving average, see calc_avg(). + */ + delta_t = (s64)(now - tctx->nvcsw_ts); + if (delta_t > NSEC_PER_SEC) { + u64 avg_nvcsw = tctx->nvcsw * NSEC_PER_SEC / delta_t; + + tctx->nvcsw = 0; + tctx->nvcsw_ts = now; + + /* + * Evaluate the latency weight of the task as its average rate + * of voluntary context switches (limited to to prevent + * excessive spikes). + */ + tctx->avg_nvcsw = calc_avg_clamp(tctx->avg_nvcsw, avg_nvcsw, + 0, MAX_AVG_NVCSW); + } +} + +void BPF_STRUCT_OPS(flash_runnable, struct task_struct *p, u64 enq_flags) +{ + struct task_ctx *tctx; + + tctx = try_lookup_task_ctx(p); + if (!tctx) + return; + tctx->sum_runtime = 0; +} + +void BPF_STRUCT_OPS(flash_set_cpumask, struct task_struct *p, + const struct cpumask *cpumask) +{ + s32 cpu = bpf_get_smp_processor_id(); + + task_set_domain(p, cpu, cpumask); +} + +void BPF_STRUCT_OPS(flash_enable, struct task_struct *p) +{ + u64 now = bpf_ktime_get_ns(); + struct task_ctx *tctx; + + p->scx.dsq_vtime = vtime_now; + + tctx = try_lookup_task_ctx(p); + if (!tctx) { + scx_bpf_error("incorrectly initialized task: %d (%s)", + p->pid, p->comm); + return; + } + /* + * Assume new tasks will use the minimum allowed time slice. + */ + tctx->avg_runtime = SLICE_MIN; + tctx->nvcsw_ts = now; + tctx->deadline = p->scx.dsq_vtime + task_deadline(p, tctx); +} + +s32 BPF_STRUCT_OPS(flash_init_task, struct task_struct *p, + struct scx_init_task_args *args) +{ + s32 cpu = bpf_get_smp_processor_id(); + struct task_ctx *tctx; + struct bpf_cpumask *cpumask; + + tctx = bpf_task_storage_get(&task_ctx_stor, p, 0, + BPF_LOCAL_STORAGE_GET_F_CREATE); + if (!tctx) + return -ENOMEM; + /* + * Create task's LLC cpumask. + */ + cpumask = bpf_cpumask_create(); + if (!cpumask) + return -ENOMEM; + cpumask = bpf_kptr_xchg(&tctx->llc_mask, cpumask); + if (cpumask) + bpf_cpumask_release(cpumask); + + task_set_domain(p, cpu, p->cpus_ptr); + + return 0; +} + +static int init_cpumask(struct bpf_cpumask **cpumask) +{ + struct bpf_cpumask *mask; + int err = 0; + + /* + * Do nothing if the mask is already initialized. + */ + mask = *cpumask; + if (mask) + return 0; + /* + * Create the CPU mask. + */ + err = calloc_cpumask(cpumask); + if (!err) + mask = *cpumask; + if (!mask) + err = -ENOMEM; + + return err; +} + +SEC("syscall") +int enable_sibling_cpu(struct domain_arg *input) +{ + struct cpu_ctx *cctx; + struct bpf_cpumask *mask, **pmask; + int err = 0; + + cctx = try_lookup_cpu_ctx(input->cpu_id); + if (!cctx) + return -ENOENT; + + /* Make sure the target CPU mask is initialized */ + pmask = &cctx->llc_mask; + err = init_cpumask(pmask); + if (err) + return err; + + bpf_rcu_read_lock(); + mask = *pmask; + if (mask) + bpf_cpumask_set_cpu(input->sibling_cpu_id, mask); + bpf_rcu_read_unlock(); + + return err; +} + +s32 BPF_STRUCT_OPS_SLEEPABLE(flash_init) +{ + int err; + + /* + * Create the shared DSQ. + * + * Allocate the new DSQ id to not clash with any valid CPU id. + */ + err = scx_bpf_create_dsq(SHARED_DSQ, -1); + if (err) { + scx_bpf_error("failed to create shared DSQ: %d", err); + return err; + } + + return 0; +} + +void BPF_STRUCT_OPS(flash_exit, struct scx_exit_info *ei) +{ + UEI_RECORD(uei, ei); +} + +SCX_OPS_DEFINE(flash_ops, + .select_cpu = (void *)flash_select_cpu, + .enqueue = (void *)flash_enqueue, + .dispatch = (void *)flash_dispatch, + .running = (void *)flash_running, + .stopping = (void *)flash_stopping, + .runnable = (void *)flash_runnable, + .set_cpumask = (void *)flash_set_cpumask, + .enable = (void *)flash_enable, + .init_task = (void *)flash_init_task, + .init = (void *)flash_init, + .exit = (void *)flash_exit, + .flags = SCX_OPS_ENQ_EXITING, + .timeout_ms = 5000U, + .name = "flash"); diff --git a/scheds/rust/scx_flash/src/bpf_intf.rs b/scheds/rust/scx_flash/src/bpf_intf.rs new file mode 100644 index 00000000..cf92b55e --- /dev/null +++ b/scheds/rust/scx_flash/src/bpf_intf.rs @@ -0,0 +1,12 @@ +// SPDX-License-Identifier: GPL-2.0 +// +// Copyright (c) 2024 Andrea Righi + +// This software may be used and distributed according to the terms of the +// GNU General Public License version 2. +#![allow(non_upper_case_globals)] +#![allow(non_camel_case_types)] +#![allow(non_snake_case)] +#![allow(dead_code)] + +include!(concat!(env!("OUT_DIR"), "/bpf_intf.rs")); diff --git a/scheds/rust/scx_flash/src/bpf_skel.rs b/scheds/rust/scx_flash/src/bpf_skel.rs new file mode 100644 index 00000000..25e4de0e --- /dev/null +++ b/scheds/rust/scx_flash/src/bpf_skel.rs @@ -0,0 +1,8 @@ +// SPDX-License-Identifier: GPL-2.0 +// +// Copyright (c) 2024 Andrea Righi + +// This software may be used and distributed according to the terms of the +// GNU General Public License version 2. + +include!(concat!(env!("OUT_DIR"), "/bpf_skel.rs")); diff --git a/scheds/rust/scx_flash/src/main.rs b/scheds/rust/scx_flash/src/main.rs new file mode 100644 index 00000000..2a0d7ae8 --- /dev/null +++ b/scheds/rust/scx_flash/src/main.rs @@ -0,0 +1,334 @@ +// SPDX-License-Identifier: GPL-2.0 +// +// Copyright (c) 2024 Andrea Righi + +// This software may be used and distributed according to the terms of the +// GNU General Public License version 2. + +mod bpf_skel; +pub use bpf_skel::*; +pub mod bpf_intf; +pub use bpf_intf::*; + +mod stats; +use std::collections::HashMap; +use std::ffi::c_int; +use std::fs::File; +use std::io::Read; +use std::mem::MaybeUninit; +use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering; +use std::sync::Arc; +use std::time::Duration; + +use anyhow::bail; +use anyhow::Context; +use anyhow::Result; +use clap::Parser; +use crossbeam::channel::RecvTimeoutError; +use libbpf_rs::skel::OpenSkel; +use libbpf_rs::skel::Skel; +use libbpf_rs::skel::SkelBuilder; +use libbpf_rs::OpenObject; +use libbpf_rs::ProgramInput; +use log::info; +use log::warn; +use scx_stats::prelude::*; +use scx_utils::build_id; +use scx_utils::import_enums; +use scx_utils::scx_enums; +use scx_utils::scx_ops_attach; +use scx_utils::scx_ops_load; +use scx_utils::scx_ops_open; +use scx_utils::set_rlimit_infinity; +use scx_utils::uei_exited; +use scx_utils::uei_report; +use scx_utils::Topology; +use scx_utils::UserExitInfo; +use stats::Metrics; + +const SCHEDULER_NAME: &'static str = "scx_flash"; + +#[derive(Debug, Parser)] +struct Opts { + /// Exit debug dump buffer length. 0 indicates default. + #[clap(long, default_value = "0")] + exit_dump_len: u32, + + /// Maximum scheduling slice duration in microseconds. + #[clap(short = 's', long, default_value = "20000")] + slice_us_max: u64, + + /// Maximum time slice lag in microseconds. + /// + /// Increasing this value can help to enhance the responsiveness of interactive tasks, but it + /// can also make performance more "spikey". + #[clap(short = 'l', long, default_value = "20000")] + slice_us_lag: u64, + + /// Enable kthreads prioritization. + /// + /// Enabling this can improve system performance, but it may also introduce interactivity + /// issues or unfairness in scenarios with high kthread activity, such as heavy I/O or network + /// traffic. + #[clap(short = 'k', long, action = clap::ArgAction::SetTrue)] + local_kthreads: bool, + + /// Enable stats monitoring with the specified interval. + #[clap(long)] + stats: Option, + + /// Run in stats monitoring mode with the specified interval. Scheduler + /// is not launched. + #[clap(long)] + monitor: Option, + + /// Enable verbose output, including libbpf details. + #[clap(short = 'v', long, action = clap::ArgAction::SetTrue)] + verbose: bool, + + /// Print scheduler version and exit. + #[clap(short = 'V', long, action = clap::ArgAction::SetTrue)] + version: bool, + + /// Show descriptions for statistics. + #[clap(long)] + help_stats: bool, +} + +fn is_smt_active() -> std::io::Result { + let mut file = File::open("/sys/devices/system/cpu/smt/active")?; + let mut contents = String::new(); + file.read_to_string(&mut contents)?; + + let smt_active: i32 = contents.trim().parse().unwrap_or(0); + + Ok(smt_active) +} + +struct Scheduler<'a> { + skel: BpfSkel<'a>, + struct_ops: Option, + stats_server: StatsServer<(), Metrics>, +} + +impl<'a> Scheduler<'a> { + fn init(opts: &'a Opts, open_object: &'a mut MaybeUninit) -> Result { + set_rlimit_infinity(); + + // Check host topology to determine if we need to enable SMT capabilities. + let smt_enabled = match is_smt_active() { + Ok(value) => value == 1, + Err(e) => bail!("Failed to read SMT status: {}", e), + }; + info!( + "{} {} {}", + SCHEDULER_NAME, + *build_id::SCX_FULL_VERSION, + if smt_enabled { "SMT on" } else { "SMT off" } + ); + + // Initialize BPF connector. + let mut skel_builder = BpfSkelBuilder::default(); + skel_builder.obj_builder.debug(opts.verbose); + let mut skel = scx_ops_open!(skel_builder, open_object, flash_ops)?; + + skel.struct_ops.flash_ops_mut().exit_dump_len = opts.exit_dump_len; + + // Override default BPF scheduling parameters. + skel.maps.rodata_data.slice_max = opts.slice_us_max * 1000; + skel.maps.rodata_data.slice_lag = opts.slice_us_lag * 1000; + skel.maps.rodata_data.local_kthreads = opts.local_kthreads; + + skel.maps.rodata_data.smt_enabled = smt_enabled; + + // Load the BPF program for validation. + let mut skel = scx_ops_load!(skel, flash_ops, uei)?; + + // Initialize CPU topology. + let topo = Topology::new().unwrap(); + + // Initialize LLC domain. + Self::init_l3_cache_domains(&mut skel, &topo)?; + + // Attach the scheduler. + let struct_ops = Some(scx_ops_attach!(skel, flash_ops)?); + let stats_server = StatsServer::new(stats::server_data()).launch()?; + + Ok(Self { + skel, + struct_ops, + stats_server, + }) + } + + fn enable_sibling_cpu( + skel: &mut BpfSkel<'_>, + cpu: usize, + sibling_cpu: usize, + ) -> Result<(), u32> { + let prog = &mut skel.progs.enable_sibling_cpu; + let mut args = domain_arg { + cpu_id: cpu as c_int, + sibling_cpu_id: sibling_cpu as c_int, + }; + let input = ProgramInput { + context_in: Some(unsafe { + std::slice::from_raw_parts_mut( + &mut args as *mut _ as *mut u8, + std::mem::size_of_val(&args), + ) + }), + ..Default::default() + }; + let out = prog.test_run(input).unwrap(); + if out.return_value != 0 { + return Err(out.return_value); + } + + Ok(()) + } + + fn init_cache_domains( + skel: &mut BpfSkel<'_>, + topo: &Topology, + cache_lvl: usize, + enable_sibling_cpu_fn: &dyn Fn(&mut BpfSkel<'_>, usize, usize, usize) -> Result<(), u32>, + ) -> Result<(), std::io::Error> { + // Determine the list of CPU IDs associated to each cache node. + let mut cache_id_map: HashMap> = HashMap::new(); + for core in topo.cores().into_iter() { + for (cpu_id, cpu) in core.cpus() { + let cache_id = match cache_lvl { + 2 => cpu.l2_id(), + 3 => cpu.l3_id(), + _ => panic!("invalid cache level {}", cache_lvl), + }; + cache_id_map + .entry(cache_id) + .or_insert_with(Vec::new) + .push(*cpu_id); + } + } + + // Update the BPF cpumasks for the cache domains. + for (cache_id, cpus) in cache_id_map { + info!( + "L{} cache ID {}: sibling CPUs: {:?}", + cache_lvl, cache_id, cpus + ); + for cpu in &cpus { + for sibling_cpu in &cpus { + match enable_sibling_cpu_fn(skel, cache_lvl, *cpu, *sibling_cpu) { + Ok(()) => {} + Err(_) => { + warn!( + "L{} cache ID {}: failed to set CPU {} sibling {}", + cache_lvl, cache_id, *cpu, *sibling_cpu + ); + } + } + } + } + } + + Ok(()) + } + + fn init_l3_cache_domains( + skel: &mut BpfSkel<'_>, + topo: &Topology, + ) -> Result<(), std::io::Error> { + Self::init_cache_domains(skel, topo, 3, &|skel, _lvl, cpu, sibling_cpu| { + Self::enable_sibling_cpu(skel, cpu, sibling_cpu) + }) + } + + fn get_metrics(&self) -> Metrics { + Metrics { + nr_kthread_dispatches: self.skel.maps.bss_data.nr_kthread_dispatches, + nr_direct_dispatches: self.skel.maps.bss_data.nr_direct_dispatches, + nr_shared_dispatches: self.skel.maps.bss_data.nr_shared_dispatches, + } + } + + pub fn exited(&mut self) -> bool { + uei_exited!(&self.skel, uei) + } + + fn run(&mut self, shutdown: Arc) -> Result { + let (res_ch, req_ch) = self.stats_server.channels(); + while !shutdown.load(Ordering::Relaxed) && !self.exited() { + match req_ch.recv_timeout(Duration::from_secs(1)) { + Ok(()) => res_ch.send(self.get_metrics())?, + Err(RecvTimeoutError::Timeout) => {} + Err(e) => Err(e)?, + } + } + + self.struct_ops.take(); + uei_report!(&self.skel, uei) + } +} + +impl<'a> Drop for Scheduler<'a> { + fn drop(&mut self) { + info!("Unregister {} scheduler", SCHEDULER_NAME); + } +} + +fn main() -> Result<()> { + let opts = Opts::parse(); + + if opts.version { + println!("{} {}", SCHEDULER_NAME, *build_id::SCX_FULL_VERSION); + return Ok(()); + } + + if opts.help_stats { + stats::server_data().describe_meta(&mut std::io::stdout(), None)?; + return Ok(()); + } + + let loglevel = simplelog::LevelFilter::Info; + + let mut lcfg = simplelog::ConfigBuilder::new(); + lcfg.set_time_level(simplelog::LevelFilter::Error) + .set_location_level(simplelog::LevelFilter::Off) + .set_target_level(simplelog::LevelFilter::Off) + .set_thread_level(simplelog::LevelFilter::Off); + simplelog::TermLogger::init( + loglevel, + lcfg.build(), + simplelog::TerminalMode::Stderr, + simplelog::ColorChoice::Auto, + )?; + + let shutdown = Arc::new(AtomicBool::new(false)); + let shutdown_clone = shutdown.clone(); + ctrlc::set_handler(move || { + shutdown_clone.store(true, Ordering::Relaxed); + }) + .context("Error setting Ctrl-C handler")?; + + if let Some(intv) = opts.monitor.or(opts.stats) { + let shutdown_copy = shutdown.clone(); + let jh = std::thread::spawn(move || { + stats::monitor(Duration::from_secs_f64(intv), shutdown_copy).unwrap() + }); + if opts.monitor.is_some() { + let _ = jh.join(); + return Ok(()); + } + } + + let mut open_object = MaybeUninit::uninit(); + loop { + let mut sched = Scheduler::init(&opts, &mut open_object)?; + if !sched.run(shutdown.clone())?.should_restart() { + break; + } + } + + Ok(()) +} diff --git a/scheds/rust/scx_flash/src/stats.rs b/scheds/rust/scx_flash/src/stats.rs new file mode 100644 index 00000000..462a51c7 --- /dev/null +++ b/scheds/rust/scx_flash/src/stats.rs @@ -0,0 +1,82 @@ +// SPDX-License-Identifier: GPL-2.0 +// +// Copyright (c) 2024 Andrea Righi + +// This software may be used and distributed according to the terms of the +// GNU General Public License version 2. + +use std::io::Write; +use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering; +use std::sync::Arc; +use std::time::Duration; + +use anyhow::Result; +use scx_stats::prelude::*; +use scx_stats_derive::Stats; +use serde::Deserialize; +use serde::Serialize; + +#[derive(Clone, Debug, Default, Serialize, Deserialize, Stats)] +#[stat(top)] +pub struct Metrics { + #[stat(desc = "Number of kthread direct dispatches")] + pub nr_kthread_dispatches: u64, + #[stat(desc = "Number of task direct dispatches")] + pub nr_direct_dispatches: u64, + #[stat(desc = "Number of task global dispatches")] + pub nr_shared_dispatches: u64, +} + +impl Metrics { + fn format(&self, w: &mut W) -> Result<()> { + writeln!( + w, + "[{}] dispatch -> kthread: {:<5} direct: {:<5} shared: {:<5}", + crate::SCHEDULER_NAME, + self.nr_kthread_dispatches, + self.nr_direct_dispatches, + self.nr_shared_dispatches, + )?; + Ok(()) + } + + fn delta(&self, rhs: &Self) -> Self { + Self { + nr_kthread_dispatches: self.nr_kthread_dispatches - rhs.nr_kthread_dispatches, + nr_direct_dispatches: self.nr_direct_dispatches - rhs.nr_direct_dispatches, + nr_shared_dispatches: self.nr_shared_dispatches - rhs.nr_shared_dispatches, + ..self.clone() + } + } +} + +pub fn server_data() -> StatsServerData<(), Metrics> { + let open: Box> = Box::new(move |(req_ch, res_ch)| { + req_ch.send(())?; + let mut prev = res_ch.recv()?; + + let read: Box> = Box::new(move |_args, (req_ch, res_ch)| { + req_ch.send(())?; + let cur = res_ch.recv()?; + let delta = cur.delta(&prev); + prev = cur; + delta.to_json() + }); + + Ok(read) + }); + + StatsServerData::new() + .add_meta(Metrics::meta()) + .add_ops("top", StatsOps { open, close: None }) +} + +pub fn monitor(intv: Duration, shutdown: Arc) -> Result<()> { + scx_utils::monitor_stats::( + &vec![], + intv, + || shutdown.load(Ordering::Relaxed), + |metrics| metrics.format(&mut std::io::stdout()), + ) +} diff --git a/services/scx b/services/scx index 3aeefe0d..29b91e5f 100644 --- a/services/scx +++ b/services/scx @@ -1,4 +1,4 @@ -# List of scx_schedulers: scx_bpfland scx_central scx_lavd scx_layered scx_nest scx_qmap scx_rlfifo scx_rustland scx_rusty scx_simple scx_userland +# List of scx_schedulers: scx_bpfland scx_central scx_flash scx_lavd scx_layered scx_nest scx_qmap scx_rlfifo scx_rustland scx_rusty scx_simple scx_userland SCX_SCHEDULER=scx_bpfland # Set custom flags for each scheduler, below is an example of how to use