Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce scx_rustland_core: a generic layer to implement user-space schedulers in Rust #161

Merged
merged 8 commits into from
Feb 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions rust/meson.build
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
subdir('scx_utils')
subdir('scx_rustland_core')
1 change: 1 addition & 0 deletions rust/scx_rustland_core/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Cargo.lock
26 changes: 26 additions & 0 deletions rust/scx_rustland_core/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
[package]
name = "scx_rustland_core"
version = "0.1.0"
edition = "2021"
authors = ["Andrea Righi <[email protected]>"]
license = "GPL-2.0-only"
repository = "https://github.com/sched-ext/scx"
description = "Framework to implement sched_ext schedulers running in user space"

include = [
"src/bpf/intf.h",
"src/bpf/main.bpf.c",
"src/bpf.rs",
]

[dependencies]
anyhow = "1.0"
libbpf-rs = "0.22.0"
libc = "0.2.137"
buddy-alloc = "0.5.1"
scx_utils = { path = "../scx_utils", version = "0.6" }

[build-dependencies]
tar = "0.4"
walkdir = "2.4"
scx_utils = { path = "../scx_utils", version = "0.6" }
1 change: 1 addition & 0 deletions rust/scx_rustland_core/LICENSE
78 changes: 78 additions & 0 deletions rust/scx_rustland_core/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
# Framework to implement sched_ext schedulers running in user-space

[sched_ext](https://github.com/sched-ext/scx) is a Linux kernel feature
which enables implementing kernel thread schedulers in BPF and dynamically
loading them.

This crate provides a generic layer that can be used to implement sched-ext
schedulers that run in user-space.

It provides a generic BPF abstraction that is completely agnostic of the
particular scheduling policy implemented in user-space.

Developers can use such abstraction to implement schedulers using pure Rust
code, without having to deal with any internal kernel / BPF internal details.

## API

The main BPF interface is provided by the `BpfScheduler` struct. When this
object is initialized it will take care of registering and initializing the BPF
component.

The scheduler then can use `BpfScheduler` instance to receive tasks (in the
form of `QueuedTask` objects) and dispatch tasks (in the form of DispatchedTask
objects), using respectively the methods `dequeue_task()` and `dispatch_task()`.

Example usage (FIFO scheduler):
```
struct Scheduler<'a> {
bpf: BpfScheduler<'a>,
}

impl<'a> Scheduler<'a> {
fn init() -> Result<Self> {
let topo = Topology::new().expect("Failed to build host topology");
let bpf = BpfScheduler::init(5000, topo.nr_cpus() as i32, false, false, false)?;
Ok(Self { bpf })
}

fn schedule(&mut self) {
match self.bpf.dequeue_task() {
Ok(Some(task)) => {
// task.cpu < 0 is used to to notify an exiting task, in this
// case we can simply ignore it.
if task.cpu >= 0 {
let _ = self.bpf.dispatch_task(&DispatchedTask {
pid: task.pid,
cpu: task.cpu,
cpumask_cnt: task.cpumask_cnt,
payload: 0,
});
}
}
Ok(None) => {
// Notify the BPF component that all tasks have been dispatched.
self.bpf.update_tasks(Some(0), Some(0))?

break;
}
Err(_) => {
break;
}
}
}
```

Moreover, a CPU ownership map (that keeps track of which PID runs on which CPU)
can be accessed using the method `get_cpu_pid()`. This also allows to keep
track of the idle and busy CPUs, with the corresponding PIDs associated to
them.

BPF counters and statistics can be accessed using the methods `nr_*_mut()`, in
particular `nr_queued_mut()` and `nr_scheduled_mut()` can be updated to notify
the BPF component if the user-space scheduler has still some pending work to do
or not.

Lastly, the methods `exited()` and `shutdown_and_report()` can be used
respectively to test whether the BPF component exited, and to shutdown and
report the exit message.
1 change: 1 addition & 0 deletions rust/scx_rustland_core/bindings.h
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
#include "bpf_h/vmlinux/vmlinux.h"
1 change: 1 addition & 0 deletions rust/scx_rustland_core/bpf_h
10 changes: 10 additions & 0 deletions rust/scx_rustland_core/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
// Copyright (c) Meta Platforms, Inc. and affiliates.
//
// This software may be used and distributed according to the terms of the
// GNU General Public License version 2.

use scx_utils::Builder;

fn main() {
Builder::new().build()
}
7 changes: 7 additions & 0 deletions rust/scx_rustland_core/meson.build
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
custom_target('scx_rustland_core',
output: '@PLAINNAME@.__PHONY__',
input: 'Cargo.toml',
command: [cargo, 'build', '--manifest-path=@INPUT@', '--target-dir=@OUTDIR@',
cargo_build_args],
env: cargo_env,
build_by_default: true)
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ const HEAP_SIZE: usize = 64 * 1024 * 1024; // 64M
const LEAF_SIZE: usize = 64;

#[repr(align(4096))]
pub struct AlignedHeap<const N: usize>([u8; N]);
struct AlignedHeap<const N: usize>([u8; N]);

// Statically pre-allocated memory arena.
static mut FAST_HEAP: AlignedHeap<FAST_HEAP_SIZE> = AlignedHeap([0u8; FAST_HEAP_SIZE]);
Expand All @@ -26,25 +26,25 @@ static mut HEAP: AlignedHeap<HEAP_SIZE> = AlignedHeap([0u8; HEAP_SIZE]);
// To prevent potential deadlock conditions under heavy loads, any scheduler that delegates
// scheduling decisions to user-space should avoid triggering page faults.
//
// To address this issue, replace the global allocator with a custom one (RustLandAllocator),
// To address this issue, replace the global allocator with a custom one (UserAllocator),
// designed to operate on a pre-allocated buffer. This, coupled with the memory locking achieved
// through mlockall(), prevents page faults from occurring during the execution of the user-space
// scheduler.
#[cfg_attr(not(test), global_allocator)]
pub static ALLOCATOR: RustLandAllocator = unsafe {
pub static ALLOCATOR: UserAllocator = unsafe {
let fast_param = FastAllocParam::new(FAST_HEAP.0.as_ptr(), FAST_HEAP_SIZE);
let buddy_param = BuddyAllocParam::new(HEAP.0.as_ptr(), HEAP_SIZE, LEAF_SIZE);
RustLandAllocator {
UserAllocator {
arena: NonThreadsafeAlloc::new(fast_param, buddy_param),
}
};

// Main allocator class.
pub struct RustLandAllocator {
pub arena: NonThreadsafeAlloc,
pub struct UserAllocator {
arena: NonThreadsafeAlloc,
}

impl RustLandAllocator {
impl UserAllocator {
pub fn lock_memory(&self) {
unsafe {
VM.save();
Expand Down Expand Up @@ -75,7 +75,7 @@ impl RustLandAllocator {
}

// Override global allocator methods.
unsafe impl GlobalAlloc for RustLandAllocator {
unsafe impl GlobalAlloc for UserAllocator {
unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
self.arena.alloc(layout)
}
Expand Down
6 changes: 6 additions & 0 deletions rust/scx_rustland_core/src/bindings.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
#![allow(non_upper_case_globals)]
#![allow(non_camel_case_types)]
#![allow(non_snake_case)]
#![allow(dead_code)]

include!(concat!(env!("OUT_DIR"), "/bindings.rs"));
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,12 @@ use libbpf_rs::skel::SkelBuilder as _;

use libc::{sched_param, sched_setscheduler};

mod alloc;
use alloc::*;

use scx_utils::init_libbpf_logging;
use scx_utils::uei_exited;
use scx_utils::uei_report;

use scx_rustland_core::ALLOCATOR;

// Defined in UAPI
const SCHED_EXT: i32 = 7;

Expand All @@ -31,7 +30,7 @@ const SCHED_EXT: i32 = 7;
#[allow(dead_code)]
pub const NO_CPU: i32 = -1;

/// scx_rustland: provide high-level abstractions to interact with the BPF component.
/// High-level Rust abstraction to interact with a generic sched-ext BPF component.
///
/// Overview
/// ========
Expand All @@ -40,7 +39,7 @@ pub const NO_CPU: i32 = -1;
/// initialized it will take care of registering and initializing the BPF component.
///
/// The scheduler then can use BpfScheduler() instance to receive tasks (in the form of QueuedTask
/// object) and dispatch tasks (in the form of DispatchedTask objects), using respectively the
/// objects) and dispatch tasks (in the form of DispatchedTask objects), using respectively the
/// methods dequeue_task() and dispatch_task().
///
/// The CPU ownership map can be accessed using the method get_cpu_pid(), this also allows to keep
Expand All @@ -51,85 +50,8 @@ pub const NO_CPU: i32 = -1;
/// user-space scheduler has some pending work to do or not.
///
/// Finally the methods exited() and shutdown_and_report() can be used respectively to test
/// whether the BPF component exited, and to shutdown and report the exit message.
/// whether the BPF component exited, and to shutdown and report exit message.
///
/// Example
/// =======
///
/// Following you can find bare minimum template that can be used to implement a simple FIFO
/// scheduler using the BPF abstraction:
///
/// mod bpf_skel;
/// pub use bpf_skel::*;
/// mod bpf;
/// pub mod bpf_intf;
/// use bpf::*;
///
/// use std::thread;
///
/// use std::sync::atomic::AtomicBool;
/// use std::sync::atomic::Ordering;
/// use std::sync::Arc;
///
/// use anyhow::Result;
///
/// struct Scheduler<'a> {
/// bpf: BpfScheduler<'a>,
/// }
///
/// impl<'a> Scheduler<'a> {
/// fn init() -> Result<Self> {
/// let bpf = BpfScheduler::init(20000, false, false)?;
/// Ok(Self { bpf })
/// }
///
/// fn dispatch_tasks(&mut self) {
/// loop {
/// match self.bpf.dequeue_task() {
/// Ok(Some(task)) => {
/// if task.cpu >= 0 {
/// let _ = self.bpf.dispatch_task(
/// &DispatchedTask {
/// pid: task.pid,
/// cpu: task.cpu,
/// payload: 0,
/// }
/// );
/// }
/// }
/// Ok(None) => {
/// *self.bpf.nr_queued_mut() = 0;
/// *self.bpf.nr_scheduled_mut() = 0;
/// break;
/// }
/// Err(_) => {
/// break;
/// }
/// }
/// }
/// }
///
/// fn run(&mut self, shutdown: Arc<AtomicBool>) -> Result<()> {
/// while !shutdown.load(Ordering::Relaxed) && !self.bpf.exited() {
/// self.dispatch_tasks();
/// thread::yield_now();
/// }
///
/// Ok(())
/// }
/// }
///
/// fn main() -> Result<()> {
/// let mut sched = Scheduler::init()?;
/// let shutdown = Arc::new(AtomicBool::new(false));
/// let shutdown_clone = shutdown.clone();
/// ctrlc::set_handler(move || {
/// shutdown_clone.store(true, Ordering::Relaxed);
/// })?;
///
/// sched.run(shutdown)
/// }
///

// Task queued for scheduling from the BPF component (see bpf_intf::queued_task_ctx).
#[derive(Debug)]
Expand Down Expand Up @@ -241,16 +163,31 @@ impl<'cb> BpfScheduler<'cb> {
ALLOCATOR.lock_memory();

// Copy one item from the ring buffer.
//
// # Safety
//
// Each invocation of the callback will trigger the copy of exactly one QueuedTask item to
// BUF. The caller must be synchronize to ensure that multiple invocations of the callback
// are not happening at the same time, but this is implicitly guaranteed by the fact that
// the caller is a single-thread process (for now).
//
// Use of a `str` whose contents are not valid UTF-8 is undefined behavior.
fn callback(data: &[u8]) -> i32 {
unsafe {
// SAFETY: copying from the BPF ring buffer to BUF is safe, since the size of BUF
// is exactly the size of QueuedTask and the callback operates in chunks of
// QueuedTask items. It also copies exactly one QueuedTask at a time, this is
// guaranteed by the error code returned by this callback (see below). From a
// thread-safety perspective this is also correct, assuming the caller is a
// single-thread process (as it is for now).
BUF.0.copy_from_slice(data);
}

// Return an unsupported error to stop early and consume only one item.
//
// NOTE: this is quite a hack. I wish libbpf would honor stopping after the first item
// is consumed, upon returnin a non-zero positive value here, but it doesn't seem to be
// the case:
// is consumed, upon returning a non-zero positive value here, but it doesn't seem to
// be the case:
//
// https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/tree/tools/lib/bpf/ringbuf.c?h=v6.8-rc5#n260
//
Expand Down Expand Up @@ -305,6 +242,23 @@ impl<'cb> BpfScheduler<'cb> {
}
}

// Update the amount of tasks that have been queued to the user-space scheduler and dispatched.
//
// This method is used to notify the BPF component if the user-space scheduler has still some
// pending actions to complete (based on the counter of queued and scheduled tasks).
//
// NOTE: do not set allow(dead_code) for this method, any scheduler must use this method at
// some point, otherwise the BPF component will keep waking-up the user-space scheduler in a
// busy loop, causing unnecessary high CPU consumption.
pub fn update_tasks(&mut self, nr_queued: Option<u64>, nr_scheduled: Option<u64>) {
if let Some(queued) = nr_queued {
self.skel.bss_mut().nr_queued = queued;
}
if let Some(scheduled) = nr_scheduled {
self.skel.bss_mut().nr_scheduled = scheduled;
}
}

// Override the default scheduler time slice (in us).
#[allow(dead_code)]
pub fn set_effective_slice_us(&mut self, slice_us: u64) {
Expand All @@ -324,11 +278,13 @@ impl<'cb> BpfScheduler<'cb> {
}

// Counter of queued tasks.
#[allow(dead_code)]
pub fn nr_queued_mut(&mut self) -> &mut u64 {
&mut self.skel.bss_mut().nr_queued
}

// Counter of scheduled tasks.
#[allow(dead_code)]
pub fn nr_scheduled_mut(&mut self) -> &mut u64 {
&mut self.skel.bss_mut().nr_scheduled
}
Expand Down
Loading