Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

layered: Fix static configuration, and dispatch for Grouped layers #148

Merged
merged 3 commits into from
Feb 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 20 additions & 29 deletions scheds/rust/scx_layered/src/bpf/main.bpf.c
Original file line number Diff line number Diff line change
Expand Up @@ -140,28 +140,42 @@ static struct cpumask *lookup_layer_cpumask(int idx)
}
}

struct layer *lookup_layer(int idx)
{
if (idx < 0 || idx >= nr_layers) {
scx_bpf_error("invalid layer %d", idx);
return NULL;
}
return &layers[idx];
}

static void refresh_cpumasks(int idx)
{
struct layer_cpumask_wrapper *cpumaskw;
struct layer *layer;
int cpu, total = 0;
struct layer *layer = lookup_layer(idx);

if (!layer)
return;

if (!__sync_val_compare_and_swap(&layers[idx].refresh_cpus, 1, 0))
if (!__sync_val_compare_and_swap(&layer->refresh_cpus, 1, 0))
return;

cpumaskw = bpf_map_lookup_elem(&layer_cpumasks, &idx);

bpf_rcu_read_lock();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's this protecting?

Copy link
Contributor Author

@Byte-Lab Byte-Lab Feb 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without the rcu region, the verifier doesn't like the cpumask kfunc arg:

; bpf_for(cpu, 0, nr_possible_cpus) {                                                                                                                                                    
441: (3e) if w1 >= w2 goto pc+23      ; frame1: R1_w=scalar(smin=smin32=0,smax=umax=smax32=umax32=31,var_off=(0x0; 0x1f)) R2_w=32 refs=9,28                                              
; if (!cpumaskw || !cpumaskw->cpumask) {                                                                                                                                                 
442: (15) if r6 == 0x0 goto pc+46     ; frame1: R6=map_value(off=0,ks=4,vs=8,imm=0) refs=9,28                                                                                            
; if (!cpumaskw || !cpumaskw->cpumask) {
443: (79) r2 = *(u64 *)(r6 +0)        ; frame1: R2_w=untrusted_ptr_or_null_bpf_cpumask(id=30,off=0,imm=0) R6=map_value(off=0,ks=4,vs=8,imm=0) refs=9,28
; if (!cpumaskw || !cpumaskw->cpumask) {
444: (15) if r2 == 0x0 goto pc+44     ; frame1: R2_w=untrusted_ptr_bpf_cpumask(off=0,imm=0) refs=9,28
; 
445: (bc) w3 = w1                     ; frame1: R1_w=scalar(id=31,smin=smin32=0,smax=umax=smax32=umax32=31,var_off=(0x0; 0x1f)) R3_w=scalar(id=31,smin=smin32=0,smax=umax=smax32=umax32=31,var_off=(0x0; 0x1f)) refs=9,28
446: (74) w3 >>= 3                    ; frame1: R3_w=scalar(smin=smin32=0,smax=umax=smax32=umax32=3,var_off=(0x0; 0x3)) refs=9,28
447: (bf) r4 = r7                     ; frame1: R4_w=map_value(off=0,ks=4,vs=8476172,smin=smin32=0,smax=umax=smax32=umax32=1055280,var_off=(0x0; 0x183f78)) R7=map_value(off=0,ks=4,vs=8476172,smin=smin32=0,smax=umax=smax32=umax32=1055280,var_off=(0x0; 0x183f78)) refs=9,28
448: (0f) r4 += r3                    ; frame1: R3_w=scalar(smin=smin32=0,smax=umax=smax32=umax32=3,var_off=(0x0; 0x3)) R4_w=map_value(off=0,ks=4,vs=8476172,smin=smin32=0,smax=umax=smax32=umax32=1055283,var_off=(0x0; 0x183f7b)) refs=9,28
449: (07) r4 += 527572                ; frame1: R4_w=map_value(off=527572,ks=4,vs=8476172,smin=smin32=0,smax=umax=smax32=umax32=1055283,var_off=(0x0; 0x183f7b)) refs=9,28
; if (*u8_ptr & (1 << (cpu % 8))) {
450: (bc) w3 = w1                     ; frame1: R1_w=scalar(id=31,smin=smin32=0,smax=umax=smax32=umax32=31,var_off=(0x0; 0x1f)) R3_w=scalar(id=31,smin=smin32=0,smax=umax=smax32=umax32=31,var_off=(0x0; 0x1f)) refs=9,28
451: (54) w3 &= 7                     ; frame1: R3_w=scalar(smin=smin32=0,smax=umax=smax32=umax32=7,var_off=(0x0; 0x7)) refs=9,28
; if (*u8_ptr & (1 << (cpu % 8))) {
452: (71) r4 = *(u8 *)(r4 +0)         ; frame1: R4_w=scalar(smin=smin32=0,smax=umax=smax32=umax32=255,var_off=(0x0; 0xff)) refs=9,28
; if (*u8_ptr & (1 << (cpu % 8))) {
453: (7c) w4 >>= w3                   ; frame1: R3_w=scalar(smin=smin32=0,smax=umax=smax32=umax32=7,var_off=(0x0; 0x7)) R4_w=scalar() refs=9,28
454: (54) w4 &= 1                     ; frame1: R4=scalar(smin=smin32=0,smax=umax=smax32=umax32=1,var_off=(0x0; 0x1)) refs=9,28
455: (16) if w4 == 0x0 goto pc+7      ; frame1: R4=scalar(smin=smin32=0,smax=umax=smax32=umax32=1,var_off=(0x0; 0x1)) refs=9,28
; bpf_cpumask_set_cpu(cpu, cpumaskw->cpumask); 
456: (85) call bpf_cpumask_set_cpu#36671
R2 must be a rcu pointer
processed 237 insns (limit 1000000) max_states_per_insn 1 total_states 24 peak_states 24 mark_read 9
-- END PROG LOAD LOG --

Otherwise, cpumaskw->cpumask could be freed out from under it on the ops.init() path. I'm kinda confused though why it's safe to pass cpumaskw->cpumask to the kfunc directly. I'd have expected us to need to put that into a local variable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. The existing code is lacking the needed protection then. Hmm... why was it even loading?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe it's because it was previously only being called on paths with interrupts disabled. ops.init() is sleepable, so it explicitly needs the extra RCU protection.

bpf_for(cpu, 0, nr_possible_cpus) {
u8 *u8_ptr;

if ((u8_ptr = MEMBER_VPTR(layers, [idx].cpus[cpu / 8]))) {
if ((u8_ptr = &layer->cpus[cpu / 8])) {
/*
* XXX - The following test should be outside the loop
* but that makes the verifier think that
* cpumaskw->cpumask might be NULL in the loop.
*/
barrier_var(cpumaskw);
if (!cpumaskw || !cpumaskw->cpumask) {
bpf_rcu_read_unlock();
scx_bpf_error("can't happen");
return;
}
Expand All @@ -176,13 +190,7 @@ static void refresh_cpumasks(int idx)
scx_bpf_error("can't happen");
}
}

// XXX - shouldn't be necessary
layer = MEMBER_VPTR(layers, [idx]);
if (!layer) {
scx_bpf_error("can't happen");
return;
}
bpf_rcu_read_unlock();

layer->nr_cpus = total;
__sync_fetch_and_add(&layer->cpus_seq, 1);
Expand Down Expand Up @@ -240,15 +248,6 @@ struct task_ctx *lookup_task_ctx(struct task_struct *p)
}
}

struct layer *lookup_layer(int idx)
{
if (idx < 0 || idx >= nr_layers) {
scx_bpf_error("invalid layer %d", idx);
return NULL;
}
return &layers[idx];
}

/*
* Because the layer membership is by the default hierarchy cgroups rather than
* the CPU controller membership, we can't use ops.cgroup_move(). Let's iterate
Expand Down Expand Up @@ -506,9 +505,6 @@ void BPF_STRUCT_OPS(layered_dispatch, s32 cpu, struct task_struct *prev)
struct layer *layer = &layers[idx];
struct cpumask *layer_cpumask;

if (layer->open)
continue;

/* consume matching layers */
if (!(layer_cpumask = lookup_layer_cpumask(idx)))
return;
Expand Down Expand Up @@ -925,16 +921,11 @@ s32 BPF_STRUCT_OPS_SLEEPABLE(layered_init)
if (!cpumask)
return -ENOMEM;

/*
* Start all layers with full cpumask so that everything runs
* everywhere. This will soon be updated by refresh_cpumasks()
* once the scheduler starts running.
*/
bpf_cpumask_setall(cpumask);

cpumask = bpf_kptr_xchg(&cpumaskw->cpumask, cpumask);
if (cpumask)
bpf_cpumask_release(cpumask);

refresh_cpumasks(i);
}

return 0;
Expand Down
74 changes: 31 additions & 43 deletions scheds/rust/scx_layered/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -841,37 +841,13 @@ impl Layer {

let nr_cpus = cpu_pool.nr_cpus;

let mut layer = Self {
Ok(Self {
name: name.into(),
kind,

nr_cpus: 0,
cpus: bitvec![0; nr_cpus],
};

match &layer.kind {
LayerKind::Confined {
cpus_range,
util_range,
}
| LayerKind::Grouped {
cpus_range,
util_range,
..
} => {
layer.resize_confined_or_grouped(
cpu_pool,
*cpus_range,
*util_range,
(0.0, 0.0),
(0.0, 0.0),
false,
)?;
}
_ => {}
}

Ok(layer)
})
}

fn grow_confined_or_grouped(
Expand Down Expand Up @@ -1239,17 +1215,7 @@ impl<'a> Scheduler<'a> {
}
Self::init_layers(&mut skel, &layer_specs)?;

// Attach.
let mut skel = skel.load().context("Failed to load BPF program")?;
skel.attach().context("Failed to attach BPF program")?;
let struct_ops = Some(
skel.maps_mut()
.layered()
.attach_struct_ops()
.context("Failed to attach layered struct ops")?,
);
info!("Layered Scheduler Attached");

let mut layers = vec![];
for spec in layer_specs.iter() {
layers.push(Layer::new(&mut cpu_pool, &spec.name, spec.kind.clone())?);
Expand All @@ -1258,8 +1224,8 @@ impl<'a> Scheduler<'a> {
// Other stuff.
let proc_reader = procfs::ProcReader::new();

Ok(Self {
struct_ops, // should be held to keep it attached
let mut sched = Self {
struct_ops: None,
layer_specs,

sched_intv: Duration::from_secs_f64(opts.interval),
Expand All @@ -1281,7 +1247,22 @@ impl<'a> Scheduler<'a> {

om_stats: OpenMetricsStats::new(),
om_format: opts.open_metrics_format,
})
};

// Initialize layers before we attach the scheduler
sched.refresh_cpumasks()?;

// Attach.
sched.skel.attach().context("Failed to attach BPF program")?;
sched.struct_ops = Some(
sched.skel.maps_mut()
.layered()
.attach_struct_ops()
.context("Failed to attach layered struct ops")?,
);
info!("Layered Scheduler Attached");

Ok(sched)
}

fn update_bpf_layer_cpumask(layer: &Layer, bpf_layer: &mut bpf_bss_types::layer) {
Expand All @@ -1295,10 +1276,7 @@ impl<'a> Scheduler<'a> {
bpf_layer.refresh_cpus = 1;
}

fn step(&mut self) -> Result<()> {
let started_at = Instant::now();
self.sched_stats
.refresh(&mut self.skel, &self.proc_reader, started_at)?;
fn refresh_cpumasks(&mut self) -> Result<()> {
let mut updated = false;

for idx in 0..self.layers.len() {
Expand Down Expand Up @@ -1366,6 +1344,16 @@ impl<'a> Scheduler<'a> {
}
}

Ok(())
}

fn step(&mut self) -> Result<()> {
let started_at = Instant::now();
self.sched_stats
.refresh(&mut self.skel, &self.proc_reader, started_at)?;

self.refresh_cpumasks()?;

self.processing_dur += Instant::now().duration_since(started_at);
Ok(())
}
Expand Down