diff --git a/Cargo.lock b/Cargo.lock index 0a2cb0bc..c7828b7c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1700,6 +1700,23 @@ dependencies = [ "simplelog", ] +[[package]] +name = "scx_flash" +version = "1.0.4" +dependencies = [ + "anyhow", + "clap", + "crossbeam", + "ctrlc", + "libbpf-rs", + "log", + "scx_stats", + "scx_stats_derive", + "scx_utils", + "serde", + "simplelog", +] + [[package]] name = "scx_lavd" version = "1.0.6" diff --git a/Cargo.toml b/Cargo.toml index 577a3264..fa3ac50b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,7 @@ members = ["rust/scx_stats", "rust/scx_loader", "scheds/rust/scx_lavd", "scheds/rust/scx_bpfland", + "scheds/rust/scx_flash", "scheds/rust/scx_rustland", "scheds/rust/scx_rlfifo", "scheds/rust/scx_rusty", diff --git a/meson-scripts/stress_tests.ini b/meson-scripts/stress_tests.ini index e15717c1..7d35371b 100644 --- a/meson-scripts/stress_tests.ini +++ b/meson-scripts/stress_tests.ini @@ -22,6 +22,12 @@ sched_args: -v stress_cmd: stress-ng -t 14 --aggressive -M -c `nproc` -f `nproc` timeout_sec: 15 +[scx_flash] +sched: scx_flash +sched_args: +stress_cmd: stress-ng -t 14 --aggressive -M -c `nproc` -f `nproc` +timeout_sec: 15 + [scx_layered] sched: scx_layered sched_args: --run-example -v --stats 1 diff --git a/meson.build b/meson.build index cf5dd5ca..d8a338d0 100644 --- a/meson.build +++ b/meson.build @@ -327,7 +327,7 @@ if enable_rust run_target('fetch', command: [cargo_fetch, cargo], env: cargo_env) rust_scheds = ['scx_lavd', 'scx_bpfland', 'scx_rustland', 'scx_rlfifo', - 'scx_rusty', + 'scx_flash', 'scx_rusty', 'scx_layered', 'scx_mitosis'] rust_misc = ['scx_stats', 'scx_stats_derive', 'scx_utils', 'scx_rustland_core', diff --git a/scheds/rust/README.md b/scheds/rust/README.md index d6319a5a..d777c9a6 100644 --- a/scheds/rust/README.md +++ b/scheds/rust/README.md @@ -18,3 +18,4 @@ main.rs or \*.bpf.c files. - [scx_rlfifo](scx_rlfifo/README.md) - [scx_lavd](scx_lavd/README.md) - [scx_bpfland](scx_bpfland/README.md) +- [scx_flash](scx_flash/README.md) diff --git a/scheds/rust/scx_flash/Cargo.toml b/scheds/rust/scx_flash/Cargo.toml new file mode 100644 index 00000000..2aada9ec --- /dev/null +++ b/scheds/rust/scx_flash/Cargo.toml @@ -0,0 +1,26 @@ +[package] +name = "scx_flash" +version = "1.0.4" +authors = ["Andrea Righi "] +edition = "2021" +description = "A scheduler designed for multimedia and real-time audio processing workloads. https://github.com/sched-ext/scx/tree/main" +license = "GPL-2.0-only" + +[dependencies] +anyhow = "1.0.65" +ctrlc = { version = "3.1", features = ["termination"] } +clap = { version = "4.1", features = ["derive", "env", "unicode", "wrap_help"] } +crossbeam = "0.8.4" +libbpf-rs = "0.24.1" +log = "0.4.17" +scx_stats = { path = "../../../rust/scx_stats", version = "1.0.4" } +scx_stats_derive = { path = "../../../rust/scx_stats/scx_stats_derive", version = "1.0.4" } +scx_utils = { path = "../../../rust/scx_utils", version = "1.0.4" } +serde = { version = "1.0", features = ["derive"] } +simplelog = "0.12" + +[build-dependencies] +scx_utils = { path = "../../../rust/scx_utils", version = "1.0.4" } + +[features] +enable_backtrace = [] diff --git a/scheds/rust/scx_flash/LICENSE b/scheds/rust/scx_flash/LICENSE new file mode 120000 index 00000000..5853aaea --- /dev/null +++ b/scheds/rust/scx_flash/LICENSE @@ -0,0 +1 @@ +../../../LICENSE \ No newline at end of file diff --git a/scheds/rust/scx_flash/README.md b/scheds/rust/scx_flash/README.md new file mode 100644 index 00000000..752f4e97 --- /dev/null +++ b/scheds/rust/scx_flash/README.md @@ -0,0 +1,26 @@ +# scx_flash + +This is a single user-defined scheduler used within [sched_ext](https://github.com/sched-ext/scx/tree/main), which is a Linux kernel feature which enables implementing kernel thread schedulers in BPF and dynamically loading them. [Read more about sched_ext](https://github.com/sched-ext/scx/tree/main). + +## Overview + +A scheduler that focuses on ensuring fairness among tasks and performance +predictability. + +It operates using an earliest deadline first (EDF) policy, where each task is +assigned a "latency" weight. This weight is dynamically adjusted based on how +often a task release the CPU before its full time slice is used. Tasks that +release the CPU early are given a higher latency weight, prioritizing them over +tasks that fully consume their time slice. + +## Typical Use Case + +The combination of dynamic latency weights and EDF scheduling ensures +responsive and consistent performance, even in overcommitted systems. + +This makes the scheduler particularly well-suited for latency-sensitive +workloads, such as multimedia or real-time audio processing. + +## Production Ready? + +Yes. diff --git a/scheds/rust/scx_flash/build.rs b/scheds/rust/scx_flash/build.rs new file mode 100644 index 00000000..2bf0ec07 --- /dev/null +++ b/scheds/rust/scx_flash/build.rs @@ -0,0 +1,13 @@ +// Copyright (c) Andrea Righi +// +// This software may be used and distributed according to the terms of the +// GNU General Public License version 2. + +fn main() { + scx_utils::BpfBuilder::new() + .unwrap() + .enable_intf("src/bpf/intf.h", "bpf_intf.rs") + .enable_skel("src/bpf/main.bpf.c", "bpf") + .build() + .unwrap(); +} diff --git a/scheds/rust/scx_flash/rustfmt.toml b/scheds/rust/scx_flash/rustfmt.toml new file mode 100644 index 00000000..b7258ed0 --- /dev/null +++ b/scheds/rust/scx_flash/rustfmt.toml @@ -0,0 +1,8 @@ +# Get help on options with `rustfmt --help=config` +# Please keep these in alphabetical order. +edition = "2021" +group_imports = "StdExternalCrate" +imports_granularity = "Item" +merge_derives = false +use_field_init_shorthand = true +version = "Two" diff --git a/scheds/rust/scx_flash/src/bpf/intf.h b/scheds/rust/scx_flash/src/bpf/intf.h new file mode 100644 index 00000000..aeb06b2a --- /dev/null +++ b/scheds/rust/scx_flash/src/bpf/intf.h @@ -0,0 +1,43 @@ +/* SPDX-License-Identifier: GPL-2.0 */ +/* + * Copyright (c) 2024 Andrea Righi + * + * This software may be used and distributed according to the terms of the GNU + * General Public License version 2. + */ +#ifndef __INTF_H +#define __INTF_H + +#include + +#define MAX(x, y) ((x) > (y) ? (x) : (y)) +#define MIN(x, y) ((x) < (y) ? (x) : (y)) +#define CLAMP(val, lo, hi) MIN(MAX(val, lo), hi) +#define ARRAY_SIZE(x) (sizeof(x) / sizeof((x)[0])) + +enum consts { + NSEC_PER_USEC = 1000ULL, + NSEC_PER_MSEC = (1000ULL * NSEC_PER_USEC), + NSEC_PER_SEC = (1000ULL * NSEC_PER_MSEC), +}; + +#ifndef __VMLINUX_H__ +typedef unsigned char u8; +typedef unsigned short u16; +typedef unsigned int u32; +typedef unsigned long u64; + +typedef signed char s8; +typedef signed short s16; +typedef signed int s32; +typedef signed long s64; + +typedef int pid_t; +#endif /* __VMLINUX_H__ */ + +struct domain_arg { + s32 cpu_id; + s32 sibling_cpu_id; +}; + +#endif /* __INTF_H */ diff --git a/scheds/rust/scx_flash/src/bpf/main.bpf.c b/scheds/rust/scx_flash/src/bpf/main.bpf.c new file mode 100644 index 00000000..7ec6d11e --- /dev/null +++ b/scheds/rust/scx_flash/src/bpf/main.bpf.c @@ -0,0 +1,1085 @@ +/* SPDX-License-Identifier: GPL-2.0 */ +/* + * Copyright (c) 2024 Andrea Righi + */ +#include +#include "intf.h" + +char _license[] SEC("license") = "GPL"; + +extern unsigned CONFIG_HZ __kconfig; + +/* + * Maximum task weight. + */ +#define MAX_TASK_WEIGHT 10000 + +/* + * Maximum amount of voluntary context switches (this limit allows to prevent + * spikes or abuse of the nvcsw dynamic). + */ +#define MAX_AVG_NVCSW 128 + +/* + * Global DSQ used to dispatch tasks. + */ +#define SHARED_DSQ 0 + +/* + * Minimum time slice that can be assigned to a task (in ns). + */ +#define SLICE_MIN (NSEC_PER_SEC / CONFIG_HZ) + +/* + * Task time slice range. + */ +const volatile u64 slice_max = 20ULL * NSEC_PER_MSEC; +const volatile u64 slice_lag = 20ULL * NSEC_PER_MSEC; + +/* + * When enabled always dispatch all kthreads directly. + * + * This allows to prioritize critical kernel threads that may potentially slow + * down the entire system if they are blocked for too long, but it may also + * introduce interactivity issues or unfairness in scenarios with high kthread + * activity, such as heavy I/O or network traffic. + */ +const volatile bool local_kthreads; + +/* + * Scheduling statistics. + */ +volatile u64 nr_kthread_dispatches, nr_direct_dispatches, nr_shared_dispatches; + +/* + * Exit information. + */ +UEI_DEFINE(uei); + +/* + * CPUs in the system have SMT is enabled. + */ +const volatile bool smt_enabled = true; + +/* + * Current global vruntime. + */ +static u64 vtime_now; + +/* + * Per-CPU context. + */ +struct cpu_ctx { + struct bpf_cpumask __kptr *llc_mask; +}; + +struct { + __uint(type, BPF_MAP_TYPE_PERCPU_ARRAY); + __type(key, u32); + __type(value, struct cpu_ctx); + __uint(max_entries, 1); +} cpu_ctx_stor SEC(".maps"); + +/* + * Return a CPU context. + */ +struct cpu_ctx *try_lookup_cpu_ctx(s32 cpu) +{ + const u32 idx = 0; + return bpf_map_lookup_percpu_elem(&cpu_ctx_stor, &idx, cpu); +} + +/* + * Per-task local storage. + * + * This contain all the per-task information used internally by the BPF code. + */ +struct task_ctx { + /* + * Temporary cpumask for calculating scheduling domains. + */ + struct bpf_cpumask __kptr *llc_mask; + + /* + * Voluntary context switches metrics. + */ + u64 nvcsw; + u64 nvcsw_ts; + u64 avg_nvcsw; + + /* + * Task's average used time slice. + */ + u64 avg_runtime; + u64 sum_runtime; + u64 last_run_at; + + /* + * Task's deadline. + */ + u64 deadline; + + /* + * Task is holding a lock. + */ + bool lock_boost; +}; + +/* Map that contains task-local storage. */ +struct { + __uint(type, BPF_MAP_TYPE_TASK_STORAGE); + __uint(map_flags, BPF_F_NO_PREALLOC); + __type(key, int); + __type(value, struct task_ctx); +} task_ctx_stor SEC(".maps"); + +/* + * Return a local task context from a generic task. + */ +struct task_ctx *try_lookup_task_ctx(const struct task_struct *p) +{ + return bpf_task_storage_get(&task_ctx_stor, + (struct task_struct *)p, 0, 0); +} + +/* + * User-space locking detection: re-using a logic similar to scx_lavd. + */ +struct futex_vector; +struct hrtimer_sleeper; +struct file; + +static bool is_task_locked(struct task_struct *p) +{ + struct task_ctx *tctx; + + tctx = try_lookup_task_ctx(p); + return tctx ? tctx->lock_boost : false; +} + +static void task_lock(struct task_struct *p) +{ + struct task_ctx *tctx; + + tctx = try_lookup_task_ctx(p); + if (tctx) + tctx->lock_boost = true; +} + +static void task_unlock(struct task_struct *p) +{ + struct task_ctx *tctx; + + tctx = try_lookup_task_ctx(p); + if (tctx) + tctx->lock_boost = false; +} + +SEC("fexit/__futex_wait") +int BPF_PROG(fexit___futex_wait, u32 *uaddr, unsigned int flags, u32 val, + struct hrtimer_sleeper *to, u32 bitset, int ret) +{ + if (ret == 0) { + struct task_struct *p = (void *)bpf_get_current_task_btf(); + task_lock(p); + } + return 0; +} + +SEC("fexit/futex_wait_multiple") +int BPF_PROG(fexit_futex_wait_multiple, struct futex_vector *vs, + unsigned int count, struct hrtimer_sleeper *to, int ret) +{ + if (ret == 0) { + struct task_struct *p = (void *)bpf_get_current_task_btf(); + task_lock(p); + } + return 0; +} + +SEC("fexit/futex_wait_requeue_pi") +int BPF_PROG(fexit_futex_wait_requeue_pi, u32 *uaddr, unsigned int flags, + u32 val, ktime_t *abs_time, u32 bitset, u32 *uaddr2, int ret) +{ + if (ret == 0) { + struct task_struct *p = (void *)bpf_get_current_task_btf(); + task_lock(p); + } + return 0; +} + +SEC("fexit/futex_lock_pi") +int BPF_PROG(fexit_futex_lock_pi, u32 *uaddr, unsigned int flags, + ktime_t *time, int trylock, int ret) +{ + if (ret == 0) { + struct task_struct *p = (void *)bpf_get_current_task_btf(); + task_lock(p); + } + return 0; +} + +SEC("fexit/futex_wake") +int BPF_PROG(fexit_futex_wake, u32 *uaddr, unsigned int flags, + int nr_wake, u32 bitset, int ret) +{ + if (ret == 0) { + struct task_struct *p = (void *)bpf_get_current_task_btf(); + task_unlock(p); + } + return 0; +} + +SEC("fexit/futex_wake_op") +int BPF_PROG(fexit_futex_wake_op, u32 *uaddr1, unsigned int flags, + u32 *uaddr2, int nr_wake, int nr_wake2, int op, int ret) +{ + if (ret == 0) { + struct task_struct *p = (void *)bpf_get_current_task_btf(); + task_unlock(p); + } + return 0; +} + +SEC("fexit/futex_unlock_pi") +int BPF_PROG(fexit_futex_unlock_pi, u32 *uaddr, unsigned int flags, int ret) +{ + if (ret == 0) { + struct task_struct *p = (void *)bpf_get_current_task_btf(); + task_unlock(p); + } + return 0; +} + +/* + * Prevent excessive prioritization of tasks performing massive fsync() + * operations on the filesystem. These tasks can degrade system responsiveness + * by not being inherently latency-sensitive. + */ +SEC("kprobe/vfs_fsync_range") +int BPF_PROG(fexit_vfs_fsync_range, struct file *file, u64 start, u64 end, int datasync) +{ + struct task_struct *p = (void *)bpf_get_current_task_btf(); + struct task_ctx *tctx; + + tctx = try_lookup_task_ctx(p); + if (tctx) + tctx->avg_nvcsw = 0; +} + +/* + * Allocate/re-allocate a new cpumask. + */ +static int calloc_cpumask(struct bpf_cpumask **p_cpumask) +{ + struct bpf_cpumask *cpumask; + + cpumask = bpf_cpumask_create(); + if (!cpumask) + return -ENOMEM; + + cpumask = bpf_kptr_xchg(p_cpumask, cpumask); + if (cpumask) + bpf_cpumask_release(cpumask); + + return 0; +} + +/* + * Exponential weighted moving average (EWMA). + * + * Copied from scx_lavd. Returns the new average as: + * + * new_avg := (old_avg * .75) + (new_val * .25); + */ +static u64 calc_avg(u64 old_val, u64 new_val) +{ + return (old_val - (old_val >> 2)) + (new_val >> 2); +} + +/* + * Evaluate the EWMA limited to the range [low ... high] + */ +static u64 calc_avg_clamp(u64 old_val, u64 new_val, u64 low, u64 high) +{ + return CLAMP(calc_avg(old_val, new_val), low, high); +} + +/* + * Compare two vruntime values, returns true if the first value is less than + * the second one. + * + * Copied from scx_simple. + */ +static inline bool vtime_before(u64 a, u64 b) +{ + return (s64)(a - b) < 0; +} + +/* + * Return true if the target task @p is a kernel thread, false instead. + */ +static inline bool is_kthread(const struct task_struct *p) +{ + return p->flags & PF_KTHREAD; +} + +/* + * Return the amount of tasks that are waiting to run. + */ +static inline u64 nr_tasks_waiting(void) +{ + return scx_bpf_dsq_nr_queued(SHARED_DSQ) + 1; +} + +/* + * Return task's weight. + */ +static u64 task_weight(const struct task_struct *p, const struct task_ctx *tctx) +{ + if (tctx->lock_boost) + return MAX_TASK_WEIGHT; + + return p->scx.weight; +} + +/* + * Return a value proportionally scaled to the task's priority. + */ +static u64 scale_up_fair(const struct task_struct *p, + const struct task_ctx *tctx, u64 value) +{ + return value * task_weight(p, tctx) / 100; +} + +/* + * Return a value inversely proportional to the task's priority. + */ +static u64 scale_inverse_fair(const struct task_struct *p, + const struct task_ctx *tctx, u64 value) +{ + return value * 100 / task_weight(p, tctx); +} + +/* + * Return the task's allowed lag: used to determine how early its vruntime can + * be. + */ +static u64 task_lag(const struct task_struct *p, const struct task_ctx *tctx) +{ + return scale_up_fair(p, tctx, slice_lag); +} + +/* + * ** Taken directly from fair.c in the Linux kernel ** + * + * The "10% effect" is relative and cumulative: from _any_ nice level, + * if you go up 1 level, it's -10% CPU usage, if you go down 1 level + * it's +10% CPU usage. (to achieve that we use a multiplier of 1.25. + * If a task goes up by ~10% and another task goes down by ~10% then + * the relative distance between them is ~25%.) + */ +const int sched_prio_to_weight[40] = { + /* -20 */ 88761, 71755, 56483, 46273, 36291, + /* -15 */ 29154, 23254, 18705, 14949, 11916, + /* -10 */ 9548, 7620, 6100, 4904, 3906, + /* -5 */ 3121, 2501, 1991, 1586, 1277, + /* 0 */ 1024, 820, 655, 526, 423, + /* 5 */ 335, 272, 215, 172, 137, + /* 10 */ 110, 87, 70, 56, 45, + /* 15 */ 36, 29, 23, 18, 15, +}; + +static u64 max_sched_prio(void) +{ + return ARRAY_SIZE(sched_prio_to_weight); +} + +/* + * Convert task priority to weight (following fair.c logic). + */ +static u64 sched_prio_to_latency_weight(u64 prio) +{ + u64 max_prio = max_sched_prio(); + + if (prio >= max_prio) { + scx_bpf_error("invalid priority"); + return 0; + } + + return sched_prio_to_weight[max_prio - prio - 1]; +} + +/* + * Evaluate task's deadline. + * + * Reuse a logic similar to scx_rusty or scx_lavd and evaluate the deadline as + * a function of the waiting and wake-up events and the average task's runtime. + */ +static u64 task_deadline(struct task_struct *p, struct task_ctx *tctx) +{ + u64 avg_run_scaled, lat_prio, lat_weight; + + /* + * Evaluate the "latency priority" as a function of the average amount + * of context switches and the expected task runtime, using the + * following formula: + * + * lat_prio = avg_nvcsw - log2(avg_run_scaled) + * + * The avg_run_scaled component is used to scale the latency priority + * proportionally to the task's weight and inversely proportional to + * its runtime, so that a task with a higher weight / shorter runtime + * gets a higher latency priority than a task with a lower weight / + * higher runtime. + * + * The log2() on the average runtime ensures that the runtime metric is + * more proportional and comparable to the average rate of voluntary + * context switches. + */ + avg_run_scaled = scale_inverse_fair(p, tctx, tctx->avg_runtime); + avg_run_scaled = log2_u64(avg_run_scaled + 1); + + lat_prio = scale_up_fair(p, tctx, tctx->avg_nvcsw); + if (lat_prio > avg_run_scaled) + lat_prio -= avg_run_scaled; + else + lat_prio = 0; + + lat_prio = MIN(lat_prio, max_sched_prio() - 1); + + /* + * Lastly, translate the latency priority into a weight and apply it to + * the task's average runtime to determine the task's deadline. + */ + lat_weight = sched_prio_to_latency_weight(lat_prio); + + return tctx->avg_runtime * 1024 / lat_weight; +} + +/* + * Return task's evaluated deadline applied to its vruntime. + */ +static u64 task_vtime(struct task_struct *p, struct task_ctx *tctx) +{ + u64 min_vruntime = vtime_now - task_lag(p, tctx); + + /* + * Limit the vruntime to to avoid excessively penalizing tasks. + */ + if (vtime_before(p->scx.dsq_vtime, min_vruntime)) { + p->scx.dsq_vtime = min_vruntime; + tctx->deadline = p->scx.dsq_vtime + task_deadline(p, tctx); + } + + return tctx->deadline; +} + +/* + * Evaluate task's time slice in function of the total amount of tasks that are + * waiting to be dispatched and the task's weight. + */ +static void task_refill_slice(struct task_struct *p) +{ + struct task_ctx *tctx; + u64 slice; + + tctx = try_lookup_task_ctx(p); + if (!tctx) + return; + + /* + * Assign a time slice proportional to the task weight and inversely + * proportional to the total amount of tasks that are waiting to be + * scheduled. + */ + slice = scale_up_fair(p, tctx, slice_max / nr_tasks_waiting()); + p->scx.slice = CLAMP(slice, SLICE_MIN, slice_max); +} + +static void task_set_domain(struct task_struct *p, s32 cpu, + const struct cpumask *cpumask) +{ + const struct cpumask *llc_domain; + struct bpf_cpumask *llc_mask; + struct task_ctx *tctx; + struct cpu_ctx *cctx; + + tctx = try_lookup_task_ctx(p); + if (!tctx) + return; + + cctx = try_lookup_cpu_ctx(cpu); + if (!cctx) + return; + + llc_domain = cast_mask(cctx->llc_mask); + if (!llc_domain) + llc_domain = p->cpus_ptr; + + llc_mask = tctx->llc_mask; + if (!llc_mask) { + scx_bpf_error("LLC cpumask not initialized"); + return; + } + + /* + * Determine the LLC cache domain as the intersection of the task's + * primary cpumask and the LLC cache domain mask of the previously used + * CPU. + */ + bpf_cpumask_and(llc_mask, p->cpus_ptr, llc_domain); +} + +/* + * Find an idle CPU in the system. + * + * NOTE: the idle CPU selection doesn't need to be formally perfect, it is + * totally fine to accept racy conditions and potentially make mistakes, by + * picking CPUs that are not idle or even offline, the logic has been designed + * to handle these mistakes in favor of a more efficient response and a reduced + * scheduling overhead. + */ +static s32 pick_idle_cpu(struct task_struct *p, s32 prev_cpu, u64 wake_flags, bool *is_idle) +{ + const struct cpumask *idle_smtmask, *idle_cpumask; + const struct cpumask *llc_mask; + struct task_ctx *tctx; + bool is_prev_llc_affine = false; + s32 cpu; + + *is_idle = false; + + tctx = try_lookup_task_ctx(p); + if (!tctx) + return -ENOENT; + + /* + * Acquire the CPU masks to determine the idle CPUs in the system. + */ + idle_smtmask = scx_bpf_get_idle_smtmask(); + idle_cpumask = scx_bpf_get_idle_cpumask(); + + /* + * Task's scheduling domains. + */ + llc_mask = cast_mask(tctx->llc_mask); + if (!llc_mask) { + scx_bpf_error("LLC cpumask not initialized"); + cpu = -EINVAL; + goto out_put_cpumask; + } + + /* + * Check if the previously used CPU is still in the LLC task domain. If + * not, we may want to move the task back to its original LLC domain. + */ + is_prev_llc_affine = bpf_cpumask_test_cpu(prev_cpu, llc_mask); + + /* + * If the current task is waking up another task and releasing the CPU + * (WAKE_SYNC), attempt to migrate the wakee on the same CPU as the + * waker. + */ + if (wake_flags & SCX_WAKE_SYNC) { + struct task_struct *current = (void *)bpf_get_current_task_btf(); + const struct cpumask *curr_llc_domain; + struct cpu_ctx *cctx; + bool share_llc, has_idle; + + /* + * Determine waker CPU scheduling domain. + */ + cpu = bpf_get_smp_processor_id(); + cctx = try_lookup_cpu_ctx(cpu); + if (!cctx) { + cpu = -EINVAL; + goto out_put_cpumask; + } + + curr_llc_domain = cast_mask(cctx->llc_mask); + if (!curr_llc_domain) + curr_llc_domain = p->cpus_ptr; + + /* + * If both the waker and wakee share the same LLC domain keep + * using the same CPU if possible. + */ + share_llc = bpf_cpumask_test_cpu(prev_cpu, curr_llc_domain); + if (share_llc && + scx_bpf_test_and_clear_cpu_idle(prev_cpu)) { + cpu = prev_cpu; + *is_idle = true; + goto out_put_cpumask; + } + + /* + * If the waker's LLC domain is not saturated attempt to migrate + * the wakee on the same CPU as the waker (since it's going to + * block and release the current CPU). + */ + has_idle = bpf_cpumask_intersects(curr_llc_domain, idle_cpumask); + if (has_idle && + bpf_cpumask_test_cpu(cpu, p->cpus_ptr) && + !(current->flags & PF_EXITING) && + scx_bpf_dsq_nr_queued(SCX_DSQ_LOCAL_ON | cpu) == 0) { + *is_idle = true; + goto out_put_cpumask; + } + } + + /* + * Find the best idle CPU, prioritizing full idle cores in SMT systems. + */ + if (smt_enabled) { + /* + * If the task can still run on the previously used CPU and + * it's a full-idle core, keep using it. + */ + if (is_prev_llc_affine && + bpf_cpumask_test_cpu(prev_cpu, idle_smtmask) && + scx_bpf_test_and_clear_cpu_idle(prev_cpu)) { + cpu = prev_cpu; + *is_idle = true; + goto out_put_cpumask; + } + + /* + * Search for any full-idle CPU in the primary domain that + * shares the same LLC domain. + */ + cpu = scx_bpf_pick_idle_cpu(llc_mask, SCX_PICK_IDLE_CORE); + if (cpu >= 0) { + *is_idle = true; + goto out_put_cpumask; + } + + /* + * Search for any other full-idle core in the task's domain. + */ + cpu = scx_bpf_pick_idle_cpu(p->cpus_ptr, SCX_PICK_IDLE_CORE); + if (cpu >= 0) { + *is_idle = true; + goto out_put_cpumask; + } + } + + /* + * If a full-idle core can't be found (or if this is not an SMT system) + * try to re-use the same CPU, even if it's not in a full-idle core. + */ + if (is_prev_llc_affine && + scx_bpf_test_and_clear_cpu_idle(prev_cpu)) { + cpu = prev_cpu; + *is_idle = true; + goto out_put_cpumask; + } + + /* + * Search for any idle CPU in the primary domain that shares the same + * LLC domain. + */ + cpu = scx_bpf_pick_idle_cpu(llc_mask, 0); + if (cpu >= 0) { + *is_idle = true; + goto out_put_cpumask; + } + + /* + * Search for any idle CPU in the task's scheduling domain. + */ + cpu = scx_bpf_pick_idle_cpu(p->cpus_ptr, 0); + if (cpu >= 0) { + *is_idle = true; + goto out_put_cpumask; + } + + /* + * We couldn't find any idle CPU, return the previous CPU if it is in + * the task's LLC domain, otherwise pick any other CPU in the LLC + * domain. + */ + if (is_prev_llc_affine) + cpu = prev_cpu; + else + cpu = scx_bpf_pick_any_cpu(llc_mask, 0); + +out_put_cpumask: + scx_bpf_put_cpumask(idle_cpumask); + scx_bpf_put_cpumask(idle_smtmask); + + /* + * If we couldn't find any CPU, or in case of error, return the + * previously used CPU. + */ + if (cpu < 0) + cpu = prev_cpu; + + return cpu; +} + +/* + * Pick a target CPU for a task which is being woken up. + * + * If a task is dispatched here, ops.enqueue() will be skipped: task will be + * dispatched directly to the CPU returned by this callback. + */ +s32 BPF_STRUCT_OPS(flash_select_cpu, struct task_struct *p, + s32 prev_cpu, u64 wake_flags) +{ + bool is_idle = false; + s32 cpu; + + cpu = pick_idle_cpu(p, prev_cpu, wake_flags, &is_idle); + if (is_idle) { + scx_bpf_dispatch(p, SCX_DSQ_LOCAL, SCX_SLICE_DFL, 0); + __sync_fetch_and_add(&nr_direct_dispatches, 1); + } + + return cpu; +} + +/* + * Wake up an idle CPU for task @p. + */ +static void kick_task_cpu(struct task_struct *p) +{ + s32 cpu = scx_bpf_task_cpu(p); + bool is_idle = false; + + /* + * If the task changed CPU affinity just try to kick any usable idle + * CPU. + */ + if (!bpf_cpumask_test_cpu(cpu, p->cpus_ptr)) { + cpu = scx_bpf_pick_idle_cpu(p->cpus_ptr, 0); + if (cpu >= 0) + scx_bpf_kick_cpu(cpu, 0); + return; + } + + /* + * For tasks that can run only on a single CPU, we can simply verify if + * their only allowed CPU is still idle. + */ + if (p->nr_cpus_allowed == 1 || p->migration_disabled) { + if (scx_bpf_test_and_clear_cpu_idle(cpu)) + scx_bpf_kick_cpu(cpu, 0); + return; + } + + /* + * Otherwise try to kick the best idle CPU for the task. + */ + cpu = pick_idle_cpu(p, cpu, 0, &is_idle); + if (is_idle) + scx_bpf_kick_cpu(cpu, 0); +} + +/* + * Dispatch all the other tasks that were not dispatched directly in + * select_cpu(). + */ +void BPF_STRUCT_OPS(flash_enqueue, struct task_struct *p, u64 enq_flags) +{ + struct task_ctx *tctx; + + tctx = try_lookup_task_ctx(p); + if (!tctx) + return; + + /* + * Per-CPU kthreads can be critical for system responsiveness, when + * local_kthreads is specified they are always dispatched directly + * before any other task. + */ + if (is_kthread(p) && (local_kthreads || p->nr_cpus_allowed == 1)) { + scx_bpf_dispatch(p, SCX_DSQ_LOCAL, SCX_SLICE_DFL, + enq_flags | SCX_ENQ_PREEMPT); + __sync_fetch_and_add(&nr_kthread_dispatches, 1); + return; + } + + /* + * Enqueue the task to the global DSQ. The task will be dispatched on + * the first CPU that becomes available. + */ + scx_bpf_dispatch_vtime(p, SHARED_DSQ, SCX_SLICE_DFL, + task_vtime(p, tctx), enq_flags); + __sync_fetch_and_add(&nr_shared_dispatches, 1); + + /* + * If there is an idle CPU available for the task, wake it up so it can + * consume the task immediately. + */ + kick_task_cpu(p); +} + +void BPF_STRUCT_OPS(flash_dispatch, s32 cpu, struct task_struct *prev) +{ + /* + * If the task can still run and it's holding a user-space lock, let it + * run for another round. + */ + if (prev && (prev->scx.flags & SCX_TASK_QUEUED) && + is_task_locked(prev)) { + task_unlock(prev); + task_refill_slice(prev); + return; + } + + /* + * Select a new task to run. + */ + if (scx_bpf_consume(SHARED_DSQ)) + return; + + /* + * If the current task expired its time slice and no other task wants + * to run, simply replenish its time slice and let it run for another + * round on the same CPU. + */ + if (prev && (prev->scx.flags & SCX_TASK_QUEUED)) + task_refill_slice(prev); +} + +void BPF_STRUCT_OPS(flash_running, struct task_struct *p) +{ + struct task_ctx *tctx; + + /* + * Refresh task's time slice immediately before it starts to run on its + * assigned CPU. + */ + task_refill_slice(p); + + tctx = try_lookup_task_ctx(p); + if (!tctx) + return; + tctx->last_run_at = bpf_ktime_get_ns(); + + /* + * Update global vruntime. + */ + if (vtime_before(vtime_now, p->scx.dsq_vtime)) + vtime_now = p->scx.dsq_vtime; +} + +void BPF_STRUCT_OPS(flash_stopping, struct task_struct *p, bool runnable) +{ + u64 now = bpf_ktime_get_ns(), slice; + s64 delta_t; + struct task_ctx *tctx; + + tctx = try_lookup_task_ctx(p); + if (!tctx) + return; + + /* + * If the time slice is not fully depleted, it means that the task + * voluntarily relased the CPU, therefore update the voluntary context + * switch counter. + * + * NOTE: the sched_ext core implements sched_yield() by setting the + * time slice to 0, so we won't boost the priority of tasks that are + * explicitly calling sched_yield(). + * + * This is actually a good thing, because we want to prioritize tasks + * that are releasing the CPU, because they're doing I/O, waiting for + * input or sending output to other tasks. + * + * Tasks that are using sched_yield() don't really need the priority + * boost and when they get the chance to run again they will be + * naturally prioritized by the vruntime-based scheduling policy. + */ + if (p->scx.slice > 0) + tctx->nvcsw++; + + /* + * Update task's average runtime. + */ + slice = now - tctx->last_run_at; + tctx->sum_runtime += slice; + tctx->avg_runtime = calc_avg(tctx->avg_runtime, tctx->sum_runtime); + + /* + * Update task vruntime charging the weighted used time slice. + */ + p->scx.dsq_vtime += scale_inverse_fair(p, tctx, slice); + tctx->deadline = p->scx.dsq_vtime + task_deadline(p, tctx); + + /* + * Refresh voluntary context switch metrics. + * + * Evaluate the average number of voluntary context switches per second + * using an exponentially weighted moving average, see calc_avg(). + */ + delta_t = (s64)(now - tctx->nvcsw_ts); + if (delta_t > NSEC_PER_SEC) { + u64 avg_nvcsw = tctx->nvcsw * NSEC_PER_SEC / delta_t; + + tctx->nvcsw = 0; + tctx->nvcsw_ts = now; + + /* + * Evaluate the latency weight of the task as its average rate + * of voluntary context switches (limited to to prevent + * excessive spikes). + */ + tctx->avg_nvcsw = calc_avg_clamp(tctx->avg_nvcsw, avg_nvcsw, + 0, MAX_AVG_NVCSW); + } +} + +void BPF_STRUCT_OPS(flash_runnable, struct task_struct *p, u64 enq_flags) +{ + struct task_ctx *tctx; + + tctx = try_lookup_task_ctx(p); + if (!tctx) + return; + tctx->sum_runtime = 0; +} + +void BPF_STRUCT_OPS(flash_set_cpumask, struct task_struct *p, + const struct cpumask *cpumask) +{ + s32 cpu = bpf_get_smp_processor_id(); + + task_set_domain(p, cpu, cpumask); +} + +void BPF_STRUCT_OPS(flash_enable, struct task_struct *p) +{ + u64 now = bpf_ktime_get_ns(); + struct task_ctx *tctx; + + p->scx.dsq_vtime = vtime_now; + + tctx = try_lookup_task_ctx(p); + if (!tctx) { + scx_bpf_error("incorrectly initialized task: %d (%s)", + p->pid, p->comm); + return; + } + /* + * Assume new tasks will use the minimum allowed time slice. + */ + tctx->avg_runtime = SLICE_MIN; + tctx->nvcsw_ts = now; + tctx->deadline = p->scx.dsq_vtime + task_deadline(p, tctx); +} + +s32 BPF_STRUCT_OPS(flash_init_task, struct task_struct *p, + struct scx_init_task_args *args) +{ + s32 cpu = bpf_get_smp_processor_id(); + struct task_ctx *tctx; + struct bpf_cpumask *cpumask; + + tctx = bpf_task_storage_get(&task_ctx_stor, p, 0, + BPF_LOCAL_STORAGE_GET_F_CREATE); + if (!tctx) + return -ENOMEM; + /* + * Create task's LLC cpumask. + */ + cpumask = bpf_cpumask_create(); + if (!cpumask) + return -ENOMEM; + cpumask = bpf_kptr_xchg(&tctx->llc_mask, cpumask); + if (cpumask) + bpf_cpumask_release(cpumask); + + task_set_domain(p, cpu, p->cpus_ptr); + + return 0; +} + +static int init_cpumask(struct bpf_cpumask **cpumask) +{ + struct bpf_cpumask *mask; + int err = 0; + + /* + * Do nothing if the mask is already initialized. + */ + mask = *cpumask; + if (mask) + return 0; + /* + * Create the CPU mask. + */ + err = calloc_cpumask(cpumask); + if (!err) + mask = *cpumask; + if (!mask) + err = -ENOMEM; + + return err; +} + +SEC("syscall") +int enable_sibling_cpu(struct domain_arg *input) +{ + struct cpu_ctx *cctx; + struct bpf_cpumask *mask, **pmask; + int err = 0; + + cctx = try_lookup_cpu_ctx(input->cpu_id); + if (!cctx) + return -ENOENT; + + /* Make sure the target CPU mask is initialized */ + pmask = &cctx->llc_mask; + err = init_cpumask(pmask); + if (err) + return err; + + bpf_rcu_read_lock(); + mask = *pmask; + if (mask) + bpf_cpumask_set_cpu(input->sibling_cpu_id, mask); + bpf_rcu_read_unlock(); + + return err; +} + +s32 BPF_STRUCT_OPS_SLEEPABLE(flash_init) +{ + int err; + + /* + * Create the shared DSQ. + * + * Allocate the new DSQ id to not clash with any valid CPU id. + */ + err = scx_bpf_create_dsq(SHARED_DSQ, -1); + if (err) { + scx_bpf_error("failed to create shared DSQ: %d", err); + return err; + } + + return 0; +} + +void BPF_STRUCT_OPS(flash_exit, struct scx_exit_info *ei) +{ + UEI_RECORD(uei, ei); +} + +SCX_OPS_DEFINE(flash_ops, + .select_cpu = (void *)flash_select_cpu, + .enqueue = (void *)flash_enqueue, + .dispatch = (void *)flash_dispatch, + .running = (void *)flash_running, + .stopping = (void *)flash_stopping, + .runnable = (void *)flash_runnable, + .set_cpumask = (void *)flash_set_cpumask, + .enable = (void *)flash_enable, + .init_task = (void *)flash_init_task, + .init = (void *)flash_init, + .exit = (void *)flash_exit, + .flags = SCX_OPS_ENQ_EXITING, + .timeout_ms = 5000U, + .name = "flash"); diff --git a/scheds/rust/scx_flash/src/bpf_intf.rs b/scheds/rust/scx_flash/src/bpf_intf.rs new file mode 100644 index 00000000..cf92b55e --- /dev/null +++ b/scheds/rust/scx_flash/src/bpf_intf.rs @@ -0,0 +1,12 @@ +// SPDX-License-Identifier: GPL-2.0 +// +// Copyright (c) 2024 Andrea Righi + +// This software may be used and distributed according to the terms of the +// GNU General Public License version 2. +#![allow(non_upper_case_globals)] +#![allow(non_camel_case_types)] +#![allow(non_snake_case)] +#![allow(dead_code)] + +include!(concat!(env!("OUT_DIR"), "/bpf_intf.rs")); diff --git a/scheds/rust/scx_flash/src/bpf_skel.rs b/scheds/rust/scx_flash/src/bpf_skel.rs new file mode 100644 index 00000000..25e4de0e --- /dev/null +++ b/scheds/rust/scx_flash/src/bpf_skel.rs @@ -0,0 +1,8 @@ +// SPDX-License-Identifier: GPL-2.0 +// +// Copyright (c) 2024 Andrea Righi + +// This software may be used and distributed according to the terms of the +// GNU General Public License version 2. + +include!(concat!(env!("OUT_DIR"), "/bpf_skel.rs")); diff --git a/scheds/rust/scx_flash/src/main.rs b/scheds/rust/scx_flash/src/main.rs new file mode 100644 index 00000000..2a0d7ae8 --- /dev/null +++ b/scheds/rust/scx_flash/src/main.rs @@ -0,0 +1,334 @@ +// SPDX-License-Identifier: GPL-2.0 +// +// Copyright (c) 2024 Andrea Righi + +// This software may be used and distributed according to the terms of the +// GNU General Public License version 2. + +mod bpf_skel; +pub use bpf_skel::*; +pub mod bpf_intf; +pub use bpf_intf::*; + +mod stats; +use std::collections::HashMap; +use std::ffi::c_int; +use std::fs::File; +use std::io::Read; +use std::mem::MaybeUninit; +use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering; +use std::sync::Arc; +use std::time::Duration; + +use anyhow::bail; +use anyhow::Context; +use anyhow::Result; +use clap::Parser; +use crossbeam::channel::RecvTimeoutError; +use libbpf_rs::skel::OpenSkel; +use libbpf_rs::skel::Skel; +use libbpf_rs::skel::SkelBuilder; +use libbpf_rs::OpenObject; +use libbpf_rs::ProgramInput; +use log::info; +use log::warn; +use scx_stats::prelude::*; +use scx_utils::build_id; +use scx_utils::import_enums; +use scx_utils::scx_enums; +use scx_utils::scx_ops_attach; +use scx_utils::scx_ops_load; +use scx_utils::scx_ops_open; +use scx_utils::set_rlimit_infinity; +use scx_utils::uei_exited; +use scx_utils::uei_report; +use scx_utils::Topology; +use scx_utils::UserExitInfo; +use stats::Metrics; + +const SCHEDULER_NAME: &'static str = "scx_flash"; + +#[derive(Debug, Parser)] +struct Opts { + /// Exit debug dump buffer length. 0 indicates default. + #[clap(long, default_value = "0")] + exit_dump_len: u32, + + /// Maximum scheduling slice duration in microseconds. + #[clap(short = 's', long, default_value = "20000")] + slice_us_max: u64, + + /// Maximum time slice lag in microseconds. + /// + /// Increasing this value can help to enhance the responsiveness of interactive tasks, but it + /// can also make performance more "spikey". + #[clap(short = 'l', long, default_value = "20000")] + slice_us_lag: u64, + + /// Enable kthreads prioritization. + /// + /// Enabling this can improve system performance, but it may also introduce interactivity + /// issues or unfairness in scenarios with high kthread activity, such as heavy I/O or network + /// traffic. + #[clap(short = 'k', long, action = clap::ArgAction::SetTrue)] + local_kthreads: bool, + + /// Enable stats monitoring with the specified interval. + #[clap(long)] + stats: Option, + + /// Run in stats monitoring mode with the specified interval. Scheduler + /// is not launched. + #[clap(long)] + monitor: Option, + + /// Enable verbose output, including libbpf details. + #[clap(short = 'v', long, action = clap::ArgAction::SetTrue)] + verbose: bool, + + /// Print scheduler version and exit. + #[clap(short = 'V', long, action = clap::ArgAction::SetTrue)] + version: bool, + + /// Show descriptions for statistics. + #[clap(long)] + help_stats: bool, +} + +fn is_smt_active() -> std::io::Result { + let mut file = File::open("/sys/devices/system/cpu/smt/active")?; + let mut contents = String::new(); + file.read_to_string(&mut contents)?; + + let smt_active: i32 = contents.trim().parse().unwrap_or(0); + + Ok(smt_active) +} + +struct Scheduler<'a> { + skel: BpfSkel<'a>, + struct_ops: Option, + stats_server: StatsServer<(), Metrics>, +} + +impl<'a> Scheduler<'a> { + fn init(opts: &'a Opts, open_object: &'a mut MaybeUninit) -> Result { + set_rlimit_infinity(); + + // Check host topology to determine if we need to enable SMT capabilities. + let smt_enabled = match is_smt_active() { + Ok(value) => value == 1, + Err(e) => bail!("Failed to read SMT status: {}", e), + }; + info!( + "{} {} {}", + SCHEDULER_NAME, + *build_id::SCX_FULL_VERSION, + if smt_enabled { "SMT on" } else { "SMT off" } + ); + + // Initialize BPF connector. + let mut skel_builder = BpfSkelBuilder::default(); + skel_builder.obj_builder.debug(opts.verbose); + let mut skel = scx_ops_open!(skel_builder, open_object, flash_ops)?; + + skel.struct_ops.flash_ops_mut().exit_dump_len = opts.exit_dump_len; + + // Override default BPF scheduling parameters. + skel.maps.rodata_data.slice_max = opts.slice_us_max * 1000; + skel.maps.rodata_data.slice_lag = opts.slice_us_lag * 1000; + skel.maps.rodata_data.local_kthreads = opts.local_kthreads; + + skel.maps.rodata_data.smt_enabled = smt_enabled; + + // Load the BPF program for validation. + let mut skel = scx_ops_load!(skel, flash_ops, uei)?; + + // Initialize CPU topology. + let topo = Topology::new().unwrap(); + + // Initialize LLC domain. + Self::init_l3_cache_domains(&mut skel, &topo)?; + + // Attach the scheduler. + let struct_ops = Some(scx_ops_attach!(skel, flash_ops)?); + let stats_server = StatsServer::new(stats::server_data()).launch()?; + + Ok(Self { + skel, + struct_ops, + stats_server, + }) + } + + fn enable_sibling_cpu( + skel: &mut BpfSkel<'_>, + cpu: usize, + sibling_cpu: usize, + ) -> Result<(), u32> { + let prog = &mut skel.progs.enable_sibling_cpu; + let mut args = domain_arg { + cpu_id: cpu as c_int, + sibling_cpu_id: sibling_cpu as c_int, + }; + let input = ProgramInput { + context_in: Some(unsafe { + std::slice::from_raw_parts_mut( + &mut args as *mut _ as *mut u8, + std::mem::size_of_val(&args), + ) + }), + ..Default::default() + }; + let out = prog.test_run(input).unwrap(); + if out.return_value != 0 { + return Err(out.return_value); + } + + Ok(()) + } + + fn init_cache_domains( + skel: &mut BpfSkel<'_>, + topo: &Topology, + cache_lvl: usize, + enable_sibling_cpu_fn: &dyn Fn(&mut BpfSkel<'_>, usize, usize, usize) -> Result<(), u32>, + ) -> Result<(), std::io::Error> { + // Determine the list of CPU IDs associated to each cache node. + let mut cache_id_map: HashMap> = HashMap::new(); + for core in topo.cores().into_iter() { + for (cpu_id, cpu) in core.cpus() { + let cache_id = match cache_lvl { + 2 => cpu.l2_id(), + 3 => cpu.l3_id(), + _ => panic!("invalid cache level {}", cache_lvl), + }; + cache_id_map + .entry(cache_id) + .or_insert_with(Vec::new) + .push(*cpu_id); + } + } + + // Update the BPF cpumasks for the cache domains. + for (cache_id, cpus) in cache_id_map { + info!( + "L{} cache ID {}: sibling CPUs: {:?}", + cache_lvl, cache_id, cpus + ); + for cpu in &cpus { + for sibling_cpu in &cpus { + match enable_sibling_cpu_fn(skel, cache_lvl, *cpu, *sibling_cpu) { + Ok(()) => {} + Err(_) => { + warn!( + "L{} cache ID {}: failed to set CPU {} sibling {}", + cache_lvl, cache_id, *cpu, *sibling_cpu + ); + } + } + } + } + } + + Ok(()) + } + + fn init_l3_cache_domains( + skel: &mut BpfSkel<'_>, + topo: &Topology, + ) -> Result<(), std::io::Error> { + Self::init_cache_domains(skel, topo, 3, &|skel, _lvl, cpu, sibling_cpu| { + Self::enable_sibling_cpu(skel, cpu, sibling_cpu) + }) + } + + fn get_metrics(&self) -> Metrics { + Metrics { + nr_kthread_dispatches: self.skel.maps.bss_data.nr_kthread_dispatches, + nr_direct_dispatches: self.skel.maps.bss_data.nr_direct_dispatches, + nr_shared_dispatches: self.skel.maps.bss_data.nr_shared_dispatches, + } + } + + pub fn exited(&mut self) -> bool { + uei_exited!(&self.skel, uei) + } + + fn run(&mut self, shutdown: Arc) -> Result { + let (res_ch, req_ch) = self.stats_server.channels(); + while !shutdown.load(Ordering::Relaxed) && !self.exited() { + match req_ch.recv_timeout(Duration::from_secs(1)) { + Ok(()) => res_ch.send(self.get_metrics())?, + Err(RecvTimeoutError::Timeout) => {} + Err(e) => Err(e)?, + } + } + + self.struct_ops.take(); + uei_report!(&self.skel, uei) + } +} + +impl<'a> Drop for Scheduler<'a> { + fn drop(&mut self) { + info!("Unregister {} scheduler", SCHEDULER_NAME); + } +} + +fn main() -> Result<()> { + let opts = Opts::parse(); + + if opts.version { + println!("{} {}", SCHEDULER_NAME, *build_id::SCX_FULL_VERSION); + return Ok(()); + } + + if opts.help_stats { + stats::server_data().describe_meta(&mut std::io::stdout(), None)?; + return Ok(()); + } + + let loglevel = simplelog::LevelFilter::Info; + + let mut lcfg = simplelog::ConfigBuilder::new(); + lcfg.set_time_level(simplelog::LevelFilter::Error) + .set_location_level(simplelog::LevelFilter::Off) + .set_target_level(simplelog::LevelFilter::Off) + .set_thread_level(simplelog::LevelFilter::Off); + simplelog::TermLogger::init( + loglevel, + lcfg.build(), + simplelog::TerminalMode::Stderr, + simplelog::ColorChoice::Auto, + )?; + + let shutdown = Arc::new(AtomicBool::new(false)); + let shutdown_clone = shutdown.clone(); + ctrlc::set_handler(move || { + shutdown_clone.store(true, Ordering::Relaxed); + }) + .context("Error setting Ctrl-C handler")?; + + if let Some(intv) = opts.monitor.or(opts.stats) { + let shutdown_copy = shutdown.clone(); + let jh = std::thread::spawn(move || { + stats::monitor(Duration::from_secs_f64(intv), shutdown_copy).unwrap() + }); + if opts.monitor.is_some() { + let _ = jh.join(); + return Ok(()); + } + } + + let mut open_object = MaybeUninit::uninit(); + loop { + let mut sched = Scheduler::init(&opts, &mut open_object)?; + if !sched.run(shutdown.clone())?.should_restart() { + break; + } + } + + Ok(()) +} diff --git a/scheds/rust/scx_flash/src/stats.rs b/scheds/rust/scx_flash/src/stats.rs new file mode 100644 index 00000000..462a51c7 --- /dev/null +++ b/scheds/rust/scx_flash/src/stats.rs @@ -0,0 +1,82 @@ +// SPDX-License-Identifier: GPL-2.0 +// +// Copyright (c) 2024 Andrea Righi + +// This software may be used and distributed according to the terms of the +// GNU General Public License version 2. + +use std::io::Write; +use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering; +use std::sync::Arc; +use std::time::Duration; + +use anyhow::Result; +use scx_stats::prelude::*; +use scx_stats_derive::Stats; +use serde::Deserialize; +use serde::Serialize; + +#[derive(Clone, Debug, Default, Serialize, Deserialize, Stats)] +#[stat(top)] +pub struct Metrics { + #[stat(desc = "Number of kthread direct dispatches")] + pub nr_kthread_dispatches: u64, + #[stat(desc = "Number of task direct dispatches")] + pub nr_direct_dispatches: u64, + #[stat(desc = "Number of task global dispatches")] + pub nr_shared_dispatches: u64, +} + +impl Metrics { + fn format(&self, w: &mut W) -> Result<()> { + writeln!( + w, + "[{}] dispatch -> kthread: {:<5} direct: {:<5} shared: {:<5}", + crate::SCHEDULER_NAME, + self.nr_kthread_dispatches, + self.nr_direct_dispatches, + self.nr_shared_dispatches, + )?; + Ok(()) + } + + fn delta(&self, rhs: &Self) -> Self { + Self { + nr_kthread_dispatches: self.nr_kthread_dispatches - rhs.nr_kthread_dispatches, + nr_direct_dispatches: self.nr_direct_dispatches - rhs.nr_direct_dispatches, + nr_shared_dispatches: self.nr_shared_dispatches - rhs.nr_shared_dispatches, + ..self.clone() + } + } +} + +pub fn server_data() -> StatsServerData<(), Metrics> { + let open: Box> = Box::new(move |(req_ch, res_ch)| { + req_ch.send(())?; + let mut prev = res_ch.recv()?; + + let read: Box> = Box::new(move |_args, (req_ch, res_ch)| { + req_ch.send(())?; + let cur = res_ch.recv()?; + let delta = cur.delta(&prev); + prev = cur; + delta.to_json() + }); + + Ok(read) + }); + + StatsServerData::new() + .add_meta(Metrics::meta()) + .add_ops("top", StatsOps { open, close: None }) +} + +pub fn monitor(intv: Duration, shutdown: Arc) -> Result<()> { + scx_utils::monitor_stats::( + &vec![], + intv, + || shutdown.load(Ordering::Relaxed), + |metrics| metrics.format(&mut std::io::stdout()), + ) +} diff --git a/services/scx b/services/scx index 3aeefe0d..29b91e5f 100644 --- a/services/scx +++ b/services/scx @@ -1,4 +1,4 @@ -# List of scx_schedulers: scx_bpfland scx_central scx_lavd scx_layered scx_nest scx_qmap scx_rlfifo scx_rustland scx_rusty scx_simple scx_userland +# List of scx_schedulers: scx_bpfland scx_central scx_flash scx_lavd scx_layered scx_nest scx_qmap scx_rlfifo scx_rustland scx_rusty scx_simple scx_userland SCX_SCHEDULER=scx_bpfland # Set custom flags for each scheduler, below is an example of how to use