Skip to content

Commit

Permalink
feat(rust): introduce async at the top level
Browse files Browse the repository at this point in the history
  • Loading branch information
winding-lines committed Feb 19, 2023
1 parent 49f71a8 commit f2f8f7b
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 22 deletions.
2 changes: 2 additions & 0 deletions polars/polars-lazy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ serde_json = "1"
[dependencies]
ahash.workspace = true
bitflags.workspace = true
futures = { version = "0.3.25" }
tokio = { version = "1.22.0", features = ["rt-multi-thread", "net"] }
glob = "0.3"
polars-arrow = { version = "0.27.2", path = "../polars-arrow" }
polars-core = { version = "0.27.2", path = "../polars-core", features = ["lazy", "private", "zip_with", "random"], default-features = false }
Expand Down
9 changes: 9 additions & 0 deletions polars/polars-lazy/src/physical_plan/executors/executor.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use futures::future::BoxFuture;

use super::*;

// Executor are the executors of the physical plan and produce DataFrames. They
Expand All @@ -9,6 +11,13 @@ use super::*;
/// physical plan until the last executor is evaluated.
pub trait Executor: Send {
fn execute(&mut self, cache: &mut ExecutionState) -> PolarsResult<DataFrame>;

fn async_execute(
&mut self,
cache: &mut ExecutionState,
) -> BoxFuture<'static, PolarsResult<DataFrame>> {
Box::pin(futures::future::ready(self.execute(cache)))
}
}

pub struct Dummy {}
Expand Down
63 changes: 43 additions & 20 deletions polars/polars-lazy/src/physical_plan/executors/union.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,26 +74,49 @@ impl Executor for UnionExec {
println!("UNION: union is run in parallel")
}

// we don't use par_iter directly because the LP may also start threads for every LP (for instance scan_csv)
// this might then lead to a rayon SO. So we take a multitude of the threads to keep work stealing
// within bounds
let out = POOL.install(|| {
inputs
.chunks_mut(POOL.current_num_threads() * 3)
.map(|chunk| {
chunk
.into_par_iter()
.enumerate()
.map(|(idx, input)| {
let mut input = std::mem::take(input);
let mut state = state.split();
state.branch_idx += idx;
input.execute(&mut state)
})
.collect::<PolarsResult<Vec<_>>>()
})
.collect::<PolarsResult<Vec<_>>>()
});
// #[cfg(not(target = "async"))]
// // we don't use par_iter directly because the LP may also start threads for every LP (for instance scan_csv)
// // this might then lead to a rayon SO. So we take a multitude of the threads to keep work stealing
// // within bounds
// let out = POOL.install(|| {
// inputs
// .chunks_mut(POOL.current_num_threads() * 3)
// .map(|chunk| {
// chunk
// .into_par_iter()
// .enumerate()
// .map(|(idx, input)| {
// let mut input = std::mem::take(input);
// let mut state = state.split();
// state.branch_idx += idx;
// input.execute(&mut state)
// })
// .collect::<PolarsResult<Vec<_>>>()
// })
// .collect::<PolarsResult<Vec<_>>>()
// });
let out = {
use futures::{stream, StreamExt, TryStreamExt};
use tokio::runtime::Runtime;

let rt = Runtime::new().unwrap();
rt.block_on(async {
stream::iter(inputs.chunks_mut(3))
.then(|chunk| async {
stream::iter(chunk.into_iter().enumerate())
.then(|(idx, input)| {
let mut input = std::mem::take(input);
let mut state = state.split();
state.branch_idx += idx;
input.async_execute(&mut state)
})
.try_collect::<Vec<_>>()
.await
})
.try_collect::<Vec<_>>()
.await
})
};

concat_df(out?.iter().flat_map(|dfs| dfs.iter()))
}
Expand Down
46 changes: 44 additions & 2 deletions py-polars/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit f2f8f7b

Please sign in to comment.