Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
winding-lines committed Mar 5, 2023
1 parent 67b4e96 commit fb5502b
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 18 deletions.
21 changes: 14 additions & 7 deletions polars/polars-io/src/parquet/async_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ use futures::channel::mpsc::Sender;
use futures::channel::oneshot;
use once_cell::sync::Lazy;
use polars_core::prelude::*;
use tokio::runtime::Handle;
use tokio::runtime::Runtime;

use super::mmap::ColumnMapper;
use super::async_impl::ParquetObjectStore;
use super::mmap::CloudMapper;

static GLOBAL_ASYNC_MANAGER: Lazy<AsyncManager> = Lazy::new(AsyncManager::default);

Expand Down Expand Up @@ -47,7 +48,7 @@ enum AsyncParquetReaderMessage {
/// The row groups to fetch.
row_groups: Range<usize>,
/// The channel to send the result to.
tx: oneshot::Sender<PolarsResult<ColumnMapper>>,
tx: oneshot::Sender<PolarsResult<CloudMapper>>,
},
}

Expand All @@ -56,20 +57,22 @@ pub(crate) struct AsyncManager {
/// The channel to communicate with the manager.
tx: Sender<AsyncParquetReaderMessage>,
/// A handle to the Tokio runtime running the manager.
handle: Handle,
runtime: Runtime,
/// Opened readers.
readers: PlHashMap<String, Arc<ParquetObjectStore>>,
}

impl AsyncManager {
/// Create a new async manager.
pub fn new() -> AsyncManager {
use futures::stream::StreamExt;

let (tx, rx) = futures::channel::mpsc::channel(1);
let handle = tokio::runtime::Builder::new_multi_thread()
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap();
handle.spawn(async move {
runtime.spawn(async move {
let mut reader = None;
while let Some(message) = rx.next().await {
match message {
Expand Down Expand Up @@ -106,7 +109,11 @@ impl AsyncManager {
}
}
});
AsyncManager { tx, handle }
AsyncManager {
tx,
runtime,
readers: PlHashMap::new(),
}
}
}

Expand Down
18 changes: 7 additions & 11 deletions polars/polars-io/src/parquet/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ fn rg_to_dfs(
let md = &file_metadata.row_groups[rg];
let current_row_count = md.num_rows() as IdxSize;

if use_statistics && !read_this_row_group(predicate.as_ref(), file_metadata, schema, rg)? {
if use_statistics && !read_this_row_group(predicate, file_metadata, schema, rg)? {
*previous_row_count += current_row_count;
continue;
}
Expand Down Expand Up @@ -363,11 +363,8 @@ impl FetchRowGroupsFromMmapReader {

/// There is nothing to do when fetching a mmap-ed file.
impl FetchRowGroups for FetchRowGroupsFromMmapReader {
fn fetch_row_groups(
&mut self,
_row_groups: Range<usize>,
) -> PolarsResult<CloudMapper> {
Ok(mmap::CloudMapper::PassThrough(self.0.deref()))
fn fetch_row_groups(&mut self, _row_groups: Range<usize>) -> PolarsResult<CloudMapper> {
Ok(mmap::CloudMapper::PassThrough(self.0.deref()))
}
}

Expand All @@ -377,7 +374,6 @@ pub struct BatchedParquetReader {
row_group_fetcher: Box<dyn FetchRowGroups>,
limit: usize,
projection: Vec<usize>,
predicate: Option<Arc<dyn PhysicalIoExpr>>,
schema: ArrowSchema,
metadata: FileMetaData,
row_count: Option<RowCount>,
Expand Down Expand Up @@ -432,8 +428,8 @@ impl BatchedParquetReader {
pub fn next_batches(&mut self, n: usize) -> PolarsResult<Option<Vec<DataFrame>>> {
// fill up fifo stack
if self.row_group_offset <= self.n_row_groups && self.chunks_fifo.len() < n {
let row_group_start = self.row_group_offset;
let row_group_end = std::cmp::min(self.row_group_offset + n, self.n_row_groups);
let row_group_start = self.row_group_offset;
let row_group_end = std::cmp::min(self.row_group_offset + n, self.n_row_groups);
let store = self
.row_group_fetcher
.fetch_row_groups(row_group_start..row_group_end)?;
Expand Down Expand Up @@ -473,8 +469,8 @@ impl BatchedParquetReader {
self.row_group_offset += n;
dfs
}
_ => unimplemented!(),
};
_ => unimplemented!(),
};

// TODO! this is slower than it needs to be
// we also need to parallelize over row groups here.
Expand Down

0 comments on commit fb5502b

Please sign in to comment.