Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Migrated to parquet2 v0.3 #265

Merged
merged 1 commit into from
Aug 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ futures = { version = "0.3", optional = true }
# for faster hashing
ahash = { version = "0.7", optional = true }

parquet2 = { version = "0.2", optional = true, default_features = false, features = ["stream"] }
parquet2 = { version = "0.3", optional = true, default_features = false, features = ["stream"] }

[dev-dependencies]
rand = "0.8"
Expand Down
19 changes: 11 additions & 8 deletions arrow-parquet-integration-testing/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,17 @@ use std::fs::File;
use std::sync::Arc;
use std::{collections::HashMap, convert::TryFrom, io::Read};

use arrow2::datatypes::DataType;
use arrow2::error::Result;
use arrow2::io::parquet::write::{Encoding, RowGroupIterator};
use arrow2::io::{
json_integration::ArrowJson,
parquet::write::{write_file, CompressionCodec, Version, WriteOptions},
use arrow2::{
datatypes::{DataType, Schema},
error::Result,
io::{
json_integration::{to_record_batch, ArrowJson},
parquet::write::{
write_file, Compression, Encoding, RowGroupIterator, Version, WriteOptions,
},
},
record_batch::RecordBatch,
};
use arrow2::{datatypes::Schema, io::json_integration::to_record_batch, record_batch::RecordBatch};

use clap::{App, Arg};

Expand Down Expand Up @@ -154,7 +157,7 @@ fn main() -> Result<()> {

let options = WriteOptions {
write_statistics: true,
compression: CompressionCodec::Uncompressed,
compression: Compression::Uncompressed,
version,
};

Expand Down
2 changes: 1 addition & 1 deletion benches/write_parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ fn write(array: &dyn Array, encoding: Encoding) -> Result<()> {

let options = WriteOptions {
write_statistics: false,
compression: CompressionCodec::Uncompressed,
compression: Compression::Uncompressed,
version: Version::V1,
};

Expand Down
4 changes: 2 additions & 2 deletions examples/parquet_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use arrow2::{
datatypes::{Field, Schema},
error::Result,
io::parquet::write::{
array_to_page, write_file, CompressionCodec, DynIter, Encoding, Version, WriteOptions,
array_to_page, write_file, Compression, DynIter, Encoding, Version, WriteOptions,
},
};

Expand All @@ -16,7 +16,7 @@ fn write_single_array(path: &str, array: &dyn Array, field: Field) -> Result<()>

let options = WriteOptions {
write_statistics: true,
compression: CompressionCodec::Uncompressed,
compression: Compression::Uncompressed,
version: Version::V2,
};
let encoding = Encoding::Plain;
Expand Down
7 changes: 4 additions & 3 deletions examples/parquet_write_record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,18 @@ use arrow2::{
array::{Array, Int32Array},
datatypes::{Field, Schema},
error::Result,
io::parquet::write::{write_file, CompressionCodec, RowGroupIterator, Version, WriteOptions},
io::parquet::write::{
write_file, Compression, Encoding, RowGroupIterator, Version, WriteOptions,
},
record_batch::RecordBatch,
};
use parquet2::schema::Encoding;

fn write_batch(path: &str, batch: RecordBatch) -> Result<()> {
let schema = batch.schema().clone();

let options = WriteOptions {
write_statistics: true,
compression: CompressionCodec::Uncompressed,
compression: Compression::Uncompressed,
version: Version::V2,
};

Expand Down
4 changes: 2 additions & 2 deletions src/io/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ mod tests {
mod tests_integration {
use std::sync::Arc;

use super::write::CompressionCodec;
use super::write::Compression;
use crate::array::{Array, PrimitiveArray, Utf8Array};
use crate::datatypes::DataType;
use crate::datatypes::TimeUnit;
Expand All @@ -421,7 +421,7 @@ mod tests_integration {
fn integration_write(schema: &Schema, batches: &[RecordBatch]) -> Result<Vec<u8>> {
let options = WriteOptions {
write_statistics: true,
compression: CompressionCodec::Uncompressed,
compression: Compression::Uncompressed,
version: Version::V1,
};

Expand Down
4 changes: 2 additions & 2 deletions src/io/parquet/read/binary/basic.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use parquet2::{
encoding::{bitpacking, delta_length_byte_array, hybrid_rle, uleb128, Encoding},
metadata::{ColumnChunkMetaData, ColumnDescriptor},
page::{BinaryPageDict, DataPage, DataPageHeader},
page::{BinaryPageDict, DataPage, DataPageHeader, DataPageHeaderExt},
read::{levels, StreamingIterator},
};

Expand Down Expand Up @@ -214,7 +214,7 @@ fn extend_from_page<O: Offset>(
let is_optional = descriptor.max_def_level() == 1;
match page.header() {
DataPageHeader::V1(header) => {
assert_eq!(header.definition_level_encoding, Encoding::Rle);
assert_eq!(header.definition_level_encoding(), Encoding::Rle);

let (_, validity_buffer, values_buffer) =
levels::split_buffer_v1(page.buffer(), false, is_optional);
Expand Down
10 changes: 5 additions & 5 deletions src/io/parquet/read/binary/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::sync::Arc;
use parquet2::{
encoding::Encoding,
metadata::{ColumnChunkMetaData, ColumnDescriptor},
page::{DataPage, DataPageHeader},
page::{DataPage, DataPageHeader, DataPageHeaderExt},
read::{
levels::{get_bit_width, split_buffer_v1, split_buffer_v2, RLEDecoder},
StreamingIterator,
Expand Down Expand Up @@ -121,8 +121,8 @@ fn extend_from_page<O: Offset>(

match page.header() {
DataPageHeader::V1(header) => {
assert_eq!(header.definition_level_encoding, Encoding::Rle);
assert_eq!(header.repetition_level_encoding, Encoding::Rle);
assert_eq!(header.definition_level_encoding(), Encoding::Rle);
assert_eq!(header.repetition_level_encoding(), Encoding::Rle);

match (&page.encoding(), page.dictionary_page()) {
(Encoding::Plain, None) => {
Expand All @@ -137,11 +137,11 @@ fn extend_from_page<O: Offset>(
values_buffer,
additional,
(
&header.repetition_level_encoding,
&header.repetition_level_encoding(),
descriptor.max_rep_level(),
),
(
&header.definition_level_encoding,
&header.definition_level_encoding(),
descriptor.max_def_level(),
),
is_nullable,
Expand Down
4 changes: 2 additions & 2 deletions src/io/parquet/read/boolean/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use super::super::utils;
use parquet2::{
encoding::{hybrid_rle, Encoding},
metadata::{ColumnChunkMetaData, ColumnDescriptor},
page::{DataPage, DataPageHeader},
page::{DataPage, DataPageHeader, DataPageHeaderExt},
read::{levels, StreamingIterator},
};

Expand Down Expand Up @@ -97,7 +97,7 @@ fn extend_from_page(
let is_optional = descriptor.max_def_level() == 1;
match page.header() {
DataPageHeader::V1(header) => {
assert_eq!(header.definition_level_encoding, Encoding::Rle);
assert_eq!(header.definition_level_encoding(), Encoding::Rle);

match (&page.encoding(), page.dictionary_page(), is_optional) {
(Encoding::Plain, None, true) => {
Expand Down
10 changes: 5 additions & 5 deletions src/io/parquet/read/boolean/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::sync::Arc;
use parquet2::{
encoding::Encoding,
metadata::{ColumnChunkMetaData, ColumnDescriptor},
page::{DataPage, DataPageHeader},
page::{DataPage, DataPageHeader, DataPageHeaderExt},
read::{
levels::{get_bit_width, split_buffer_v1, split_buffer_v2, RLEDecoder},
StreamingIterator,
Expand Down Expand Up @@ -107,8 +107,8 @@ fn extend_from_page(

match page.header() {
DataPageHeader::V1(header) => {
assert_eq!(header.definition_level_encoding, Encoding::Rle);
assert_eq!(header.repetition_level_encoding, Encoding::Rle);
assert_eq!(header.definition_level_encoding(), Encoding::Rle);
assert_eq!(header.repetition_level_encoding(), Encoding::Rle);

match (&page.encoding(), page.dictionary_page()) {
(Encoding::Plain, None) => {
Expand All @@ -123,11 +123,11 @@ fn extend_from_page(
values_buffer,
additional,
(
&header.repetition_level_encoding,
&header.repetition_level_encoding(),
descriptor.max_rep_level(),
),
(
&header.definition_level_encoding,
&header.definition_level_encoding(),
descriptor.max_def_level(),
),
is_nullable,
Expand Down
4 changes: 2 additions & 2 deletions src/io/parquet/read/fixed_size_binary.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use parquet2::{
encoding::{bitpacking, hybrid_rle, uleb128, Encoding},
page::{DataPage, DataPageHeader, FixedLenByteArrayPageDict},
page::{DataPage, DataPageHeader, DataPageHeaderExt, FixedLenByteArrayPageDict},
read::{levels, StreamingIterator},
};

Expand Down Expand Up @@ -171,7 +171,7 @@ pub(crate) fn extend_from_page(
let is_optional = descriptor.max_def_level() == 1;
match page.header() {
DataPageHeader::V1(header) => {
assert_eq!(header.definition_level_encoding, Encoding::Rle);
assert_eq!(header.definition_level_encoding(), Encoding::Rle);

let (_, validity_buffer, values_buffer) =
levels::split_buffer_v1(page.buffer(), false, is_optional);
Expand Down
6 changes: 3 additions & 3 deletions src/io/parquet/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ pub use parquet2::{
decompress, get_page_iterator as _get_page_iterator, read_metadata as _read_metadata,
streaming_iterator, Decompressor, PageIterator, StreamingIterator,
},
schema::{
types::{LogicalType, ParquetType, PhysicalType, PrimitiveConvertedType},
schema::types::{
LogicalType, ParquetType, PhysicalType, PrimitiveConvertedType,
TimeUnit as ParquetTimeUnit, TimestampType,
},
types::int96_to_i64_ns,
Expand All @@ -43,7 +43,7 @@ pub fn get_page_iterator<'b, RR: Read + Seek>(
buffer: Vec<u8>,
) -> Result<PageIterator<'b, RR>> {
Ok(_get_page_iterator(
metadata, row_group, column, reader, buffer,
metadata, row_group, column, reader, None, buffer,
)?)
}

Expand Down
4 changes: 2 additions & 2 deletions src/io/parquet/read/primitive/basic.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use parquet2::{
encoding::{bitpacking, hybrid_rle, uleb128, Encoding},
page::{DataPage, DataPageHeader, PrimitivePageDict},
page::{DataPage, DataPageHeader, DataPageHeaderExt, PrimitivePageDict},
read::levels,
types::NativeType,
};
Expand Down Expand Up @@ -160,7 +160,7 @@ where
let is_optional = descriptor.max_def_level() == 1;
match page.header() {
DataPageHeader::V1(header) => {
assert_eq!(header.definition_level_encoding, Encoding::Rle);
assert_eq!(header.definition_level_encoding(), Encoding::Rle);

let (_, validity_buffer, values_buffer) =
levels::split_buffer_v1(page.buffer(), false, is_optional);
Expand Down
7 changes: 3 additions & 4 deletions src/io/parquet/read/primitive/dictionary.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
use std::sync::Arc;

use parquet2::{
encoding::{bitpacking, hybrid_rle, uleb128},
page::{DataPage, DataPageHeader, PrimitivePageDict},
encoding::{bitpacking, hybrid_rle, uleb128, Encoding},
page::{DataPage, DataPageHeader, DataPageHeaderExt, PrimitivePageDict},
read::{levels, StreamingIterator},
schema::Encoding,
types::NativeType,
};

Expand Down Expand Up @@ -102,7 +101,7 @@ where
let is_optional = descriptor.max_def_level() == 1;
match page.header() {
DataPageHeader::V1(header) => {
assert_eq!(header.definition_level_encoding, Encoding::Rle);
assert_eq!(header.definition_level_encoding(), Encoding::Rle);

let (_, validity_buffer, values_buffer) =
levels::split_buffer_v1(page.buffer(), false, is_optional);
Expand Down
10 changes: 5 additions & 5 deletions src/io/parquet/read/primitive/nested.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use parquet2::{
encoding::Encoding,
page::{DataPage, DataPageHeader},
page::{DataPage, DataPageHeader, DataPageHeaderExt},
read::levels::{get_bit_width, split_buffer_v1, split_buffer_v2, RLEDecoder},
types::NativeType,
};
Expand Down Expand Up @@ -127,8 +127,8 @@ where

match page.header() {
DataPageHeader::V1(header) => {
assert_eq!(header.definition_level_encoding, Encoding::Rle);
assert_eq!(header.repetition_level_encoding, Encoding::Rle);
assert_eq!(header.definition_level_encoding(), Encoding::Rle);
assert_eq!(header.repetition_level_encoding(), Encoding::Rle);

match (&page.encoding(), page.dictionary_page()) {
(Encoding::Plain, None) => {
Expand All @@ -143,11 +143,11 @@ where
values_buffer,
additional,
(
&header.repetition_level_encoding,
&header.repetition_level_encoding(),
descriptor.max_rep_level(),
),
(
&header.definition_level_encoding,
&header.definition_level_encoding(),
descriptor.max_def_level(),
),
is_nullable,
Expand Down
4 changes: 2 additions & 2 deletions src/io/parquet/read/schema/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ use parquet2::{
schema::{
types::{
BasicTypeInfo, GroupConvertedType, LogicalType, ParquetType, PhysicalType,
PrimitiveConvertedType, TimeUnit as ParquetTimeUnit,
PrimitiveConvertedType, TimeUnit as ParquetTimeUnit, TimestampType,
},
Repetition, TimestampType,
Repetition,
},
};

Expand Down
2 changes: 1 addition & 1 deletion src/io/parquet/read/utils.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use parquet2::{encoding::get_length, schema::Encoding};
use parquet2::encoding::{get_length, Encoding};

use crate::error::ArrowError;

Expand Down
5 changes: 3 additions & 2 deletions src/io/parquet/write/binary/nested.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use parquet2::schema::Encoding;
use parquet2::{metadata::ColumnDescriptor, page::CompressedDataPage, write::WriteOptions};
use parquet2::{
encoding::Encoding, metadata::ColumnDescriptor, page::CompressedDataPage, write::WriteOptions,
};

use super::super::{levels, utils};
use super::basic::{build_statistics, encode_plain};
Expand Down
3 changes: 1 addition & 2 deletions src/io/parquet/write/boolean/basic.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use parquet2::{
encoding::hybrid_rle::bitpacked_encode,
encoding::{hybrid_rle::bitpacked_encode, Encoding},
metadata::ColumnDescriptor,
page::CompressedDataPage,
schema::Encoding,
statistics::{serialize_statistics, BooleanStatistics, ParquetStatistics, Statistics},
write::WriteOptions,
};
Expand Down
5 changes: 3 additions & 2 deletions src/io/parquet/write/boolean/nested.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use parquet2::schema::Encoding;
use parquet2::{metadata::ColumnDescriptor, page::CompressedDataPage, write::WriteOptions};
use parquet2::{
encoding::Encoding, metadata::ColumnDescriptor, page::CompressedDataPage, write::WriteOptions,
};

use super::super::{levels, utils};
use super::basic::{build_statistics, encode_plain};
Expand Down
11 changes: 6 additions & 5 deletions src/io/parquet/write/dictionary.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use parquet2::encoding::hybrid_rle::encode_u32;
use parquet2::page::{CompressedDictPage, CompressedPage};
use parquet2::schema::Encoding;
use parquet2::write::DynIter;
use parquet2::{metadata::ColumnDescriptor, write::WriteOptions};
use parquet2::{
encoding::{hybrid_rle::encode_u32, Encoding},
metadata::ColumnDescriptor,
page::{CompressedDictPage, CompressedPage},
write::{DynIter, WriteOptions},
};

use super::binary::encode_plain as binary_encode_plain;
use super::primitive::encode_plain as primitive_encode_plain;
Expand Down
4 changes: 2 additions & 2 deletions src/io/parquet/write/fixed_len_bytes.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use parquet2::{
compression::create_codec, metadata::ColumnDescriptor, page::CompressedDataPage,
schema::Encoding, write::WriteOptions,
compression::create_codec, encoding::Encoding, metadata::ColumnDescriptor,
page::CompressedDataPage, write::WriteOptions,
};

use super::utils;
Expand Down
Loading