From a9d0bd2a3c40da89152214e4e41a6931d269a5c3 Mon Sep 17 00:00:00 2001 From: Jiayu Liu Date: Fri, 28 Oct 2022 11:06:29 +0800 Subject: [PATCH] add support for xz file compression --- datafusion-cli/Cargo.lock | 102 +++++++++++------- datafusion/core/Cargo.toml | 10 +- .../core/src/datasource/file_format/csv.rs | 2 +- .../src/datasource/file_format/file_type.rs | 88 ++++++++++++--- .../core/src/datasource/file_format/json.rs | 4 +- .../core/src/physical_plan/file_format/csv.rs | 19 ++-- .../src/physical_plan/file_format/json.rs | 16 +-- datafusion/core/src/test/mod.rs | 16 +++ datafusion/expr/src/logical_plan/plan.rs | 2 +- datafusion/sql/src/parser.rs | 5 +- 10 files changed, 186 insertions(+), 78 deletions(-) diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index 3eff2be29b4e..52888d07e044 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -23,9 +23,9 @@ dependencies = [ [[package]] name = "aho-corasick" -version = "0.7.19" +version = "0.7.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b4f55bd91a0978cbfd91c457a164bab8b4001c833b7f323132c0a4e1922dd44e" +checksum = "cc936419f96fa211c1b9166887b38e5e40b19958e5b895be7c1f93adec7071ac" dependencies = [ "memchr", ] @@ -228,6 +228,7 @@ dependencies = [ "memchr", "pin-project-lite", "tokio", + "xz2", ] [[package]] @@ -281,9 +282,9 @@ dependencies = [ [[package]] name = "blake3" -version = "1.3.1" +version = "1.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a08e53fc5a564bb15bfe6fae56bd71522205f1f91893f9c0116edad6496c183f" +checksum = "895adc16c8b3273fbbc32685a7d55227705eda08c01e77704020f3491924b44b" dependencies = [ "arrayref", "arrayvec", @@ -349,9 +350,9 @@ checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" [[package]] name = "bytes" -version = "1.2.1" +version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec8a7b6a70fde80372154c65702f00a0f56f3e1c36abbc6c440484be248856db" +checksum = "dfb24e866b15a1af2a1b663f10c6b6b8f397a84aadb828f12e5b289ec23a3a3c" [[package]] name = "bzip2" @@ -376,9 +377,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.0.76" +version = "1.0.77" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "76a284da2e6fe2092f2353e51713435363112dfd60030e22add80be333fb928f" +checksum = "e9f73505338f7d905b19d18738976aae232eb46b8efc15554ffc56deb5d9ebe4" dependencies = [ "jobserver", ] @@ -464,9 +465,9 @@ dependencies = [ [[package]] name = "comfy-table" -version = "6.1.2" +version = "6.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1090f39f45786ec6dc6286f8ea9c75d0a7ef0a0d3cda674cef0c3af7b307fbc2" +checksum = "e621e7e86c46fd8a14c32c6ae3cb95656621b4743a27d0cffedb831d46e7ad21" dependencies = [ "strum", "strum_macros", @@ -497,9 +498,9 @@ dependencies = [ [[package]] name = "constant_time_eq" -version = "0.1.5" +version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "245097e9a4535ee1e3e3931fcfcd55a796a44c643e8596ff6566d68f09b87bbc" +checksum = "f3ad85c1f65dc7b37604eb0e89748faf0b9653065f2a8ef69f96a687ec1e9279" [[package]] name = "core-foundation-sys" @@ -565,9 +566,9 @@ dependencies = [ [[package]] name = "cxx" -version = "1.0.81" +version = "1.0.82" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97abf9f0eca9e52b7f81b945524e76710e6cb2366aead23b7d4fbf72e281f888" +checksum = "d4a41a86530d0fe7f5d9ea779916b7cadd2d4f9add748b99c2c029cbbdfaf453" dependencies = [ "cc", "cxxbridge-flags", @@ -577,9 +578,9 @@ dependencies = [ [[package]] name = "cxx-build" -version = "1.0.81" +version = "1.0.82" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7cc32cc5fea1d894b77d269ddb9f192110069a8a9c1f1d441195fba90553dea3" +checksum = "06416d667ff3e3ad2df1cd8cd8afae5da26cf9cec4d0825040f88b5ca659a2f0" dependencies = [ "cc", "codespan-reporting", @@ -592,15 +593,15 @@ dependencies = [ [[package]] name = "cxxbridge-flags" -version = "1.0.81" +version = "1.0.82" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8ca220e4794c934dc6b1207c3b42856ad4c302f2df1712e9f8d2eec5afaacf1f" +checksum = "820a9a2af1669deeef27cb271f476ffd196a2c4b6731336011e0ba63e2c7cf71" [[package]] name = "cxxbridge-macro" -version = "1.0.81" +version = "1.0.82" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b846f081361125bfc8dc9d3940c84e1fd83ba54bbca7b17cd29483c828be0704" +checksum = "a08a6e2fcc370a089ad3b4aaf54db3b1b4cee38ddabce5896b33eb693275f470" dependencies = [ "proc-macro2", "quote", @@ -661,6 +662,7 @@ dependencies = [ "tokio-util", "url", "uuid", + "xz2", ] [[package]] @@ -918,9 +920,9 @@ dependencies = [ [[package]] name = "flate2" -version = "1.0.24" +version = "1.0.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f82b0f4c27ad9f8bfd1f3208d882da2b09c301bc1c828fd3a00d0216d2fbbff6" +checksum = "a8a2db397cb1c8772f31494cb8917e48cd1e64f0fa7efac59fbd741a0a8ce841" dependencies = [ "crc32fast", "miniz_oxide", @@ -1229,9 +1231,9 @@ dependencies = [ [[package]] name = "indexmap" -version = "1.9.1" +version = "1.9.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "10a35a97730320ffe8e2d410b5d3b69279b98d2c14bdb8b70ea89ecf7888d41e" +checksum = "1885e79c1fc4b10f0e172c475f458b7f7b93061064d98c3293e98c5ba0c8b399" dependencies = [ "autocfg", "hashbrown 0.12.3", @@ -1410,9 +1412,9 @@ dependencies = [ [[package]] name = "linux-raw-sys" -version = "0.1.2" +version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bb68f22743a3fb35785f1e7f844ca5a3de2dde5bd0c0ef5b372065814699b121" +checksum = "8f9f08d8963a6c613f4b1a78f4f4a4dbfadf8e6545b2d72861731e4858b8b47f" [[package]] name = "lock_api" @@ -1453,6 +1455,17 @@ dependencies = [ "libc", ] +[[package]] +name = "lzma-sys" +version = "0.1.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5fda04ab3764e6cde78b9974eec4f779acaba7c4e84b36eca3cf77c581b85d27" +dependencies = [ + "cc", + "libc", + "pkg-config", +] + [[package]] name = "md-5" version = "0.10.5" @@ -1485,9 +1498,9 @@ checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d" [[package]] name = "miniz_oxide" -version = "0.5.4" +version = "0.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "96590ba8f175222643a85693f33d26e9c8a015f599c216509b1a6894af675d34" +checksum = "b275950c28b37e794e8c55d88aeb5e139d0ce23fdbbeda68f8d7174abdf9e8fa" dependencies = [ "adler", ] @@ -1676,9 +1689,9 @@ dependencies = [ [[package]] name = "os_str_bytes" -version = "6.4.0" +version = "6.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b5bf27447411e9ee3ff51186bf7a08e16c341efdde93f4d823e8844429bed7e" +checksum = "9b7820b9daea5457c9f21c69448905d723fbd21136ccf521748f23fd49e723ee" [[package]] name = "parking_lot" @@ -1977,9 +1990,9 @@ dependencies = [ [[package]] name = "rustix" -version = "0.36.1" +version = "0.36.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "812a2ec2043c4d6bc6482f5be2ab8244613cac2493d128d36c0759e52a626ab3" +checksum = "0b1fbb4dfc4eb1d390c02df47760bb19a84bb80b301ecc947ab5406394d8223e" dependencies = [ "bitflags", "errno", @@ -2104,9 +2117,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.87" +version = "1.0.89" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6ce777b7b150d76b9cf60d28b55f5847135a003f7d7350c6be7a773508ce7d45" +checksum = "020ff22c755c2ed3f8cf162dbb41a7268d934702f3ed3631656ea597e08fc3db" dependencies = [ "itoa 1.0.4", "ryu", @@ -2175,9 +2188,9 @@ dependencies = [ [[package]] name = "snap" -version = "1.0.5" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "45456094d1983e2ee2a18fdfebce3189fa451699d0502cb8e3b49dba5ba41451" +checksum = "5e9f0ab6ef7eb7353d9119c170a436d1bf248eea575ac42d19d12f4e34130831" [[package]] name = "socket2" @@ -2344,9 +2357,9 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c" [[package]] name = "tokio" -version = "1.21.2" +version = "1.22.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a9e03c497dc955702ba729190dc4aac6f2a0ce97f913e5b1b5912fc5039d9099" +checksum = "d76ce4a75fb488c605c54bf610f221cea8b0dafb53333c1a67e8ee199dcd2ae3" dependencies = [ "autocfg", "bytes", @@ -2748,6 +2761,15 @@ dependencies = [ "winapi", ] +[[package]] +name = "xz2" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "388c44dc09d76f1536602ead6d325eb532f5c122f17782bd57fb47baeeb767e2" +dependencies = [ + "lzma-sys", +] + [[package]] name = "zstd" version = "0.11.2+zstd.1.5.2" @@ -2769,9 +2791,9 @@ dependencies = [ [[package]] name = "zstd-sys" -version = "2.0.1+zstd.1.5.2" +version = "2.0.3+zstd.1.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9fd07cbbc53846d9145dbffdf6dd09a7a0aa52be46741825f5c97bdd4f73f12b" +checksum = "44ccf97612ac95f3ccb89b2d7346b345e52f1c3019be4984f0455fb4ba991f8a" dependencies = [ "cc", "libc", diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index 37629643529e..dc460b1206ba 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -41,12 +41,13 @@ path = "src/lib.rs" # Used to enable the avro format avro = ["apache-avro", "num-traits", "datafusion-common/avro"] crypto_expressions = ["datafusion-physical-expr/crypto_expressions"] -default = ["crypto_expressions", "regex_expressions", "unicode_expressions"] +default = ["crypto_expressions", "regex_expressions", "unicode_expressions", "compression"] # Used for testing ONLY: causes all values to hash to the same value (test for collisions) force_hash_collisions = [] # Used to enable JIT code generation jit = ["datafusion-jit", "datafusion-row/jit"] pyarrow = ["pyo3", "arrow/pyarrow", "datafusion-common/pyarrow"] +compression = ["xz2", "bzip2", "flate2", "async-compression"] regex_expressions = ["datafusion-physical-expr/regex_expressions"] # Used to enable scheduler scheduler = ["rayon"] @@ -57,10 +58,10 @@ unicode_expressions = ["datafusion-physical-expr/regex_expressions", "datafusion ahash = { version = "0.8", default-features = false, features = ["runtime-rng"] } apache-avro = { version = "0.14", optional = true } arrow = { version = "27.0.0", features = ["prettyprint"] } -async-compression = { version = "0.3.14", features = ["bzip2", "gzip", "futures-io", "tokio"] } +async-compression = { version = "0.3.14", features = ["bzip2", "gzip", "xz", "futures-io", "tokio"], optional = true } async-trait = "0.1.41" bytes = "1.1" -bzip2 = "0.4.3" +bzip2 = { version = "0.4.3", optional = true } chrono = { version = "0.4.23", default-features = false } dashmap = "5.4.0" datafusion-common = { path = "../common", version = "14.0.0", features = ["parquet", "object_store"] } @@ -70,7 +71,7 @@ datafusion-optimizer = { path = "../optimizer", version = "14.0.0" } datafusion-physical-expr = { path = "../physical-expr", version = "14.0.0" } datafusion-row = { path = "../row", version = "14.0.0" } datafusion-sql = { path = "../sql", version = "14.0.0" } -flate2 = "1.0.24" +flate2 = { version = "1.0.24", optional = true } futures = "0.3" glob = "0.3.0" hashbrown = { version = "0.13", features = ["raw"] } @@ -96,6 +97,7 @@ tokio-stream = "0.1" tokio-util = { version = "0.7.4", features = ["io"] } url = "2.2" uuid = { version = "1.0", features = ["v4"] } +xz2 = { version = "0.1", optional = true } [dev-dependencies] arrow = { version = "27.0.0", features = ["prettyprint", "dyn_cmp_dict"] } diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs index 044cdf30df63..d44f6ca51d65 100644 --- a/datafusion/core/src/datasource/file_format/csv.rs +++ b/datafusion/core/src/datasource/file_format/csv.rs @@ -127,7 +127,7 @@ impl FileFormat for CsvFormat { .await .map_err(|e| DataFusionError::External(Box::new(e)))?; - let decoder = self.file_compression_type.convert_read(data.reader()); + let decoder = self.file_compression_type.convert_read(data.reader())?; let (schema, records_read) = arrow::csv::reader::infer_reader_schema( decoder, self.delimiter, diff --git a/datafusion/core/src/datasource/file_format/file_type.rs b/datafusion/core/src/datasource/file_format/file_type.rs index 78f6dbc9aed7..5a7b3c348eab 100644 --- a/datafusion/core/src/datasource/file_format/file_type.rs +++ b/datafusion/core/src/datasource/file_format/file_type.rs @@ -18,22 +18,29 @@ //! File type abstraction use crate::error::{DataFusionError, Result}; -use std::io::Error; - -use async_compression::tokio::bufread::{ - BzDecoder as AsyncBzDecoder, GzipDecoder as AsyncGzDecoder, -}; -use bzip2::read::BzDecoder; use crate::datasource::file_format::avro::DEFAULT_AVRO_EXTENSION; use crate::datasource::file_format::csv::DEFAULT_CSV_EXTENSION; use crate::datasource::file_format::json::DEFAULT_JSON_EXTENSION; use crate::datasource::file_format::parquet::DEFAULT_PARQUET_EXTENSION; +#[cfg(feature = "compression")] +use async_compression::tokio::bufread::{ + BzDecoder as AsyncBzDecoder, GzipDecoder as AsyncGzDecoder, + XzDecoder as AsyncXzDecoder, +}; use bytes::Bytes; +#[cfg(feature = "compression")] +use bzip2::read::BzDecoder; +#[cfg(feature = "compression")] use flate2::read::GzDecoder; -use futures::{Stream, TryStreamExt}; +use futures::Stream; +#[cfg(feature = "compression")] +use futures::TryStreamExt; use std::str::FromStr; +#[cfg(feature = "compression")] use tokio_util::io::{ReaderStream, StreamReader}; +#[cfg(feature = "compression")] +use xz2::read::XzDecoder; /// Define each `FileType`/`FileCompressionType`'s extension pub trait GetExt { @@ -48,6 +55,8 @@ pub enum FileCompressionType { GZIP, /// Bzip2-ed file BZIP2, + /// Xz-ed file (liblzma) + XZ, /// Uncompressed file UNCOMPRESSED, } @@ -57,6 +66,7 @@ impl GetExt for FileCompressionType { match self { FileCompressionType::GZIP => ".gz".to_owned(), FileCompressionType::BZIP2 => ".bz2".to_owned(), + FileCompressionType::XZ => ".xz".to_owned(), FileCompressionType::UNCOMPRESSED => "".to_owned(), } } @@ -70,6 +80,7 @@ impl FromStr for FileCompressionType { match s.as_str() { "GZIP" | "GZ" => Ok(FileCompressionType::GZIP), "BZIP2" | "BZ2" => Ok(FileCompressionType::BZIP2), + "XZ" => Ok(FileCompressionType::XZ), "" => Ok(FileCompressionType::UNCOMPRESSED), _ => Err(DataFusionError::NotImplemented(format!( "Unknown FileCompressionType: {}", @@ -85,8 +96,9 @@ impl FileCompressionType { pub fn convert_stream> + Unpin + Send + 'static>( &self, s: T, - ) -> Box> + Send + Unpin> { - let err_converter = |e: Error| match e + ) -> Result> + Send + Unpin>> { + #[cfg(feature = "compression")] + let err_converter = |e: std::io::Error| match e .get_ref() .and_then(|e| e.downcast_ref::()) { @@ -99,29 +111,56 @@ impl FileCompressionType { None => Into::::into(e), }; - match self { + Ok(match self { + #[cfg(feature = "compression")] FileCompressionType::GZIP => Box::new( ReaderStream::new(AsyncGzDecoder::new(StreamReader::new(s))) .map_err(err_converter), ), + #[cfg(feature = "compression")] FileCompressionType::BZIP2 => Box::new( ReaderStream::new(AsyncBzDecoder::new(StreamReader::new(s))) .map_err(err_converter), ), + #[cfg(feature = "compression")] + FileCompressionType::XZ => Box::new( + ReaderStream::new(AsyncXzDecoder::new(StreamReader::new(s))) + .map_err(err_converter), + ), + #[cfg(not(feature = "compression"))] + FileCompressionType::GZIP + | FileCompressionType::BZIP2 + | FileCompressionType::XZ => { + return Err(DataFusionError::NotImplemented( + "Compression feature is not enabled".to_owned(), + )) + } FileCompressionType::UNCOMPRESSED => Box::new(s), - } + }) } /// Given a `Read`, create a `Read` which data are decompressed with `FileCompressionType`. pub fn convert_read( &self, r: T, - ) -> Box { - match self { + ) -> Result> { + Ok(match self { + #[cfg(feature = "compression")] FileCompressionType::GZIP => Box::new(GzDecoder::new(r)), + #[cfg(feature = "compression")] FileCompressionType::BZIP2 => Box::new(BzDecoder::new(r)), + #[cfg(feature = "compression")] + FileCompressionType::XZ => Box::new(XzDecoder::new(r)), + #[cfg(not(feature = "compression"))] + FileCompressionType::GZIP + | FileCompressionType::BZIP2 + | FileCompressionType::XZ => { + return Err(DataFusionError::NotImplemented( + "Compression feature is not enabled".to_owned(), + )) + } FileCompressionType::UNCOMPRESSED => Box::new(r), - } + }) } } @@ -205,6 +244,12 @@ mod tests { .unwrap(), ".csv.gz" ); + assert_eq!( + file_type + .get_ext_with_compression(FileCompressionType::XZ) + .unwrap(), + ".csv.xz" + ); assert_eq!( file_type .get_ext_with_compression(FileCompressionType::BZIP2) @@ -225,6 +270,12 @@ mod tests { .unwrap(), ".json.gz" ); + assert_eq!( + file_type + .get_ext_with_compression(FileCompressionType::XZ) + .unwrap(), + ".json.xz" + ); assert_eq!( file_type .get_ext_with_compression(FileCompressionType::BZIP2) @@ -300,7 +351,14 @@ mod tests { FileCompressionType::from_str("GZIP").unwrap(), FileCompressionType::GZIP ); - + assert_eq!( + FileCompressionType::from_str("xz").unwrap(), + FileCompressionType::XZ + ); + assert_eq!( + FileCompressionType::from_str("XZ").unwrap(), + FileCompressionType::XZ + ); assert_eq!( FileCompressionType::from_str("bz2").unwrap(), FileCompressionType::BZIP2 diff --git a/datafusion/core/src/datasource/file_format/json.rs b/datafusion/core/src/datasource/file_format/json.rs index 626331ab123d..08bb2adece85 100644 --- a/datafusion/core/src/datasource/file_format/json.rs +++ b/datafusion/core/src/datasource/file_format/json.rs @@ -103,14 +103,14 @@ impl FileFormat for JsonFormat { let schema = match store.get(&object.location).await? { GetResult::File(file, _) => { - let decoder = file_compression_type.convert_read(file); + let decoder = file_compression_type.convert_read(file)?; let mut reader = BufReader::new(decoder); let iter = ValueIter::new(&mut reader, None); infer_json_schema_from_iterator(iter.take_while(|_| take_while()))? } r @ GetResult::Stream(_) => { let data = r.bytes().await?; - let decoder = file_compression_type.convert_read(data.reader()); + let decoder = file_compression_type.convert_read(data.reader())?; let mut reader = BufReader::new(decoder); let iter = ValueIter::new(&mut reader, None); infer_json_schema_from_iterator(iter.take_while(|_| take_while()))? diff --git a/datafusion/core/src/physical_plan/file_format/csv.rs b/datafusion/core/src/physical_plan/file_format/csv.rs index 4547703496c3..752cfdc26649 100644 --- a/datafusion/core/src/physical_plan/file_format/csv.rs +++ b/datafusion/core/src/physical_plan/file_format/csv.rs @@ -218,13 +218,13 @@ impl FileOpener for CsvOpener { Ok(Box::pin(async move { match store.get(file_meta.location()).await? { GetResult::File(file, _) => { - let decoder = file_compression_type.convert_read(file); + let decoder = file_compression_type.convert_read(file)?; Ok(futures::stream::iter(config.open(decoder, true)).boxed()) } GetResult::Stream(s) => { let mut first_chunk = true; let s = s.map_err(Into::::into); - let decoder = file_compression_type.convert_stream(s); + let decoder = file_compression_type.convert_stream(s)?; Ok(newline_delimited_stream(decoder) .map_ok(move |bytes| { let reader = config.open(bytes.reader(), first_chunk); @@ -304,7 +304,8 @@ mod tests { file_compression_type, case(FileCompressionType::UNCOMPRESSED), case(FileCompressionType::GZIP), - case(FileCompressionType::BZIP2) + case(FileCompressionType::BZIP2), + case(FileCompressionType::XZ) )] #[tokio::test] async fn csv_exec_with_projection( @@ -358,7 +359,8 @@ mod tests { file_compression_type, case(FileCompressionType::UNCOMPRESSED), case(FileCompressionType::GZIP), - case(FileCompressionType::BZIP2) + case(FileCompressionType::BZIP2), + case(FileCompressionType::XZ) )] #[tokio::test] async fn csv_exec_with_limit( @@ -412,7 +414,8 @@ mod tests { file_compression_type, case(FileCompressionType::UNCOMPRESSED), case(FileCompressionType::GZIP), - case(FileCompressionType::BZIP2) + case(FileCompressionType::BZIP2), + case(FileCompressionType::XZ) )] #[tokio::test] async fn csv_exec_with_missing_column( @@ -466,7 +469,8 @@ mod tests { file_compression_type, case(FileCompressionType::UNCOMPRESSED), case(FileCompressionType::GZIP), - case(FileCompressionType::BZIP2) + case(FileCompressionType::BZIP2), + case(FileCompressionType::XZ) )] #[tokio::test] async fn csv_exec_with_partition( @@ -559,7 +563,8 @@ mod tests { file_compression_type, case(FileCompressionType::UNCOMPRESSED), case(FileCompressionType::GZIP), - case(FileCompressionType::BZIP2) + case(FileCompressionType::BZIP2), + case(FileCompressionType::XZ) )] #[tokio::test] async fn test_chunked(file_compression_type: FileCompressionType) { diff --git a/datafusion/core/src/physical_plan/file_format/json.rs b/datafusion/core/src/physical_plan/file_format/json.rs index a2eb549205ad..f301a7fdb6a3 100644 --- a/datafusion/core/src/physical_plan/file_format/json.rs +++ b/datafusion/core/src/physical_plan/file_format/json.rs @@ -176,13 +176,13 @@ impl FileOpener for JsonOpener { Ok(Box::pin(async move { match store.get(file_meta.location()).await? { GetResult::File(file, _) => { - let decoder = file_compression_type.convert_read(file); + let decoder = file_compression_type.convert_read(file)?; let reader = json::Reader::new(decoder, schema.clone(), options); Ok(futures::stream::iter(reader).boxed()) } GetResult::Stream(s) => { let s = s.map_err(Into::into); - let decoder = file_compression_type.convert_stream(s); + let decoder = file_compression_type.convert_stream(s)?; Ok(newline_delimited_stream(decoder) .map_ok(move |bytes| { @@ -305,7 +305,8 @@ mod tests { file_compression_type, case(FileCompressionType::UNCOMPRESSED), case(FileCompressionType::GZIP), - case(FileCompressionType::BZIP2) + case(FileCompressionType::BZIP2), + case(FileCompressionType::XZ) )] #[tokio::test] async fn nd_json_exec_file_without_projection( @@ -377,7 +378,8 @@ mod tests { file_compression_type, case(FileCompressionType::UNCOMPRESSED), case(FileCompressionType::GZIP), - case(FileCompressionType::BZIP2) + case(FileCompressionType::BZIP2), + case(FileCompressionType::XZ) )] #[tokio::test] async fn nd_json_exec_file_with_missing_column( @@ -431,7 +433,8 @@ mod tests { file_compression_type, case(FileCompressionType::UNCOMPRESSED), case(FileCompressionType::GZIP), - case(FileCompressionType::BZIP2) + case(FileCompressionType::BZIP2), + case(FileCompressionType::XZ) )] #[tokio::test] async fn nd_json_exec_file_projection( @@ -531,7 +534,8 @@ mod tests { file_compression_type, case(FileCompressionType::UNCOMPRESSED), case(FileCompressionType::GZIP), - case(FileCompressionType::BZIP2) + case(FileCompressionType::BZIP2), + case(FileCompressionType::XZ) )] #[tokio::test] async fn test_chunked(file_compression_type: FileCompressionType) { diff --git a/datafusion/core/src/test/mod.rs b/datafusion/core/src/test/mod.rs index e36a46b6dd33..837610deb08f 100644 --- a/datafusion/core/src/test/mod.rs +++ b/datafusion/core/src/test/mod.rs @@ -33,9 +33,13 @@ use array::ArrayRef; use arrow::array::{self, Array, Decimal128Builder, Int32Array}; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; +#[cfg(feature = "compression")] use bzip2::write::BzEncoder; +#[cfg(feature = "compression")] use bzip2::Compression as BzCompression; +#[cfg(feature = "compression")] use flate2::write::GzEncoder; +#[cfg(feature = "compression")] use flate2::Compression as GzCompression; use futures::{Future, FutureExt}; use std::fs::File; @@ -44,6 +48,8 @@ use std::io::{BufReader, BufWriter}; use std::pin::Pin; use std::sync::Arc; use tempfile::TempDir; +#[cfg(feature = "compression")] +use xz2::write::XzEncoder; pub fn create_table_dual() -> Arc { let dual_schema = Arc::new(Schema::new(vec![ @@ -112,12 +118,22 @@ pub fn partitioned_file_groups( let encoder: Box = match file_compression_type.to_owned() { FileCompressionType::UNCOMPRESSED => Box::new(file), + #[cfg(feature = "compression")] FileCompressionType::GZIP => { Box::new(GzEncoder::new(file, GzCompression::default())) } + #[cfg(feature = "compression")] + FileCompressionType::XZ => Box::new(XzEncoder::new(file, 9)), + #[cfg(feature = "compression")] FileCompressionType::BZIP2 => { Box::new(BzEncoder::new(file, BzCompression::default())) } + #[cfg(not(feature = "compression"))] + FileCompressionType::GZIP + | FileCompressionType::BZIP2 + | FileCompressionType::XZ => { + panic!("GZIP compression is not supported in this build") + } }; let writer = BufWriter::new(encoder); diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 68bfaa378a0c..d41ad797c333 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -1364,7 +1364,7 @@ pub struct CreateExternalTable { pub if_not_exists: bool, /// SQL used to create the table, if available pub definition: Option, - /// File compression type (GZIP, BZIP2) + /// File compression type (GZIP, BZIP2, XZ) pub file_compression_type: String, /// Table(provider) specific options pub options: HashMap, diff --git a/datafusion/sql/src/parser.rs b/datafusion/sql/src/parser.rs index bfbe628f4d31..4744417f6225 100644 --- a/datafusion/sql/src/parser.rs +++ b/datafusion/sql/src/parser.rs @@ -65,7 +65,7 @@ pub struct CreateExternalTable { pub table_partition_cols: Vec, /// Option to not error if table already exists pub if_not_exists: bool, - /// File compression type (GZIP, BZIP2) + /// File compression type (GZIP, BZIP2, XZ) pub file_compression_type: String, /// Table(provider) specific options pub options: HashMap, @@ -386,7 +386,7 @@ impl<'a> DFParser<'a> { fn parse_file_compression_type(&mut self) -> Result { match self.parser.next_token() { Token::Word(w) => parse_file_compression_type(&w.value), - unexpected => self.expected("one of GZIP, BZIP2", unexpected), + unexpected => self.expected("one of GZIP, BZIP2, XZ", unexpected), } } @@ -585,6 +585,7 @@ mod tests { let sqls = vec![ ("CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV COMPRESSION TYPE GZIP LOCATION 'foo.csv'", "GZIP"), ("CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV COMPRESSION TYPE BZIP2 LOCATION 'foo.csv'", "BZIP2"), + ("CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV COMPRESSION TYPE XZ LOCATION 'foo.csv'", "XZ"), ]; for (sql, file_compression_type) in sqls { let expected = Statement::CreateExternalTable(CreateExternalTable {