Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

when inferring the schema of compressed CSV, decompress before newline-delimited chunking #5860

Merged
merged 1 commit into from
Apr 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions datafusion-examples/examples/csv_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use datafusion::datasource::file_format::file_type::FileCompressionType;
use datafusion::error::Result;
use datafusion::prelude::*;

Expand Down Expand Up @@ -48,5 +49,22 @@ async fn main() -> Result<()> {
// print the results
df.show().await?;

// query compressed CSV with specific options
let csv_options = CsvReadOptions::default()
.has_header(true)
.file_compression_type(FileCompressionType::GZIP)
.file_extension("csv.gz");
let df = ctx
.read_csv(
&format!("{testdata}/csv/aggregate_test_100.csv.gz"),
csv_options,
)
.await?;
let df = df
.filter(col("c1").eq(lit("a")))?
.select_columns(&["c2", "c3"])?;

df.show().await?;

Ok(())
}
187 changes: 161 additions & 26 deletions datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use bytes::{Buf, Bytes};
use datafusion_common::DataFusionError;

use datafusion_physical_expr::PhysicalExpr;
use futures::stream::BoxStream;
use futures::{pin_mut, Stream, StreamExt, TryStreamExt};
use object_store::{delimited::newline_delimited_stream, ObjectMeta, ObjectStore};

Expand Down Expand Up @@ -65,6 +66,62 @@ impl Default for CsvFormat {
}

impl CsvFormat {
/// Return a newline delimited stream from the specified file on
/// Stream, decompressing if necessary
/// Each returned `Bytes` has a whole number of newline delimited rows
async fn read_to_delimited_chunks(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like there must be a simpler way to express this code, but this does appear to work.

I wonder if we could use BoxStream rather than impl Stream.... 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed,thanks a lot.

&self,
store: &Arc<dyn ObjectStore>,
object: &ObjectMeta,
) -> BoxStream<'static, Result<Bytes>> {
// stream to only read as many rows as needed into memory
let stream = store
.get(&object.location)
.await
.map_err(DataFusionError::ObjectStore);
let stream = match stream {
Ok(stream) => self
.read_to_delimited_chunks_from_stream(
stream
.into_stream()
.map_err(DataFusionError::ObjectStore)
.boxed(),
)
.await
.map_err(DataFusionError::from)
.left_stream(),
Err(e) => {
futures::stream::once(futures::future::ready(Err(e))).right_stream()
}
};
stream.boxed()
}

async fn read_to_delimited_chunks_from_stream(
&self,
stream: BoxStream<'static, Result<Bytes>>,
) -> BoxStream<'static, Result<Bytes>> {
let file_compression_type = self.file_compression_type.to_owned();
let decoder = file_compression_type.convert_stream(stream);
let steam = match decoder {
Ok(decoded_stream) => {
newline_delimited_stream(decoded_stream.map_err(|e| match e {
DataFusionError::ObjectStore(e) => e,
err => object_store::Error::Generic {
store: "read to delimited chunks failed",
source: Box::new(err),
},
}))
.map_err(DataFusionError::from)
.left_stream()
}
Err(e) => {
futures::stream::once(futures::future::ready(Err(e))).right_stream()
}
};
steam.boxed()
}

/// Set a limit in terms of records to scan to infer the schema
/// - default to `DEFAULT_SCHEMA_INFER_MAX_RECORD`
pub fn with_schema_infer_max_rec(mut self, max_rec: Option<usize>) -> Self {
Expand Down Expand Up @@ -124,8 +181,7 @@ impl FileFormat for CsvFormat {
let mut records_to_read = self.schema_infer_max_rec.unwrap_or(usize::MAX);

for object in objects {
// stream to only read as many rows as needed into memory
let stream = read_to_delimited_chunks(store, object).await;
let stream = self.read_to_delimited_chunks(store, object).await;
let (schema, records_read) = self
.infer_schema_from_stream(records_to_read, stream)
.await?;
Expand Down Expand Up @@ -166,28 +222,6 @@ impl FileFormat for CsvFormat {
}
}

/// Return a newline delimited stream from the specified file on
/// object store
///
/// Each returned `Bytes` has a whole number of newline delimited rows
async fn read_to_delimited_chunks(
store: &Arc<dyn ObjectStore>,
object: &ObjectMeta,
) -> impl Stream<Item = Result<Bytes>> {
// stream to only read as many rows as needed into memory
let stream = store
.get(&object.location)
.await
.map_err(DataFusionError::ObjectStore);

match stream {
Ok(s) => newline_delimited_stream(s.into_stream())
.map_err(|e| DataFusionError::External(Box::new(e)))
.left_stream(),
Err(e) => futures::stream::once(futures::future::ready(Err(e))).right_stream(),
}
}

impl CsvFormat {
/// Return the inferred schema reading up to records_to_read from a
/// stream of delimited chunks returning the inferred schema and the
Expand All @@ -207,7 +241,7 @@ impl CsvFormat {
while let Some(chunk) = stream.next().await.transpose()? {
let (Schema { fields, .. }, records_read) =
arrow::csv::reader::infer_reader_schema(
self.file_compression_type.convert_read(chunk.reader())?,
chunk.reader(),
self.delimiter,
Some(records_to_read),
// only consider header for first chunk
Expand Down Expand Up @@ -295,14 +329,19 @@ fn build_schema_helper(names: Vec<String>, types: &[HashSet<DataType>]) -> Schem
mod tests {
use super::super::test_util::scan_format;
use super::*;
use crate::assert_batches_eq;
use crate::datasource::file_format::test_util::VariableStream;
use crate::physical_plan::collect;
use crate::prelude::{SessionConfig, SessionContext};
use crate::prelude::{CsvReadOptions, SessionConfig, SessionContext};
use crate::test_util::arrow_test_data;
use bytes::Bytes;
use chrono::DateTime;
use datafusion_common::cast::as_string_array;
use datafusion_expr::{col, lit};
use futures::StreamExt;
use object_store::local::LocalFileSystem;
use object_store::path::Path;
use rstest::*;

#[tokio::test]
async fn read_small_batches() -> Result<()> {
Expand Down Expand Up @@ -461,6 +500,102 @@ mod tests {
Ok(())
}

#[rstest(
file_compression_type,
case(FileCompressionType::UNCOMPRESSED),
case(FileCompressionType::GZIP),
case(FileCompressionType::BZIP2),
case(FileCompressionType::XZ),
case(FileCompressionType::ZSTD)
)]
#[tokio::test]
async fn query_compress_data(
file_compression_type: FileCompressionType,
) -> Result<()> {
let integration = LocalFileSystem::new_with_prefix(arrow_test_data()).unwrap();

let path = Path::from("csv/aggregate_test_100.csv");
let csv = CsvFormat::default().with_has_header(true);
let records_to_read = csv.schema_infer_max_rec.unwrap_or(usize::MAX);
let store = Arc::new(integration) as Arc<dyn ObjectStore>;
let original_stream = store.get(&path).await?;

//convert original_stream to compressed_stream for next step
let compressed_stream =
file_compression_type.to_owned().convert_to_compress_stream(
original_stream
.into_stream()
.map_err(DataFusionError::from)
.boxed(),
);

//prepare expected schema for assert_eq
let expected = Schema::new(vec![
Field::new("c1", DataType::Utf8, true),
Field::new("c2", DataType::Int64, true),
Field::new("c3", DataType::Int64, true),
Field::new("c4", DataType::Int64, true),
Field::new("c5", DataType::Int64, true),
Field::new("c6", DataType::Int64, true),
Field::new("c7", DataType::Int64, true),
Field::new("c8", DataType::Int64, true),
Field::new("c9", DataType::Int64, true),
Field::new("c10", DataType::Int64, true),
Field::new("c11", DataType::Float64, true),
Field::new("c12", DataType::Float64, true),
Field::new("c13", DataType::Utf8, true),
]);

let compressed_csv =
csv.with_file_compression_type(file_compression_type.clone());

//convert compressed_stream to decoded_stream
let decoded_stream = compressed_csv
.read_to_delimited_chunks_from_stream(compressed_stream.unwrap())
.await;
let (schema, records_read) = compressed_csv
.infer_schema_from_stream(records_to_read, decoded_stream)
.await?;

assert_eq!(expected, schema);
assert_eq!(100, records_read);
Ok(())
}

#[tokio::test]
async fn query_compress_csv() -> Result<()> {
let ctx = SessionContext::new();

let csv_options = CsvReadOptions::default()
.has_header(true)
.file_compression_type(FileCompressionType::GZIP)
.file_extension("csv.gz");
let df = ctx
.read_csv(
&format!("{}/csv/aggregate_test_100.csv.gz", arrow_test_data()),
csv_options,
)
.await?;

let record_batch = df
.filter(col("c1").eq(lit("a")).and(col("c2").gt(lit("4"))))?
.select_columns(&["c2", "c3"])?
.collect()
.await?;
#[rustfmt::skip]
let expected = vec![
"+----+------+",
"| c2 | c3 |",
"+----+------+",
"| 5 | 36 |",
"| 5 | -31 |",
"| 5 | -101 |",
"+----+------+",
];
assert_batches_eq!(expected, &record_batch);
Ok(())
}

async fn get_exec(
state: &SessionState,
file_name: &str,
Expand Down
91 changes: 55 additions & 36 deletions datafusion/core/src/datasource/file_format/file_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,21 @@ use crate::datasource::file_format::json::DEFAULT_JSON_EXTENSION;
use crate::datasource::file_format::parquet::DEFAULT_PARQUET_EXTENSION;
#[cfg(feature = "compression")]
use async_compression::tokio::bufread::{
BzDecoder as AsyncBzDecoder, GzipDecoder as AsyncGzDecoder,
XzDecoder as AsyncXzDecoder, ZstdDecoder as AsyncZstdDecoer,
BzDecoder as AsyncBzDecoder, BzEncoder as AsyncBzEncoder,
GzipDecoder as AsyncGzDecoder, GzipEncoder as AsyncGzEncoder,
XzDecoder as AsyncXzDecoder, XzEncoder as AsyncXzEncoder,
ZstdDecoder as AsyncZstdDecoer, ZstdEncoder as AsyncZstdEncoder,
};

use bytes::Bytes;
#[cfg(feature = "compression")]
use bzip2::read::MultiBzDecoder;
use datafusion_common::parsers::CompressionTypeVariant;
#[cfg(feature = "compression")]
use flate2::read::MultiGzDecoder;
use futures::Stream;

use futures::stream::BoxStream;
use futures::StreamExt;
#[cfg(feature = "compression")]
use futures::TryStreamExt;
use std::str::FromStr;
Expand Down Expand Up @@ -111,53 +116,67 @@ impl FileCompressionType {
self.variant.is_compressed()
}

/// Given a `Stream`, create a `Stream` which data are decompressed with `FileCompressionType`.
pub fn convert_stream<T: Stream<Item = Result<Bytes>> + Unpin + Send + 'static>(
/// Given a `Stream`, create a `Stream` which data are compressed with `FileCompressionType`.
pub fn convert_to_compress_stream(
&self,
s: T,
) -> Result<Box<dyn Stream<Item = Result<Bytes>> + Send + Unpin>> {
#[cfg(feature = "compression")]
let err_converter = |e: std::io::Error| match e
.get_ref()
.and_then(|e| e.downcast_ref::<DataFusionError>())
{
Some(_) => {
*(e.into_inner()
.unwrap()
.downcast::<DataFusionError>()
.unwrap())
s: BoxStream<'static, Result<Bytes>>,
) -> Result<BoxStream<'static, Result<Bytes>>> {
Ok(match self.variant {
#[cfg(feature = "compression")]
GZIP => ReaderStream::new(AsyncGzEncoder::new(StreamReader::new(s)))
.map_err(DataFusionError::from)
.boxed(),
#[cfg(feature = "compression")]
BZIP2 => ReaderStream::new(AsyncBzEncoder::new(StreamReader::new(s)))
.map_err(DataFusionError::from)
.boxed(),
#[cfg(feature = "compression")]
XZ => ReaderStream::new(AsyncXzEncoder::new(StreamReader::new(s)))
.map_err(DataFusionError::from)
.boxed(),
#[cfg(feature = "compression")]
ZSTD => ReaderStream::new(AsyncZstdEncoder::new(StreamReader::new(s)))
.map_err(DataFusionError::from)
.boxed(),
#[cfg(not(feature = "compression"))]
GZIP | BZIP2 | XZ | ZSTD => {
return Err(DataFusionError::NotImplemented(
"Compression feature is not enabled".to_owned(),
))
}
None => Into::<DataFusionError>::into(e),
};
UNCOMPRESSED => s.boxed(),
})
}

/// Given a `Stream`, create a `Stream` which data are decompressed with `FileCompressionType`.
pub fn convert_stream(
&self,
s: BoxStream<'static, Result<Bytes>>,
) -> Result<BoxStream<'static, Result<Bytes>>> {
Ok(match self.variant {
#[cfg(feature = "compression")]
GZIP => Box::new(
ReaderStream::new(AsyncGzDecoder::new(StreamReader::new(s)))
.map_err(err_converter),
),
GZIP => ReaderStream::new(AsyncGzDecoder::new(StreamReader::new(s)))
.map_err(DataFusionError::from)
.boxed(),
#[cfg(feature = "compression")]
BZIP2 => Box::new(
ReaderStream::new(AsyncBzDecoder::new(StreamReader::new(s)))
.map_err(err_converter),
),
BZIP2 => ReaderStream::new(AsyncBzDecoder::new(StreamReader::new(s)))
.map_err(DataFusionError::from)
.boxed(),
#[cfg(feature = "compression")]
XZ => Box::new(
ReaderStream::new(AsyncXzDecoder::new(StreamReader::new(s)))
.map_err(err_converter),
),
XZ => ReaderStream::new(AsyncXzDecoder::new(StreamReader::new(s)))
.map_err(DataFusionError::from)
.boxed(),
#[cfg(feature = "compression")]
ZSTD => Box::new(
ReaderStream::new(AsyncZstdDecoer::new(StreamReader::new(s)))
.map_err(err_converter),
),
ZSTD => ReaderStream::new(AsyncZstdDecoer::new(StreamReader::new(s)))
.map_err(DataFusionError::from)
.boxed(),
#[cfg(not(feature = "compression"))]
GZIP | BZIP2 | XZ | ZSTD => {
return Err(DataFusionError::NotImplemented(
"Compression feature is not enabled".to_owned(),
))
}
UNCOMPRESSED => Box::new(s),
UNCOMPRESSED => s.boxed(),
})
}

Expand Down
Loading