From c957ce11d7288672ad86f81b0de16614dd2956eb Mon Sep 17 00:00:00 2001 From: youngsofun Date: Wed, 23 Feb 2022 11:46:39 +0800 Subject: [PATCH] bump arrow2 to main --- Cargo.lock | 9 +- common/arrow/Cargo.toml | 5 +- common/arrow/src/lib.rs | 5 ++ common/arrow/src/parquet_read.rs | 81 ++++++++++++++++++ common/arrow/src/parquet_write.rs | 46 +++++++++++ common/streams/src/sources/source_parquet.rs | 49 ++++++----- common/streams/tests/it/source.rs | 13 +-- query/src/storages/fuse/io/block_reader.rs | 87 +++++++------------- query/src/storages/fuse/io/block_writer.rs | 22 ++--- query/tests/it/tests/parquet.rs | 13 +-- 10 files changed, 201 insertions(+), 129 deletions(-) create mode 100644 common/arrow/src/parquet_read.rs create mode 100644 common/arrow/src/parquet_write.rs diff --git a/Cargo.lock b/Cargo.lock index d57febf19f6e7..d88448c3f8079 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -118,8 +118,7 @@ dependencies = [ [[package]] name = "arrow2" version = "0.9.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aea88c49c98db9de8c72ccaa0470182857e70faa635f32fc4aa3c9e1a1dfefea" +source = "git+https://github.com/datafuse-extras/arrow2?rev=9e6924b#9e6924b2429a747fc4a0af6d963082d2b34f3287" dependencies = [ "ahash", "arrow-format", @@ -127,6 +126,7 @@ dependencies = [ "bytemuck", "chrono", "csv", + "either", "fallible-streaming-iterator", "futures", "hash_hasher", @@ -1381,6 +1381,7 @@ version = "0.1.0" dependencies = [ "arrow-format", "arrow2", + "futures", "parquet2", ] @@ -4983,9 +4984,9 @@ dependencies = [ [[package]] name = "parquet2" -version = "0.9.2" +version = "0.10.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "45476d276db539ec4076f6abe62392619460fb70a1a8edebcc06e11cd93c0ec3" +checksum = "fe2f206abf5ff36f501e6b6acb646de1d236ca4ecb35dc6a58cbedbf7cab748a" dependencies = [ "async-stream", "bitpacking", diff --git a/common/arrow/Cargo.toml b/common/arrow/Cargo.toml index f38436d963948..36f9dabf535a6 100644 --- a/common/arrow/Cargo.toml +++ b/common/arrow/Cargo.toml @@ -20,9 +20,10 @@ simd = ["arrow/simd"] # Workspace dependencies # Github dependencies -arrow = { package = "arrow2", version="0.9.1", default-features = false} +arrow = { package = "arrow2", git = "https://github.com/datafuse-extras/arrow2", default-features = false, rev = "196e6fb"} arrow-format = { version = "0.4.0", features = ["flight-data", "flight-service", "ipc"] } -parquet2 = { version = "0.9.0", default_features = false } +parquet2 = { version = "0.10.2", default_features = false } +futures = { version = "0.3"} # Crates.io dependencies [dev-dependencies] diff --git a/common/arrow/src/lib.rs b/common/arrow/src/lib.rs index 4980fde5b25c5..365fba804f77e 100644 --- a/common/arrow/src/lib.rs +++ b/common/arrow/src/lib.rs @@ -12,6 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod parquet_read; +mod parquet_write; + pub use arrow; pub use arrow_format; pub use parquet2 as parquet; +pub use parquet_read::read_columns_many_async; +pub use parquet_write::write_parquet_file; diff --git a/common/arrow/src/parquet_read.rs b/common/arrow/src/parquet_read.rs new file mode 100644 index 0000000000000..25da854cb92b6 --- /dev/null +++ b/common/arrow/src/parquet_read.rs @@ -0,0 +1,81 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use arrow::datatypes::Field; +use arrow::error::Result; +use arrow::io::parquet::read::to_deserializer; +use arrow::io::parquet::read::ArrayIter; +use futures::AsyncRead; +use futures::AsyncReadExt; +use futures::AsyncSeek; +use futures::AsyncSeekExt; +use parquet2::metadata::ColumnChunkMetaData; +use parquet2::metadata::RowGroupMetaData; + +fn get_field_columns<'a>( + columns: &'a [ColumnChunkMetaData], + field_name: &str, +) -> Vec<&'a ColumnChunkMetaData> { + columns + .iter() + .filter(|x| x.descriptor().path_in_schema()[0] == field_name) + .collect() +} + +async fn _read_single_column_async( + reader: &mut R, + meta: &ColumnChunkMetaData, +) -> Result> +where + R: AsyncRead + AsyncSeek + Send + Unpin, +{ + let (start, len) = meta.byte_range(); + reader.seek(std::io::SeekFrom::Start(start)).await?; + let mut chunk = vec![0; len as usize]; + reader.read_exact(&mut chunk).await?; + Result::Ok(chunk) +} + +async fn read_columns_async<'a, R: AsyncRead + AsyncSeek + Send + Unpin>( + reader: &mut R, + columns: &'a [ColumnChunkMetaData], + field_name: &str, +) -> Result)>> { + let col_metas = get_field_columns(columns, field_name); + let mut cols = Vec::with_capacity(col_metas.len()); + for meta in col_metas { + cols.push((meta, _read_single_column_async(reader, meta).await?)) + } + Ok(cols) +} + +// used when we can not use arrow::io::parquet::read::read_columns_many_async which need a factory of reader +pub async fn read_columns_many_async<'a, R: AsyncRead + AsyncSeek + Send + Unpin>( + reader: &mut R, + row_group: &RowGroupMetaData, + fields: Vec<&Field>, + chunk_size: Option, +) -> Result>> { + let mut arrays = Vec::with_capacity(fields.len()); + for field in fields { + let columns = read_columns_async(reader, row_group.columns(), &field.name).await?; + arrays.push(to_deserializer( + columns, + field.to_owned(), + row_group.num_rows() as usize, + chunk_size, + )?); + } + Ok(arrays) +} diff --git a/common/arrow/src/parquet_write.rs b/common/arrow/src/parquet_write.rs new file mode 100644 index 0000000000000..281fc2f0e609d --- /dev/null +++ b/common/arrow/src/parquet_write.rs @@ -0,0 +1,46 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::io::Write; + +use arrow::array::Array; +use arrow::chunk::Chunk; +use arrow::datatypes::Schema; +use arrow::error::Result; +use arrow::io::parquet::write::FileWriter; +use arrow::io::parquet::write::RowGroupIterator; +use parquet2::write::WriteOptions; + +// a simple wrapper for code reuse +pub fn write_parquet_file( + writer: &mut W, + row_groups: RowGroupIterator, + schema: Schema, + options: WriteOptions, +) -> Result +where + W: Write, + A: AsRef + 'static + Send + Sync, + I: Iterator>>, +{ + let mut file_writer = FileWriter::try_new(writer, schema, options)?; + + file_writer.start()?; + for group in row_groups { + let (group, len) = group?; + file_writer.write(group, len)?; + } + let (size, _) = file_writer.end(None)?; + Ok(size) +} diff --git a/common/streams/src/sources/source_parquet.rs b/common/streams/src/sources/source_parquet.rs index c516df9f857f7..2e2625617aa1b 100644 --- a/common/streams/src/sources/source_parquet.rs +++ b/common/streams/src/sources/source_parquet.rs @@ -15,12 +15,12 @@ use std::sync::Arc; use async_trait::async_trait; +use common_arrow::arrow::datatypes::Field; use common_arrow::arrow::datatypes::Schema as ArrowSchema; -use common_arrow::arrow::io::parquet::read::decompress; -use common_arrow::arrow::io::parquet::read::page_stream_to_array; use common_arrow::arrow::io::parquet::read::read_metadata_async; use common_arrow::arrow::io::parquet::read::schema::FileMetaData; -use common_arrow::parquet::read::get_page_stream; +use common_arrow::arrow::io::parquet::read::RowGroupDeserializer; +use common_arrow::read_columns_many_async; use common_datablocks::DataBlock; use common_datavalues::prelude::*; use common_exception::ErrorCode; @@ -30,7 +30,6 @@ use common_tracing::tracing::debug_span; use common_tracing::tracing::Instrument; use futures::AsyncRead; use futures::AsyncSeek; -use futures::StreamExt; use crate::Source; @@ -95,33 +94,31 @@ where R: AsyncRead + AsyncSeek + Unpin + Send } let fields = &self.arrow_table_schema.fields; - let row_grp = &metadata.row_groups[self.current_row_group]; - let cols = self + + let row_group = &metadata.row_groups[self.current_row_group]; + let fields_to_read: Vec<&Field> = self .projection .clone() .into_iter() - .map(|idx| (row_grp.column(idx).clone(), idx)); - let mut data_cols = Vec::with_capacity(cols.len()); - for (col_meta, idx) in cols { - let col_pages = - get_page_stream(&col_meta, &mut self.reader, vec![], Arc::new(|_, _| true)) - .instrument(debug_span!("parquet_source_get_column_page")) - .await - .map_err(|e| ErrorCode::ParquetError(e.to_string()))?; - let pages = col_pages.map(|compressed_page| decompress(compressed_page?, &mut vec![])); - let array = page_stream_to_array(pages, &col_meta, fields[idx].data_type.clone()) - .instrument(debug_span!("parquet_source_page_stream_to_array")) - .await?; - let array: Arc = array.into(); + .map(|idx| &fields[idx]) + .collect(); - let column = match fields[idx].is_nullable { - false => array.into_column(), - true => array.into_nullable_column(), - }; - data_cols.push(column); - } + let column_chunks = + read_columns_many_async(&mut self.reader, row_group, fields_to_read, None) + .await + .map_err(|e| ErrorCode::ParquetError(e.to_string()))?; + + let mut chunks = + RowGroupDeserializer::new(column_chunks, row_group.num_rows() as usize, None); + + // expect exact one chunk + let chunk = match chunks.next() { + None => return Err(ErrorCode::ParquetError("fail to get a chunk")), + Some(chunk) => chunk.map_err(|e| ErrorCode::ParquetError(e.to_string()))?, + }; + + let block = DataBlock::from_chunk(&self.block_schema, &chunk)?; self.current_row_group += 1; - let block = DataBlock::create(self.block_schema.clone(), data_cols); Ok(Some(block)) } } diff --git a/common/streams/tests/it/source.rs b/common/streams/tests/it/source.rs index 4f75dec05dc5c..b9d19c9f811dc 100644 --- a/common/streams/tests/it/source.rs +++ b/common/streams/tests/it/source.rs @@ -218,18 +218,11 @@ async fn test_source_parquet() -> Result<()> { let len = { let rg_iter = std::iter::repeat(batch).map(Ok).take(page_nums_expects); let row_groups = RowGroupIterator::try_new(rg_iter, &arrow_schema, options, encodings)?; - let parquet_schema = row_groups.parquet_schema().clone(); let path = dir.path().join(name); let mut writer = File::create(path).unwrap(); - common_arrow::parquet::write::write_file( - &mut writer, - row_groups, - parquet_schema, - options, - None, - None, - ) - .map_err(|e| ErrorCode::ParquetError(e.to_string()))? + + common_arrow::write_parquet_file(&mut writer, row_groups, arrow_schema, options) + .map_err(|e| ErrorCode::ParquetError(e.to_string()))? }; let local = Operator::new( diff --git a/query/src/storages/fuse/io/block_reader.rs b/query/src/storages/fuse/io/block_reader.rs index de8bc5baad782..5b625bf524ad2 100644 --- a/query/src/storages/fuse/io/block_reader.rs +++ b/query/src/storages/fuse/io/block_reader.rs @@ -14,12 +14,9 @@ use std::sync::Arc; -use common_arrow::arrow::datatypes::DataType as ArrowType; use common_arrow::arrow::datatypes::Schema as ArrowSchema; -use common_arrow::arrow::io::parquet::read::decompress; -use common_arrow::arrow::io::parquet::read::page_stream_to_array; -use common_arrow::parquet::metadata::ColumnChunkMetaData; -use common_arrow::parquet::read::get_page_stream; +use common_arrow::arrow::io::parquet::read::read_columns_many_async; +use common_arrow::arrow::io::parquet::read::RowGroupDeserializer; use common_datablocks::DataBlock; use common_datavalues::prelude::*; use common_exception::ErrorCode; @@ -27,9 +24,8 @@ use common_exception::Result; use common_tracing::tracing; use common_tracing::tracing::debug_span; use common_tracing::tracing::Instrument; +use futures::future::BoxFuture; use futures::io::BufReader; -use futures::StreamExt; -use futures::TryStreamExt; use opendal::readers::SeekableReader; use opendal::Operator; @@ -39,7 +35,6 @@ pub struct BlockReader { data_accessor: Operator, path: String, block_schema: DataSchemaRef, - table_schema: DataSchemaRef, arrow_table_schema: ArrowSchema, projection: Vec, file_len: u64, @@ -63,7 +58,6 @@ impl BlockReader { data_accessor, path, block_schema, - table_schema, arrow_table_schema, projection, file_len, @@ -88,67 +82,42 @@ impl BlockReader { &metadata.row_groups[0] }; - let col_num = self.projection.len(); - let cols = self - .projection - .clone() - .into_iter() - .map(|idx| (row_group.column(idx).clone(), idx)); - - let fields = self.table_schema.fields(); let arrow_fields = &self.arrow_table_schema.fields; let stream_len = self.file_len; let read_buffer_size = self.read_buffer_size; - let stream = futures::stream::iter(cols).map(|(col_meta, idx)| { + let fields_to_read = self + .projection + .clone() + .into_iter() + .map(|idx| arrow_fields[idx].clone()) + .collect(); + + let factory = || { let data_accessor = self.data_accessor.clone(); let path = self.path.clone(); - async move { + Box::pin(async move { let reader = SeekableReader::new(data_accessor, path.as_str(), stream_len); let reader = BufReader::with_capacity(read_buffer_size as usize, reader); - let data_type = fields[idx].data_type(); - let arrow_type = arrow_fields[idx].data_type(); - Self::read_column(reader, &col_meta, data_type.clone(), arrow_type.clone()).await - } - .instrument(debug_span!("block_reader_read_column").or_current()) - }); + Ok(reader) + }) as BoxFuture<_> + }; - // TODO configuration of the buffer size - let buffer_size = 10; - let n = std::cmp::min(buffer_size, col_num); - let data_cols = stream.buffered(n).try_collect().await?; + let column_chunks = read_columns_many_async(factory, row_group, fields_to_read, None) + .instrument(debug_span!("block_reader_read_columns").or_current()) + .await + .map_err(|e| ErrorCode::ParquetError(e.to_string()))?; - let block = DataBlock::create(self.block_schema.clone(), data_cols); - Ok(block) - } + let mut chunks = + RowGroupDeserializer::new(column_chunks, row_group.num_rows() as usize, None); - async fn read_column( - mut reader: BufReader, - column_chunk_meta: &ColumnChunkMetaData, - data_type: DataTypePtr, - arrow_type: ArrowType, - ) -> Result { - let col_pages = get_page_stream( - column_chunk_meta, - &mut reader, - vec![], - Arc::new(|_, _| true), - ) - .instrument(debug_span!("block_reader_get_column_page")) - .await - .map_err(|e| ErrorCode::ParquetError(e.to_string()))?; - let pages = col_pages.map(|compressed_page| { - debug_span!("block_reader_decompress_page") - .in_scope(|| decompress(compressed_page?, &mut vec![])) - }); - let array = page_stream_to_array(pages, column_chunk_meta, arrow_type) - .instrument(debug_span!("block_reader_page_stream_to_array")) - .await?; - let array: Arc = array.into(); + // expect exact one chunk + let chunk = match chunks.next() { + None => return Err(ErrorCode::ParquetError("fail to get a chunk")), + Some(chunk) => chunk.map_err(|e| ErrorCode::ParquetError(e.to_string()))?, + }; - match data_type.is_nullable() { - true => Ok(array.into_nullable_column()), - false => Ok(array.into_column()), - } + let block = DataBlock::from_chunk(&self.block_schema, &chunk)?; + Ok(block) } } diff --git a/query/src/storages/fuse/io/block_writer.rs b/query/src/storages/fuse/io/block_writer.rs index 48aefcdff67f6..c909242325bd3 100644 --- a/query/src/storages/fuse/io/block_writer.rs +++ b/query/src/storages/fuse/io/block_writer.rs @@ -45,32 +45,20 @@ pub async fn write_block( let iter = vec![Ok(batch)]; let row_groups = RowGroupIterator::try_new(iter.into_iter(), arrow_schema, options, encodings)?; - let parquet_schema = row_groups.parquet_schema().clone(); // PutObject in S3 need to know the content-length in advance // multipart upload may intimidate this, but let's fit things together first // see issue #xxx - use bytes::BufMut; // we need a configuration of block size threshold here - let mut writer = Vec::with_capacity(100 * 1024 * 1024).writer(); + let mut buf = Vec::with_capacity(100 * 1024 * 1024); - let len = common_arrow::parquet::write::write_file( - &mut writer, - row_groups, - parquet_schema, - options, - None, - None, - ) - .map_err(|e| ErrorCode::ParquetError(e.to_string()))?; - - let parquet = writer.into_inner(); - let stream_len = parquet.len(); + let len = common_arrow::write_parquet_file(&mut buf, row_groups, arrow_schema.clone(), options) + .map_err(|e| ErrorCode::ParquetError(e.to_string()))?; data_accessor - .write(location, stream_len as u64) - .run(Box::new(Cursor::new(parquet))) + .write(location, len) + .run(Box::new(Cursor::new(buf))) .await .map_err(|e| ErrorCode::DalTransportError(e.to_string()))?; diff --git a/query/tests/it/tests/parquet.rs b/query/tests/it/tests/parquet.rs index b72eff8dd91d0..c24d66522b534 100644 --- a/query/tests/it/tests/parquet.rs +++ b/query/tests/it/tests/parquet.rs @@ -15,12 +15,12 @@ use std::fs::File; use common_arrow::arrow::chunk::Chunk; -use common_arrow::arrow::io::parquet::write::write_file; use common_arrow::arrow::io::parquet::write::Compression; use common_arrow::arrow::io::parquet::write::Encoding; use common_arrow::arrow::io::parquet::write::RowGroupIterator; use common_arrow::arrow::io::parquet::write::Version; use common_arrow::arrow::io::parquet::write::WriteOptions; +use common_arrow::write_parquet_file; use common_datablocks::DataBlock; use common_datavalues::prelude::*; @@ -70,15 +70,6 @@ impl ParquetTestData { RowGroupIterator::try_new(batches.into_iter(), &schema, options, encodings).unwrap(); let mut file = File::create(path).unwrap(); - let parquet_schema = row_groups.parquet_schema().clone(); - write_file( - &mut file, - row_groups, - &schema, - parquet_schema, - options, - None, - ) - .unwrap(); + write_parquet_file(&mut file, row_groups, schema, options).unwrap(); } }