diff --git a/Cargo.toml b/Cargo.toml index 0d120c8db58..26dd1acc388 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -74,8 +74,7 @@ futures = { version = "0.3", optional = true } async-stream = { version = "0.3.2", optional = true } # parquet support -#parquet2 = { version = "0.14.0", optional = true, default_features = false } -parquet2 = { git = "https://github.com/jorgecarleitao/parquet2", branch = "delay_dict", optional = true, default_features = false } +parquet2 = { version = "0.15.0", optional = true, default_features = false, features = ["async"] } # avro support avro-schema = { version = "0.3", optional = true } diff --git a/benches/read_parquet.rs b/benches/read_parquet.rs index 51fb0fc5cd8..cce4875a4de 100644 --- a/benches/read_parquet.rs +++ b/benches/read_parquet.rs @@ -59,7 +59,7 @@ fn add_benchmark(c: &mut Criterion) { let buffer = to_buffer(size, true, true, false, false); let a = format!("read ts dict 2^{}", i); - c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 11).unwrap())); + c.bench_function(&a, |b| b.iter(|| read_chunk(&buffer, size, 11).unwrap())); let a = format!("read utf8 2^{}", i); c.bench_function(&a, |b| b.iter(|| read_chunk(&buffer, size, 2).unwrap())); diff --git a/src/io/parquet/read/deserialize/mod.rs b/src/io/parquet/read/deserialize/mod.rs index d86b3d4615d..b16d1a6e83d 100644 --- a/src/io/parquet/read/deserialize/mod.rs +++ b/src/io/parquet/read/deserialize/mod.rs @@ -31,12 +31,14 @@ pub fn get_page_iterator( reader: R, pages_filter: Option, buffer: Vec, + max_header_size: usize, ) -> Result> { Ok(_get_page_iterator( column_metadata, reader, pages_filter, buffer, + max_header_size, )?) } diff --git a/src/io/parquet/read/row_group.rs b/src/io/parquet/read/row_group.rs index 59fba886a65..d2e7f0059d3 100644 --- a/src/io/parquet/read/row_group.rs +++ b/src/io/parquet/read/row_group.rs @@ -180,11 +180,13 @@ pub fn to_deserializer<'a>( let (columns, types): (Vec<_>, Vec<_>) = columns .into_iter() .map(|(column_meta, chunk)| { + let len = chunk.len(); let pages = PageReader::new( std::io::Cursor::new(chunk), column_meta, std::sync::Arc::new(|_, _| true), vec![], + len * 2 + 1024, ); ( BasicDecompressor::new(pages, vec![]), diff --git a/src/io/parquet/write/mod.rs b/src/io/parquet/write/mod.rs index 9f16ea9fb01..3b8f48e9938 100644 --- a/src/io/parquet/write/mod.rs +++ b/src/io/parquet/write/mod.rs @@ -21,7 +21,7 @@ use crate::types::NativeType; use parquet2::schema::types::PrimitiveType as ParquetPrimitiveType; pub use parquet2::{ - compression::{BrotliLevel, CompressionLevel, CompressionOptions, GzipLevel, ZstdLevel}, + compression::{BrotliLevel, CompressionOptions, GzipLevel, ZstdLevel}, encoding::Encoding, fallible_streaming_iterator, metadata::{Descriptor, FileMetaData, KeyValue, SchemaDescriptor, ThriftFileMetaData},