Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow Overriding AsyncFileReader used by ParquetExec #3051

Merged
merged 25 commits into from
Aug 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
db4e632
add metadata_ext to part file, etc
Cheappie Aug 6, 2022
12dee5e
handle error
Cheappie Aug 6, 2022
926d25d
fix compilation issues
Cheappie Aug 6, 2022
4e9eeba
rename var
Cheappie Aug 6, 2022
cab87e8
fix tests compilation issues
Cheappie Aug 6, 2022
cecfa0a
allow user to provide their own parquet reader factory
Cheappie Aug 6, 2022
124eeb9
move ThinFileReader into parquet module
Cheappie Aug 6, 2022
793d252
rename field reader to delegate
Cheappie Aug 6, 2022
8994e0c
convert if else to unwrap or else
Cheappie Aug 8, 2022
cd1e532
rename ThinFileReader to BoxedAsyncFileReader, add doc
Cheappie Aug 8, 2022
ef4e30e
hide ParquetFileMetrics
Cheappie Aug 8, 2022
cfb168f
derive debug
Cheappie Aug 8, 2022
8c6d702
convert metadata_ext field into Any type
Cheappie Aug 8, 2022
10c12da
add builder like method instead of modifying ctor
Cheappie Aug 8, 2022
e6e50c2
make `ParquetFileReaderFactory` public to let user's provide custom i…
Cheappie Aug 8, 2022
6b33ea9
imports cleanup and more docs
Cheappie Aug 8, 2022
2f2079a
try fix clippy failures
Cheappie Aug 8, 2022
cb96ca7
Disable where_clauses_object_safety
tustvold Aug 8, 2022
e387d23
add test
Cheappie Aug 9, 2022
bc66f6a
extract ParquetFileReaderFactory test to integration tests
Cheappie Aug 9, 2022
334edce
resolve conflicts
Cheappie Aug 9, 2022
30d664a
further cleanup
Cheappie Aug 9, 2022
9c2ccc3
Merge pull request #1 from tustvold/fix-send
Cheappie Aug 9, 2022
e18d26f
fix: Add apache RAT license
alamb Aug 9, 2022
cec773f
fix send
Cheappie Aug 9, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions datafusion/core/src/datasource/file_format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ pub(crate) mod test_util {
object_meta: meta,
partition_values: vec![],
range: None,
extensions: None,
}]];

let exec = format
Expand Down
6 changes: 5 additions & 1 deletion datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,11 @@ fn summarize_min_max(
}
}

pub(crate) async fn fetch_parquet_metadata(
/// Fetches parquet metadata from ObjectStore for given object
///
/// This component is a subject to **change** in near future and is exposed for low level integrations
/// through [ParquetFileReaderFactory].
pub async fn fetch_parquet_metadata(
store: &dyn ObjectStore,
meta: &ObjectMeta,
size_hint: Option<usize>,
Expand Down
40 changes: 22 additions & 18 deletions datafusion/core/src/datasource/listing/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,24 +183,27 @@ pub async fn pruned_partition_list<'a>(
// Note: We might avoid parsing the partition values if they are not used in any projection,
// but the cost of parsing will likely be far dominated by the time to fetch the listing from
// the object store.
Ok(Box::pin(list.try_filter_map(move |file_meta| async move {
let parsed_path = parse_partitions_for_path(
table_path,
&file_meta.location,
table_partition_cols,
)
.map(|p| {
p.iter()
.map(|&pn| ScalarValue::Utf8(Some(pn.to_owned())))
.collect()
});

Ok(parsed_path.map(|partition_values| PartitionedFile {
partition_values,
object_meta: file_meta,
range: None,
}))
})))
Ok(Box::pin(list.try_filter_map(
move |object_meta| async move {
let parsed_path = parse_partitions_for_path(
table_path,
&object_meta.location,
table_partition_cols,
)
.map(|p| {
p.iter()
.map(|&pn| ScalarValue::Utf8(Some(pn.to_owned())))
.collect()
});

Ok(parsed_path.map(|partition_values| PartitionedFile {
partition_values,
object_meta,
range: None,
extensions: None,
}))
},
)))
} else {
// parse the partition values and serde them as a RecordBatch to filter them
let metas: Vec<_> = list.try_collect().await?;
Expand Down Expand Up @@ -317,6 +320,7 @@ fn batches_to_paths(batches: &[RecordBatch]) -> Result<Vec<PartitionedFile>> {
})
.collect(),
range: None,
extensions: None,
})
})
})
Expand Down
6 changes: 6 additions & 0 deletions datafusion/core/src/datasource/listing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use datafusion_common::ScalarValue;
use futures::Stream;
use object_store::{path::Path, ObjectMeta};
use std::pin::Pin;
use std::sync::Arc;

pub use self::url::ListingTableUrl;
pub use table::{ListingOptions, ListingTable, ListingTableConfig};
Expand Down Expand Up @@ -58,6 +59,8 @@ pub struct PartitionedFile {
pub partition_values: Vec<ScalarValue>,
/// An optional file range for a more fine-grained parallel execution
pub range: Option<FileRange>,
/// An optional field for user defined per object metadata
pub extensions: Option<Arc<dyn std::any::Any + Send + Sync>>,
}

impl PartitionedFile {
Expand All @@ -71,6 +74,7 @@ impl PartitionedFile {
},
partition_values: vec![],
range: None,
extensions: None,
}
}

Expand All @@ -84,6 +88,7 @@ impl PartitionedFile {
},
partition_values: vec![],
range: Some(FileRange { start, end }),
extensions: None,
}
}
}
Expand All @@ -94,6 +99,7 @@ impl From<ObjectMeta> for PartitionedFile {
object_meta,
partition_values: vec![],
range: None,
extensions: None,
}
}
}
2 changes: 2 additions & 0 deletions datafusion/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.
#![warn(missing_docs, clippy::needless_borrow)]
// TODO: Temporary workaround for https://github.com/apache/arrow-rs/issues/2372 (#3081)
#![allow(where_clauses_object_safety)]

//! [DataFusion](https://github.com/apache/arrow-datafusion)
//! is an extensible query execution framework that uses
Expand Down
15 changes: 7 additions & 8 deletions datafusion/core/src/physical_plan/file_format/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,11 +151,11 @@ impl ExecutionPlan for AvroExec {
#[cfg(feature = "avro")]
mod private {
use super::*;
use crate::datasource::listing::FileRange;
use crate::physical_plan::file_format::file_stream::{FileOpenFuture, FileOpener};
use crate::physical_plan::file_format::FileMeta;
use bytes::Buf;
use futures::StreamExt;
use object_store::{GetResult, ObjectMeta, ObjectStore};
use object_store::{GetResult, ObjectStore};

pub struct AvroConfig {
pub schema: SchemaRef,
Expand Down Expand Up @@ -185,12 +185,11 @@ mod private {
fn open(
&self,
store: Arc<dyn ObjectStore>,
file: ObjectMeta,
_range: Option<FileRange>,
) -> FileOpenFuture {
file_meta: FileMeta,
) -> Result<FileOpenFuture> {
let config = self.config.clone();
Box::pin(async move {
match store.get(&file.location).await? {
Ok(Box::pin(async move {
match store.get(file_meta.location()).await? {
GetResult::File(file, _) => {
let reader = config.open(file)?;
Ok(futures::stream::iter(reader).boxed())
Expand All @@ -201,7 +200,7 @@ mod private {
Ok(futures::stream::iter(reader).boxed())
}
}
})
}))
}
}
}
Expand Down
22 changes: 10 additions & 12 deletions datafusion/core/src/physical_plan/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,20 @@
use crate::error::{DataFusionError, Result};
use crate::execution::context::{SessionState, TaskContext};
use crate::physical_plan::expressions::PhysicalSortExpr;
use crate::physical_plan::{
DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics,
};

use crate::datasource::listing::FileRange;
use crate::physical_plan::file_format::delimited_stream::newline_delimited_stream;
use crate::physical_plan::file_format::file_stream::{
FileOpenFuture, FileOpener, FileStream,
};
use crate::physical_plan::file_format::FileMeta;
use crate::physical_plan::metrics::ExecutionPlanMetricsSet;
use crate::physical_plan::{
DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics,
};
use arrow::csv;
use arrow::datatypes::SchemaRef;
use bytes::Buf;
use futures::{StreamExt, TryStreamExt};
use object_store::{GetResult, ObjectMeta, ObjectStore};
use object_store::{GetResult, ObjectStore};
use std::any::Any;
use std::fs;
use std::path::Path;
Expand Down Expand Up @@ -201,12 +200,11 @@ impl FileOpener for CsvOpener {
fn open(
&self,
store: Arc<dyn ObjectStore>,
file: ObjectMeta,
_range: Option<FileRange>,
) -> FileOpenFuture {
file_meta: FileMeta,
) -> Result<FileOpenFuture> {
let config = self.config.clone();
Box::pin(async move {
match store.get(&file.location).await? {
Ok(Box::pin(async move {
match store.get(file_meta.location()).await? {
GetResult::File(file, _) => {
Ok(futures::stream::iter(config.open(file, true)).boxed())
}
Expand All @@ -222,7 +220,7 @@ impl FileOpener for CsvOpener {
.boxed())
}
}
})
}))
}
}

Expand Down
49 changes: 29 additions & 20 deletions datafusion/core/src/physical_plan/file_format/file_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,16 @@ use arrow::{error::Result as ArrowResult, record_batch::RecordBatch};
use futures::future::BoxFuture;
use futures::stream::BoxStream;
use futures::{ready, FutureExt, Stream, StreamExt};
use object_store::{ObjectMeta, ObjectStore};
use object_store::ObjectStore;

use datafusion_common::ScalarValue;

use crate::datasource::listing::{FileRange, PartitionedFile};
use crate::datasource::listing::PartitionedFile;
use crate::error::Result;
use crate::execution::context::TaskContext;
use crate::physical_plan::file_format::{FileScanConfig, PartitionColumnProjector};
use crate::physical_plan::file_format::{
FileMeta, FileScanConfig, PartitionColumnProjector,
};
use crate::physical_plan::metrics::{
BaselineMetrics, ExecutionPlanMetricsSet, MetricBuilder, Time,
};
Expand All @@ -53,9 +55,8 @@ pub trait FileOpener: Unpin {
fn open(
&self,
store: Arc<dyn ObjectStore>,
file: ObjectMeta,
range: Option<FileRange>,
) -> FileOpenFuture;
file_meta: FileMeta,
tustvold marked this conversation as resolved.
Show resolved Hide resolved
) -> Result<FileOpenFuture>;
}

/// A stream that iterates record batch by record batch, file over file.
Expand Down Expand Up @@ -205,21 +206,30 @@ impl<F: FileOpener> FileStream<F> {
loop {
match &mut self.state {
FileStreamState::Idle => {
let file = match self.file_iter.pop_front() {
let part_file = match self.file_iter.pop_front() {
Some(file) => file,
None => return Poll::Ready(None),
};

let file_meta = FileMeta {
object_meta: part_file.object_meta,
range: part_file.range,
extensions: part_file.extensions,
};

self.file_stream_metrics.time_opening.start();
let future = self.file_reader.open(
self.object_store.clone(),
file.object_meta,
file.range,
);

self.state = FileStreamState::Open {
future,
partition_values: file.partition_values,

match self.file_reader.open(self.object_store.clone(), file_meta) {
Ok(future) => {
self.state = FileStreamState::Open {
future,
partition_values: part_file.partition_values,
}
}
Err(e) => {
self.state = FileStreamState::Error;
return Poll::Ready(Some(Err(e.into())));
}
}
}
FileStreamState::Open {
Expand Down Expand Up @@ -322,12 +332,11 @@ mod tests {
fn open(
&self,
_store: Arc<dyn ObjectStore>,
_file: ObjectMeta,
_range: Option<FileRange>,
) -> FileOpenFuture {
_file_meta: FileMeta,
) -> Result<FileOpenFuture> {
let iterator = self.records.clone().into_iter().map(Ok);
let stream = futures::stream::iter(iterator).boxed();
futures::future::ready(Ok(stream)).boxed()
Ok(futures::future::ready(Ok(stream)).boxed())
}
}

Expand Down
18 changes: 8 additions & 10 deletions datafusion/core/src/physical_plan/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@
// under the License.

//! Execution plan for reading line-delimited JSON files
use arrow::json::reader::DecoderOptions;

use crate::datasource::listing::FileRange;
use crate::error::{DataFusionError, Result};
use crate::execution::context::SessionState;
use crate::execution::context::TaskContext;
Expand All @@ -27,14 +24,16 @@ use crate::physical_plan::file_format::delimited_stream::newline_delimited_strea
use crate::physical_plan::file_format::file_stream::{
FileOpenFuture, FileOpener, FileStream,
};
use crate::physical_plan::file_format::FileMeta;
use crate::physical_plan::metrics::ExecutionPlanMetricsSet;
use crate::physical_plan::{
DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics,
};
use arrow::json::reader::DecoderOptions;
use arrow::{datatypes::SchemaRef, json};
use bytes::Buf;
use futures::{StreamExt, TryStreamExt};
use object_store::{GetResult, ObjectMeta, ObjectStore};
use object_store::{GetResult, ObjectStore};
use std::any::Any;
use std::fs;
use std::path::Path;
Expand Down Expand Up @@ -163,13 +162,12 @@ impl FileOpener for JsonOpener {
fn open(
&self,
store: Arc<dyn ObjectStore>,
file: ObjectMeta,
_range: Option<FileRange>,
) -> FileOpenFuture {
file_meta: FileMeta,
) -> Result<FileOpenFuture> {
let options = self.options.clone();
let schema = self.file_schema.clone();
Box::pin(async move {
match store.get(&file.location).await? {
Ok(Box::pin(async move {
match store.get(file_meta.location()).await? {
GetResult::File(file, _) => {
let reader = json::Reader::new(file, schema.clone(), options);
Ok(futures::stream::iter(reader).boxed())
Expand All @@ -188,7 +186,7 @@ impl FileOpener for JsonOpener {
.boxed())
}
}
})
}))
}
}

Expand Down
Loading