Skip to content

Commit

Permalink
don't cache window function groups/join_tuples fixes deadlock
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Jul 22, 2021
1 parent 6c7b0d4 commit 619baa6
Showing 1 changed file with 15 additions and 74 deletions.
89 changes: 15 additions & 74 deletions polars/polars-lazy/src/physical_plan/expressions/window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,63 +20,27 @@ pub struct WindowExpr {
impl PhysicalExpr for WindowExpr {
// Note: this was first implemented with expression evaluation but this performed really bad.
// Therefore we choose the groupby -> apply -> self join approach

// This first cached the groupby and the join tuples, but rayon under a mutex leads to deadlocks:
// https://github.com/rayon-rs/rayon/issues/592
fn evaluate(&self, df: &DataFrame, state: &ExecutionState) -> Result<Series> {
// This method does the following:
// 1. determine groupby tuples based on the group_column
// 2. apply an aggregation function
// 3. join the results back to the original dataframe
// this stores all group values on the original df size
// 4. select the final column and return

// We create a key to store in the state cache
// assume 32 digits per ptr.
let mut key = String::with_capacity(df.width() * 32);
df.get_columns()
.iter()
.for_each(|s| key.push_str(&format!("{}", s.get_data_ptr())));

let groupby_columns = self
.group_by
.iter()
.map(|e| e.evaluate(df, state))
.collect::<Result<Vec<_>>>()?;
groupby_columns.iter().for_each(|e| {
key.push_str(e.name());
});

// 1. get the group tuples
// We keep the lock for the entire window expression, we want those to be sequential
// The utilize parallelism enough in groupby and join operation
let mut groups_lock;

// We have got this spin-lock because we can deadlock here. That's because in the gb.aggregations
// below, like `sum`, `min` etc. the work is put on a rayon threadpool, stopping this thread
// letting other threads do the aggregation, but rayon may also start a new window expression,
// trying to acquire the lock that is held here. Therefore we spin while trying to lock,
// and if it's held by another thread we release this thread.
loop {
match state.group_tuples.try_lock() {
Ok(lock) => {
groups_lock = lock;
break;
}
Err(_) => {
// thread yield could still cause a dead lock, maybe because it remained
// high priority?
std::thread::sleep(std::time::Duration::from_millis(1));
}
}
}
let groups = match groups_lock.get_mut(&key) {
Some(groups) => std::mem::take(groups),
None => {
let mut gb = df.groupby_with_series(groupby_columns.clone(), true)?;
std::mem::take(gb.get_groups_mut())
}
};
let mut gb = df.groupby_with_series(groupby_columns.clone(), true)?;
let groups = std::mem::take(gb.get_groups_mut());

// 2. create GroupBy object and apply aggregation
let mut gb = GroupBy::new(
let gb = GroupBy::new(
df,
groupby_columns.clone(),
groups,
Expand Down Expand Up @@ -123,48 +87,25 @@ impl PhysicalExpr for WindowExpr {
.into(),
)),
}?;
// store the group tuples and drop the lock so other threads may use them
groups_lock.insert(key.clone(), std::mem::take(gb.get_groups_mut()));
drop(groups_lock);

// 3. get the join tuples and use them to take the new Series
let out_column = out.select_at_idx(out.width() - 1).unwrap();

// Same logic as above. The join algorithm also spawns new threads.
let mut join_tuples_lock;
loop {
match state.join_tuples.try_lock() {
Ok(lock) => {
join_tuples_lock = lock;
break;
}
Err(_) => {
std::thread::sleep(std::time::Duration::from_millis(1));
}
}
}

let opt_join_tuples = match join_tuples_lock.get_mut(&key) {
Some(t) => std::mem::take(t),
None => {
if groupby_columns.len() == 1 {
// group key from right column
let right = out.select_at_idx(0).unwrap();
groupby_columns[0].hash_join_left(right)
} else {
let df_right =
DataFrame::new_no_checks(out.get_columns()[..out.width() - 1].to_vec());
let df_left = DataFrame::new_no_checks(groupby_columns);
private_left_join_multiple_keys(&df_left, &df_right)
}
}
let opt_join_tuples = if groupby_columns.len() == 1 {
// group key from right column
let right = out.select_at_idx(0).unwrap();
groupby_columns[0].hash_join_left(right)
} else {
let df_right = DataFrame::new_no_checks(out.get_columns()[..out.width() - 1].to_vec());
let df_left = DataFrame::new_no_checks(groupby_columns);
private_left_join_multiple_keys(&df_left, &df_right)
};

let mut iter = opt_join_tuples
.iter()
.map(|(_left, right)| right.map(|i| i as usize));

let mut out = unsafe { out_column.take_opt_iter_unchecked(&mut iter) };
join_tuples_lock.insert(key, opt_join_tuples);
if let Some(name) = &self.out_name {
out.rename(name.as_str());
}
Expand Down

0 comments on commit 619baa6

Please sign in to comment.