-
Notifications
You must be signed in to change notification settings - Fork 762
/
Copy pathparquet_read.rs
81 lines (76 loc) · 2.63 KB
/
parquet_read.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
// Copyright 2022 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use arrow::datatypes::Field;
use arrow::error::Result;
use arrow::io::parquet::read::to_deserializer;
use arrow::io::parquet::read::ArrayIter;
use futures::AsyncRead;
use futures::AsyncReadExt;
use futures::AsyncSeek;
use futures::AsyncSeekExt;
use parquet2::metadata::ColumnChunkMetaData;
use parquet2::metadata::RowGroupMetaData;
fn get_field_columns<'a>(
columns: &'a [ColumnChunkMetaData],
field_name: &str,
) -> Vec<&'a ColumnChunkMetaData> {
columns
.iter()
.filter(|x| x.descriptor().path_in_schema()[0] == field_name)
.collect()
}
async fn _read_single_column_async<R>(
reader: &mut R,
meta: &ColumnChunkMetaData,
) -> Result<Vec<u8>>
where
R: AsyncRead + AsyncSeek + Send + Unpin,
{
let (start, len) = meta.byte_range();
reader.seek(std::io::SeekFrom::Start(start)).await?;
let mut chunk = vec![0; len as usize];
reader.read_exact(&mut chunk).await?;
Result::Ok(chunk)
}
async fn read_columns_async<'a, R: AsyncRead + AsyncSeek + Send + Unpin>(
reader: &mut R,
columns: &'a [ColumnChunkMetaData],
field_name: &str,
) -> Result<Vec<(&'a ColumnChunkMetaData, Vec<u8>)>> {
let col_metas = get_field_columns(columns, field_name);
let mut cols = Vec::with_capacity(col_metas.len());
for meta in col_metas {
cols.push((meta, _read_single_column_async(reader, meta).await?))
}
Ok(cols)
}
// used when we can not use arrow::io::parquet::read::read_columns_many_async which need a factory of reader
pub async fn read_columns_many_async<'a, R: AsyncRead + AsyncSeek + Send + Unpin>(
reader: &mut R,
row_group: &RowGroupMetaData,
fields: Vec<&Field>,
chunk_size: Option<usize>,
) -> Result<Vec<ArrayIter<'a>>> {
let mut arrays = Vec::with_capacity(fields.len());
for field in fields {
let columns = read_columns_async(reader, row_group.columns(), &field.name).await?;
arrays.push(to_deserializer(
columns,
field.to_owned(),
row_group.num_rows() as usize,
chunk_size,
)?);
}
Ok(arrays)
}