Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow Overriding AsyncFileReader used by ParquetExec #3051

Merged
merged 25 commits into from
Aug 9, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
db4e632
add metadata_ext to part file, etc
Cheappie Aug 6, 2022
12dee5e
handle error
Cheappie Aug 6, 2022
926d25d
fix compilation issues
Cheappie Aug 6, 2022
4e9eeba
rename var
Cheappie Aug 6, 2022
cab87e8
fix tests compilation issues
Cheappie Aug 6, 2022
cecfa0a
allow user to provide their own parquet reader factory
Cheappie Aug 6, 2022
124eeb9
move ThinFileReader into parquet module
Cheappie Aug 6, 2022
793d252
rename field reader to delegate
Cheappie Aug 6, 2022
8994e0c
convert if else to unwrap or else
Cheappie Aug 8, 2022
cd1e532
rename ThinFileReader to BoxedAsyncFileReader, add doc
Cheappie Aug 8, 2022
ef4e30e
hide ParquetFileMetrics
Cheappie Aug 8, 2022
cfb168f
derive debug
Cheappie Aug 8, 2022
8c6d702
convert metadata_ext field into Any type
Cheappie Aug 8, 2022
10c12da
add builder like method instead of modifying ctor
Cheappie Aug 8, 2022
e6e50c2
make `ParquetFileReaderFactory` public to let user's provide custom i…
Cheappie Aug 8, 2022
6b33ea9
imports cleanup and more docs
Cheappie Aug 8, 2022
2f2079a
try fix clippy failures
Cheappie Aug 8, 2022
cb96ca7
Disable where_clauses_object_safety
tustvold Aug 8, 2022
e387d23
add test
Cheappie Aug 9, 2022
bc66f6a
extract ParquetFileReaderFactory test to integration tests
Cheappie Aug 9, 2022
334edce
resolve conflicts
Cheappie Aug 9, 2022
30d664a
further cleanup
Cheappie Aug 9, 2022
9c2ccc3
Merge pull request #1 from tustvold/fix-send
Cheappie Aug 9, 2022
e18d26f
fix: Add apache RAT license
alamb Aug 9, 2022
cec773f
fix send
Cheappie Aug 9, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -522,8 +522,13 @@ pub(crate) mod test_util {
use super::*;
use crate::test::object_store::local_unpartitioned_file;
use arrow::record_batch::RecordBatch;
use bytes::Bytes;
use object_store::memory::InMemory;
use object_store::path::Path;
use parquet::arrow::ArrowWriter;
use parquet::file::properties::WriterProperties;
use std::io::Cursor;
use std::time::SystemTime;
use tempfile::NamedTempFile;

pub async fn store_parquet(
Expand All @@ -548,6 +553,47 @@ pub(crate) mod test_util {
let meta: Vec<_> = files.iter().map(local_unpartitioned_file).collect();
Ok((meta, files))
}

pub async fn store_parquet_in_memory(
batches: Vec<RecordBatch>,
) -> (Arc<dyn ObjectStore>, Vec<ObjectMeta>) {
let in_memory = InMemory::new();

let parquet_batches: Vec<(ObjectMeta, Bytes)> = batches
.into_iter()
.enumerate()
.map(|(offset, batch)| {
let mut buf = Vec::<u8>::with_capacity(32 * 1024);
let mut output = Cursor::new(&mut buf);

let mut writer = ArrowWriter::try_new(&mut output, batch.schema(), None)
.expect("creating writer");

writer.write(&batch).expect("Writing batch");
writer.close().unwrap();

let meta = ObjectMeta {
location: Path::parse(format!("file-{offset}.parquet"))
.expect("creating path"),
last_modified: chrono::DateTime::from(SystemTime::now()),
size: buf.len(),
};

(meta, Bytes::from(buf))
})
.collect();

let mut objects = Vec::with_capacity(parquet_batches.len());
for (meta, bytes) in parquet_batches {
in_memory
.put(&meta.location, bytes)
.await
.expect("put parquet file into in memory object store");
objects.push(meta);
}

(Arc::new(in_memory), objects)
}
}

#[cfg(test)]
Expand Down
116 changes: 109 additions & 7 deletions datafusion/core/src/physical_plan/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -759,20 +759,21 @@ pub async fn plan_to_parquet(

#[cfg(test)]
mod tests {
use crate::{
assert_batches_sorted_eq, assert_contains,
datasource::file_format::{parquet::ParquetFormat, FileFormat},
physical_plan::collect,
};

use super::*;
use crate::datasource::file_format::parquet::test_util::store_parquet;
use crate::datasource::file_format::parquet::test_util::{
store_parquet, store_parquet_in_memory,
};
use crate::datasource::file_format::test_util::scan_format;
use crate::datasource::listing::{FileRange, PartitionedFile};
use crate::datasource::object_store::ObjectStoreUrl;
use crate::execution::options::CsvReadOptions;
use crate::prelude::{ParquetReadOptions, SessionConfig, SessionContext};
use crate::test::object_store::local_unpartitioned_file;
use crate::{
assert_batches_sorted_eq, assert_contains,
datasource::file_format::{parquet::ParquetFormat, FileFormat},
physical_plan::collect,
};
use arrow::array::Float32Array;
use arrow::record_batch::RecordBatch;
use arrow::{
Expand Down Expand Up @@ -856,6 +857,107 @@ mod tests {
)
}

const EXPECTED_USER_DEFINED_METADATA: &str = "some-user-defined-metadata";

#[tokio::test]
async fn route_data_access_ops_to_parquet_file_reader_factory() {
let c1: ArrayRef =
Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None]));
let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), None]));

let batch = create_batch(vec![
("c1", c1.clone()),
("c2", c2.clone()),
("c3", c3.clone()),
]);

let file_schema = batch.schema().clone();
let (in_memory_object_store, parquet_files_meta) =
store_parquet_in_memory(vec![batch]).await;
let file_groups = parquet_files_meta
.into_iter()
.map(|meta| PartitionedFile {
object_meta: meta,
partition_values: vec![],
range: None,
extensions: Some(Arc::new(String::from(EXPECTED_USER_DEFINED_METADATA))),
})
.collect();

// prepare the scan
let parquet_exec = ParquetExec::new(
FileScanConfig {
// just any url that doesn't point to in memory object store
object_store_url: ObjectStoreUrl::local_filesystem(),
file_groups: vec![file_groups],
file_schema,
statistics: Statistics::default(),
projection: None,
limit: None,
table_partition_cols: vec![],
},
None,
None,
)
.with_parquet_file_reader_factory(Arc::new(
InMemoryParquetFileReaderFactory(Arc::clone(&in_memory_object_store)),
));

let session_ctx = SessionContext::new();

let task_ctx = session_ctx.task_ctx();
let read = collect(Arc::new(parquet_exec), task_ctx).await.unwrap();

let expected = vec![
"+-----+----+----+",
"| c1 | c2 | c3 |",
"+-----+----+----+",
"| Foo | 1 | 10 |",
"| | 2 | 20 |",
"| bar | | |",
"+-----+----+----+",
];

assert_batches_sorted_eq!(expected, &read);
}

#[derive(Debug)]
struct InMemoryParquetFileReaderFactory(Arc<dyn ObjectStore>);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this test looks great

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have moved that test to integration tests, but for users convenience I had to expose two things ParquetFileMetrics and fn fetch_parquet_metadata(...). I have added docs that describe that these things are subjects to change in near future and are exposed for low level integrations. Should we worry about exposing impl details, or having such doc in place that these things might change is enough ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @Cheappie -- I think the comments are good enough

In fact it is perhaps good to see what was required to be made pub to use these new APIs. Hopefully it makes downstream integrations easier


impl ParquetFileReaderFactory for InMemoryParquetFileReaderFactory {
fn create_reader(
&self,
partition_index: usize,
file_meta: FileMeta,
metadata_size_hint: Option<usize>,
metrics: &ExecutionPlanMetricsSet,
) -> Result<Box<dyn AsyncFileReader + Send>> {
let metadata = file_meta
.extensions
.as_ref()
.expect("has user defined metadata");
let metadata = metadata
.downcast_ref::<String>()
.expect("has string metadata");

assert_eq!(EXPECTED_USER_DEFINED_METADATA, &metadata[..]);

let parquet_file_metrics = ParquetFileMetrics::new(
partition_index,
file_meta.location().as_ref(),
metrics,
);

Ok(Box::new(ParquetFileReader {
meta: file_meta.object_meta,
store: Arc::clone(&self.0),
metadata_size_hint,
metrics: parquet_file_metrics,
}))
}
}

#[tokio::test]
async fn evolved_schema() {
let c1: ArrayRef =
Expand Down