From aea685d320113658f5eb90f0da3343ec8f5856e0 Mon Sep 17 00:00:00 2001 From: jiangzhx Date: Tue, 4 Apr 2023 14:49:58 +0800 Subject: [PATCH] when inferring the schema of compressed CSV, decompress before newline-delimited chunking --- datafusion-examples/examples/csv_sql.rs | 18 ++ .../core/src/datasource/file_format/csv.rs | 187 +++++++++++++++--- .../src/datasource/file_format/file_type.rs | 91 +++++---- .../core/src/physical_plan/file_format/csv.rs | 3 +- .../src/physical_plan/file_format/json.rs | 3 +- testing | 2 +- 6 files changed, 239 insertions(+), 65 deletions(-) diff --git a/datafusion-examples/examples/csv_sql.rs b/datafusion-examples/examples/csv_sql.rs index ce602e0e4816..c883a2076d13 100644 --- a/datafusion-examples/examples/csv_sql.rs +++ b/datafusion-examples/examples/csv_sql.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use datafusion::datasource::file_format::file_type::FileCompressionType; use datafusion::error::Result; use datafusion::prelude::*; @@ -48,5 +49,22 @@ async fn main() -> Result<()> { // print the results df.show().await?; + // query compressed CSV with specific options + let csv_options = CsvReadOptions::default() + .has_header(true) + .file_compression_type(FileCompressionType::GZIP) + .file_extension("csv.gz"); + let df = ctx + .read_csv( + &format!("{testdata}/csv/aggregate_test_100.csv.gz"), + csv_options, + ) + .await?; + let df = df + .filter(col("c1").eq(lit("a")))? + .select_columns(&["c2", "c3"])?; + + df.show().await?; + Ok(()) } diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs index fcab651e3514..875a963c9d21 100644 --- a/datafusion/core/src/datasource/file_format/csv.rs +++ b/datafusion/core/src/datasource/file_format/csv.rs @@ -30,6 +30,7 @@ use bytes::{Buf, Bytes}; use datafusion_common::DataFusionError; use datafusion_physical_expr::PhysicalExpr; +use futures::stream::BoxStream; use futures::{pin_mut, Stream, StreamExt, TryStreamExt}; use object_store::{delimited::newline_delimited_stream, ObjectMeta, ObjectStore}; @@ -65,6 +66,62 @@ impl Default for CsvFormat { } impl CsvFormat { + /// Return a newline delimited stream from the specified file on + /// Stream, decompressing if necessary + /// Each returned `Bytes` has a whole number of newline delimited rows + async fn read_to_delimited_chunks( + &self, + store: &Arc, + object: &ObjectMeta, + ) -> BoxStream<'static, Result> { + // stream to only read as many rows as needed into memory + let stream = store + .get(&object.location) + .await + .map_err(DataFusionError::ObjectStore); + let stream = match stream { + Ok(stream) => self + .read_to_delimited_chunks_from_stream( + stream + .into_stream() + .map_err(DataFusionError::ObjectStore) + .boxed(), + ) + .await + .map_err(DataFusionError::from) + .left_stream(), + Err(e) => { + futures::stream::once(futures::future::ready(Err(e))).right_stream() + } + }; + stream.boxed() + } + + async fn read_to_delimited_chunks_from_stream( + &self, + stream: BoxStream<'static, Result>, + ) -> BoxStream<'static, Result> { + let file_compression_type = self.file_compression_type.to_owned(); + let decoder = file_compression_type.convert_stream(stream); + let steam = match decoder { + Ok(decoded_stream) => { + newline_delimited_stream(decoded_stream.map_err(|e| match e { + DataFusionError::ObjectStore(e) => e, + err => object_store::Error::Generic { + store: "read to delimited chunks failed", + source: Box::new(err), + }, + })) + .map_err(DataFusionError::from) + .left_stream() + } + Err(e) => { + futures::stream::once(futures::future::ready(Err(e))).right_stream() + } + }; + steam.boxed() + } + /// Set a limit in terms of records to scan to infer the schema /// - default to `DEFAULT_SCHEMA_INFER_MAX_RECORD` pub fn with_schema_infer_max_rec(mut self, max_rec: Option) -> Self { @@ -124,8 +181,7 @@ impl FileFormat for CsvFormat { let mut records_to_read = self.schema_infer_max_rec.unwrap_or(usize::MAX); for object in objects { - // stream to only read as many rows as needed into memory - let stream = read_to_delimited_chunks(store, object).await; + let stream = self.read_to_delimited_chunks(store, object).await; let (schema, records_read) = self .infer_schema_from_stream(records_to_read, stream) .await?; @@ -166,28 +222,6 @@ impl FileFormat for CsvFormat { } } -/// Return a newline delimited stream from the specified file on -/// object store -/// -/// Each returned `Bytes` has a whole number of newline delimited rows -async fn read_to_delimited_chunks( - store: &Arc, - object: &ObjectMeta, -) -> impl Stream> { - // stream to only read as many rows as needed into memory - let stream = store - .get(&object.location) - .await - .map_err(DataFusionError::ObjectStore); - - match stream { - Ok(s) => newline_delimited_stream(s.into_stream()) - .map_err(|e| DataFusionError::External(Box::new(e))) - .left_stream(), - Err(e) => futures::stream::once(futures::future::ready(Err(e))).right_stream(), - } -} - impl CsvFormat { /// Return the inferred schema reading up to records_to_read from a /// stream of delimited chunks returning the inferred schema and the @@ -207,7 +241,7 @@ impl CsvFormat { while let Some(chunk) = stream.next().await.transpose()? { let (Schema { fields, .. }, records_read) = arrow::csv::reader::infer_reader_schema( - self.file_compression_type.convert_read(chunk.reader())?, + chunk.reader(), self.delimiter, Some(records_to_read), // only consider header for first chunk @@ -295,14 +329,19 @@ fn build_schema_helper(names: Vec, types: &[HashSet]) -> Schem mod tests { use super::super::test_util::scan_format; use super::*; + use crate::assert_batches_eq; use crate::datasource::file_format::test_util::VariableStream; use crate::physical_plan::collect; - use crate::prelude::{SessionConfig, SessionContext}; + use crate::prelude::{CsvReadOptions, SessionConfig, SessionContext}; + use crate::test_util::arrow_test_data; use bytes::Bytes; use chrono::DateTime; use datafusion_common::cast::as_string_array; + use datafusion_expr::{col, lit}; use futures::StreamExt; + use object_store::local::LocalFileSystem; use object_store::path::Path; + use rstest::*; #[tokio::test] async fn read_small_batches() -> Result<()> { @@ -461,6 +500,102 @@ mod tests { Ok(()) } + #[rstest( + file_compression_type, + case(FileCompressionType::UNCOMPRESSED), + case(FileCompressionType::GZIP), + case(FileCompressionType::BZIP2), + case(FileCompressionType::XZ), + case(FileCompressionType::ZSTD) + )] + #[tokio::test] + async fn query_compress_data( + file_compression_type: FileCompressionType, + ) -> Result<()> { + let integration = LocalFileSystem::new_with_prefix(arrow_test_data()).unwrap(); + + let path = Path::from("csv/aggregate_test_100.csv"); + let csv = CsvFormat::default().with_has_header(true); + let records_to_read = csv.schema_infer_max_rec.unwrap_or(usize::MAX); + let store = Arc::new(integration) as Arc; + let original_stream = store.get(&path).await?; + + //convert original_stream to compressed_stream for next step + let compressed_stream = + file_compression_type.to_owned().convert_to_compress_stream( + original_stream + .into_stream() + .map_err(DataFusionError::from) + .boxed(), + ); + + //prepare expected schema for assert_eq + let expected = Schema::new(vec![ + Field::new("c1", DataType::Utf8, true), + Field::new("c2", DataType::Int64, true), + Field::new("c3", DataType::Int64, true), + Field::new("c4", DataType::Int64, true), + Field::new("c5", DataType::Int64, true), + Field::new("c6", DataType::Int64, true), + Field::new("c7", DataType::Int64, true), + Field::new("c8", DataType::Int64, true), + Field::new("c9", DataType::Int64, true), + Field::new("c10", DataType::Int64, true), + Field::new("c11", DataType::Float64, true), + Field::new("c12", DataType::Float64, true), + Field::new("c13", DataType::Utf8, true), + ]); + + let compressed_csv = + csv.with_file_compression_type(file_compression_type.clone()); + + //convert compressed_stream to decoded_stream + let decoded_stream = compressed_csv + .read_to_delimited_chunks_from_stream(compressed_stream.unwrap()) + .await; + let (schema, records_read) = compressed_csv + .infer_schema_from_stream(records_to_read, decoded_stream) + .await?; + + assert_eq!(expected, schema); + assert_eq!(100, records_read); + Ok(()) + } + + #[tokio::test] + async fn query_compress_csv() -> Result<()> { + let ctx = SessionContext::new(); + + let csv_options = CsvReadOptions::default() + .has_header(true) + .file_compression_type(FileCompressionType::GZIP) + .file_extension("csv.gz"); + let df = ctx + .read_csv( + &format!("{}/csv/aggregate_test_100.csv.gz", arrow_test_data()), + csv_options, + ) + .await?; + + let record_batch = df + .filter(col("c1").eq(lit("a")).and(col("c2").gt(lit("4"))))? + .select_columns(&["c2", "c3"])? + .collect() + .await?; + #[rustfmt::skip] + let expected = vec![ + "+----+------+", + "| c2 | c3 |", + "+----+------+", + "| 5 | 36 |", + "| 5 | -31 |", + "| 5 | -101 |", + "+----+------+", + ]; + assert_batches_eq!(expected, &record_batch); + Ok(()) + } + async fn get_exec( state: &SessionState, file_name: &str, diff --git a/datafusion/core/src/datasource/file_format/file_type.rs b/datafusion/core/src/datasource/file_format/file_type.rs index e07eb8a3d7a6..e72b1e579e4a 100644 --- a/datafusion/core/src/datasource/file_format/file_type.rs +++ b/datafusion/core/src/datasource/file_format/file_type.rs @@ -25,16 +25,21 @@ use crate::datasource::file_format::json::DEFAULT_JSON_EXTENSION; use crate::datasource::file_format::parquet::DEFAULT_PARQUET_EXTENSION; #[cfg(feature = "compression")] use async_compression::tokio::bufread::{ - BzDecoder as AsyncBzDecoder, GzipDecoder as AsyncGzDecoder, - XzDecoder as AsyncXzDecoder, ZstdDecoder as AsyncZstdDecoer, + BzDecoder as AsyncBzDecoder, BzEncoder as AsyncBzEncoder, + GzipDecoder as AsyncGzDecoder, GzipEncoder as AsyncGzEncoder, + XzDecoder as AsyncXzDecoder, XzEncoder as AsyncXzEncoder, + ZstdDecoder as AsyncZstdDecoer, ZstdEncoder as AsyncZstdEncoder, }; + use bytes::Bytes; #[cfg(feature = "compression")] use bzip2::read::MultiBzDecoder; use datafusion_common::parsers::CompressionTypeVariant; #[cfg(feature = "compression")] use flate2::read::MultiGzDecoder; -use futures::Stream; + +use futures::stream::BoxStream; +use futures::StreamExt; #[cfg(feature = "compression")] use futures::TryStreamExt; use std::str::FromStr; @@ -111,53 +116,67 @@ impl FileCompressionType { self.variant.is_compressed() } - /// Given a `Stream`, create a `Stream` which data are decompressed with `FileCompressionType`. - pub fn convert_stream> + Unpin + Send + 'static>( + /// Given a `Stream`, create a `Stream` which data are compressed with `FileCompressionType`. + pub fn convert_to_compress_stream( &self, - s: T, - ) -> Result> + Send + Unpin>> { - #[cfg(feature = "compression")] - let err_converter = |e: std::io::Error| match e - .get_ref() - .and_then(|e| e.downcast_ref::()) - { - Some(_) => { - *(e.into_inner() - .unwrap() - .downcast::() - .unwrap()) + s: BoxStream<'static, Result>, + ) -> Result>> { + Ok(match self.variant { + #[cfg(feature = "compression")] + GZIP => ReaderStream::new(AsyncGzEncoder::new(StreamReader::new(s))) + .map_err(DataFusionError::from) + .boxed(), + #[cfg(feature = "compression")] + BZIP2 => ReaderStream::new(AsyncBzEncoder::new(StreamReader::new(s))) + .map_err(DataFusionError::from) + .boxed(), + #[cfg(feature = "compression")] + XZ => ReaderStream::new(AsyncXzEncoder::new(StreamReader::new(s))) + .map_err(DataFusionError::from) + .boxed(), + #[cfg(feature = "compression")] + ZSTD => ReaderStream::new(AsyncZstdEncoder::new(StreamReader::new(s))) + .map_err(DataFusionError::from) + .boxed(), + #[cfg(not(feature = "compression"))] + GZIP | BZIP2 | XZ | ZSTD => { + return Err(DataFusionError::NotImplemented( + "Compression feature is not enabled".to_owned(), + )) } - None => Into::::into(e), - }; + UNCOMPRESSED => s.boxed(), + }) + } + /// Given a `Stream`, create a `Stream` which data are decompressed with `FileCompressionType`. + pub fn convert_stream( + &self, + s: BoxStream<'static, Result>, + ) -> Result>> { Ok(match self.variant { #[cfg(feature = "compression")] - GZIP => Box::new( - ReaderStream::new(AsyncGzDecoder::new(StreamReader::new(s))) - .map_err(err_converter), - ), + GZIP => ReaderStream::new(AsyncGzDecoder::new(StreamReader::new(s))) + .map_err(DataFusionError::from) + .boxed(), #[cfg(feature = "compression")] - BZIP2 => Box::new( - ReaderStream::new(AsyncBzDecoder::new(StreamReader::new(s))) - .map_err(err_converter), - ), + BZIP2 => ReaderStream::new(AsyncBzDecoder::new(StreamReader::new(s))) + .map_err(DataFusionError::from) + .boxed(), #[cfg(feature = "compression")] - XZ => Box::new( - ReaderStream::new(AsyncXzDecoder::new(StreamReader::new(s))) - .map_err(err_converter), - ), + XZ => ReaderStream::new(AsyncXzDecoder::new(StreamReader::new(s))) + .map_err(DataFusionError::from) + .boxed(), #[cfg(feature = "compression")] - ZSTD => Box::new( - ReaderStream::new(AsyncZstdDecoer::new(StreamReader::new(s))) - .map_err(err_converter), - ), + ZSTD => ReaderStream::new(AsyncZstdDecoer::new(StreamReader::new(s))) + .map_err(DataFusionError::from) + .boxed(), #[cfg(not(feature = "compression"))] GZIP | BZIP2 | XZ | ZSTD => { return Err(DataFusionError::NotImplemented( "Compression feature is not enabled".to_owned(), )) } - UNCOMPRESSED => Box::new(s), + UNCOMPRESSED => s.boxed(), }) } diff --git a/datafusion/core/src/physical_plan/file_format/csv.rs b/datafusion/core/src/physical_plan/file_format/csv.rs index d9075c84adfc..9826d32d0f7a 100644 --- a/datafusion/core/src/physical_plan/file_format/csv.rs +++ b/datafusion/core/src/physical_plan/file_format/csv.rs @@ -246,7 +246,8 @@ impl FileOpener for CsvOpener { GetResult::Stream(s) => { let mut decoder = config.builder().build_decoder(); let s = s.map_err(DataFusionError::from); - let mut input = file_compression_type.convert_stream(s)?.fuse(); + let mut input = + file_compression_type.convert_stream(s.boxed())?.fuse(); let mut buffered = Bytes::new(); let s = futures::stream::poll_fn(move |cx| { diff --git a/datafusion/core/src/physical_plan/file_format/json.rs b/datafusion/core/src/physical_plan/file_format/json.rs index ebbae7417889..fe81932286cf 100644 --- a/datafusion/core/src/physical_plan/file_format/json.rs +++ b/datafusion/core/src/physical_plan/file_format/json.rs @@ -191,7 +191,8 @@ impl FileOpener for JsonOpener { .build_decoder()?; let s = s.map_err(DataFusionError::from); - let mut input = file_compression_type.convert_stream(s)?.fuse(); + let mut input = + file_compression_type.convert_stream(s.boxed())?.fuse(); let mut buffered = Bytes::new(); let s = stream::poll_fn(move |cx| { diff --git a/testing b/testing index 5bab2f264a23..47f7b56b2568 160000 --- a/testing +++ b/testing @@ -1 +1 @@ -Subproject commit 5bab2f264a23f5af68f69ea93d24ef1e8e77fc88 +Subproject commit 47f7b56b25683202c1fd957668e13f2abafc0f12