Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Migrate to latest parquet2
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Mar 22, 2022
1 parent 3fc5882 commit 68ee80b
Show file tree
Hide file tree
Showing 49 changed files with 792 additions and 347 deletions.
5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ hex = { version = "^0.4", optional = true }

# for IPC compression
lz4 = { version = "1.23.1", optional = true }
zstd = { version = "0.10", optional = true }
zstd = { version = "0.11", optional = true }

rand = { version = "0.8", optional = true }

Expand All @@ -68,7 +68,8 @@ futures = { version = "0.3", optional = true }
ahash = { version = "0.7", optional = true }

# parquet support
parquet2 = { version = "0.10", optional = true, default_features = false, features = ["stream"] }
#parquet2 = { version = "0.10", optional = true, default_features = false, features = ["stream"] }
parquet2 = { git = "https://github.com/jorgecarleitao/parquet2", branch = "write_indexes", optional = true, default_features = false, features = ["stream"] }

# avro support
avro-schema = { version = "0.2", optional = true }
Expand Down
3 changes: 1 addition & 2 deletions arrow-parquet-integration-testing/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,7 @@ fn main() -> Result<()> {

writer.start()?;
for group in row_groups {
let (group, len) = group?;
writer.write(group, len)?;
writer.write(group?)?;
}
let _ = writer.end(None)?;

Expand Down
3 changes: 1 addition & 2 deletions benches/write_parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ fn write(array: &dyn Array, encoding: Encoding) -> Result<()> {

writer.start()?;
for group in row_groups {
let (group, len) = group?;
writer.write(group, len)?;
writer.write(group?)?;
}
let _ = writer.end(None)?;
Ok(())
Expand Down
3 changes: 1 addition & 2 deletions examples/parquet_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ fn write_batch(path: &str, schema: Schema, columns: Chunk<Arc<dyn Array>>) -> Re

writer.start()?;
for group in row_groups {
let (group, len) = group?;
writer.write(group, len)?;
writer.write(group?)?;
}
let _size = writer.end(None)?;
Ok(())
Expand Down
3 changes: 1 addition & 2 deletions examples/parquet_write_parallel/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,7 @@ fn parallel_write(path: &str, schema: &Schema, batches: &[Chunk]) -> Result<()>
// Write the file.
writer.start()?;
for group in row_groups {
let (group, len) = group?;
writer.write(group, len)?;
writer.write(group?)?;
}
let _size = writer.end(None)?;

Expand Down
3 changes: 1 addition & 2 deletions src/doc/lib.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,7 @@ fn main() -> Result<()> {
// Write the file.
writer.start()?;
for group in row_groups {
let (group, len) = group?;
writer.write(group, len)?;
writer.write(group?)?;
}
let _ = writer.end(None)?;
Ok(())
Expand Down
6 changes: 6 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ impl From<std::str::Utf8Error> for ArrowError {
}
}

impl From<std::string::FromUtf8Error> for ArrowError {
fn from(error: std::string::FromUtf8Error) -> Self {
ArrowError::External("".to_string(), Box::new(error))
}
}

impl From<simdutf8::basic::Utf8Error> for ArrowError {
fn from(error: simdutf8::basic::Utf8Error) -> Self {
ArrowError::External("".to_string(), Box::new(error))
Expand Down
2 changes: 1 addition & 1 deletion src/io/parquet/read/deserialize/binary/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder<O> {

fn build_state(&self, page: &'a DataPage) -> Result<Self::State> {
let is_optional =
page.descriptor().type_().get_basic_info().repetition() == &Repetition::Optional;
page.descriptor.primitive_type.field_info.repetition == Repetition::Optional;

match (page.encoding(), page.dictionary_page(), is_optional) {
(Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), false) => {
Expand Down
2 changes: 1 addition & 1 deletion src/io/parquet/read/deserialize/binary/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder<O> {

fn build_state(&self, page: &'a DataPage) -> Result<Self::State> {
let is_optional =
page.descriptor().type_().get_basic_info().repetition() == &Repetition::Optional;
page.descriptor.primitive_type.field_info.repetition == Repetition::Optional;

match (page.encoding(), page.dictionary_page(), is_optional) {
(Encoding::Plain, None, true) => {
Expand Down
2 changes: 1 addition & 1 deletion src/io/parquet/read/deserialize/boolean/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ impl<'a> Decoder<'a> for BooleanDecoder {

fn build_state(&self, page: &'a DataPage) -> Result<Self::State> {
let is_optional =
page.descriptor().type_().get_basic_info().repetition() == &Repetition::Optional;
page.descriptor.primitive_type.field_info.repetition == Repetition::Optional;

match (page.encoding(), is_optional) {
(Encoding::Plain, true) => Ok(State::Optional(Optional::new(page))),
Expand Down
2 changes: 1 addition & 1 deletion src/io/parquet/read/deserialize/boolean/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ impl<'a> Decoder<'a> for BooleanDecoder {

fn build_state(&self, page: &'a DataPage) -> Result<Self::State> {
let is_optional =
page.descriptor().type_().get_basic_info().repetition() == &Repetition::Optional;
page.descriptor.primitive_type.field_info.repetition == Repetition::Optional;

match (page.encoding(), is_optional) {
(Encoding::Plain, true) => {
Expand Down
2 changes: 1 addition & 1 deletion src/io/parquet/read/deserialize/dictionary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ where

fn build_state(&self, page: &'a DataPage) -> Result<Self::State> {
let is_optional =
page.descriptor().type_().get_basic_info().repetition() == &Repetition::Optional;
page.descriptor.primitive_type.field_info.repetition == Repetition::Optional;

match (page.encoding(), is_optional) {
(Encoding::PlainDictionary | Encoding::RleDictionary, false) => {
Expand Down
2 changes: 1 addition & 1 deletion src/io/parquet/read/deserialize/fixed_size_binary/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ impl<'a> Decoder<'a> for BinaryDecoder {

fn build_state(&self, page: &'a DataPage) -> Result<Self::State> {
let is_optional =
page.descriptor().type_().get_basic_info().repetition() == &Repetition::Optional;
page.descriptor.primitive_type.field_info.repetition == Repetition::Optional;

match (page.encoding(), page.dictionary_page(), is_optional) {
(Encoding::Plain, None, true) => Ok(State::Optional(Optional::new(page, self.size))),
Expand Down
7 changes: 4 additions & 3 deletions src/io/parquet/read/deserialize/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use crate::{
};

use self::nested_utils::{InitNested, NestedArrayIter, NestedState};
use parquet2::schema::types::PrimitiveType;
use simple::page_iter_to_arrays;

use super::*;
Expand All @@ -27,7 +28,7 @@ pub fn get_page_iterator<R: Read + Seek>(
reader: R,
pages_filter: Option<PageFilter>,
buffer: Vec<u8>,
) -> Result<PageIterator<R>> {
) -> Result<PageReader<R>> {
Ok(_get_page_iterator(
column_metadata,
reader,
Expand Down Expand Up @@ -76,7 +77,7 @@ fn create_list(

fn columns_to_iter_recursive<'a, I: 'a>(
mut columns: Vec<I>,
mut types: Vec<&ParquetType>,
mut types: Vec<&PrimitiveType>,
field: Field,
mut init: Vec<InitNested>,
chunk_size: usize,
Expand Down Expand Up @@ -238,7 +239,7 @@ fn field_to_init(field: &Field) -> Vec<InitNested> {
/// The arrays are guaranteed to be at most of size `chunk_size` and data type `field.data_type`.
pub fn column_iter_to_arrays<'a, I: 'a>(
columns: Vec<I>,
types: Vec<&ParquetType>,
types: Vec<&PrimitiveType>,
field: Field,
chunk_size: usize,
) -> Result<ArrayIter<'a>>
Expand Down
6 changes: 3 additions & 3 deletions src/io/parquet/read/deserialize/nested_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,8 +283,8 @@ impl<'a> NestedPage<'a> {
pub fn new(page: &'a DataPage) -> Self {
let (rep_levels, def_levels, _) = split_buffer(page);

let max_rep_level = page.descriptor().max_rep_level();
let max_def_level = page.descriptor().max_def_level();
let max_rep_level = page.descriptor.max_rep_level;
let max_def_level = page.descriptor.max_def_level;

let reps =
HybridRleDecoder::new(rep_levels, get_bit_width(max_rep_level), page.num_values());
Expand Down Expand Up @@ -451,7 +451,7 @@ impl<'a> Optional<'a> {
pub fn new(page: &'a DataPage) -> Self {
let (_, def_levels, _) = split_buffer(page);

let max_def = page.descriptor().max_def_level();
let max_def = page.descriptor.max_def_level;

Self {
definition_levels: HybridRleDecoder::new(
Expand Down
2 changes: 1 addition & 1 deletion src/io/parquet/read/deserialize/primitive/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ where

fn build_state(&self, page: &'a DataPage) -> Result<Self::State> {
let is_optional =
page.descriptor().type_().get_basic_info().repetition() == &Repetition::Optional;
page.descriptor.primitive_type.field_info.repetition == Repetition::Optional;

match (page.encoding(), page.dictionary_page(), is_optional) {
(Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), false) => {
Expand Down
2 changes: 1 addition & 1 deletion src/io/parquet/read/deserialize/primitive/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ where

fn build_state(&self, page: &'a DataPage) -> Result<Self::State> {
let is_optional =
page.descriptor().type_().get_basic_info().repetition() == &Repetition::Optional;
page.descriptor.primitive_type.field_info.repetition == Repetition::Optional;

match (page.encoding(), page.dictionary_page(), is_optional) {
(Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), false) => {
Expand Down
22 changes: 6 additions & 16 deletions src/io/parquet/read/deserialize/simple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::sync::Arc;

use parquet2::{
schema::types::{
LogicalType, ParquetType, PhysicalType, TimeUnit as ParquetTimeUnit, TimestampType,
LogicalType, PhysicalType, PrimitiveType, TimeUnit as ParquetTimeUnit, TimestampType,
},
types::int96_to_i64_ns,
};
Expand Down Expand Up @@ -60,24 +60,14 @@ where
/// of [`DataType`] `data_type` and `chunk_size`.
pub fn page_iter_to_arrays<'a, I: 'a + DataPages>(
pages: I,
type_: &ParquetType,
type_: &PrimitiveType,
data_type: DataType,
chunk_size: usize,
) -> Result<ArrayIter<'a>> {
use DataType::*;

let (physical_type, logical_type) = if let ParquetType::PrimitiveType {
physical_type,
logical_type,
..
} = type_
{
(physical_type, logical_type)
} else {
return Err(ArrowError::InvalidArgumentError(
"page_iter_to_arrays can only be called with a parquet primitive type".into(),
));
};
let physical_type = &type_.physical_type;
let logical_type = &type_.logical_type;

Ok(match data_type.to_logical_type() {
Null => null::iter_to_arrays(pages, data_type, chunk_size),
Expand Down Expand Up @@ -276,7 +266,7 @@ fn timestamp<'a, I: 'a + DataPages>(
Ok(match (unit, time_unit) {
(ParquetTimeUnit::MILLIS(_), TimeUnit::Second) => dyn_iter(op(iter, |x| x / 1_000)),
(ParquetTimeUnit::MICROS(_), TimeUnit::Second) => dyn_iter(op(iter, |x| x / 1_000_000)),
(ParquetTimeUnit::NANOS(_), TimeUnit::Second) => dyn_iter(op(iter, |x| x * 1_000_000_000)),
(ParquetTimeUnit::NANOS(_), TimeUnit::Second) => dyn_iter(op(iter, |x| x / 1_000_000_000)),

(ParquetTimeUnit::MILLIS(_), TimeUnit::Millisecond) => dyn_iter(iden(iter)),
(ParquetTimeUnit::MICROS(_), TimeUnit::Millisecond) => dyn_iter(op(iter, |x| x / 1_000)),
Expand Down Expand Up @@ -348,7 +338,7 @@ fn timestamp_dict<'a, K: DictionaryKey, I: 'a + DataPages>(
pages,
data_type,
chunk_size,
|x: i64| x * 1_000_000_000,
|x: i64| x / 1_000_000_000,
))
}

Expand Down
2 changes: 1 addition & 1 deletion src/io/parquet/read/deserialize/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ pub fn not_implemented(

#[inline]
pub fn split_buffer(page: &DataPage) -> (&[u8], &[u8], &[u8]) {
_split_buffer(page, page.descriptor())
_split_buffer(page)
}

/// A private trait representing structs that can receive elements.
Expand Down
43 changes: 43 additions & 0 deletions src/io/parquet/read/indexes/binary.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
use parquet2::indexes::PageIndex;

use crate::{
array::{Array, BinaryArray, PrimitiveArray, Utf8Array},
datatypes::{DataType, PhysicalType},
error::ArrowError,
trusted_len::TrustedLen,
};

use super::ColumnIndex;

pub fn deserialize(
indexes: &[PageIndex<Vec<u8>>],
data_type: &DataType,
) -> Result<ColumnIndex, ArrowError> {
Ok(ColumnIndex {
min: deserialize_binary_iter(indexes.iter().map(|index| index.min.as_ref()), data_type)?,
max: deserialize_binary_iter(indexes.iter().map(|index| index.max.as_ref()), data_type)?,
null_count: PrimitiveArray::from_trusted_len_iter(
indexes
.iter()
.map(|index| index.null_count.map(|x| x as u64)),
),
})
}

fn deserialize_binary_iter<'a, I: TrustedLen<Item = Option<&'a Vec<u8>>>>(
iter: I,
data_type: &DataType,
) -> Result<Box<dyn Array>, ArrowError> {
match data_type.to_physical_type() {
PhysicalType::LargeBinary => Ok(Box::new(BinaryArray::<i64>::from_iter(iter))),
PhysicalType::Utf8 => {
let iter = iter.map(|x| x.map(|x| std::str::from_utf8(x)).transpose());
Ok(Box::new(Utf8Array::<i32>::try_from_trusted_len_iter(iter)?))
}
PhysicalType::LargeUtf8 => {
let iter = iter.map(|x| x.map(|x| std::str::from_utf8(x)).transpose());
Ok(Box::new(Utf8Array::<i64>::try_from_trusted_len_iter(iter)?))
}
_ => Ok(Box::new(BinaryArray::<i32>::from_iter(iter))),
}
}
21 changes: 21 additions & 0 deletions src/io/parquet/read/indexes/boolean.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
use parquet2::indexes::PageIndex;

use crate::array::{BooleanArray, PrimitiveArray};

use super::ColumnIndex;

pub fn deserialize(indexes: &[PageIndex<bool>]) -> ColumnIndex {
ColumnIndex {
min: Box::new(BooleanArray::from_trusted_len_iter(
indexes.iter().map(|index| index.min),
)),
max: Box::new(BooleanArray::from_trusted_len_iter(
indexes.iter().map(|index| index.max),
)),
null_count: PrimitiveArray::from_trusted_len_iter(
indexes
.iter()
.map(|index| index.null_count.map(|x| x as u64)),
),
}
}
58 changes: 58 additions & 0 deletions src/io/parquet/read/indexes/fixed_len_binary.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
use parquet2::indexes::PageIndex;

use crate::{
array::{Array, FixedSizeBinaryArray, MutableFixedSizeBinaryArray, PrimitiveArray},
datatypes::{DataType, PhysicalType, PrimitiveType},
trusted_len::TrustedLen,
};

use super::ColumnIndex;

pub fn deserialize(indexes: &[PageIndex<Vec<u8>>], data_type: DataType) -> ColumnIndex {
ColumnIndex {
min: deserialize_binary_iter(
indexes.iter().map(|index| index.min.as_ref()),
data_type.clone(),
),
max: deserialize_binary_iter(indexes.iter().map(|index| index.max.as_ref()), data_type),
null_count: PrimitiveArray::from_trusted_len_iter(
indexes
.iter()
.map(|index| index.null_count.map(|x| x as u64)),
),
}
}

fn deserialize_binary_iter<'a, I: TrustedLen<Item = Option<&'a Vec<u8>>>>(
iter: I,
data_type: DataType,
) -> Box<dyn Array> {
match data_type.to_physical_type() {
PhysicalType::Primitive(PrimitiveType::Int128) => {
Box::new(PrimitiveArray::from_trusted_len_iter(iter.map(|v| {
v.map(|x| {
// Copy the fixed-size byte value to the start of a 16 byte stack
// allocated buffer, then use an arithmetic right shift to fill in
// MSBs, which accounts for leading 1's in negative (two's complement)
// values.
let n = x.len();
let mut bytes = [0u8; 16];
bytes[..n].copy_from_slice(x);
i128::from_be_bytes(bytes) >> (8 * (16 - n))
})
})))
}
_ => {
let mut a = MutableFixedSizeBinaryArray::from_data(
data_type,
Vec::with_capacity(iter.size_hint().0),
None,
);
for item in iter {
a.push(item);
}
let a: FixedSizeBinaryArray = a.into();
Box::new(a)
}
}
}
Loading

0 comments on commit 68ee80b

Please sign in to comment.