Skip to content

Commit

Permalink
when inferring the schema of compressed CSV, decompress before newlin…
Browse files Browse the repository at this point in the history
…e-delimited chunking
  • Loading branch information
jiangzhx committed Apr 7, 2023
1 parent fd350fa commit 0bf639d
Show file tree
Hide file tree
Showing 4 changed files with 234 additions and 30 deletions.
18 changes: 18 additions & 0 deletions datafusion-examples/examples/csv_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use datafusion::datasource::file_format::file_type::FileCompressionType;
use datafusion::error::Result;
use datafusion::prelude::*;

Expand Down Expand Up @@ -48,5 +49,22 @@ async fn main() -> Result<()> {
// print the results
df.show().await?;

// query compressed CSV with specific options
let csv_options = CsvReadOptions::default()
.has_header(true)
.file_compression_type(FileCompressionType::GZIP)
.file_extension("csv.gz");
let df = ctx
.read_csv(
&format!("{testdata}/csv/aggregate_test_100.csv.gz"),
csv_options,
)
.await?;
let df = df
.filter(col("c1").eq(lit("a")))?
.select_columns(&["c2", "c3"])?;

df.show().await?;

Ok(())
}
182 changes: 156 additions & 26 deletions datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,59 @@ impl Default for CsvFormat {
}

impl CsvFormat {
/// Return a newline delimited stream from the specified file on
/// Stream, decompressing if necessary
/// Each returned `Bytes` has a whole number of newline delimited rows
async fn read_to_delimited_chunks(
&self,
store: &Arc<dyn ObjectStore>,
object: &ObjectMeta,
) -> impl Stream<Item = Result<Bytes>> {
// stream to only read as many rows as needed into memory
let stream = store
.get(&object.location)
.await
.map_err(DataFusionError::ObjectStore);
match stream {
Ok(stream) => self
.read_to_delimited_chunks_from_stream(
stream.into_stream().map_err(DataFusionError::ObjectStore),
)
.await
.map_err(DataFusionError::from)
.left_stream(),
Err(e) => {
futures::stream::once(futures::future::ready(Err(e))).right_stream()
}
}
}

async fn read_to_delimited_chunks_from_stream<
T: Stream<Item = Result<Bytes>> + Unpin + Send + 'static,
>(
&self,
stream: T,
) -> impl Stream<Item = Result<Bytes>> {
let file_compression_type = self.file_compression_type.to_owned();
let decoder = file_compression_type.convert_stream(stream);
match decoder {
Ok(decoded_stream) => {
newline_delimited_stream(decoded_stream.map_err(|e| match e {
DataFusionError::ObjectStore(e) => e,
err => object_store::Error::Generic {
store: "read to delimited chunks failed",
source: Box::new(err),
},
}))
.map_err(DataFusionError::from)
.left_stream()
}
Err(e) => {
futures::stream::once(futures::future::ready(Err(e))).right_stream()
}
}
}

/// Set a limit in terms of records to scan to infer the schema
/// - default to `DEFAULT_SCHEMA_INFER_MAX_RECORD`
pub fn with_schema_infer_max_rec(mut self, max_rec: Option<usize>) -> Self {
Expand Down Expand Up @@ -124,8 +177,7 @@ impl FileFormat for CsvFormat {
let mut records_to_read = self.schema_infer_max_rec.unwrap_or(usize::MAX);

for object in objects {
// stream to only read as many rows as needed into memory
let stream = read_to_delimited_chunks(store, object).await;
let stream = self.read_to_delimited_chunks(store, object).await;
let (schema, records_read) = self
.infer_schema_from_stream(records_to_read, stream)
.await?;
Expand Down Expand Up @@ -166,28 +218,6 @@ impl FileFormat for CsvFormat {
}
}

/// Return a newline delimited stream from the specified file on
/// object store
///
/// Each returned `Bytes` has a whole number of newline delimited rows
async fn read_to_delimited_chunks(
store: &Arc<dyn ObjectStore>,
object: &ObjectMeta,
) -> impl Stream<Item = Result<Bytes>> {
// stream to only read as many rows as needed into memory
let stream = store
.get(&object.location)
.await
.map_err(DataFusionError::ObjectStore);

match stream {
Ok(s) => newline_delimited_stream(s.into_stream())
.map_err(|e| DataFusionError::External(Box::new(e)))
.left_stream(),
Err(e) => futures::stream::once(futures::future::ready(Err(e))).right_stream(),
}
}

impl CsvFormat {
/// Return the inferred schema reading up to records_to_read from a
/// stream of delimited chunks returning the inferred schema and the
Expand All @@ -207,7 +237,7 @@ impl CsvFormat {
while let Some(chunk) = stream.next().await.transpose()? {
let (Schema { fields, .. }, records_read) =
arrow::csv::reader::infer_reader_schema(
self.file_compression_type.convert_read(chunk.reader())?,
chunk.reader(),
self.delimiter,
Some(records_to_read),
// only consider header for first chunk
Expand Down Expand Up @@ -295,14 +325,19 @@ fn build_schema_helper(names: Vec<String>, types: &[HashSet<DataType>]) -> Schem
mod tests {
use super::super::test_util::scan_format;
use super::*;
use crate::assert_batches_eq;
use crate::datasource::file_format::test_util::VariableStream;
use crate::physical_plan::collect;
use crate::prelude::{SessionConfig, SessionContext};
use crate::prelude::{CsvReadOptions, SessionConfig, SessionContext};
use crate::test_util::arrow_test_data;
use bytes::Bytes;
use chrono::DateTime;
use datafusion_common::cast::as_string_array;
use datafusion_expr::{col, lit};
use futures::StreamExt;
use object_store::local::LocalFileSystem;
use object_store::path::Path;
use rstest::*;

#[tokio::test]
async fn read_small_batches() -> Result<()> {
Expand Down Expand Up @@ -461,6 +496,101 @@ mod tests {
Ok(())
}

#[rstest(
file_compression_type,
case(FileCompressionType::UNCOMPRESSED),
case(FileCompressionType::GZIP),
case(FileCompressionType::BZIP2),
case(FileCompressionType::XZ),
case(FileCompressionType::ZSTD)
)]
#[tokio::test]
async fn query_compress_data(
file_compression_type: FileCompressionType,
) -> Result<()> {
let integration = LocalFileSystem::new_with_prefix(arrow_test_data()).unwrap();

let path = Path::from("csv/aggregate_test_100.csv");
let csv = CsvFormat::default().with_has_header(true);
let records_to_read = csv.schema_infer_max_rec.unwrap_or(usize::MAX);
let store = Arc::new(integration) as Arc<dyn ObjectStore>;
let original_stream = store.get(&path).await?;

//convert original_stream to compressed_stream for next step
let compressed_stream = file_compression_type
.to_owned()
.convert_to_compress_stream(
original_stream.into_stream().map_err(DataFusionError::from),
)
.unwrap();

//prepare expected schema for assert_eq
let expected = Schema::new(vec![
Field::new("c1", DataType::Utf8, true),
Field::new("c2", DataType::Int64, true),
Field::new("c3", DataType::Int64, true),
Field::new("c4", DataType::Int64, true),
Field::new("c5", DataType::Int64, true),
Field::new("c6", DataType::Int64, true),
Field::new("c7", DataType::Int64, true),
Field::new("c8", DataType::Int64, true),
Field::new("c9", DataType::Int64, true),
Field::new("c10", DataType::Int64, true),
Field::new("c11", DataType::Float64, true),
Field::new("c12", DataType::Float64, true),
Field::new("c13", DataType::Utf8, true),
]);

let compressed_csv =
csv.with_file_compression_type(file_compression_type.clone());

//convert compressed_stream to decoded_stream
let decoded_stream = compressed_csv
.read_to_delimited_chunks_from_stream(compressed_stream)
.await;
let (schema, records_read) = compressed_csv
.infer_schema_from_stream(records_to_read, decoded_stream)
.await?;

assert_eq!(expected, schema);
assert_eq!(100, records_read);
Ok(())
}

#[tokio::test]
async fn query_compress_csv() -> Result<()> {
let ctx = SessionContext::new();

let csv_options = CsvReadOptions::default()
.has_header(true)
.file_compression_type(FileCompressionType::GZIP)
.file_extension("csv.gz");
let df = ctx
.read_csv(
&format!("{}/csv/aggregate_test_100.csv.gz", arrow_test_data()),
csv_options,
)
.await?;

let record_batch = df
.filter(col("c1").eq(lit("a")).and(col("c2").gt(lit("4"))))?
.select_columns(&["c2", "c3"])?
.collect()
.await?;
#[rustfmt::skip]
let expected = vec![
"+----+------+",
"| c2 | c3 |",
"+----+------+",
"| 5 | 36 |",
"| 5 | -31 |",
"| 5 | -101 |",
"+----+------+",
];
assert_batches_eq!(expected, &record_batch);
Ok(())
}

async fn get_exec(
state: &SessionState,
file_name: &str,
Expand Down
62 changes: 59 additions & 3 deletions datafusion/core/src/datasource/file_format/file_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,19 @@ use crate::datasource::file_format::json::DEFAULT_JSON_EXTENSION;
use crate::datasource::file_format::parquet::DEFAULT_PARQUET_EXTENSION;
#[cfg(feature = "compression")]
use async_compression::tokio::bufread::{
BzDecoder as AsyncBzDecoder, GzipDecoder as AsyncGzDecoder,
XzDecoder as AsyncXzDecoder, ZstdDecoder as AsyncZstdDecoer,
BzDecoder as AsyncBzDecoder, BzEncoder as AsyncBzEncoder,
GzipDecoder as AsyncGzDecoder, GzipEncoder as AsyncGzEncoder,
XzDecoder as AsyncXzDecoder, XzEncoder as AsyncXzEncoder,
ZstdDecoder as AsyncZstdDecoer, ZstdEncoder as AsyncZstdEncoer,
};

use bytes::Bytes;
#[cfg(feature = "compression")]
use bzip2::read::MultiBzDecoder;
use datafusion_common::parsers::CompressionTypeVariant;
#[cfg(feature = "compression")]
use flate2::read::MultiGzDecoder;

use futures::Stream;
#[cfg(feature = "compression")]
use futures::TryStreamExt;
Expand Down Expand Up @@ -112,7 +116,9 @@ impl FileCompressionType {
}

/// Given a `Stream`, create a `Stream` which data are decompressed with `FileCompressionType`.
pub fn convert_stream<T: Stream<Item = Result<Bytes>> + Unpin + Send + 'static>(
pub fn convert_to_compress_stream<
T: Stream<Item = Result<Bytes>> + Unpin + Send + 'static,
>(
&self,
s: T,
) -> Result<Box<dyn Stream<Item = Result<Bytes>> + Send + Unpin>> {
Expand All @@ -130,6 +136,56 @@ impl FileCompressionType {
None => Into::<DataFusionError>::into(e),
};

Ok(match self.variant {
#[cfg(feature = "compression")]
GZIP => Box::new(
ReaderStream::new(AsyncGzEncoder::new(StreamReader::new(s)))
.map_err(err_converter),
),
#[cfg(feature = "compression")]
BZIP2 => Box::new(
ReaderStream::new(AsyncBzEncoder::new(StreamReader::new(s)))
.map_err(err_converter),
),
#[cfg(feature = "compression")]
XZ => Box::new(
ReaderStream::new(AsyncXzEncoder::new(StreamReader::new(s)))
.map_err(err_converter),
),
#[cfg(feature = "compression")]
ZSTD => Box::new(
ReaderStream::new(AsyncZstdEncoer::new(StreamReader::new(s)))
.map_err(err_converter),
),
#[cfg(not(feature = "compression"))]
GZIP | BZIP2 | XZ | ZSTD => {
return Err(DataFusionError::NotImplemented(
"Compression feature is not enabled".to_owned(),
))
}
UNCOMPRESSED => Box::new(s),
})
}

/// Given a `Stream`, create a `Stream` which data are decompressed with `FileCompressionType`.
pub fn convert_stream<T: Stream<Item = Result<Bytes>> + Unpin + Send + 'static>(
&self,
s: T,
) -> Result<Box<dyn Stream<Item = Result<Bytes>> + Send + Unpin>> {
// #[cfg(feature = "compression")]
let err_converter = |e: std::io::Error| match e
.get_ref()
.and_then(|e| e.downcast_ref::<DataFusionError>())
{
Some(_) => {
*(e.into_inner()
.unwrap()
.downcast::<DataFusionError>()
.unwrap())
}
None => Into::<DataFusionError>::into(e),
};

Ok(match self.variant {
#[cfg(feature = "compression")]
GZIP => Box::new(
Expand Down

0 comments on commit 0bf639d

Please sign in to comment.