diff --git a/scheds/rust/scx_bpfland/Cargo.toml b/scheds/rust/scx_bpfland/Cargo.toml index 9ca97de80..e363b1c3e 100644 --- a/scheds/rust/scx_bpfland/Cargo.toml +++ b/scheds/rust/scx_bpfland/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "scx_bpfland" version = "1.0.2" -authors = ["Andrea Righi ", "Canonical"] +authors = ["Andrea Righi "] edition = "2021" description = "A vruntime-based sched_ext scheduler that prioritizes interactive workloads. https://github.com/sched-ext/scx/tree/main" license = "GPL-2.0-only" diff --git a/scheds/rust/scx_bpfland/build.rs b/scheds/rust/scx_bpfland/build.rs index 8aa6cbb0c..c15be3ff5 100644 --- a/scheds/rust/scx_bpfland/build.rs +++ b/scheds/rust/scx_bpfland/build.rs @@ -1,4 +1,4 @@ -// Copyright (c) Andrea Righi +// Copyright (c) Andrea Righi // // This software may be used and distributed according to the terms of the // GNU General Public License version 2. diff --git a/scheds/rust/scx_bpfland/src/bpf/intf.h b/scheds/rust/scx_bpfland/src/bpf/intf.h index 5a48c61c8..bfd940bed 100644 --- a/scheds/rust/scx_bpfland/src/bpf/intf.h +++ b/scheds/rust/scx_bpfland/src/bpf/intf.h @@ -1,6 +1,6 @@ /* SPDX-License-Identifier: GPL-2.0 */ /* - * Copyright (c) 2024 Andrea Righi + * Copyright (c) 2024 Andrea Righi * * This software may be used and distributed according to the terms of the GNU * General Public License version 2. @@ -15,9 +15,9 @@ #define CLAMP(val, lo, hi) MIN(MAX(val, lo), hi) enum consts { - NSEC_PER_USEC = 1000ULL, - NSEC_PER_MSEC = (1000ULL * NSEC_PER_USEC), - NSEC_PER_SEC = (1000ULL * NSEC_PER_MSEC), + NSEC_PER_USEC = 1000ULL, + NSEC_PER_MSEC = (1000ULL * NSEC_PER_USEC), + NSEC_PER_SEC = (1000ULL * NSEC_PER_MSEC), }; #ifndef __VMLINUX_H__ @@ -34,4 +34,8 @@ typedef signed long s64; typedef int pid_t; #endif /* __VMLINUX_H__ */ +struct cpu_arg { + s32 cpu_id; +}; + #endif /* __INTF_H */ diff --git a/scheds/rust/scx_bpfland/src/bpf/main.bpf.c b/scheds/rust/scx_bpfland/src/bpf/main.bpf.c index a56832fcf..8b656e443 100644 --- a/scheds/rust/scx_bpfland/src/bpf/main.bpf.c +++ b/scheds/rust/scx_bpfland/src/bpf/main.bpf.c @@ -1,6 +1,6 @@ /* SPDX-License-Identifier: GPL-2.0 */ /* - * Copyright (c) 2024 Andrea Righi + * Copyright (c) 2024 Andrea Righi */ #include #include "intf.h" @@ -106,6 +106,12 @@ volatile u64 nr_running, nr_waiting, nr_interactive, nr_online_cpus; */ UEI_DEFINE(uei); +/* + * Mask of CPUs that the scheduler can use, until the system becomes saturated, + * at which point tasks may overflow to other available CPUs. + */ +private(BPFLAND) struct bpf_cpumask __kptr *allowed_cpumask; + /* * Mask of offline CPUs, used to properly support CPU hotplugging. */ @@ -150,9 +156,9 @@ static u64 vtime_now; */ struct task_ctx { /* - * Set to true if the task is classified as interactive. + * A temporary cpumask for calculating the allowed CPU mask. */ - bool is_interactive; + struct bpf_cpumask __kptr *cpumask; /* * Voluntary context switches metrics. @@ -160,6 +166,11 @@ struct task_ctx { u64 nvcsw; u64 nvcsw_ts; u64 avg_nvcsw; + + /* + * Set to true if the task is classified as interactive. + */ + bool is_interactive; }; /* Map that contains task-local storage. */ @@ -196,6 +207,19 @@ struct task_ctx *lookup_task_ctx(const struct task_struct *p) return tctx; } +/* + * Return true if the task is interactive, false otherwise. + */ +static bool is_task_interactive(struct task_struct *p) +{ + struct task_ctx *tctx; + + tctx = try_lookup_task_ctx(p); + if (!tctx) + return false; + return tctx->is_interactive; +} + /* * Return true if the target task @p is a kernel thread. */ @@ -288,8 +312,6 @@ static inline bool vtime_before(u64 a, u64 b) */ static inline u64 task_vtime(struct task_struct *p) { - u64 vtime = p->scx.dsq_vtime; - /* * Limit the vruntime to (vtime_now - slice_ns_lag) to avoid * excessively penalizing tasks. @@ -300,7 +322,7 @@ static inline u64 task_vtime(struct task_struct *p) * * Instead, a negative slice_ns_lag can result in more consistent * performance (less spikey), smoothing the reordering of the vruntime - * scheduling and making the scheduler closer to a FIFO.    + * scheduling and making the scheduler closer to a FIFO. */ if (vtime_before(p->scx.dsq_vtime, vtime_now - slice_ns_lag)) p->scx.dsq_vtime = vtime_now - slice_ns_lag; @@ -409,8 +431,14 @@ static int dispatch_direct_cpu(struct task_struct *p, s32 cpu, u64 enq_flags) static s32 pick_idle_cpu(struct task_struct *p, s32 prev_cpu, u64 wake_flags) { const struct cpumask *online_cpumask, *idle_smtmask, *idle_cpumask; + struct bpf_cpumask *p_mask, *allowed; + struct task_ctx *tctx; s32 cpu; + tctx = try_lookup_task_ctx(p); + if (!tctx) + return prev_cpu; + /* * For tasks that can run only on a single CPU, we can simply verify if * their only allowed CPU is idle. @@ -424,6 +452,10 @@ static s32 pick_idle_cpu(struct task_struct *p, s32 prev_cpu, u64 wake_flags) return -ENOENT; } + allowed = allowed_cpumask; + if (!allowed) + return -ENOENT; + /* * Acquire the CPU masks to determine the online and idle CPUs in the * system. @@ -432,6 +464,18 @@ static s32 pick_idle_cpu(struct task_struct *p, s32 prev_cpu, u64 wake_flags) idle_smtmask = scx_bpf_get_idle_smtmask(); idle_cpumask = scx_bpf_get_idle_cpumask(); + p_mask = tctx->cpumask; + if (!p_mask) { + cpu = prev_cpu; + goto out_put_cpumask; + } + + /* + * Enable the task to run in the intersection of its permitted CPUs and + * the primary scheduling domain. + */ + bpf_cpumask_and(p_mask, p->cpus_ptr, cast_mask(allowed)); + /* * Find the best idle CPU, prioritizing full idle cores in SMT systems. */ @@ -440,7 +484,7 @@ static s32 pick_idle_cpu(struct task_struct *p, s32 prev_cpu, u64 wake_flags) * If the task can still run on the previously used CPU and * it's a full-idle core, keep using it. */ - if (bpf_cpumask_test_cpu(prev_cpu, p->cpus_ptr) && + if (bpf_cpumask_test_cpu(prev_cpu, cast_mask(p_mask)) && bpf_cpumask_test_cpu(prev_cpu, idle_smtmask) && scx_bpf_test_and_clear_cpu_idle(prev_cpu)) { cpu = prev_cpu; @@ -450,7 +494,7 @@ static s32 pick_idle_cpu(struct task_struct *p, s32 prev_cpu, u64 wake_flags) /* * Otherwise, search for another usable full-idle core. */ - cpu = bpf_cpumask_any_and_distribute(p->cpus_ptr, idle_smtmask); + cpu = bpf_cpumask_any_and_distribute(cast_mask(p_mask), idle_smtmask); if (bpf_cpumask_test_cpu(cpu, online_cpumask) && scx_bpf_test_and_clear_cpu_idle(cpu)) goto out_put_cpumask; @@ -460,7 +504,7 @@ static s32 pick_idle_cpu(struct task_struct *p, s32 prev_cpu, u64 wake_flags) * If a full-idle core can't be found (or if this is not an SMT system) * try to re-use the same CPU, even if it's not in a full-idle core. */ - if (bpf_cpumask_test_cpu(prev_cpu, p->cpus_ptr) && + if (bpf_cpumask_test_cpu(prev_cpu, cast_mask(p_mask)) && scx_bpf_test_and_clear_cpu_idle(prev_cpu)) { cpu = prev_cpu; goto out_put_cpumask; @@ -470,7 +514,7 @@ static s32 pick_idle_cpu(struct task_struct *p, s32 prev_cpu, u64 wake_flags) * If all the previous attempts have failed, try to use any idle CPU in * the system. */ - cpu = bpf_cpumask_any_and_distribute(p->cpus_ptr, idle_cpumask); + cpu = bpf_cpumask_any_and_distribute(cast_mask(p_mask), idle_cpumask); if (bpf_cpumask_test_cpu(cpu, online_cpumask) && scx_bpf_test_and_clear_cpu_idle(cpu)) goto out_put_cpumask; @@ -497,6 +541,19 @@ static bool is_prio_congested(void) return scx_bpf_dsq_nr_queued(prio_dsq_id) > nr_online_cpus * 4; } +s32 BPF_STRUCT_OPS(bpfland_select_cpu, struct task_struct *p, s32 prev_cpu, u64 wake_flags) +{ + s32 cpu; + + cpu = pick_idle_cpu(p, prev_cpu, wake_flags); + if (cpu >= 0 && !dispatch_direct_cpu(p, cpu, 0)) { + __sync_fetch_and_add(&nr_direct_dispatches, 1); + return cpu; + } + + return prev_cpu; +} + /* * Handle synchronous wake-up event for a task. */ @@ -513,32 +570,13 @@ static void handle_sync_wakeup(struct task_struct *p) * promote additional interactive tasks, instead we give priority to * the tasks that are already classified as interactive. */ - tctx = lookup_task_ctx(p); + tctx = try_lookup_task_ctx(p); if (!tctx) return; if (!tctx->is_interactive && !is_prio_congested()) tctx->is_interactive = true; } -s32 BPF_STRUCT_OPS(bpfland_select_cpu, struct task_struct *p, s32 prev_cpu, u64 wake_flags) -{ - s32 cpu; - - /* - * Try to prioritize newly awakened tasks. - */ - if (wake_flags & SCX_WAKE_SYNC) - handle_sync_wakeup(p); - - cpu = pick_idle_cpu(p, prev_cpu, wake_flags); - if (cpu >= 0 && !dispatch_direct_cpu(p, cpu, 0)) { - __sync_fetch_and_add(&nr_direct_dispatches, 1); - return cpu; - } - - return prev_cpu; -} - /* * Dispatch all the other tasks that were not dispatched directly in * select_cpu(). @@ -547,7 +585,14 @@ void BPF_STRUCT_OPS(bpfland_enqueue, struct task_struct *p, u64 enq_flags) { u64 vtime = task_vtime(p); u64 slice = task_slice(p); - struct task_ctx *tctx; + + /* + * If the system is saturated and we couldn't dispatch directly in + * select_cpu(), try to prioritize newly awakened tasks by immediately + * promoting them as interactive. + */ + if (enq_flags & SCX_ENQ_WAKEUP) + handle_sync_wakeup(p); /* * Always dispatch per-CPU kthreads directly on their target CPU if @@ -561,10 +606,6 @@ void BPF_STRUCT_OPS(bpfland_enqueue, struct task_struct *p, u64 enq_flags) } } - tctx = lookup_task_ctx(p); - if (!tctx) - return; - /* * Dispatch interactive tasks to the priority DSQ and regular tasks to * the shared DSQ. @@ -574,7 +615,7 @@ void BPF_STRUCT_OPS(bpfland_enqueue, struct task_struct *p, u64 enq_flags) * that can consume them) we can just dispatch them to the shared DSQ * and simply rely on the vruntime logic. */ - if (tctx->is_interactive) { + if (is_task_interactive(p)) { scx_bpf_dispatch_vtime(p, prio_dsq_id, slice, vtime, enq_flags); __sync_fetch_and_add(&nr_prio_dispatches, 1); } else { @@ -726,12 +767,6 @@ void BPF_STRUCT_OPS(bpfland_dispatch, s32 cpu, struct task_struct *prev) void BPF_STRUCT_OPS(bpfland_running, struct task_struct *p) { - struct task_ctx *tctx; - - tctx = lookup_task_ctx(p); - if (!tctx) - return; - /* Update global vruntime */ if (vtime_before(vtime_now, p->scx.dsq_vtime)) vtime_now = p->scx.dsq_vtime; @@ -744,7 +779,7 @@ void BPF_STRUCT_OPS(bpfland_running, struct task_struct *p) p->scx.slice = slice_ns; /* Update CPU interactive state */ - if (tctx->is_interactive) + if (is_task_interactive(p)) __sync_fetch_and_add(&nr_interactive, 1); __sync_fetch_and_add(&nr_running, 1); @@ -885,11 +920,22 @@ void BPF_STRUCT_OPS(bpfland_cpu_offline, s32 cpu) s32 BPF_STRUCT_OPS(bpfland_init_task, struct task_struct *p, struct scx_init_task_args *args) { - if (bpf_task_storage_get(&task_ctx_stor, p, 0, - BPF_LOCAL_STORAGE_GET_F_CREATE)) - return 0; - else + struct task_ctx *tctx; + struct bpf_cpumask *cpumask; + + tctx = bpf_task_storage_get(&task_ctx_stor, p, 0, + BPF_LOCAL_STORAGE_GET_F_CREATE); + if (!tctx) return -ENOMEM; + cpumask = bpf_cpumask_create(); + if (!cpumask) + return -ENOMEM; + + cpumask = bpf_kptr_xchg(&tctx->cpumask, cpumask); + if (cpumask) + bpf_cpumask_release(cpumask); + + return 0; } /* @@ -914,6 +960,51 @@ s32 get_nr_online_cpus(void) return cpus; } +static int init_allowed_cpus(void) +{ + struct bpf_cpumask *mask; + int err = 0; + + /* + * Do nothing if the mask is already initialized. + */ + mask = allowed_cpumask; + if (mask) + return 0; + /* + * Create the allowed CPU mask. + */ + err = calloc_cpumask(&allowed_cpumask); + if (!err) + mask = allowed_cpumask; + if (!mask) + err = -ENOMEM; + + return err; +} + +SEC("syscall") +int enable_cpu(struct cpu_arg *input) +{ + struct bpf_cpumask *mask; + int err = 0; + + /* Make sure the allowed CPU mask is initialized */ + err = init_allowed_cpus(); + if (err) + return err; + /* + * Enable the target CPU in the primary scheduling domain. + */ + bpf_rcu_read_lock(); + mask = allowed_cpumask; + if (mask) + bpf_cpumask_set_cpu(input->cpu_id, mask); + bpf_rcu_read_unlock(); + + return err; +} + s32 BPF_STRUCT_OPS_SLEEPABLE(bpfland_init) { struct bpf_cpumask *mask; @@ -966,7 +1057,8 @@ s32 BPF_STRUCT_OPS_SLEEPABLE(bpfland_init) if (err) return err; - return err; + /* Initialize the primary scheduling domain */ + return init_allowed_cpus(); } void BPF_STRUCT_OPS(bpfland_exit, struct scx_exit_info *ei) diff --git a/scheds/rust/scx_bpfland/src/bpf_intf.rs b/scheds/rust/scx_bpfland/src/bpf_intf.rs index df3142101..30808ac75 100644 --- a/scheds/rust/scx_bpfland/src/bpf_intf.rs +++ b/scheds/rust/scx_bpfland/src/bpf_intf.rs @@ -1,6 +1,6 @@ // SPDX-License-Identifier: GPL-2.0 // -// Copyright (c) 2024 Andrea Righi +// Copyright (c) 2024 Andrea Righi // This software may be used and distributed according to the terms of the // GNU General Public License version 2. diff --git a/scheds/rust/scx_bpfland/src/bpf_skel.rs b/scheds/rust/scx_bpfland/src/bpf_skel.rs index 9a1010285..9491741eb 100644 --- a/scheds/rust/scx_bpfland/src/bpf_skel.rs +++ b/scheds/rust/scx_bpfland/src/bpf_skel.rs @@ -1,6 +1,6 @@ // SPDX-License-Identifier: GPL-2.0 // -// Copyright (c) 2024 Andrea Righi +// Copyright (c) 2024 Andrea Righi // This software may be used and distributed according to the terms of the // GNU General Public License version 2. diff --git a/scheds/rust/scx_bpfland/src/main.rs b/scheds/rust/scx_bpfland/src/main.rs index 2220a17fe..40a5ea182 100644 --- a/scheds/rust/scx_bpfland/src/main.rs +++ b/scheds/rust/scx_bpfland/src/main.rs @@ -1,6 +1,6 @@ // SPDX-License-Identifier: GPL-2.0 // -// Copyright (c) 2024 Andrea Righi . +// Copyright (c) 2024 Andrea Righi // This software may be used and distributed according to the terms of the // GNU General Public License version 2. @@ -10,31 +10,33 @@ pub use bpf_skel::*; pub mod bpf_intf; pub use bpf_intf::*; +use std::ffi::c_int; use std::fs::File; use std::io::Read; use std::mem::MaybeUninit; +use std::str; use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering; use std::sync::Arc; use std::time::Duration; -use std::str; - use anyhow::bail; use anyhow::Context; use anyhow::Result; use clap::Parser; use log::info; +use log::warn; use metrics::{gauge, Gauge}; use metrics_exporter_prometheus::PrometheusBuilder; use rlimit::{getrlimit, setrlimit, Resource}; -use libbpf_rs::OpenObject; use libbpf_rs::skel::OpenSkel; use libbpf_rs::skel::Skel; use libbpf_rs::skel::SkelBuilder; +use libbpf_rs::OpenObject; +use libbpf_rs::ProgramInput; use scx_utils::build_id; use scx_utils::scx_ops_attach; @@ -46,6 +48,69 @@ use scx_utils::UserExitInfo; const SCHEDULER_NAME: &'static str = "scx_bpfland"; +#[derive(Debug, Clone)] +struct CpuMask { + mask: Vec, + num_bits: usize, +} + +impl CpuMask { + pub fn from_mask(mask: Vec, num_bits: usize) -> Self { + Self { mask, num_bits } + } + + pub fn is_cpu_set(&self, cpu: usize) -> bool { + if self.num_bits == 0 { + return true; + } + if cpu >= self.num_bits { + return false; + } + let idx = cpu / 64; + let bit = cpu % 64; + self.mask + .get(idx) + .map_or(false, |&val| val & (1 << bit) != 0) + } + + pub fn from_str(hex_str: &str) -> Result { + let hex_str = hex_str.trim_start_matches("0x"); + let num_bits = hex_str.len() * 4; + + let num_u64s = (num_bits + 63) / 64; + let padded_hex_str = format!("{:0>width$}", hex_str, width = num_u64s * 16); + + let mask = (0..num_u64s) + .rev() + .map(|i| u64::from_str_radix(&padded_hex_str[i * 16..(i + 1) * 16], 16)) + .collect::, _>>()?; + + Ok(CpuMask::from_mask(mask, num_bits)) + } + + pub fn to_string(&self) -> String { + if self.num_bits == 0 { + return "all".to_string(); + } + let mut hex_str = String::new(); + for &chunk in self.mask.iter().rev() { + hex_str.push_str(&format!("{:016x}", chunk)); + } + + // Remove leading zeros, but keep at least one digit. + hex_str = hex_str.trim_start_matches('0').to_string(); + if hex_str.is_empty() { + hex_str = "0".to_string(); + } + format!("0x{}", hex_str) + } +} + +// Custom parser function for cpumask using CpuMask's from_str method +fn parse_cpumask(hex_str: &str) -> Result { + CpuMask::from_str(hex_str) +} + /// scx_bpfland: a vruntime-based sched_ext scheduler that prioritizes interactive workloads. /// /// This scheduler is derived from scx_rustland, but it is fully implemented in BFP with minimal @@ -87,6 +152,14 @@ struct Opts { #[clap(short = 'k', long, action = clap::ArgAction::SetTrue)] local_kthreads: bool, + /// Specifies the initial set of CPUs, represented as a bitmask in hex (e.g., 0xff), that the + /// scheduler will use to dispatch tasks, until the system becomes saturated, at which point + /// tasks may overflow to other available CPUs. + /// + /// (empty string = all CPUs are used for initial scheduling) + #[clap(short = 'm', long, default_value = "", value_parser = parse_cpumask)] + primary_domain: CpuMask, + /// Maximum threshold of voluntary context switch per second, used to classify interactive /// tasks (0 = disable interactive tasks classification). #[clap(short = 'c', long, default_value = "10")] @@ -166,8 +239,6 @@ struct Scheduler<'a> { skel: BpfSkel<'a>, struct_ops: Option, metrics: Metrics, - slice_ns: u64, - slice_ns_min: u64, } impl<'a> Scheduler<'a> { @@ -207,8 +278,13 @@ impl<'a> Scheduler<'a> { skel.maps.rodata_data.starvation_thresh_ns = opts.starvation_thresh_us * 1000; skel.maps.rodata_data.nvcsw_max_thresh = opts.nvcsw_max_thresh; - // Attach the scheduler. + // Load the BPF program for validation. let mut skel = scx_ops_load!(skel, bpfland_ops, uei)?; + + // Initialize primary domain CPUs. + Self::init_primary_domain(&mut skel, &opts.primary_domain)?; + + // Attach the scheduler. let struct_ops = Some(scx_ops_attach!(skel, bpfland_ops)?); // Enable Prometheus metrics. @@ -223,13 +299,46 @@ impl<'a> Scheduler<'a> { skel, struct_ops, metrics: Metrics::new(), - slice_ns: opts.slice_us * 1000, - slice_ns_min: opts.slice_us_min * 1000, }) } - fn effective_slice(&self, nr_waiting: u64) -> u64 { - std::cmp::max(self.slice_ns / (nr_waiting + 1), self.slice_ns_min) + fn enable_cpu(skel: &mut BpfSkel<'_>, cpu: usize) -> Result<(), u32> { + let prog = &mut skel.progs.enable_cpu; + let mut args = cpu_arg { + cpu_id: cpu as c_int, + }; + let input = ProgramInput { + context_in: Some(unsafe { + std::slice::from_raw_parts_mut( + &mut args as *mut _ as *mut u8, + std::mem::size_of_val(&args), + ) + }), + ..Default::default() + }; + let out = prog.test_run(input).unwrap(); + if out.return_value != 0 { + return Err(out.return_value); + } + + Ok(()) + } + + fn init_primary_domain(skel: &mut BpfSkel<'_>, primary_domain: &CpuMask) -> Result<()> { + info!("primary CPU domain = {}", primary_domain.to_string()); + + for cpu in 0..libbpf_rs::num_possible_cpus().unwrap() { + if primary_domain.is_cpu_set(cpu) { + if let Err(err) = Self::enable_cpu(skel, cpu) { + warn!( + "Failed to add CPU {} to primary domain: error {}", + cpu, err + ); + } + } + } + + Ok(()) } fn update_stats(&mut self) { @@ -253,7 +362,8 @@ impl<'a> Scheduler<'a> { .nr_waiting .set(nr_waiting as f64); self.metrics - .nvcsw_avg_thresh.set(nvcsw_avg_thresh as f64); + .nvcsw_avg_thresh + .set(nvcsw_avg_thresh as f64); self.metrics .nr_direct_dispatches .set(nr_direct_dispatches as f64); @@ -264,15 +374,13 @@ impl<'a> Scheduler<'a> { .nr_shared_dispatches .set(nr_shared_dispatches as f64); - let slice_ms = self.effective_slice(nr_waiting) as f64 / 1_000_000.0; - // Log scheduling statistics. - info!("running: {:>4}/{:<4} interactive: {:<4} wait: {:<4} | slice: {:5.2}ms | nvcsw: {:<4} | direct: {:<6} prio: {:<6} shared: {:<6}", + info!("[{}] tasks -> run: {:>2}/{:<2} int: {:<2} wait: {:<4} | nvcsw: {:<4} | dispatch -> dir: {:<5} prio: {:<5} shr: {:<5}", + SCHEDULER_NAME, nr_running, nr_cpus, nr_interactive, nr_waiting, - slice_ms, nvcsw_avg_thresh, nr_direct_dispatches, nr_prio_dispatches,