Skip to content

Commit

Permalink
rusty: Use new load balancer crate
Browse files Browse the repository at this point in the history
Now that we have a new load balancer crate, let's update rusty to use
it.

Signed-off-by: David Vernet <[email protected]>
  • Loading branch information
Byte-Lab committed Feb 24, 2024
1 parent 32dbb16 commit a2e7336
Showing 1 changed file with 23 additions and 185 deletions.
208 changes: 23 additions & 185 deletions scheds/rust/scx_rusty/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use scx_utils::ravg::ravg_read;
use scx_utils::uei_exited;
use scx_utils::uei_report;
use scx_utils::Topology;
use scx_utils::LoadBalancer;

const RAVG_FRAC_BITS: u32 = bpf_intf::ravg_consts_RAVG_FRAC_BITS;
const MAX_DOMS: usize = bpf_intf::consts_MAX_DOMS as usize;
Expand Down Expand Up @@ -368,13 +369,12 @@ struct TaskInfo {
is_kworker: bool,
}

struct LoadBalancer<'a, 'b, 'c> {
struct LoadBalancerInt<'a, 'b, 'c> {
skel: &'a mut BpfSkel<'b>,
top: Arc<Topology>,
dom_group: Arc<DomainGroup>,
skip_kworkers: bool,

lb_apply_weight: bool,
infeas_threshold: f64,

tasks_by_load: Vec<Option<BTreeMap<OrderedFloat<f64>, TaskInfo>>>,
Expand All @@ -385,6 +385,7 @@ struct LoadBalancer<'a, 'b, 'c> {
doms_to_push: BTreeMap<OrderedFloat<f64>, u32>,
doms_to_pull: BTreeMap<OrderedFloat<f64>, u32>,

lb_apply_weight: bool,
nr_lb_data_errors: &'c mut u64,
}

Expand All @@ -393,7 +394,7 @@ struct LoadBalancer<'a, 'b, 'c> {
const_assert_eq!(bpf_intf::consts_LB_MAX_WEIGHT % bpf_intf::consts_LB_LOAD_BUCKETS, 0);


impl<'a, 'b, 'c> LoadBalancer<'a, 'b, 'c> {
impl<'a, 'b, 'c> LoadBalancerInt<'a, 'b, 'c> {
// If imbalance gets higher than this ratio, try to balance the loads.
const LOAD_IMBAL_HIGH_RATIO: f64 = 0.05;

Expand All @@ -417,14 +418,13 @@ impl<'a, 'b, 'c> LoadBalancer<'a, 'b, 'c> {
top: Arc<Topology>,
dom_group: Arc<DomainGroup>,
skip_kworkers: bool,
lb_apply_weight: &bool,
lb_apply_weight: bool,
nr_lb_data_errors: &'c mut u64,
) -> Self {
Self {
skel,
skip_kworkers,

lb_apply_weight: lb_apply_weight.clone(),
infeas_threshold: bpf_intf::consts_LB_MAX_WEIGHT as f64,

tasks_by_load: (0..dom_group.nr_doms()).map(|_| None).collect(),
Expand All @@ -435,6 +435,7 @@ impl<'a, 'b, 'c> LoadBalancer<'a, 'b, 'c> {
doms_to_pull: BTreeMap::new(),
doms_to_push: BTreeMap::new(),

lb_apply_weight: lb_apply_weight.clone(),
nr_lb_data_errors,

top,
Expand All @@ -455,157 +456,32 @@ impl<'a, 'b, 'c> LoadBalancer<'a, 'b, 'c> {
let min_w = 1 + (MAX_WEIGHT * bucket) / NUM_BUCKETS;
let max_w = min_w + WEIGHT_PER_BUCKET - 1;

(min_w as f64, max_w as f64)
(min_w as f64, max_w as f64)
}

fn bucket_weight(&self, bucket: u64) -> f64 {
fn bucket_weight(&self, bucket: u64) -> usize {
const WEIGHT_PER_BUCKET: f64 = bpf_intf::consts_LB_WEIGHT_PER_BUCKET as f64;
let (min_weight, _) = self.bucket_range(bucket);

// Use the mid-point of the bucket when determining weight
min_weight + (WEIGHT_PER_BUCKET / 2.0f64)
}

fn apply_infeas_threshold(&mut self,
doms_dcycles_buckets: &[f64],
infeas_thrsh: f64) {
const NUM_BUCKETS: u64 = bpf_intf::consts_LB_LOAD_BUCKETS as u64;

self.infeas_threshold = infeas_thrsh;
let mut global_load_sum = 0.0f64;
for dom in 0..self.dom_group.nr_doms() {
let dom_offset = (dom as u64) * NUM_BUCKETS;
let mut dom_load_sum = 0.0f64;
for i in 0..NUM_BUCKETS {
let weight = self.bucket_weight(i).min(self.infeas_threshold);
let dcycle = doms_dcycles_buckets[(dom_offset + i) as usize];

dom_load_sum += dcycle * weight;
}
self.dom_loads[dom] = dom_load_sum;
global_load_sum += dom_load_sum;
}

self.load_avg = global_load_sum / self.dom_group.nr_doms() as f64;
}

fn adjust_infeas_weights(&mut self,
bucket_dcycles: &[f64],
doms_dcycle_buckets: &[f64],
global_load_sum: f64) -> Result<()> {
// At this point we have the following data points:
//
// P : The number of cores on the system
// L : The total load sum of the system before any adjustments for
// infeasibility
// Lf: The load sum of all feasible tasks
// D : The total sum of duty cycles across all domains in the system
// Di: The duty cycle sum of all infeasible tasks
//
// We need to find a weight lambda_x such that every infeasible task in
// the system will be granted a CPU allocation equal to their duty
// cycle, and all the remaining compute capacity in the system will be
// divided fairly amongst the feasible tasks according to their load.
// Our goal is to find a value lambda_x such that every infeasible task
// is allocated its duty cycle, and the remaining compute capacity is
// shared fairly amongst the feasible tasks on the system.
//
// If L' is the load sum on the system after clamping all weights
// w_x > lambda_x to lambda_x, then lambda_x can be defined as follows:
//
// lambda_x = L' / P
//
// => L' = lambda_x * Di + Lf
// => lambda_x * P' = lambda_x * Di + Lf
// => lambda_x (P' - D_I) = Lf
// => lambda_x = Lf / (P' - Di)
//
// Thus, need to iterate over different values of x (i.e. over buckets)
// until we find a lambda_x such that:
//
// w_x >= lambda_x >= w_x+1
//
// Once we find a lambda_x, we need to:
//
// 1. Adjust the maximum weights of any w_x > lambda_x -> lambda_x
// 2. Subtract (w_i - lambda_x) from the load sums that the buckets were
// contributing to
// 3. Re-calculate the per-domain load, and the global load average.
//
// Note that we should always find a lambda_x at this point, as we
// verified in the caller that there is at least one infeasible bucket
// in the system.
//
// All of this is described and proven in detail in the following pdf:
//
// https://drive.google.com/file/d/1fAoWUlmW-HTp6akuATVpMxpUpvWcGSAv
const NUM_BUCKETS: u64 = bpf_intf::consts_LB_LOAD_BUCKETS as u64;
let p = self.top.nr_cpus() as f64;
let mut curr_dcycle_sum = 0.0f64;
let mut curr_load_sum = global_load_sum;
let mut lambda_x = curr_load_sum / p;

for bucket in (0..NUM_BUCKETS).filter(|bucket| !approx_eq(bucket_dcycles[*bucket as usize], 0f64)).rev() {
let weight = self.bucket_weight(bucket);
let dcycles = bucket_dcycles[bucket as usize];

if approx_ge(lambda_x, weight) {
self.apply_infeas_threshold(doms_dcycle_buckets, lambda_x);
return Ok(());
}

curr_dcycle_sum += dcycles;
curr_load_sum -= weight * dcycles;
lambda_x = curr_load_sum / (p - curr_dcycle_sum);
}

// We can fail to find an infeasible weight if the host is
// under-utilized. In this case, just fall back to using weights. If
// this is happening due to a stale system-wide util value due to the
// tuner not having run recently enough, it is a condition that should
// self-correct soon. If it is the result of the user configuring us to
// use weights even when the system is under-utilized, they were warned
// when the scheduler was launched.
self.load_avg = global_load_sum / self.dom_group.nr_doms() as f64;
Ok(())
// Use the mid-point of the bucket when determining weight
(min_weight + (WEIGHT_PER_BUCKET / 2.0f64)).floor() as usize
}

fn read_dom_loads(&mut self) -> Result<()> {
const NUM_BUCKETS: u64 = bpf_intf::consts_LB_LOAD_BUCKETS as u64;
let now_mono = now_monotonic();
let load_half_life = self.skel.rodata().load_half_life;
let maps = self.skel.maps();
let dom_data = maps.dom_data();
const NUM_BUCKETS: u64 = bpf_intf::consts_LB_LOAD_BUCKETS as u64;

// Sum of dcycle and load for each bucket, aggregated across domains.
let mut global_bucket_dcycle = vec![0.0f64; NUM_BUCKETS as usize];

// Global dcycle and load sums.
let mut global_dcycle_sum = 0.0f64;
let mut global_load_sum = 0.0f64;

// dcycle values stored in every bucket. Recorded here so we don't have
// to do another ravg read later when testing and adjusting for
// infeasibility.
let mut doms_dcycle_buckets = vec![0.064; NUM_BUCKETS as usize * self.dom_group.nr_doms()];

// Sum of dcycle for each domain. Used if we're going to do load
// balancing based on just dcycle to avoid having to do two iterations.
let mut doms_dcycle_sums = vec![0.064; self.dom_group.nr_doms()];

// Track maximum weight so we can test for infeasibility below.
let mut max_weight = 0.0f64;
let mut aggregator = LoadBalancer::aggregator(self.top.nr_cpus(), !self.lb_apply_weight.clone());

// Accumulate dcycle and load across all domains and buckets. If we're
// under-utilized, or there are no infeasible weights, this is
// sufficient to collect all of the data we need for load balancing.
for dom in 0..self.dom_group.nr_doms() {
let dom_key = unsafe { std::mem::transmute::<u32, [u8; 4]>(dom as u32) };

let dom_offset = dom as u64 * NUM_BUCKETS;
let mut dom_dcycle_sum = 0.0f64;
let mut dom_load_sum = 0.0f64;

if let Some(dom_ctx_map_elem) = dom_data
.lookup(&dom_key, libbpf_rs::MapFlags::ANY)
.context("Failed to lookup dom_ctx")?
Expand All @@ -630,61 +506,23 @@ impl<'a, 'b, 'c> LoadBalancer<'a, 'b, 'c> {
continue;
}

dom_dcycle_sum += duty_cycle;
global_bucket_dcycle[bucket as usize] += duty_cycle;
doms_dcycle_buckets[(dom_offset + bucket) as usize] = duty_cycle;

let weight = self.bucket_weight(bucket);
let load = weight * duty_cycle;
dom_load_sum += load;
if weight > max_weight {
max_weight = weight;
}
aggregator.record_dom_load(dom, weight, duty_cycle)?;
}

global_dcycle_sum += dom_dcycle_sum;
doms_dcycle_sums[dom] = dom_dcycle_sum;

global_load_sum += dom_load_sum;
self.dom_loads[dom] = dom_load_sum;
}
}

let balancer = aggregator.calculate();
if !self.lb_apply_weight {
// System is under-utilized, so just use dcycle instead of load.
self.load_avg = global_dcycle_sum / self.dom_group.nr_doms() as f64;
self.dom_loads = doms_dcycle_sums;
return Ok(());
}

// If the sum of duty cycle on the system is >= P, any weight w_x of a
// task that exceeds L / P is guaranteed to be infeasible. Furthermore,
// if any weight w_x == L / P then we know that task t_x can get its
// full duty cycle, as:
//
// c_x = P * (w_x * d_x) / L
// = P * (L/P * d_x) / L
// = d_x / L / L
// = d_x
//
// If there is no bucket whose weight exceeds L / P that has a nonzero
// duty cycle, then all weights are feasible and we can use the data we
// collected above without having to adjust for infeasibility.
// Otherwise, we have at least one infeasible weight.
//
// See the function header for adjust_infeas_weights() for a more
// comprehensive description of the algorithm for adjusting for
// infeasible weights.
let infeasible_thresh = global_load_sum / self.top.nr_cpus() as f64;
if approx_ge(max_weight, infeasible_thresh) {
debug!("max_weight={} infeasible_threshold= {}",
max_weight, infeasible_thresh);
return self.adjust_infeas_weights(&global_bucket_dcycle,
&doms_dcycle_buckets,
global_load_sum);
self.load_avg = balancer.global_dcycle_sum() / self.dom_group.nr_doms() as f64;
self.dom_loads = balancer.dom_dcycle_sums().to_vec();
} else {
self.load_avg = balancer.global_load_sum() / self.dom_group.nr_doms() as f64;
self.dom_loads = balancer.dom_load_sums().to_vec();
self.infeas_threshold = balancer.w_prime();
}

self.load_avg = global_load_sum / self.dom_group.nr_doms() as f64;
Ok(())
}

Expand Down Expand Up @@ -1321,12 +1159,12 @@ impl<'a> Scheduler<'a> {
let bpf_stats = self.read_bpf_stats()?;
let cpu_busy = self.get_cpu_busy()?;

let mut lb = LoadBalancer::new(
let mut lb = LoadBalancerInt::new(
&mut self.skel,
self.top.clone(),
self.dom_group.clone(),
self.dom_group.clone(),
self.balanced_kworkers,
&lb_apply_weight,
lb_apply_weight.clone(),
&mut self.nr_lb_data_errors,
);

Expand Down

0 comments on commit a2e7336

Please sign in to comment.