Skip to content

Commit

Permalink
when inferring the schema of compressed CSV, decompress before newlin…
Browse files Browse the repository at this point in the history
…e-delimited chunking
  • Loading branch information
jiangzhx committed Apr 4, 2023
1 parent fd350fa commit 2e62a39
Show file tree
Hide file tree
Showing 5 changed files with 218 additions and 29 deletions.
18 changes: 18 additions & 0 deletions datafusion-examples/examples/csv_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use datafusion::datasource::file_format::file_type::FileCompressionType;
use datafusion::error::Result;
use datafusion::prelude::*;

Expand Down Expand Up @@ -48,5 +49,22 @@ async fn main() -> Result<()> {
// print the results
df.show().await?;

// query with
let csv_options = CsvReadOptions::default()
.has_header(true)
.file_compression_type(FileCompressionType::GZIP)
.file_extension("csv.gz");
let df = ctx
.read_csv(
&format!("{testdata}/csv/aggregate_test_100.csv.gz"),
csv_options,
)
.await?;
let df = df
.filter(col("c1").eq(lit("a")))?
.select_columns(&["c2", "c3"])?;

df.show().await?;

Ok(())
}
165 changes: 140 additions & 25 deletions datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,37 @@ impl Default for CsvFormat {
}

impl CsvFormat {
/// Return a newline delimited stream from the specified file on
/// Stream, decompressing if necessary
/// Each returned `Bytes` has a whole number of newline delimited rows
async fn read_to_delimited_chunks<
T: Stream<Item = Result<Bytes>> + Unpin + Send + 'static,
>(
&self,
stream: T,
) -> impl Stream<Item = Result<Bytes>> {
let file_compression_type = self.file_compression_type.to_owned();
// stream to only read as many rows as needed into memory

let decoder = file_compression_type.convert_stream(stream);
match decoder {
Ok(decoded_stream) => {
newline_delimited_stream(decoded_stream.map_err(|e| match e {
DataFusionError::ObjectStore(e) => e,
err => object_store::Error::Generic {
store: "read to delimited chunks failed",
source: Box::new(err),
},
}))
.map_err(DataFusionError::from)
.left_stream()
}
Err(e) => {
futures::stream::once(futures::future::ready(Err(e))).right_stream()
}
}
}

/// Set a limit in terms of records to scan to infer the schema
/// - default to `DEFAULT_SCHEMA_INFER_MAX_RECORD`
pub fn with_schema_infer_max_rec(mut self, max_rec: Option<usize>) -> Self {
Expand Down Expand Up @@ -125,7 +156,13 @@ impl FileFormat for CsvFormat {

for object in objects {
// stream to only read as many rows as needed into memory
let stream = read_to_delimited_chunks(store, object).await;
let stream = store.get(&object.location).await.unwrap();

let stream = self
.read_to_delimited_chunks(
stream.into_stream().map_err(DataFusionError::from),
)
.await;
let (schema, records_read) = self
.infer_schema_from_stream(records_to_read, stream)
.await?;
Expand Down Expand Up @@ -166,28 +203,6 @@ impl FileFormat for CsvFormat {
}
}

/// Return a newline delimited stream from the specified file on
/// object store
///
/// Each returned `Bytes` has a whole number of newline delimited rows
async fn read_to_delimited_chunks(
store: &Arc<dyn ObjectStore>,
object: &ObjectMeta,
) -> impl Stream<Item = Result<Bytes>> {
// stream to only read as many rows as needed into memory
let stream = store
.get(&object.location)
.await
.map_err(DataFusionError::ObjectStore);

match stream {
Ok(s) => newline_delimited_stream(s.into_stream())
.map_err(|e| DataFusionError::External(Box::new(e)))
.left_stream(),
Err(e) => futures::stream::once(futures::future::ready(Err(e))).right_stream(),
}
}

impl CsvFormat {
/// Return the inferred schema reading up to records_to_read from a
/// stream of delimited chunks returning the inferred schema and the
Expand All @@ -207,7 +222,7 @@ impl CsvFormat {
while let Some(chunk) = stream.next().await.transpose()? {
let (Schema { fields, .. }, records_read) =
arrow::csv::reader::infer_reader_schema(
self.file_compression_type.convert_read(chunk.reader())?,
chunk.reader(),
self.delimiter,
Some(records_to_read),
// only consider header for first chunk
Expand Down Expand Up @@ -295,14 +310,19 @@ fn build_schema_helper(names: Vec<String>, types: &[HashSet<DataType>]) -> Schem
mod tests {
use super::super::test_util::scan_format;
use super::*;
use crate::assert_batches_eq;
use crate::datasource::file_format::test_util::VariableStream;
use crate::physical_plan::collect;
use crate::prelude::{SessionConfig, SessionContext};
use crate::prelude::{CsvReadOptions, SessionConfig, SessionContext};
use crate::test_util::arrow_test_data;
use bytes::Bytes;
use chrono::DateTime;
use datafusion_common::cast::as_string_array;
use datafusion_expr::{col, lit};
use futures::StreamExt;
use object_store::local::LocalFileSystem;
use object_store::path::Path;
use rstest::*;

#[tokio::test]
async fn read_small_batches() -> Result<()> {
Expand Down Expand Up @@ -461,6 +481,101 @@ mod tests {
Ok(())
}

#[rstest(
file_compression_type,
case(FileCompressionType::UNCOMPRESSED),
case(FileCompressionType::GZIP),
case(FileCompressionType::BZIP2),
case(FileCompressionType::XZ),
case(FileCompressionType::ZSTD)
)]
#[tokio::test]
async fn query_compress_data(
file_compression_type: FileCompressionType,
) -> Result<()> {
let integration = LocalFileSystem::new_with_prefix(arrow_test_data()).unwrap();

let path = Path::from("csv/aggregate_test_100.csv");
let csv = CsvFormat::default().with_has_header(true);
let records_to_read = csv.schema_infer_max_rec.unwrap_or(usize::MAX);
let store = Arc::new(integration) as Arc<dyn ObjectStore>;
let original_stream = store.get(&path).await?;

//convert original_stream to compressed_stream for next step
let compressed_stream = file_compression_type
.to_owned()
.convert_to_compress_stream(
original_stream.into_stream().map_err(DataFusionError::from),
)
.unwrap();

//prepare expected schema for assert_eq
let expected = Schema::new(vec![
Field::new("c1", DataType::Utf8, true),
Field::new("c2", DataType::Int64, true),
Field::new("c3", DataType::Int64, true),
Field::new("c4", DataType::Int64, true),
Field::new("c5", DataType::Int64, true),
Field::new("c6", DataType::Int64, true),
Field::new("c7", DataType::Int64, true),
Field::new("c8", DataType::Int64, true),
Field::new("c9", DataType::Int64, true),
Field::new("c10", DataType::Int64, true),
Field::new("c11", DataType::Float64, true),
Field::new("c12", DataType::Float64, true),
Field::new("c13", DataType::Utf8, true),
]);

let compressed_csv =
csv.with_file_compression_type(file_compression_type.clone());

//convert compressed_stream to decoded_stream
let decoded_stream = compressed_csv
.read_to_delimited_chunks(compressed_stream)
.await;
let (schema, records_read) = compressed_csv
.infer_schema_from_stream(records_to_read, decoded_stream)
.await?;

assert_eq!(expected, schema);
assert_eq!(100, records_read);
Ok(())
}

#[tokio::test]
async fn query_compress_csv() -> Result<()> {
let ctx = SessionContext::new();

let csv_options = CsvReadOptions::default()
.has_header(true)
.file_compression_type(FileCompressionType::GZIP)
.file_extension("csv.gz");
let df = ctx
.read_csv(
&format!("{}/csv/aggregate_test_100.csv.gz", arrow_test_data()),
csv_options,
)
.await?;

let record_batch = df
.filter(col("c1").eq(lit("a")).and(col("c2").gt(lit("4"))))?
.select_columns(&["c2", "c3"])?
.collect()
.await?;
#[rustfmt::skip]
let expected = vec![
"+----+------+",
"| c2 | c3 |",
"+----+------+",
"| 5 | 36 |",
"| 5 | -31 |",
"| 5 | -101 |",
"+----+------+",
];
assert_batches_eq!(expected, &record_batch);
Ok(())
}

async fn get_exec(
state: &SessionState,
file_name: &str,
Expand Down
60 changes: 58 additions & 2 deletions datafusion/core/src/datasource/file_format/file_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,19 @@ use crate::datasource::file_format::json::DEFAULT_JSON_EXTENSION;
use crate::datasource::file_format::parquet::DEFAULT_PARQUET_EXTENSION;
#[cfg(feature = "compression")]
use async_compression::tokio::bufread::{
BzDecoder as AsyncBzDecoder, GzipDecoder as AsyncGzDecoder,
XzDecoder as AsyncXzDecoder, ZstdDecoder as AsyncZstdDecoer,
BzDecoder as AsyncBzDecoder, BzEncoder as AsyncBzEncoder,
GzipDecoder as AsyncGzDecoder, GzipEncoder as AsyncGzEncoder,
XzDecoder as AsyncXzDecoder, XzEncoder as AsyncXzEncoder,
ZstdDecoder as AsyncZstdDecoer, ZstdEncoder as AsyncZstdEncoer,
};

use bytes::Bytes;
#[cfg(feature = "compression")]
use bzip2::read::MultiBzDecoder;
use datafusion_common::parsers::CompressionTypeVariant;
#[cfg(feature = "compression")]
use flate2::read::MultiGzDecoder;

use futures::Stream;
#[cfg(feature = "compression")]
use futures::TryStreamExt;
Expand Down Expand Up @@ -111,6 +115,58 @@ impl FileCompressionType {
self.variant.is_compressed()
}

/// Given a `Stream`, create a `Stream` which data are decompressed with `FileCompressionType`.
pub fn convert_to_compress_stream<
T: Stream<Item = Result<Bytes>> + Unpin + Send + 'static,
>(
&self,
s: T,
) -> Result<Box<dyn Stream<Item = Result<Bytes>> + Send + Unpin>> {
#[cfg(feature = "compression")]
let err_converter = |e: std::io::Error| match e
.get_ref()
.and_then(|e| e.downcast_ref::<DataFusionError>())
{
Some(_) => {
*(e.into_inner()
.unwrap()
.downcast::<DataFusionError>()
.unwrap())
}
None => Into::<DataFusionError>::into(e),
};

Ok(match self.variant {
#[cfg(feature = "compression")]
GZIP => Box::new(
ReaderStream::new(AsyncGzEncoder::new(StreamReader::new(s)))
.map_err(err_converter),
),
#[cfg(feature = "compression")]
BZIP2 => Box::new(
ReaderStream::new(AsyncBzEncoder::new(StreamReader::new(s)))
.map_err(err_converter),
),
#[cfg(feature = "compression")]
XZ => Box::new(
ReaderStream::new(AsyncXzEncoder::new(StreamReader::new(s)))
.map_err(err_converter),
),
#[cfg(feature = "compression")]
ZSTD => Box::new(
ReaderStream::new(AsyncZstdEncoer::new(StreamReader::new(s)))
.map_err(err_converter),
),
#[cfg(not(feature = "compression"))]
GZIP | BZIP2 | XZ | ZSTD => {
return Err(DataFusionError::NotImplemented(
"Compression feature is not enabled".to_owned(),
))
}
UNCOMPRESSED => Box::new(s),
})
}

/// Given a `Stream`, create a `Stream` which data are decompressed with `FileCompressionType`.
pub fn convert_stream<T: Stream<Item = Result<Bytes>> + Unpin + Send + 'static>(
&self,
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/physical_plan/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ mod tests {
use crate::prelude::*;
use crate::test::{partitioned_csv_config, partitioned_file_groups};
use crate::test_util::{aggr_test_schema_with_missing_col, arrow_test_data};
use crate::{scalar::ScalarValue, test_util::aggr_test_schema};
use crate::{assert_batches_eq, scalar::ScalarValue, test_util::aggr_test_schema};
use arrow::datatypes::*;
use futures::StreamExt;
use object_store::local::LocalFileSystem;
Expand Down

0 comments on commit 2e62a39

Please sign in to comment.