Skip to content

Commit

Permalink
bump arrow2 to main
Browse files Browse the repository at this point in the history
  • Loading branch information
youngsofun committed Feb 23, 2022
1 parent 0e1c390 commit c957ce1
Show file tree
Hide file tree
Showing 10 changed files with 201 additions and 129 deletions.
9 changes: 5 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions common/arrow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ simd = ["arrow/simd"]
# Workspace dependencies

# Github dependencies
arrow = { package = "arrow2", version="0.9.1", default-features = false}
arrow = { package = "arrow2", git = "https://github.com/datafuse-extras/arrow2", default-features = false, rev = "196e6fb"}
arrow-format = { version = "0.4.0", features = ["flight-data", "flight-service", "ipc"] }
parquet2 = { version = "0.9.0", default_features = false }
parquet2 = { version = "0.10.2", default_features = false }
futures = { version = "0.3"}
# Crates.io dependencies

[dev-dependencies]
5 changes: 5 additions & 0 deletions common/arrow/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.

mod parquet_read;
mod parquet_write;

pub use arrow;
pub use arrow_format;
pub use parquet2 as parquet;
pub use parquet_read::read_columns_many_async;
pub use parquet_write::write_parquet_file;
81 changes: 81 additions & 0 deletions common/arrow/src/parquet_read.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright 2022 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use arrow::datatypes::Field;
use arrow::error::Result;
use arrow::io::parquet::read::to_deserializer;
use arrow::io::parquet::read::ArrayIter;
use futures::AsyncRead;
use futures::AsyncReadExt;
use futures::AsyncSeek;
use futures::AsyncSeekExt;
use parquet2::metadata::ColumnChunkMetaData;
use parquet2::metadata::RowGroupMetaData;

fn get_field_columns<'a>(
columns: &'a [ColumnChunkMetaData],
field_name: &str,
) -> Vec<&'a ColumnChunkMetaData> {
columns
.iter()
.filter(|x| x.descriptor().path_in_schema()[0] == field_name)
.collect()
}

async fn _read_single_column_async<R>(
reader: &mut R,
meta: &ColumnChunkMetaData,
) -> Result<Vec<u8>>
where
R: AsyncRead + AsyncSeek + Send + Unpin,
{
let (start, len) = meta.byte_range();
reader.seek(std::io::SeekFrom::Start(start)).await?;
let mut chunk = vec![0; len as usize];
reader.read_exact(&mut chunk).await?;
Result::Ok(chunk)
}

async fn read_columns_async<'a, R: AsyncRead + AsyncSeek + Send + Unpin>(
reader: &mut R,
columns: &'a [ColumnChunkMetaData],
field_name: &str,
) -> Result<Vec<(&'a ColumnChunkMetaData, Vec<u8>)>> {
let col_metas = get_field_columns(columns, field_name);
let mut cols = Vec::with_capacity(col_metas.len());
for meta in col_metas {
cols.push((meta, _read_single_column_async(reader, meta).await?))
}
Ok(cols)
}

// used when we can not use arrow::io::parquet::read::read_columns_many_async which need a factory of reader
pub async fn read_columns_many_async<'a, R: AsyncRead + AsyncSeek + Send + Unpin>(
reader: &mut R,
row_group: &RowGroupMetaData,
fields: Vec<&Field>,
chunk_size: Option<usize>,
) -> Result<Vec<ArrayIter<'a>>> {
let mut arrays = Vec::with_capacity(fields.len());
for field in fields {
let columns = read_columns_async(reader, row_group.columns(), &field.name).await?;
arrays.push(to_deserializer(
columns,
field.to_owned(),
row_group.num_rows() as usize,
chunk_size,
)?);
}
Ok(arrays)
}
46 changes: 46 additions & 0 deletions common/arrow/src/parquet_write.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright 2022 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::io::Write;

use arrow::array::Array;
use arrow::chunk::Chunk;
use arrow::datatypes::Schema;
use arrow::error::Result;
use arrow::io::parquet::write::FileWriter;
use arrow::io::parquet::write::RowGroupIterator;
use parquet2::write::WriteOptions;

// a simple wrapper for code reuse
pub fn write_parquet_file<W: Write, A, I>(
writer: &mut W,
row_groups: RowGroupIterator<A, I>,
schema: Schema,
options: WriteOptions,
) -> Result<u64>
where
W: Write,
A: AsRef<dyn Array> + 'static + Send + Sync,
I: Iterator<Item = Result<Chunk<A>>>,
{
let mut file_writer = FileWriter::try_new(writer, schema, options)?;

file_writer.start()?;
for group in row_groups {
let (group, len) = group?;
file_writer.write(group, len)?;
}
let (size, _) = file_writer.end(None)?;
Ok(size)
}
49 changes: 23 additions & 26 deletions common/streams/src/sources/source_parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@
use std::sync::Arc;

use async_trait::async_trait;
use common_arrow::arrow::datatypes::Field;
use common_arrow::arrow::datatypes::Schema as ArrowSchema;
use common_arrow::arrow::io::parquet::read::decompress;
use common_arrow::arrow::io::parquet::read::page_stream_to_array;
use common_arrow::arrow::io::parquet::read::read_metadata_async;
use common_arrow::arrow::io::parquet::read::schema::FileMetaData;
use common_arrow::parquet::read::get_page_stream;
use common_arrow::arrow::io::parquet::read::RowGroupDeserializer;
use common_arrow::read_columns_many_async;
use common_datablocks::DataBlock;
use common_datavalues::prelude::*;
use common_exception::ErrorCode;
Expand All @@ -30,7 +30,6 @@ use common_tracing::tracing::debug_span;
use common_tracing::tracing::Instrument;
use futures::AsyncRead;
use futures::AsyncSeek;
use futures::StreamExt;

use crate::Source;

Expand Down Expand Up @@ -95,33 +94,31 @@ where R: AsyncRead + AsyncSeek + Unpin + Send
}

let fields = &self.arrow_table_schema.fields;
let row_grp = &metadata.row_groups[self.current_row_group];
let cols = self

let row_group = &metadata.row_groups[self.current_row_group];
let fields_to_read: Vec<&Field> = self
.projection
.clone()
.into_iter()
.map(|idx| (row_grp.column(idx).clone(), idx));
let mut data_cols = Vec::with_capacity(cols.len());
for (col_meta, idx) in cols {
let col_pages =
get_page_stream(&col_meta, &mut self.reader, vec![], Arc::new(|_, _| true))
.instrument(debug_span!("parquet_source_get_column_page"))
.await
.map_err(|e| ErrorCode::ParquetError(e.to_string()))?;
let pages = col_pages.map(|compressed_page| decompress(compressed_page?, &mut vec![]));
let array = page_stream_to_array(pages, &col_meta, fields[idx].data_type.clone())
.instrument(debug_span!("parquet_source_page_stream_to_array"))
.await?;
let array: Arc<dyn common_arrow::arrow::array::Array> = array.into();
.map(|idx| &fields[idx])
.collect();

let column = match fields[idx].is_nullable {
false => array.into_column(),
true => array.into_nullable_column(),
};
data_cols.push(column);
}
let column_chunks =
read_columns_many_async(&mut self.reader, row_group, fields_to_read, None)
.await
.map_err(|e| ErrorCode::ParquetError(e.to_string()))?;

let mut chunks =
RowGroupDeserializer::new(column_chunks, row_group.num_rows() as usize, None);

// expect exact one chunk
let chunk = match chunks.next() {
None => return Err(ErrorCode::ParquetError("fail to get a chunk")),
Some(chunk) => chunk.map_err(|e| ErrorCode::ParquetError(e.to_string()))?,
};

let block = DataBlock::from_chunk(&self.block_schema, &chunk)?;
self.current_row_group += 1;
let block = DataBlock::create(self.block_schema.clone(), data_cols);
Ok(Some(block))
}
}
13 changes: 3 additions & 10 deletions common/streams/tests/it/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,18 +218,11 @@ async fn test_source_parquet() -> Result<()> {
let len = {
let rg_iter = std::iter::repeat(batch).map(Ok).take(page_nums_expects);
let row_groups = RowGroupIterator::try_new(rg_iter, &arrow_schema, options, encodings)?;
let parquet_schema = row_groups.parquet_schema().clone();
let path = dir.path().join(name);
let mut writer = File::create(path).unwrap();
common_arrow::parquet::write::write_file(
&mut writer,
row_groups,
parquet_schema,
options,
None,
None,
)
.map_err(|e| ErrorCode::ParquetError(e.to_string()))?

common_arrow::write_parquet_file(&mut writer, row_groups, arrow_schema, options)
.map_err(|e| ErrorCode::ParquetError(e.to_string()))?
};

let local = Operator::new(
Expand Down
Loading

0 comments on commit c957ce1

Please sign in to comment.