diff --git a/datafusion/src/avro_to_arrow/mod.rs b/datafusion/src/avro_to_arrow/mod.rs index 531b1092e1d62..f30fbdcc0cec2 100644 --- a/datafusion/src/avro_to_arrow/mod.rs +++ b/datafusion/src/avro_to_arrow/mod.rs @@ -28,11 +28,11 @@ use crate::arrow::datatypes::Schema; use crate::error::Result; #[cfg(feature = "avro")] pub use reader::{Reader, ReaderBuilder}; -use std::io::{Read, Seek}; +use std::io::Read; #[cfg(feature = "avro")] /// Read Avro schema given a reader -pub fn read_avro_schema_from_reader(reader: &mut R) -> Result { +pub fn read_avro_schema_from_reader(reader: &mut R) -> Result { let avro_reader = avro_rs::Reader::new(reader)?; let schema = avro_reader.writer_schema(); schema::to_arrow_schema(schema) @@ -40,7 +40,7 @@ pub fn read_avro_schema_from_reader(reader: &mut R) -> Result(_: &mut R) -> Result { +pub fn read_avro_schema_from_reader(_: &mut R) -> Result { Err(crate::error::DataFusionError::NotImplemented( "cannot read avro schema without the 'avro' feature enabled".to_string(), )) diff --git a/datafusion/src/datasource/file_format/avro.rs b/datafusion/src/datasource/file_format/avro.rs index bb5083d547b7b..1ca538867ccab 100644 --- a/datafusion/src/datasource/file_format/avro.rs +++ b/datafusion/src/datasource/file_format/avro.rs @@ -23,55 +23,48 @@ use arrow::datatypes::Schema; use arrow::{self, datatypes::SchemaRef}; use async_trait::async_trait; use futures::StreamExt; -use std::fs::File; -use super::PartitionedFile; -use super::{FileFormat, StringStream}; +use super::{FileFormat, PhysicalPlanConfig}; use crate::avro_to_arrow::read_avro_schema_from_reader; +use crate::datasource::object_store::{ObjectReader, ObjectReaderStream}; use crate::error::Result; -use crate::logical_plan::Expr; use crate::physical_plan::file_format::AvroExec; use crate::physical_plan::ExecutionPlan; use crate::physical_plan::Statistics; -/// Line-delimited Avro `FileFormat` implementation. -pub struct AvroFormat {} +/// Avro `FileFormat` implementation. +pub struct AvroFormat; #[async_trait] impl FileFormat for AvroFormat { - async fn infer_schema(&self, mut paths: StringStream) -> Result { + async fn infer_schema(&self, mut readers: ObjectReaderStream) -> Result { let mut schemas = vec![]; - while let Some(filename) = paths.next().await { - let mut file = File::open(filename)?; - let schema = read_avro_schema_from_reader(&mut file)?; + while let Some(obj_reader) = readers.next().await { + let mut reader = obj_reader?.sync_reader()?; + let schema = read_avro_schema_from_reader(&mut reader)?; schemas.push(schema); } let merged_schema = Schema::try_merge(schemas)?; Ok(Arc::new(merged_schema)) } - async fn infer_stats(&self, _path: &str) -> Result { + async fn infer_stats(&self, _reader: Arc) -> Result { Ok(Statistics::default()) } async fn create_physical_plan( &self, - schema: SchemaRef, - files: Vec>, - statistics: Statistics, - projection: &Option>, - batch_size: usize, - _filters: &[Expr], - limit: Option, + conf: PhysicalPlanConfig, ) -> Result> { let exec = AvroExec::new( + conf.object_store, // flattening this for now because CsvExec does not support partitioning yet - files.into_iter().flatten().map(|f| f.path).collect(), - statistics, - schema, - projection.clone(), - batch_size, - limit, + conf.files.into_iter().flatten().collect(), + conf.statistics, + conf.schema, + conf.projection, + conf.batch_size, + conf.limit, ); Ok(Arc::new(exec)) } @@ -80,8 +73,16 @@ impl FileFormat for AvroFormat { #[cfg(test)] #[cfg(feature = "avro")] mod tests { - use crate::datasource::file_format::string_stream; - use crate::physical_plan::collect; + use crate::{ + datasource::{ + file_format::PartitionedFile, + object_store::local::{ + local_file_meta, local_object_reader, local_object_reader_stream, + LocalFileSystem, + }, + }, + physical_plan::collect, + }; use super::*; use arrow::array::{ @@ -93,10 +94,10 @@ mod tests { #[tokio::test] async fn read_small_batches() -> Result<()> { let projection = None; - let exec = get_exec("alltypes_plain.avro", &projection, 2).await?; + let exec = get_exec("alltypes_plain.avro", &projection, 2, None).await?; let stream = exec.execute(0).await?; - let _ = stream + let tt_batches = stream .map(|batch| { let batch = batch.unwrap(); assert_eq!(11, batch.num_columns()); @@ -105,13 +106,27 @@ mod tests { .fold(0, |acc, _| async move { acc + 1i32 }) .await; + assert_eq!(tt_batches, 4 /* 8/2 */); + + Ok(()) + } + + #[tokio::test] + async fn read_limit() -> Result<()> { + let projection = None; + let exec = get_exec("alltypes_plain.avro", &projection, 1024, Some(1)).await?; + let batches = collect(exec).await?; + assert_eq!(1, batches.len()); + assert_eq!(11, batches[0].num_columns()); + assert_eq!(1, batches[0].num_rows()); + Ok(()) } #[tokio::test] async fn read_alltypes_plain_avro() -> Result<()> { let projection = None; - let exec = get_exec("alltypes_plain.avro", &projection, 1024).await?; + let exec = get_exec("alltypes_plain.avro", &projection, 1024, None).await?; let x: Vec = exec .schema() @@ -161,7 +176,7 @@ mod tests { #[tokio::test] async fn read_bool_alltypes_plain_avro() -> Result<()> { let projection = Some(vec![1]); - let exec = get_exec("alltypes_plain.avro", &projection, 1024).await?; + let exec = get_exec("alltypes_plain.avro", &projection, 1024, None).await?; let batches = collect(exec).await?; assert_eq!(batches.len(), 1); @@ -189,7 +204,7 @@ mod tests { #[tokio::test] async fn read_i32_alltypes_plain_avro() -> Result<()> { let projection = Some(vec![0]); - let exec = get_exec("alltypes_plain.avro", &projection, 1024).await?; + let exec = get_exec("alltypes_plain.avro", &projection, 1024, None).await?; let batches = collect(exec).await?; assert_eq!(batches.len(), 1); @@ -214,7 +229,7 @@ mod tests { #[tokio::test] async fn read_i96_alltypes_plain_avro() -> Result<()> { let projection = Some(vec![10]); - let exec = get_exec("alltypes_plain.avro", &projection, 1024).await?; + let exec = get_exec("alltypes_plain.avro", &projection, 1024, None).await?; let batches = collect(exec).await?; assert_eq!(batches.len(), 1); @@ -239,7 +254,7 @@ mod tests { #[tokio::test] async fn read_f32_alltypes_plain_avro() -> Result<()> { let projection = Some(vec![6]); - let exec = get_exec("alltypes_plain.avro", &projection, 1024).await?; + let exec = get_exec("alltypes_plain.avro", &projection, 1024, None).await?; let batches = collect(exec).await?; assert_eq!(batches.len(), 1); @@ -267,7 +282,7 @@ mod tests { #[tokio::test] async fn read_f64_alltypes_plain_avro() -> Result<()> { let projection = Some(vec![7]); - let exec = get_exec("alltypes_plain.avro", &projection, 1024).await?; + let exec = get_exec("alltypes_plain.avro", &projection, 1024, None).await?; let batches = collect(exec).await?; assert_eq!(batches.len(), 1); @@ -295,7 +310,7 @@ mod tests { #[tokio::test] async fn read_binary_alltypes_plain_avro() -> Result<()> { let projection = Some(vec![9]); - let exec = get_exec("alltypes_plain.avro", &projection, 1024).await?; + let exec = get_exec("alltypes_plain.avro", &projection, 1024, None).await?; let batches = collect(exec).await?; assert_eq!(batches.len(), 1); @@ -324,21 +339,33 @@ mod tests { file_name: &str, projection: &Option>, batch_size: usize, + limit: Option, ) -> Result> { let testdata = crate::test_util::arrow_test_data(); let filename = format!("{}/avro/{}", testdata, file_name); let format = AvroFormat {}; let schema = format - .infer_schema(string_stream(vec![filename.clone()])) + .infer_schema(local_object_reader_stream(vec![filename.clone()])) .await .expect("Schema inference"); - let stats = format - .infer_stats(&filename) + let statistics = format + .infer_stats(local_object_reader(filename.clone())) .await .expect("Stats inference"); - let files = vec![vec![PartitionedFile { path: filename }]]; + let files = vec![vec![PartitionedFile { + file_meta: local_file_meta(filename.to_owned()), + }]]; let exec = format - .create_physical_plan(schema, files, stats, projection, batch_size, &[], None) + .create_physical_plan(PhysicalPlanConfig { + object_store: Arc::new(LocalFileSystem {}), + schema, + files, + statistics, + projection: projection.clone(), + batch_size, + filters: vec![], + limit, + }) .await?; Ok(exec) } @@ -349,7 +376,7 @@ mod tests { mod tests { use super::*; - use crate::datasource::file_format::string_stream; + use crate::datasource::object_store::local::local_object_reader_stream; use crate::error::DataFusionError; #[tokio::test] @@ -357,7 +384,7 @@ mod tests { let testdata = crate::test_util::arrow_test_data(); let filename = format!("{}/avro/alltypes_plain.avro", testdata); let schema_result = AvroFormat {} - .infer_schema(string_stream(vec![filename])) + .infer_schema(local_object_reader_stream(vec![filename])) .await; assert!(matches!( schema_result, diff --git a/datafusion/src/datasource/file_format/csv.rs b/datafusion/src/datasource/file_format/csv.rs index a5b555053ef76..d619227e43060 100644 --- a/datafusion/src/datasource/file_format/csv.rs +++ b/datafusion/src/datasource/file_format/csv.rs @@ -23,37 +23,65 @@ use arrow::datatypes::Schema; use arrow::{self, datatypes::SchemaRef}; use async_trait::async_trait; use futures::StreamExt; -use std::fs::File; -use super::PartitionedFile; -use super::{FileFormat, StringStream}; +use super::{FileFormat, PhysicalPlanConfig}; +use crate::datasource::object_store::{ObjectReader, ObjectReaderStream}; use crate::error::Result; -use crate::logical_plan::Expr; use crate::physical_plan::file_format::CsvExec; use crate::physical_plan::ExecutionPlan; use crate::physical_plan::Statistics; /// Character Separated Value `FileFormat` implementation. pub struct CsvFormat { + has_header: bool, + delimiter: u8, + schema_infer_max_rec: Option, +} + +impl Default for CsvFormat { + fn default() -> Self { + Self { + schema_infer_max_rec: None, + has_header: true, + delimiter: b',', + } + } +} + +impl CsvFormat { + /// Set a limit in terms of records to scan to infer the schema + /// - default to `None` (no limit) + pub fn with_schema_infer_max_rec(&mut self, max_rec: Option) -> &mut Self { + self.schema_infer_max_rec = max_rec; + self + } + /// Set true to indicate that the first line is a header. - pub has_header: bool, - /// The character seprating values within a row. - pub delimiter: u8, - /// If no schema was provided for the table, it will be - /// infered from the data itself, this limits the number - /// of lines used in the process. - pub schema_infer_max_rec: Option, + /// - default to true + pub fn with_has_header(&mut self, has_header: bool) -> &mut Self { + self.has_header = has_header; + self + } + + /// The character separating values within a row. + /// - default to ',' + pub fn with_delimiter(&mut self, delimiter: u8) -> &mut Self { + self.delimiter = delimiter; + self + } } #[async_trait] impl FileFormat for CsvFormat { - async fn infer_schema(&self, mut paths: StringStream) -> Result { + async fn infer_schema(&self, mut readers: ObjectReaderStream) -> Result { let mut schemas = vec![]; + let mut records_to_read = self.schema_infer_max_rec.unwrap_or(std::usize::MAX); - while let Some(fname) = paths.next().await { - let (schema, records_read) = arrow::csv::reader::infer_file_schema( - &mut File::open(fname)?, + while let Some(obj_reader) = readers.next().await { + let mut reader = obj_reader?.sync_reader()?; + let (schema, records_read) = arrow::csv::reader::infer_reader_schema( + &mut reader, self.delimiter, Some(records_to_read), self.has_header, @@ -72,30 +100,25 @@ impl FileFormat for CsvFormat { Ok(Arc::new(merged_schema)) } - async fn infer_stats(&self, _path: &str) -> Result { + async fn infer_stats(&self, _reader: Arc) -> Result { Ok(Statistics::default()) } async fn create_physical_plan( &self, - schema: SchemaRef, - files: Vec>, - statistics: Statistics, - projection: &Option>, - batch_size: usize, - _filters: &[Expr], - limit: Option, + conf: PhysicalPlanConfig, ) -> Result> { let exec = CsvExec::new( + conf.object_store, // flattening this for now because CsvExec does not support partitioning yet - files.into_iter().flatten().map(|f| f.path).collect(), - statistics, - schema, + conf.files.into_iter().flatten().collect(), + conf.statistics, + conf.schema, self.has_header, self.delimiter, - projection.clone(), - batch_size, - limit, + conf.projection, + conf.batch_size, + conf.limit, ); Ok(Arc::new(exec)) } @@ -106,16 +129,25 @@ mod tests { use arrow::array::StringArray; use super::*; - use crate::{datasource::file_format::string_stream, physical_plan::collect}; + use crate::{ + datasource::{ + file_format::{PartitionedFile, PhysicalPlanConfig}, + object_store::local::{ + local_file_meta, local_object_reader, local_object_reader_stream, + LocalFileSystem, + }, + }, + physical_plan::collect, + }; #[tokio::test] async fn read_small_batches() -> Result<()> { // skip column 9 that overflows the automaticly discovered column type of i64 (u64 would work) let projection = Some(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 10, 11, 12]); - let exec = get_exec("aggregate_test_100.csv", &projection, 2).await?; + let exec = get_exec("aggregate_test_100.csv", &projection, 2, None).await?; let stream = exec.execute(0).await?; - let tt_rows: i32 = stream + let tt_batches: i32 = stream .map(|batch| { let batch = batch.unwrap(); assert_eq!(12, batch.num_columns()); @@ -124,7 +156,7 @@ mod tests { .fold(0, |acc, _| async move { acc + 1i32 }) .await; - assert_eq!(tt_rows, 50 /* 100/2 */); + assert_eq!(tt_batches, 50 /* 100/2 */); // test metadata assert_eq!(exec.statistics().num_rows, None); @@ -133,10 +165,22 @@ mod tests { Ok(()) } + #[tokio::test] + async fn read_limit() -> Result<()> { + let projection = Some(vec![0, 1, 2, 3]); + let exec = get_exec("aggregate_test_100.csv", &projection, 1024, Some(1)).await?; + let batches = collect(exec).await?; + assert_eq!(1, batches.len()); + assert_eq!(4, batches[0].num_columns()); + assert_eq!(1, batches[0].num_rows()); + + Ok(()) + } + #[tokio::test] async fn infer_schema() -> Result<()> { let projection = None; - let exec = get_exec("aggregate_test_100.csv", &projection, 1024).await?; + let exec = get_exec("aggregate_test_100.csv", &projection, 1024, None).await?; let x: Vec = exec .schema() @@ -169,7 +213,7 @@ mod tests { #[tokio::test] async fn read_char_column() -> Result<()> { let projection = Some(vec![0]); - let exec = get_exec("aggregate_test_100.csv", &projection, 1024).await?; + let exec = get_exec("aggregate_test_100.csv", &projection, 1024, None).await?; let batches = collect(exec).await.expect("Collect batches"); @@ -196,25 +240,33 @@ mod tests { file_name: &str, projection: &Option>, batch_size: usize, + limit: Option, ) -> Result> { let testdata = crate::test_util::arrow_test_data(); let filename = format!("{}/csv/{}", testdata, file_name); - let format = CsvFormat { - has_header: true, - schema_infer_max_rec: Some(1000), - delimiter: b',', - }; + let format = CsvFormat::default(); let schema = format - .infer_schema(string_stream(vec![filename.clone()])) + .infer_schema(local_object_reader_stream(vec![filename.clone()])) .await .expect("Schema inference"); - let stats = format - .infer_stats(&filename) + let statistics = format + .infer_stats(local_object_reader(filename.clone())) .await .expect("Stats inference"); - let files = vec![vec![PartitionedFile { path: filename }]]; + let files = vec![vec![PartitionedFile { + file_meta: local_file_meta(filename.to_owned()), + }]]; let exec = format - .create_physical_plan(schema, files, stats, projection, batch_size, &[], None) + .create_physical_plan(PhysicalPlanConfig { + object_store: Arc::new(LocalFileSystem {}), + schema, + files, + statistics, + projection: projection.clone(), + batch_size, + filters: vec![], + limit, + }) .await?; Ok(exec) } diff --git a/datafusion/src/datasource/file_format/json.rs b/datafusion/src/datasource/file_format/json.rs index 7357644af1fa3..fd7f6580ab8a4 100644 --- a/datafusion/src/datasource/file_format/json.rs +++ b/datafusion/src/datasource/file_format/json.rs @@ -26,32 +26,44 @@ use arrow::json::reader::infer_json_schema_from_iterator; use arrow::json::reader::ValueIter; use async_trait::async_trait; use futures::StreamExt; -use std::fs::File; -use super::PartitionedFile; -use super::{FileFormat, StringStream}; +use super::FileFormat; +use super::PhysicalPlanConfig; +use crate::datasource::object_store::{ObjectReader, ObjectReaderStream}; use crate::error::Result; -use crate::logical_plan::Expr; use crate::physical_plan::file_format::NdJsonExec; use crate::physical_plan::ExecutionPlan; use crate::physical_plan::Statistics; /// New line delimited JSON `FileFormat` implementation. pub struct JsonFormat { - /// If no schema was provided for the table, it will be - /// infered from the data itself, this limits the number - /// of lines used in the process. - pub schema_infer_max_rec: Option, + schema_infer_max_rec: Option, +} + +impl Default for JsonFormat { + fn default() -> Self { + Self { + schema_infer_max_rec: None, + } + } +} + +impl JsonFormat { + /// Set a limit in terms of records to scan to infer the schema + /// - defaults to `None` (no limit) + pub fn with_schema_infer_max_rec(&mut self, max_rec: Option) -> &mut Self { + self.schema_infer_max_rec = max_rec; + self + } } #[async_trait] impl FileFormat for JsonFormat { - async fn infer_schema(&self, mut paths: StringStream) -> Result { + async fn infer_schema(&self, mut readers: ObjectReaderStream) -> Result { let mut schemas = Vec::new(); let mut records_to_read = self.schema_infer_max_rec.unwrap_or(usize::MAX); - while let Some(file) = paths.next().await { - let file = File::open(file)?; - let mut reader = BufReader::new(file); + while let Some(obj_reader) = readers.next().await { + let mut reader = BufReader::new(obj_reader?.sync_reader()?); let iter = ValueIter::new(&mut reader, None); let schema = infer_json_schema_from_iterator(iter.take_while(|_| { let should_take = records_to_read > 0; @@ -68,28 +80,23 @@ impl FileFormat for JsonFormat { Ok(Arc::new(schema)) } - async fn infer_stats(&self, _path: &str) -> Result { + async fn infer_stats(&self, _reader: Arc) -> Result { Ok(Statistics::default()) } async fn create_physical_plan( &self, - schema: SchemaRef, - files: Vec>, - statistics: Statistics, - projection: &Option>, - batch_size: usize, - _filters: &[Expr], - limit: Option, + conf: PhysicalPlanConfig, ) -> Result> { let exec = NdJsonExec::new( + conf.object_store, // flattening this for now because NdJsonExec does not support partitioning yet - files.into_iter().flatten().map(|f| f.path).collect(), - statistics, - schema, - projection.clone(), - batch_size, - limit, + conf.files.into_iter().flatten().collect(), + conf.statistics, + conf.schema, + conf.projection, + conf.batch_size, + conf.limit, ); Ok(Arc::new(exec)) } @@ -100,15 +107,24 @@ mod tests { use arrow::array::Int64Array; use super::*; - use crate::{datasource::file_format::string_stream, physical_plan::collect}; + use crate::{ + datasource::{ + file_format::{PartitionedFile, PhysicalPlanConfig}, + object_store::local::{ + local_file_meta, local_object_reader, local_object_reader_stream, + LocalFileSystem, + }, + }, + physical_plan::collect, + }; #[tokio::test] async fn read_small_batches() -> Result<()> { let projection = None; - let exec = get_exec(&projection, 2).await?; + let exec = get_exec(&projection, 2, None).await?; let stream = exec.execute(0).await?; - let tt_rows: i32 = stream + let tt_batches: i32 = stream .map(|batch| { let batch = batch.unwrap(); assert_eq!(4, batch.num_columns()); @@ -117,7 +133,7 @@ mod tests { .fold(0, |acc, _| async move { acc + 1i32 }) .await; - assert_eq!(tt_rows, 6 /* 12/2 */); + assert_eq!(tt_batches, 6 /* 12/2 */); // test metadata assert_eq!(exec.statistics().num_rows, None); @@ -126,10 +142,22 @@ mod tests { Ok(()) } + #[tokio::test] + async fn read_limit() -> Result<()> { + let projection = None; + let exec = get_exec(&projection, 1024, Some(1)).await?; + let batches = collect(exec).await?; + assert_eq!(1, batches.len()); + assert_eq!(4, batches[0].num_columns()); + assert_eq!(1, batches[0].num_rows()); + + Ok(()) + } + #[tokio::test] async fn infer_schema() -> Result<()> { let projection = None; - let exec = get_exec(&projection, 1024).await?; + let exec = get_exec(&projection, 1024, None).await?; let x: Vec = exec .schema() @@ -145,7 +173,7 @@ mod tests { #[tokio::test] async fn read_int_column() -> Result<()> { let projection = Some(vec![0]); - let exec = get_exec(&projection, 1024).await?; + let exec = get_exec(&projection, 1024, None).await?; let batches = collect(exec).await.expect("Collect batches"); @@ -174,21 +202,32 @@ mod tests { async fn get_exec( projection: &Option>, batch_size: usize, + limit: Option, ) -> Result> { let filename = "tests/jsons/2.json"; - let format = JsonFormat { - schema_infer_max_rec: Some(1000), - }; + let format = JsonFormat::default(); let schema = format - .infer_schema(string_stream(vec![filename.to_owned()])) + .infer_schema(local_object_reader_stream(vec![filename.to_owned()])) .await .expect("Schema inference"); - let stats = format.infer_stats(filename).await.expect("Stats inference"); + let statistics = format + .infer_stats(local_object_reader(filename.to_owned())) + .await + .expect("Stats inference"); let files = vec![vec![PartitionedFile { - path: filename.to_owned(), + file_meta: local_file_meta(filename.to_owned()), }]]; let exec = format - .create_physical_plan(schema, files, stats, projection, batch_size, &[], None) + .create_physical_plan(PhysicalPlanConfig { + object_store: Arc::new(LocalFileSystem {}), + schema, + files, + statistics, + projection: projection.clone(), + batch_size, + filters: vec![], + limit, + }) .await?; Ok(exec) } diff --git a/datafusion/src/datasource/file_format/mod.rs b/datafusion/src/datasource/file_format/mod.rs index e08ebd615c802..b315c99b7d140 100644 --- a/datafusion/src/datasource/file_format/mod.rs +++ b/datafusion/src/datasource/file_format/mod.rs @@ -32,14 +32,29 @@ use crate::logical_plan::Expr; use crate::physical_plan::{Accumulator, ExecutionPlan, Statistics}; use async_trait::async_trait; -use futures::Stream; - -/// A stream of String that can be used accross await calls -pub type StringStream = Pin + Send + Sync>>; - -/// Convert a vector into a stream -pub fn string_stream(strings: Vec) -> StringStream { - Box::pin(futures::stream::iter(strings)) +use futures::{Stream, StreamExt}; + +use super::object_store::{FileMeta, ObjectReader, ObjectReaderStream, ObjectStore}; + +/// The configurations to be passed when creating a physical plan for +/// a given file format. +pub struct PhysicalPlanConfig { + /// Store from which the `files` should be fetched + pub object_store: Arc, + /// Schema before projection + pub schema: SchemaRef, + /// Partitioned fields to process in the executor + pub files: Vec>, + /// Estimated overall statistics of source plan + pub statistics: Statistics, + /// Columns on which to project the data + pub projection: Option>, + /// The maximum number of records per arrow column + pub batch_size: usize, + /// The filters that where pushed down to this execution plan + pub filters: Vec, + /// The minimum number of records required from this source plan + pub limit: Option, } /// This trait abstracts all the file format specific implementations @@ -47,27 +62,21 @@ pub fn string_stream(strings: Vec) -> StringStream { /// providers that support the the same file formats. #[async_trait] pub trait FileFormat: Send + Sync { - /// Open the files at the paths provided by iterator and infer the - /// common schema - async fn infer_schema(&self, paths: StringStream) -> Result; + /// Infer the common schema of the provided objects. The objects will usually + /// be analysed up to a given number of records or files (as specified in the + /// format config) then give the estimated common schema. This might fail if + /// the files have schemas that cannot be merged. + async fn infer_schema(&self, readers: ObjectReaderStream) -> Result; - /// Open the file at the given path and infer its statistics - async fn infer_stats(&self, path: &str) -> Result; + /// Infer the statistics for the provided object. The cost and accuracy of the + /// estimated statistics might vary greatly between file formats. + async fn infer_stats(&self, reader: Arc) -> Result; /// Take a list of files and convert it to the appropriate executor /// according to this file format. - /// TODO group params into TableDescription(schema,files,stats) and - /// ScanOptions(projection,batch_size,filters) to avoid too_many_arguments - #[allow(clippy::too_many_arguments)] async fn create_physical_plan( &self, - schema: SchemaRef, - files: Vec>, - statistics: Statistics, - projection: &Option>, - batch_size: usize, - filters: &[Expr], - limit: Option, + conf: PhysicalPlanConfig, ) -> Result>; } @@ -76,12 +85,12 @@ pub trait FileFormat: Send + Sync { /// needed to read up to `limit` number of rows /// TODO fix case where `num_rows` and `total_byte_size` are not defined (stat should be None instead of Some(0)) /// TODO move back to crate::datasource::mod.rs once legacy cleaned up -pub fn get_statistics_with_limit( - all_files: &[(PartitionedFile, Statistics)], +pub async fn get_statistics_with_limit( + all_files: impl Stream>, schema: SchemaRef, limit: Option, -) -> (Vec, Statistics) { - let mut all_files = all_files.to_vec(); +) -> Result<(Vec, Statistics)> { + let mut result_files = vec![]; let mut total_byte_size = 0; let mut null_counts = vec![0; schema.fields().len()]; @@ -89,10 +98,11 @@ pub fn get_statistics_with_limit( let (mut max_values, mut min_values) = create_max_min_accs(&schema); let mut num_rows = 0; - let mut num_files = 0; let mut is_exact = true; - for (_, file_stats) in &all_files { - num_files += 1; + let mut all_files = Box::pin(all_files); + while let Some(res) = all_files.next().await { + let (file, file_stats) = res?; + result_files.push(file); is_exact &= file_stats.is_exact; num_rows += file_stats.num_rows.unwrap_or(0); total_byte_size += file_stats.total_byte_size.unwrap_or(0); @@ -128,9 +138,11 @@ pub fn get_statistics_with_limit( break; } } - if num_files < all_files.len() { + // if we still have files in the stream, it means that the limit kicked + // in and that the statistic could have been different if we processed + // the files in a different order. + if all_files.next().await.is_some() { is_exact = false; - all_files.truncate(num_files); } let column_stats = if has_statistics { @@ -151,9 +163,7 @@ pub fn get_statistics_with_limit( is_exact, }; - let files = all_files.into_iter().map(|(f, _)| f).collect(); - - (files, statistics) + Ok((result_files, statistics)) } #[derive(Debug, Clone)] @@ -162,15 +172,19 @@ pub fn get_statistics_with_limit( /// TODO move back to crate::datasource::mod.rs once legacy cleaned up pub struct PartitionedFile { /// Path for the file (e.g. URL, filesystem path, etc) - pub path: String, + pub file_meta: FileMeta, // Values of partition columns to be appended to each row // pub partition_value: Option>, // We may include row group range here for a more fine-grained parallel execution } +/// Stream of files get listed from object store +pub type PartitionedFileStream = + Pin> + Send + Sync + 'static>>; + impl std::fmt::Display for PartitionedFile { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - write!(f, "{}", self.path) + write!(f, "{}", self.file_meta) } } diff --git a/datafusion/src/datasource/file_format/parquet.rs b/datafusion/src/datasource/file_format/parquet.rs index 39aa14de0b9f0..9e5d623b7bb15 100644 --- a/datafusion/src/datasource/file_format/parquet.rs +++ b/datafusion/src/datasource/file_format/parquet.rs @@ -17,7 +17,7 @@ //! Parquet format abstractions -use std::fs::File; +use std::io::Read; use std::sync::Arc; use arrow::datatypes::Schema; @@ -26,17 +26,21 @@ use async_trait::async_trait; use futures::stream::StreamExt; use parquet::arrow::ArrowReader; use parquet::arrow::ParquetFileArrowReader; +use parquet::errors::ParquetError; +use parquet::errors::Result as ParquetResult; +use parquet::file::reader::ChunkReader; +use parquet::file::reader::Length; use parquet::file::serialized_reader::SerializedFileReader; use parquet::file::statistics::Statistics as ParquetStatistics; use super::FileFormat; -use super::PartitionedFile; -use super::{create_max_min_accs, get_col_stats, StringStream}; +use super::PhysicalPlanConfig; +use super::{create_max_min_accs, get_col_stats}; use crate::arrow::datatypes::{DataType, Field}; +use crate::datasource::object_store::{ObjectReader, ObjectReaderStream}; use crate::error::DataFusionError; use crate::error::Result; use crate::logical_plan::combine_filters; -use crate::logical_plan::Expr; use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator}; use crate::physical_plan::file_format::ParquetExec; use crate::physical_plan::ExecutionPlan; @@ -45,58 +49,67 @@ use crate::scalar::ScalarValue; /// The Apache Parquet `FileFormat` implementation pub struct ParquetFormat { + enable_pruning: bool, +} + +impl Default for ParquetFormat { + fn default() -> Self { + Self { + enable_pruning: true, + } + } +} + +impl ParquetFormat { /// Activate statistics based row group level pruning - pub enable_pruning: bool, + /// - defaults to true + pub fn with_enable_pruning(&mut self, enable: bool) -> &mut Self { + self.enable_pruning = enable; + self + } } #[async_trait] impl FileFormat for ParquetFormat { - async fn infer_schema(&self, mut paths: StringStream) -> Result { + async fn infer_schema(&self, mut readers: ObjectReaderStream) -> Result { // We currently get the schema information from the first file rather than do // schema merging and this is a limitation. // See https://issues.apache.org/jira/browse/ARROW-11017 - let first_file = paths + let first_file = readers .next() .await - .ok_or_else(|| DataFusionError::Plan("No data file found".to_owned()))?; - let (schema, _) = fetch_metadata(&first_file)?; + .ok_or_else(|| DataFusionError::Plan("No data file found".to_owned()))??; + let (schema, _) = fetch_metadata(first_file)?; Ok(Arc::new(schema)) } - async fn infer_stats(&self, path: &str) -> Result { - let (_, stats) = fetch_metadata(path)?; + async fn infer_stats(&self, reader: Arc) -> Result { + let (_, stats) = fetch_metadata(reader)?; Ok(stats) } async fn create_physical_plan( &self, - schema: SchemaRef, - files: Vec>, - statistics: Statistics, - projection: &Option>, - batch_size: usize, - filters: &[Expr], - limit: Option, + conf: PhysicalPlanConfig, ) -> Result> { // If enable pruning then combine the filters to build the predicate. // If disable pruning then set the predicate to None, thus readers // will not prune data based on the statistics. let predicate = if self.enable_pruning { - combine_filters(filters) + combine_filters(&conf.filters) } else { None }; Ok(Arc::new(ParquetExec::new( - files, - statistics, - schema, - projection.clone(), + conf.object_store, + conf.files, + conf.statistics, + conf.schema, + conf.projection, predicate, - limit - .map(|l| std::cmp::min(l, batch_size)) - .unwrap_or(batch_size), - limit, + conf.batch_size, + conf.limit, ))) } } @@ -224,9 +237,9 @@ fn summarize_min_max( } /// Read and parse the metadata of the Parquet file at location `path` -fn fetch_metadata(path: &str) -> Result<(Schema, Statistics)> { - let file = File::open(path)?; - let file_reader = Arc::new(SerializedFileReader::new(file)?); +fn fetch_metadata(object_reader: Arc) -> Result<(Schema, Statistics)> { + let obj_reader = ChunkObjectReader(object_reader); + let file_reader = Arc::new(SerializedFileReader::new(obj_reader)?); let mut arrow_reader = ParquetFileArrowReader::new(file_reader); let schema = arrow_reader.get_schema()?; let num_fields = schema.fields().len(); @@ -282,10 +295,37 @@ fn fetch_metadata(path: &str) -> Result<(Schema, Statistics)> { Ok((schema, statistics)) } +/// A wrapper around the object reader to make it implement `ChunkReader` +pub struct ChunkObjectReader(pub Arc); + +impl Length for ChunkObjectReader { + fn len(&self) -> u64 { + self.0.length() + } +} + +impl ChunkReader for ChunkObjectReader { + type T = Box; + + fn get_read(&self, start: u64, length: usize) -> ParquetResult { + self.0 + .sync_chunk_reader(start, length) + .map_err(|e| ParquetError::ArrowError(e.to_string())) + } +} + #[cfg(test)] mod tests { - use crate::datasource::file_format::string_stream; - use crate::physical_plan::collect; + use crate::{ + datasource::{ + file_format::PartitionedFile, + object_store::local::{ + local_file_meta, local_object_reader, local_object_reader_stream, + LocalFileSystem, + }, + }, + physical_plan::collect, + }; use super::*; use arrow::array::{ @@ -297,10 +337,10 @@ mod tests { #[tokio::test] async fn read_small_batches() -> Result<()> { let projection = None; - let exec = get_exec("alltypes_plain.parquet", &projection, 2).await?; + let exec = get_exec("alltypes_plain.parquet", &projection, 2, None).await?; let stream = exec.execute(0).await?; - let _ = stream + let tt_batches = stream .map(|batch| { let batch = batch.unwrap(); assert_eq!(11, batch.num_columns()); @@ -309,6 +349,8 @@ mod tests { .fold(0, |acc, _| async move { acc + 1i32 }) .await; + assert_eq!(tt_batches, 4 /* 8/2 */); + // test metadata assert_eq!(exec.statistics().num_rows, Some(8)); assert_eq!(exec.statistics().total_byte_size, Some(671)); @@ -316,10 +358,27 @@ mod tests { Ok(()) } + #[tokio::test] + async fn read_limit() -> Result<()> { + let projection = None; + let exec = get_exec("alltypes_plain.parquet", &projection, 1024, Some(1)).await?; + + // note: even if the limit is set, the executor rounds up to the batch size + assert_eq!(exec.statistics().num_rows, Some(8)); + assert_eq!(exec.statistics().total_byte_size, Some(671)); + assert!(exec.statistics().is_exact); + let batches = collect(exec).await?; + assert_eq!(1, batches.len()); + assert_eq!(11, batches[0].num_columns()); + assert_eq!(8, batches[0].num_rows()); + + Ok(()) + } + #[tokio::test] async fn read_alltypes_plain_parquet() -> Result<()> { let projection = None; - let exec = get_exec("alltypes_plain.parquet", &projection, 1024).await?; + let exec = get_exec("alltypes_plain.parquet", &projection, 1024, None).await?; let x: Vec = exec .schema() @@ -355,7 +414,7 @@ mod tests { #[tokio::test] async fn read_bool_alltypes_plain_parquet() -> Result<()> { let projection = Some(vec![1]); - let exec = get_exec("alltypes_plain.parquet", &projection, 1024).await?; + let exec = get_exec("alltypes_plain.parquet", &projection, 1024, None).await?; let batches = collect(exec).await?; assert_eq!(1, batches.len()); @@ -383,7 +442,7 @@ mod tests { #[tokio::test] async fn read_i32_alltypes_plain_parquet() -> Result<()> { let projection = Some(vec![0]); - let exec = get_exec("alltypes_plain.parquet", &projection, 1024).await?; + let exec = get_exec("alltypes_plain.parquet", &projection, 1024, None).await?; let batches = collect(exec).await?; assert_eq!(1, batches.len()); @@ -408,7 +467,7 @@ mod tests { #[tokio::test] async fn read_i96_alltypes_plain_parquet() -> Result<()> { let projection = Some(vec![10]); - let exec = get_exec("alltypes_plain.parquet", &projection, 1024).await?; + let exec = get_exec("alltypes_plain.parquet", &projection, 1024, None).await?; let batches = collect(exec).await?; assert_eq!(1, batches.len()); @@ -433,7 +492,7 @@ mod tests { #[tokio::test] async fn read_f32_alltypes_plain_parquet() -> Result<()> { let projection = Some(vec![6]); - let exec = get_exec("alltypes_plain.parquet", &projection, 1024).await?; + let exec = get_exec("alltypes_plain.parquet", &projection, 1024, None).await?; let batches = collect(exec).await?; assert_eq!(1, batches.len()); @@ -461,7 +520,7 @@ mod tests { #[tokio::test] async fn read_f64_alltypes_plain_parquet() -> Result<()> { let projection = Some(vec![7]); - let exec = get_exec("alltypes_plain.parquet", &projection, 1024).await?; + let exec = get_exec("alltypes_plain.parquet", &projection, 1024, None).await?; let batches = collect(exec).await?; assert_eq!(1, batches.len()); @@ -489,7 +548,7 @@ mod tests { #[tokio::test] async fn read_binary_alltypes_plain_parquet() -> Result<()> { let projection = Some(vec![9]); - let exec = get_exec("alltypes_plain.parquet", &projection, 1024).await?; + let exec = get_exec("alltypes_plain.parquet", &projection, 1024, None).await?; let batches = collect(exec).await?; assert_eq!(1, batches.len()); @@ -518,23 +577,33 @@ mod tests { file_name: &str, projection: &Option>, batch_size: usize, + limit: Option, ) -> Result> { let testdata = crate::test_util::parquet_test_data(); let filename = format!("{}/{}", testdata, file_name); - let format = ParquetFormat { - enable_pruning: true, - }; + let format = ParquetFormat::default(); let schema = format - .infer_schema(string_stream(vec![filename.clone()])) + .infer_schema(local_object_reader_stream(vec![filename.clone()])) .await .expect("Schema inference"); - let stats = format - .infer_stats(&filename.clone()) + let statistics = format + .infer_stats(local_object_reader(filename.clone())) .await .expect("Stats inference"); - let files = vec![vec![PartitionedFile { path: filename }]]; + let files = vec![vec![PartitionedFile { + file_meta: local_file_meta(filename.clone()), + }]]; let exec = format - .create_physical_plan(schema, files, stats, projection, batch_size, &[], None) + .create_physical_plan(PhysicalPlanConfig { + object_store: Arc::new(LocalFileSystem {}), + schema, + files, + statistics, + projection: projection.clone(), + batch_size, + filters: vec![], + limit, + }) .await?; Ok(exec) } diff --git a/datafusion/src/datasource/listing.rs b/datafusion/src/datasource/listing.rs index cac9b36b3d51f..f4b8f7755166d 100644 --- a/datafusion/src/datasource/listing.rs +++ b/datafusion/src/datasource/listing.rs @@ -22,17 +22,20 @@ use std::{any::Any, sync::Arc}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use async_trait::async_trait; -use futures::{StreamExt, TryStreamExt}; +use futures::StreamExt; use crate::{ datasource::file_format::{self, PartitionedFile}, - error::Result, + error::{DataFusionError, Result}, logical_plan::Expr, - physical_plan::{common, ExecutionPlan, Statistics}, + physical_plan::{ExecutionPlan, Statistics}, }; use super::{ - datasource::TableProviderFilterPushDown, file_format::FileFormat, TableProvider, + datasource::TableProviderFilterPushDown, + file_format::{FileFormat, PartitionedFileStream, PhysicalPlanConfig}, + object_store::{ObjectStore, ObjectStoreRegistry}, + TableProvider, }; /// Options for creating a `ListingTable` @@ -50,7 +53,7 @@ pub struct ListingOptions { /// Note that only `DataType::Utf8` is supported for the column type. /// TODO implement case where partitions.len() > 0 pub partitions: Vec, - /// Set true to try to guess statistics from the file parse it. + /// Set true to try to guess statistics from the files. /// This can add a lot of overhead as it requires files to /// be opened and partially parsed. pub collect_stat: bool, @@ -60,16 +63,23 @@ pub struct ListingOptions { } impl ListingOptions { - /// Infer the schema of the files at the given path, including the partitioning + /// Infer the schema of the files at the given uri, including the partitioning /// columns. /// /// This method will not be called by the table itself but before creating it. /// This way when creating the logical plan we can decide to resolve the schema /// locally or ask a remote service to do it (e.g a scheduler). - pub async fn infer_schema(&self, path: &str) -> Result { - let files = - futures::stream::iter(common::build_file_list(path, &self.file_extension)?); - let file_schema = self.format.infer_schema(Box::pin(files)).await?; + pub async fn infer_schema( + &self, + object_store_registry: Arc, + uri: &str, + ) -> Result { + let (object_store, path) = object_store_registry.get_by_uri(uri)?; + let file_stream = object_store + .list_file_with_suffix(path, &self.file_extension) + .await? + .map(move |file_meta| object_store.file_reader(file_meta?.sized_file)); + let file_schema = self.format.infer_schema(Box::pin(file_stream)).await?; // Add the partition columns to the file schema let mut fields = file_schema.fields().clone(); for part in &self.partitions { @@ -82,7 +92,9 @@ impl ListingOptions { /// An implementation of `TableProvider` that uses the object store /// or file system listing capability to get the list of files. pub struct ListingTable { - path: String, + // TODO pass object_store_registry to scan() instead + object_store_registry: Arc, + uri: String, schema: SchemaRef, options: ListingOptions, } @@ -90,14 +102,17 @@ pub struct ListingTable { impl ListingTable { /// Create new table that lists the FS to get the files to scan. pub fn new( - path: impl Into, + // TODO pass object_store_registry to scan() instead + object_store_registry: Arc, + uri: impl Into, // the schema must be resolved before creating the table schema: SchemaRef, options: ListingOptions, ) -> Self { - let path: String = path.into(); + let uri: String = uri.into(); Self { - path, + object_store_registry, + uri, schema, options, } @@ -121,44 +136,24 @@ impl TableProvider for ListingTable { filters: &[Expr], limit: Option, ) -> Result> { - // list files (with partitions) - let file_list = pruned_partition_list( - &self.path, - filters, - &self.options.file_extension, - &self.options.partitions, - )?; - - // collect the statistics if required by the config - let files = futures::stream::iter(file_list) - .then(|file| async { - let statistics = if self.options.collect_stat { - self.options.format.infer_stats(&file.path).await? - } else { - Statistics::default() - }; - Ok((file, statistics)) as Result<(PartitionedFile, Statistics)> - }) - .try_collect::>() + // TODO object_store_registry should be provided as param here + let (object_store, path) = self.object_store_registry.get_by_uri(&self.uri)?; + let (partitioned_file_lists, statistics) = self + .list_files_for_scan(Arc::clone(&object_store), path, filters, limit) .await?; - - let (files, statistics) = - file_format::get_statistics_with_limit(&files, self.schema(), limit); - - let partitioned_file_lists = split_files(files, self.options.max_partitions); - // create the execution plan self.options .format - .create_physical_plan( - self.schema(), - partitioned_file_lists, + .create_physical_plan(PhysicalPlanConfig { + object_store, + schema: self.schema(), + files: partitioned_file_lists, statistics, - projection, + projection: projection.clone(), batch_size, - filters, + filters: filters.to_vec(), limit, - ) + }) .await } @@ -170,23 +165,70 @@ impl TableProvider for ListingTable { } } +impl ListingTable { + async fn list_files_for_scan<'a>( + &'a self, + object_store: Arc, + path: &'a str, + filters: &'a [Expr], + limit: Option, + ) -> Result<(Vec>, Statistics)> { + // list files (with partitions) + let file_list = pruned_partition_list( + object_store.as_ref(), + path, + filters, + &self.options.file_extension, + &self.options.partitions, + ) + .await?; + + // collect the statistics if required by the config + let files = file_list.then(move |part_file| { + let object_store = object_store.clone(); + async move { + let part_file = part_file?; + let statistics = if self.options.collect_stat { + let object_reader = object_store + .file_reader(part_file.file_meta.sized_file.clone())?; + self.options.format.infer_stats(object_reader).await? + } else { + Statistics::default() + }; + Ok((part_file, statistics)) as Result<(PartitionedFile, Statistics)> + } + }); + + let (files, statistics) = + file_format::get_statistics_with_limit(files, self.schema(), limit).await?; + + if files.is_empty() { + return Err(DataFusionError::Plan(format!( + "No files found at {} with file extension {}", + self.uri, self.options.file_extension, + ))); + } + + Ok((split_files(files, self.options.max_partitions), statistics)) + } +} + /// Discover the partitions on the given path and prune out files /// relative to irrelevant partitions using `filters` expressions -fn pruned_partition_list( - // registry: &ObjectStoreRegistry, +async fn pruned_partition_list( + store: &dyn ObjectStore, path: &str, _filters: &[Expr], file_extension: &str, partition_names: &[String], -) -> Result> { - let list_all = || { - Ok(common::build_file_list(path, file_extension)? - .into_iter() - .map(|f| PartitionedFile { path: f }) - .collect::>()) - }; +) -> Result { if partition_names.is_empty() { - list_all() + Ok(Box::pin( + store + .list_file_with_suffix(path, file_extension) + .await? + .map(|f| Ok(PartitionedFile { file_meta: f? })), + )) } else { todo!("use filters to prune partitions") } @@ -208,26 +250,37 @@ fn split_files( #[cfg(test)] mod tests { + use std::io::Read; + + use futures::AsyncRead; + + use crate::datasource::{ + file_format::{avro::AvroFormat, parquet::ParquetFormat}, + object_store::{ + FileMeta, FileMetaStream, ListEntryStream, ObjectReader, ObjectStore, + SizedFile, + }, + }; + use super::*; #[test] fn test_split_files() { - let files = vec![ - PartitionedFile { - path: "a".to_owned(), - }, - PartitionedFile { - path: "b".to_owned(), - }, - PartitionedFile { - path: "c".to_owned(), - }, - PartitionedFile { - path: "d".to_owned(), - }, - PartitionedFile { - path: "e".to_owned(), + let new_partitioned_file = |path: &str| PartitionedFile { + file_meta: FileMeta { + sized_file: SizedFile { + path: path.to_owned(), + size: 10, + }, + last_modified: None, }, + }; + let files = vec![ + new_partitioned_file("a"), + new_partitioned_file("b"), + new_partitioned_file("c"), + new_partitioned_file("d"), + new_partitioned_file("e"), ]; let chunks = split_files(files.clone(), 1); @@ -275,23 +328,131 @@ mod tests { Ok(()) } - // TODO add tests on listing once the ObjectStore abstraction is added + #[tokio::test] + async fn file_listings() -> Result<()> { + assert_partitioning(5, 12, 5).await?; + assert_partitioning(4, 4, 4).await?; + assert_partitioning(5, 2, 2).await?; + assert_partitioning(0, 2, 0).await.expect_err("no files"); + Ok(()) + } async fn load_table(name: &str) -> Result> { let testdata = crate::test_util::parquet_test_data(); let filename = format!("{}/{}", testdata, name); let opt = ListingOptions { file_extension: "parquet".to_owned(), - format: Arc::new(file_format::parquet::ParquetFormat { - enable_pruning: true, - }), + format: Arc::new(ParquetFormat::default()), partitions: vec![], max_partitions: 2, collect_stat: true, }; + let object_store_reg = Arc::new(ObjectStoreRegistry::new()); // here we resolve the schema locally - let schema = opt.infer_schema(&filename).await.expect("Infer schema"); - let table = ListingTable::new(&filename, schema, opt); + let schema = opt + .infer_schema(Arc::clone(&object_store_reg), &filename) + .await + .expect("Infer schema"); + let table = ListingTable::new(object_store_reg, &filename, schema, opt); Ok(Arc::new(table)) } + + async fn assert_partitioning( + files_in_folder: usize, + max_partitions: usize, + output_partitioning: usize, + ) -> Result<()> { + let registry = ObjectStoreRegistry::new(); + let mock_store: Arc = + Arc::new(MockObjectStore { files_in_folder }); + registry.register_store("mock".to_owned(), Arc::clone(&mock_store)); + + let format = AvroFormat {}; + + let opt = ListingOptions { + file_extension: "".to_owned(), + format: Arc::new(format), + partitions: vec![], + max_partitions, + collect_stat: true, + }; + + let object_store_reg = Arc::new(ObjectStoreRegistry::new()); + + let schema = Schema::new(vec![Field::new("a", DataType::Boolean, false)]); + + let table = ListingTable::new( + object_store_reg, + "mock://bucket/key-prefix", + Arc::new(schema), + opt, + ); + + let (file_list, _) = table + .list_files_for_scan(mock_store, "bucket/key-prefix", &[], None) + .await?; + + assert_eq!(file_list.len(), output_partitioning); + + Ok(()) + } + + #[derive(Debug)] + struct MockObjectStore { + pub files_in_folder: usize, + } + + #[async_trait] + impl ObjectStore for MockObjectStore { + async fn list_file(&self, prefix: &str) -> Result { + let prefix = prefix.to_owned(); + let files = (0..self.files_in_folder).map(move |i| { + Ok(FileMeta { + sized_file: SizedFile { + path: format!("{}file{}", prefix, i), + size: 100, + }, + last_modified: None, + }) + }); + Ok(Box::pin(futures::stream::iter(files))) + } + + async fn list_dir( + &self, + _prefix: &str, + _delimiter: Option, + ) -> Result { + unimplemented!() + } + + fn file_reader(&self, _file: SizedFile) -> Result> { + Ok(Arc::new(MockObjectReader {})) + } + } + + struct MockObjectReader {} + + #[async_trait] + impl ObjectReader for MockObjectReader { + async fn chunk_reader( + &self, + _start: u64, + _length: usize, + ) -> Result> { + unimplemented!() + } + + fn sync_chunk_reader( + &self, + _start: u64, + _length: usize, + ) -> Result> { + unimplemented!() + } + + fn length(&self) -> u64 { + unimplemented!() + } + } } diff --git a/datafusion/src/datasource/object_store/local.rs b/datafusion/src/datasource/object_store/local.rs index 2b27f6c8f993b..c18af1cd8f448 100644 --- a/datafusion/src/datasource/object_store/local.rs +++ b/datafusion/src/datasource/object_store/local.rs @@ -17,7 +17,8 @@ //! Object store that represents the Local File System. -use std::fs::Metadata; +use std::fs::{self, File, Metadata}; +use std::io::{Read, Seek, SeekFrom}; use std::sync::Arc; use async_trait::async_trait; @@ -29,6 +30,8 @@ use crate::datasource::object_store::{ use crate::error::DataFusionError; use crate::error::Result; +use super::{ObjectReaderStream, SizedFile}; + #[derive(Debug)] /// Local File System as Object Store. pub struct LocalFileSystem; @@ -47,17 +50,17 @@ impl ObjectStore for LocalFileSystem { todo!() } - fn file_reader(&self, file: FileMeta) -> Result> { + fn file_reader(&self, file: SizedFile) -> Result> { Ok(Arc::new(LocalFileReader::new(file)?)) } } struct LocalFileReader { - file: FileMeta, + file: SizedFile, } impl LocalFileReader { - fn new(file: FileMeta) -> Result { + fn new(file: SizedFile) -> Result { Ok(Self { file }) } } @@ -68,8 +71,22 @@ impl ObjectReader for LocalFileReader { &self, _start: u64, _length: usize, - ) -> Result> { - todo!() + ) -> Result> { + todo!( + "implement once async file readers are available (arrow-rs#78, arrow-rs#111)" + ) + } + + fn sync_chunk_reader( + &self, + start: u64, + length: usize, + ) -> Result> { + // A new file descriptor is opened for each chunk reader. + // This okay because chunks are usually fairly large. + let mut file = File::open(&self.file.path)?; + file.seek(SeekFrom::Start(start))?; + Ok(Box::new(file.take(length as u64))) } fn length(&self) -> u64 { @@ -80,9 +97,11 @@ impl ObjectReader for LocalFileReader { async fn list_all(prefix: String) -> Result { fn get_meta(path: String, metadata: Metadata) -> FileMeta { FileMeta { - path, + sized_file: SizedFile { + path, + size: metadata.len(), + }, last_modified: metadata.modified().map(chrono::DateTime::from).ok(), - size: metadata.len(), } } @@ -133,6 +152,30 @@ async fn list_all(prefix: String) -> Result { } } +/// Create a stream of `ObjectReader` by opening each file in the `files` vector +pub fn local_object_reader_stream(files: Vec) -> ObjectReaderStream { + Box::pin(futures::stream::iter(files).map(|f| Ok(local_object_reader(f)))) +} + +/// Helper method to convert a file location to an ObjectReader +pub fn local_object_reader(file: String) -> Arc { + LocalFileSystem + .file_reader(local_file_meta(file).sized_file) + .expect("File not found") +} + +/// Helper method to fetch the file size and date at given path and create a `FileMeta` +pub fn local_file_meta(file: String) -> FileMeta { + let metadata = fs::metadata(&file).expect("Local file metadata"); + FileMeta { + sized_file: SizedFile { + size: metadata.len(), + path: file, + }, + last_modified: metadata.modified().map(chrono::DateTime::from).ok(), + } +} + #[cfg(test)] mod tests { use super::*; @@ -163,8 +206,8 @@ mod tests { let mut files = list_all(tmp.path().to_str().unwrap().to_string()).await?; while let Some(file) = files.next().await { let file = file?; - assert_eq!(file.size, 0); - all_files.insert(file.path); + assert_eq!(file.size(), 0); + all_files.insert(file.path().to_owned()); } assert_eq!(all_files.len(), 3); diff --git a/datafusion/src/datasource/object_store/mod.rs b/datafusion/src/datasource/object_store/mod.rs index fd25fd43a2e7d..b16684a9db0aa 100644 --- a/datafusion/src/datasource/object_store/mod.rs +++ b/datafusion/src/datasource/object_store/mod.rs @@ -20,57 +20,110 @@ pub mod local; use std::collections::HashMap; -use std::fmt::Debug; +use std::fmt::{self, Debug}; +use std::io::Read; use std::pin::Pin; use std::sync::{Arc, RwLock}; use async_trait::async_trait; -use futures::{AsyncRead, Stream}; +use chrono::{DateTime, Utc}; +use futures::{AsyncRead, Stream, StreamExt}; use local::LocalFileSystem; use crate::error::{DataFusionError, Result}; -use chrono::Utc; -/// Object Reader for one file in a object store +/// Object Reader for one file in an object store. +/// +/// Note that the dynamic dispatch on the reader might +/// have some performance impacts. #[async_trait] -pub trait ObjectReader { +pub trait ObjectReader: Send + Sync { /// Get reader for a part [start, start + length] in the file asynchronously async fn chunk_reader(&self, start: u64, length: usize) - -> Result>; + -> Result>; - /// Get length for the file + /// Get reader for a part [start, start + length] in the file + fn sync_chunk_reader( + &self, + start: u64, + length: usize, + ) -> Result>; + + /// Get reader for the entire file + fn sync_reader(&self) -> Result> { + self.sync_chunk_reader(0, self.length() as usize) + } + + /// Get the size of the file fn length(&self) -> u64; } -/// Represents a file or a prefix that may require further resolution +/// Represents a specific file or a prefix (folder) that may +/// require further resolution #[derive(Debug)] pub enum ListEntry { - /// File metadata + /// Specific file with metadata FileMeta(FileMeta), /// Prefix to be further resolved during partition discovery Prefix(String), } -/// File meta we got from object store -#[derive(Debug)] -pub struct FileMeta { - /// Path of the file +/// The path and size of the file. +#[derive(Debug, Clone)] +pub struct SizedFile { + /// Path of the file. It is relative to the current object + /// store (it does not specify the xx:// scheme). pub path: String, - /// Last time the file was modified in UTC - pub last_modified: Option>, /// File size in total pub size: u64, } -/// Stream of files get listed from object store +/// Description of a file as returned by the listing command of a +/// given object store. The resulting path is relative to the +/// object store that generated it. +#[derive(Debug, Clone)] +pub struct FileMeta { + /// The path and size of the file. + pub sized_file: SizedFile, + /// The last modification time of the file according to the + /// object store metadata. This information might be used by + /// catalog systems like Delta Lake for time travel (see + /// https://github.com/delta-io/delta/issues/192) + pub last_modified: Option>, +} + +impl FileMeta { + /// The path that describes this file. It is relative to the + /// associated object store. + pub fn path(&self) -> &str { + &self.sized_file.path + } + + /// The size of the file. + pub fn size(&self) -> u64 { + self.sized_file.size + } +} + +impl std::fmt::Display for FileMeta { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "{} (size: {})", self.path(), self.size()) + } +} + +/// Stream of files listed from object store pub type FileMetaStream = Pin> + Send + Sync + 'static>>; -/// Stream of list entries get from object store +/// Stream of list entries obtained from object store pub type ListEntryStream = Pin> + Send + Sync + 'static>>; +/// Stream readers opened on a given object store +pub type ObjectReaderStream = + Pin>> + Send + Sync + 'static>>; + /// A ObjectStore abstracts access to an underlying file/object storage. /// It maps strings (e.g. URLs, filesystem paths, etc) to sources of bytes #[async_trait] @@ -78,6 +131,23 @@ pub trait ObjectStore: Sync + Send + Debug { /// Returns all the files in path `prefix` async fn list_file(&self, prefix: &str) -> Result; + /// Calls `list_file` with a suffix filter + async fn list_file_with_suffix( + &self, + prefix: &str, + suffix: &str, + ) -> Result { + let file_stream = self.list_file(prefix).await?; + let suffix = suffix.to_owned(); + Ok(Box::pin(file_stream.filter(move |fr| { + let has_suffix = match fr { + Ok(f) => f.path().ends_with(&suffix), + Err(_) => true, + }; + async move { has_suffix } + }))) + } + /// Returns all the files in `prefix` if the `prefix` is already a leaf dir, /// or all paths between the `prefix` and the first occurrence of the `delimiter` if it is provided. async fn list_dir( @@ -87,7 +157,7 @@ pub trait ObjectStore: Sync + Send + Debug { ) -> Result; /// Get object reader for one file - fn file_reader(&self, file: FileMeta) -> Result>; + fn file_reader(&self, file: SizedFile) -> Result>; } static LOCAL_SCHEME: &str = "file"; @@ -100,6 +170,22 @@ pub struct ObjectStoreRegistry { pub object_stores: RwLock>>, } +impl fmt::Debug for ObjectStoreRegistry { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ObjectStoreRegistry") + .field( + "schemes", + &self + .object_stores + .read() + .unwrap() + .keys() + .collect::>(), + ) + .finish() + } +} + impl ObjectStoreRegistry { /// Create the registry that object stores can registered into. /// ['LocalFileSystem'] store is registered in by default to support read local files natively. @@ -130,12 +216,17 @@ impl ObjectStoreRegistry { } /// Get a suitable store for the URI based on it's scheme. For example: - /// URI with scheme file or no schema will return the default LocalFS store, - /// URI with scheme s3 will return the S3 store if it's registered. - pub fn get_by_uri(&self, uri: &str) -> Result> { - if let Some((scheme, _)) = uri.split_once(':') { + /// - URI with scheme `file://` or no schema will return the default LocalFS store + /// - URI with scheme `s3://` will return the S3 store if it's registered + /// Returns a tuple with the store and the path of the file in that store + /// (URI=scheme://path). + pub fn get_by_uri<'a>( + &self, + uri: &'a str, + ) -> Result<(Arc, &'a str)> { + if let Some((scheme, path)) = uri.split_once("://") { let stores = self.object_stores.read().unwrap(); - stores + let store = stores .get(&*scheme.to_lowercase()) .map(Clone::clone) .ok_or_else(|| { @@ -143,9 +234,10 @@ impl ObjectStoreRegistry { "No suitable object store found for {}", scheme )) - }) + })?; + Ok((store, path)) } else { - Ok(Arc::new(LocalFileSystem)) + Ok((Arc::new(LocalFileSystem), uri)) } } } diff --git a/datafusion/src/physical_plan/file_format/avro.rs b/datafusion/src/physical_plan/file_format/avro.rs index fd50f18bf1f09..23d912803250e 100644 --- a/datafusion/src/physical_plan/file_format/avro.rs +++ b/datafusion/src/physical_plan/file_format/avro.rs @@ -16,6 +16,8 @@ // under the License. //! Execution plan for reading line-delimited Avro files +use crate::datasource::file_format::PartitionedFile; +use crate::datasource::object_store::ObjectStore; use crate::error::{DataFusionError, Result}; #[cfg(feature = "avro")] use crate::physical_plan::RecordBatchStream; @@ -32,7 +34,6 @@ use std::any::Any; use std::sync::Arc; #[cfg(feature = "avro")] use std::{ - fs::File, io::Read, pin::Pin, task::{Context, Poll}, @@ -41,7 +42,8 @@ use std::{ /// Execution plan for scanning Avro data source #[derive(Debug, Clone)] pub struct AvroExec { - files: Vec, + object_store: Arc, + files: Vec, statistics: Statistics, schema: SchemaRef, projection: Option>, @@ -54,7 +56,8 @@ impl AvroExec { /// Create a new JSON reader execution plan provided file list and schema /// TODO: support partitiond file list (Vec>) pub fn new( - files: Vec, + object_store: Arc, + files: Vec, statistics: Statistics, schema: SchemaRef, projection: Option>, @@ -69,6 +72,7 @@ impl AvroExec { }; Self { + object_store, files, statistics, schema, @@ -121,21 +125,26 @@ impl ExecutionPlan for AvroExec { #[cfg(feature = "avro")] async fn execute(&self, partition: usize) -> Result { - let mut builder = crate::avro_to_arrow::ReaderBuilder::new() - .with_schema(self.schema.clone()) - .with_batch_size(self.batch_size); - if let Some(proj) = &self.projection { - builder = builder.with_projection( - proj.iter() - .map(|col_idx| self.schema.field(*col_idx).name()) - .cloned() - .collect(), - ); - } - - let file = File::open(&self.files[partition])?; - - Ok(Box::pin(AvroStream::new(builder.build(file)?, self.limit))) + let file = self + .object_store + .file_reader(self.files[partition].file_meta.sized_file.clone())? + .sync_reader()?; + + let proj = self.projection.as_ref().map(|p| { + p.iter() + .map(|col_idx| self.schema.field(*col_idx).name()) + .cloned() + .collect() + }); + + let avro_reader = crate::avro_to_arrow::Reader::try_new( + file, + self.schema(), + self.batch_size, + proj, + )?; + + Ok(Box::pin(AvroStream::new(avro_reader, self.limit))) } fn fmt_as( @@ -147,10 +156,14 @@ impl ExecutionPlan for AvroExec { DisplayFormatType::Default => { write!( f, - "AvroExec: batch_size={}, limit={:?}, partitions=[{}]", + "AvroExec: batch_size={}, limit={:?}, files=[{}]", self.batch_size, self.limit, - self.files.join(", ") + self.files + .iter() + .map(|f| f.file_meta.path()) + .collect::>() + .join(", ") ) } } @@ -229,6 +242,10 @@ impl RecordBatchStream for AvroStream<'_, R> { #[cfg(feature = "avro")] mod tests { + use crate::datasource::object_store::local::{ + local_file_meta, local_object_reader_stream, LocalFileSystem, + }; + use super::*; #[tokio::test] @@ -240,10 +257,13 @@ mod tests { let testdata = crate::test_util::arrow_test_data(); let filename = format!("{}/avro/alltypes_plain.avro", testdata); let avro_exec = AvroExec::new( - vec![filename.clone()], + Arc::new(LocalFileSystem {}), + vec![PartitionedFile { + file_meta: local_file_meta(filename.clone()), + }], Statistics::default(), AvroFormat {} - .infer_schema(Box::pin(futures::stream::once(async { filename }))) + .infer_schema(local_object_reader_stream(vec![filename])) .await?, Some(vec![0, 1, 2]), 1024, diff --git a/datafusion/src/physical_plan/file_format/csv.rs b/datafusion/src/physical_plan/file_format/csv.rs index df3d74761564b..df6022cc44aab 100644 --- a/datafusion/src/physical_plan/file_format/csv.rs +++ b/datafusion/src/physical_plan/file_format/csv.rs @@ -17,6 +17,8 @@ //! Execution plan for reading CSV files +use crate::datasource::file_format::PartitionedFile; +use crate::datasource::object_store::ObjectStore; use crate::error::{DataFusionError, Result}; use crate::physical_plan::{ DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream, @@ -28,7 +30,6 @@ use arrow::error::Result as ArrowResult; use arrow::record_batch::RecordBatch; use futures::Stream; use std::any::Any; -use std::fs::File; use std::io::Read; use std::pin::Pin; use std::sync::Arc; @@ -39,8 +40,9 @@ use async_trait::async_trait; /// Execution plan for scanning a CSV file #[derive(Debug, Clone)] pub struct CsvExec { + object_store: Arc, /// List of data files - files: Vec, + files: Vec, /// Schema representing the CSV file schema: SchemaRef, /// Provided statistics @@ -64,7 +66,8 @@ impl CsvExec { /// TODO: support partitiond file list (Vec>) #[allow(clippy::too_many_arguments)] pub fn new( - files: Vec, + object_store: Arc, + files: Vec, statistics: Statistics, schema: SchemaRef, has_header: bool, @@ -81,6 +84,7 @@ impl CsvExec { }; Self { + object_store, files, schema, statistics, @@ -131,8 +135,13 @@ impl ExecutionPlan for CsvExec { } async fn execute(&self, partition: usize) -> Result { - Ok(Box::pin(CsvStream::try_new( - &self.files[partition], + let file = self + .object_store + .file_reader(self.files[partition].file_meta.sized_file.clone())? + .sync_reader()?; + + Ok(Box::pin(CsvStream::try_new_from_reader( + file, self.schema.clone(), self.has_header, self.delimiter, @@ -151,11 +160,15 @@ impl ExecutionPlan for CsvExec { DisplayFormatType::Default => { write!( f, - "CsvExec: has_header={}, batch_size={}, limit={:?}, partitions=[{}]", + "CsvExec: has_header={}, batch_size={}, limit={:?}, files=[{}]", self.has_header, self.batch_size, self.limit, - self.files.join(", ") + self.files + .iter() + .map(|f| f.file_meta.path()) + .collect::>() + .join(", ") ) } } @@ -171,23 +184,7 @@ struct CsvStream { /// Arrow CSV reader reader: csv::Reader, } -impl CsvStream { - /// Create an iterator for a CSV file - pub fn try_new( - filename: &str, - schema: SchemaRef, - has_header: bool, - delimiter: Option, - projection: &Option>, - batch_size: usize, - limit: Option, - ) -> Result { - let file = File::open(filename)?; - Self::try_new_from_reader( - file, schema, has_header, delimiter, projection, batch_size, limit, - ) - } -} + impl CsvStream { /// Create an iterator for a reader pub fn try_new_from_reader( @@ -237,7 +234,10 @@ impl RecordBatchStream for CsvStream { #[cfg(test)] mod tests { use super::*; - use crate::test::aggr_test_schema; + use crate::{ + datasource::object_store::local::{local_file_meta, LocalFileSystem}, + test::aggr_test_schema, + }; use futures::StreamExt; #[tokio::test] @@ -247,7 +247,10 @@ mod tests { let filename = "aggregate_test_100.csv"; let path = format!("{}/csv/{}", testdata, filename); let csv = CsvExec::new( - vec![path], + Arc::new(LocalFileSystem {}), + vec![PartitionedFile { + file_meta: local_file_meta(path), + }], Statistics::default(), schema, true, @@ -277,7 +280,10 @@ mod tests { let filename = "aggregate_test_100.csv"; let path = format!("{}/csv/{}", testdata, filename); let csv = CsvExec::new( - vec![path], + Arc::new(LocalFileSystem {}), + vec![PartitionedFile { + file_meta: local_file_meta(path), + }], Statistics::default(), schema, true, diff --git a/datafusion/src/physical_plan/file_format/json.rs b/datafusion/src/physical_plan/file_format/json.rs index 69be9d2e7a9f9..090fd3f59ef35 100644 --- a/datafusion/src/physical_plan/file_format/json.rs +++ b/datafusion/src/physical_plan/file_format/json.rs @@ -19,6 +19,8 @@ use async_trait::async_trait; use futures::Stream; +use crate::datasource::file_format::PartitionedFile; +use crate::datasource::object_store::ObjectStore; use crate::error::{DataFusionError, Result}; use crate::physical_plan::{ DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream, @@ -31,7 +33,6 @@ use arrow::{ record_batch::RecordBatch, }; use std::any::Any; -use std::fs::File; use std::{ io::Read, pin::Pin, @@ -42,7 +43,8 @@ use std::{ /// Execution plan for scanning NdJson data source #[derive(Debug, Clone)] pub struct NdJsonExec { - files: Vec, + object_store: Arc, + files: Vec, statistics: Statistics, schema: SchemaRef, projection: Option>, @@ -55,7 +57,8 @@ impl NdJsonExec { /// Create a new JSON reader execution plan provided file list and schema /// TODO: support partitiond file list (Vec>) pub fn new( - files: Vec, + object_store: Arc, + files: Vec, statistics: Statistics, schema: SchemaRef, projection: Option>, @@ -70,6 +73,7 @@ impl NdJsonExec { }; Self { + object_store, files, statistics, schema, @@ -114,24 +118,21 @@ impl ExecutionPlan for NdJsonExec { } async fn execute(&self, partition: usize) -> Result { - let mut builder = json::ReaderBuilder::new() - .with_schema(self.schema.clone()) - .with_batch_size(self.batch_size); - if let Some(proj) = &self.projection { - builder = builder.with_projection( - proj.iter() - .map(|col_idx| self.schema.field(*col_idx).name()) - .cloned() - .collect(), - ); - } + let proj = self.projection.as_ref().map(|p| { + p.iter() + .map(|col_idx| self.schema.field(*col_idx).name()) + .cloned() + .collect() + }); + + let file = self + .object_store + .file_reader(self.files[partition].file_meta.sized_file.clone())? + .sync_reader()?; - let file = File::open(&self.files[partition])?; + let json_reader = json::Reader::new(file, self.schema(), self.batch_size, proj); - Ok(Box::pin(NdJsonStream::new( - builder.build(file)?, - self.limit, - ))) + Ok(Box::pin(NdJsonStream::new(json_reader, self.limit))) } fn fmt_as( @@ -143,10 +144,14 @@ impl ExecutionPlan for NdJsonExec { DisplayFormatType::Default => { write!( f, - "JsonExec: batch_size={}, limit={:?}, partitions=[{}]", + "JsonExec: batch_size={}, limit={:?}, files=[{}]", self.batch_size, self.limit, - self.files.join(", ") + self.files + .iter() + .map(|f| f.file_meta.path()) + .collect::>() + .join(", ") ) } } @@ -221,18 +226,21 @@ impl RecordBatchStream for NdJsonStream { mod tests { use futures::StreamExt; - use crate::datasource::file_format::{json::JsonFormat, FileFormat}; + use crate::datasource::{ + file_format::{json::JsonFormat, FileFormat}, + object_store::local::{ + local_file_meta, local_object_reader_stream, LocalFileSystem, + }, + }; use super::*; const TEST_DATA_BASE: &str = "tests/jsons"; async fn infer_schema(path: String) -> Result { - JsonFormat { - schema_infer_max_rec: None, - } - .infer_schema(Box::pin(futures::stream::once(async { path }))) - .await + JsonFormat::default() + .infer_schema(local_object_reader_stream(vec![path])) + .await } #[tokio::test] @@ -240,7 +248,10 @@ mod tests { use arrow::datatypes::DataType; let path = format!("{}/1.json", TEST_DATA_BASE); let exec = NdJsonExec::new( - vec![path.clone()], + Arc::new(LocalFileSystem {}), + vec![PartitionedFile { + file_meta: local_file_meta(path.clone()), + }], Default::default(), infer_schema(path).await?, None, @@ -292,7 +303,10 @@ mod tests { async fn nd_json_exec_file_projection() -> Result<()> { let path = format!("{}/1.json", TEST_DATA_BASE); let exec = NdJsonExec::new( - vec![path.clone()], + Arc::new(LocalFileSystem {}), + vec![PartitionedFile { + file_meta: local_file_meta(path.clone()), + }], Default::default(), infer_schema(path).await?, Some(vec![0, 2]), diff --git a/datafusion/src/physical_plan/file_format/parquet.rs b/datafusion/src/physical_plan/file_format/parquet.rs index 615e52b544748..3d73d0550fa08 100644 --- a/datafusion/src/physical_plan/file_format/parquet.rs +++ b/datafusion/src/physical_plan/file_format/parquet.rs @@ -18,10 +18,11 @@ //! Execution plan for reading Parquet files use std::fmt; -use std::fs::File; use std::sync::Arc; use std::{any::Any, convert::TryInto}; +use crate::datasource::file_format::parquet::ChunkObjectReader; +use crate::datasource::object_store::ObjectStore; use crate::{ error::{DataFusionError, Result}, logical_plan::{Column, Expr}, @@ -63,10 +64,11 @@ use crate::datasource::file_format::{FilePartition, PartitionedFile}; /// Execution plan for scanning one or more Parquet partitions #[derive(Debug, Clone)] pub struct ParquetExec { + object_store: Arc, /// Parquet partitions to read - pub partitions: Vec, + partitions: Vec, /// Schema after projection is applied - pub schema: SchemaRef, + schema: SchemaRef, /// Projection for which columns to load projection: Vec, /// Batch size @@ -108,8 +110,11 @@ struct ParquetFileMetrics { } impl ParquetExec { - /// Create a new Parquet reader execution plan provided file list and schema + /// Create a new Parquet reader execution plan provided file list and schema. + /// Even if `limit` is set, ParquetExec rounds up the number of records to the next `batch_size`. + #[allow(clippy::too_many_arguments)] pub fn new( + object_store: Arc, files: Vec>, statistics: Statistics, schema: SchemaRef, @@ -156,6 +161,7 @@ impl ParquetExec { Self::project(&projection, schema, statistics); Self { + object_store, partitions, schema: projected_schema, projection, @@ -283,9 +289,11 @@ impl ExecutionPlan for ParquetExec { let predicate_builder = self.predicate_builder.clone(); let batch_size = self.batch_size; let limit = self.limit; + let object_store = Arc::clone(&self.object_store); task::spawn_blocking(move || { if let Err(e) = read_partition( + object_store.as_ref(), partition_index, partition, metrics, @@ -462,6 +470,7 @@ fn build_row_group_predicate( #[allow(clippy::too_many_arguments)] fn read_partition( + object_store: &dyn ObjectStore, partition_index: usize, partition: ParquetPartition, metrics: ExecutionPlanMetricsSet, @@ -474,10 +483,15 @@ fn read_partition( let mut total_rows = 0; let all_files = partition.file_partition.files; 'outer: for partitioned_file in all_files { - let file_metrics = - ParquetFileMetrics::new(partition_index, &*partitioned_file.path, &metrics); - let file = File::open(partitioned_file.path.as_str())?; - let mut file_reader = SerializedFileReader::new(file)?; + let file_metrics = ParquetFileMetrics::new( + partition_index, + &*partitioned_file.file_meta.path(), + &metrics, + ); + let object_reader = + object_store.file_reader(partitioned_file.file_meta.sized_file.clone())?; + let mut file_reader = + SerializedFileReader::new(ChunkObjectReader(object_reader))?; if let Some(predicate_builder) = predicate_builder { let row_group_predicate = build_row_group_predicate( predicate_builder, @@ -526,7 +540,12 @@ fn read_partition( #[cfg(test)] mod tests { - use crate::datasource::file_format::{parquet::ParquetFormat, FileFormat}; + use crate::datasource::{ + file_format::{parquet::ParquetFormat, FileFormat}, + object_store::local::{ + local_file_meta, local_object_reader_stream, LocalFileSystem, + }, + }; use super::*; use arrow::datatypes::{DataType, Field}; @@ -542,15 +561,14 @@ mod tests { let testdata = crate::test_util::parquet_test_data(); let filename = format!("{}/alltypes_plain.parquet", testdata); let parquet_exec = ParquetExec::new( + Arc::new(LocalFileSystem {}), vec![vec![PartitionedFile { - path: filename.clone(), + file_meta: local_file_meta(filename.clone()), }]], Statistics::default(), - ParquetFormat { - enable_pruning: true, - } - .infer_schema(Box::pin(futures::stream::once(async { filename }))) - .await?, + ParquetFormat::default() + .infer_schema(local_object_reader_stream(vec![filename])) + .await?, Some(vec![0, 1, 2]), None, 1024,