Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Added support to write and read Intervals from and to parquet #1122

Merged
merged 1 commit into from
Jun 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ jobs:
- name: Run
# --skip io: miri can't handle opening of files, so we skip those
run: cargo miri test --features full -- --skip io::parquet --skip io::ipc --skip io::flight
run: cargo miri test --tests --features compute,chrono-tz

miri-checks-io:
name: MIRI on IO IPC
Expand Down
40 changes: 21 additions & 19 deletions examples/parquet_write_parallel/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,6 @@ fn parallel_write(path: &str, schema: Schema, chunks: &[Chunk]) -> Result<()> {

let encoding_map = |data_type: &DataType| {
match data_type.to_physical_type() {
// let's be fancy and use delta-encoding for binary fields
PhysicalType::Binary
| PhysicalType::LargeBinary
| PhysicalType::Utf8
| PhysicalType::LargeUtf8 => Encoding::DeltaLengthByteArray,
// remaining is plain
_ => Encoding::Plain,
}
Expand All @@ -70,7 +65,7 @@ fn parallel_write(path: &str, schema: Schema, chunks: &[Chunk]) -> Result<()> {
// derive the parquet schema (physical types) from arrow's schema.
let parquet_schema = to_parquet_schema(&schema)?;

let row_groups = chunks.iter().map(|batch| {
let row_groups = chunks.iter().map(|chunk| {
// write batch to pages; parallelized by rayon
let columns = chunk
.columns()
Expand Down Expand Up @@ -106,7 +101,7 @@ fn parallel_write(path: &str, schema: Schema, chunks: &[Chunk]) -> Result<()> {
});

// Create a new empty file
let file = std::fs::File::create(path)?;
let file = std::io::BufWriter::new(std::fs::File::create(path)?);

let mut writer = FileWriter::try_new(file, schema, options)?;

Expand All @@ -123,7 +118,7 @@ fn create_batch(size: usize) -> Result<Chunk> {
let c1: Int32Array = (0..size)
.map(|x| if x % 9 == 0 { None } else { Some(x as i32) })
.collect();
let c2: Utf8Array<i32> = (0..size)
let c2: Utf8Array<i64> = (0..size)
.map(|x| {
if x % 8 == 0 {
None
Expand All @@ -133,18 +128,25 @@ fn create_batch(size: usize) -> Result<Chunk> {
})
.collect();

Chunk::try_new(vec![c1.boxed(), c2.boxed()])
Chunk::try_new(vec![
c1.clone().boxed(),
c1.clone().boxed(),
c1.boxed(),
c2.boxed(),
])
}

fn main() -> Result<()> {
let schema = Schema {
fields: vec![
Field::new("c1", DataType::Int32, true),
Field::new("c2", DataType::Utf8, true),
],
metadata: Default::default(),
};
let batch = create_batch(5_000_000)?;

parallel_write("example.parquet", schema, &[batch.clone(), batch])
let fields = vec![
Field::new("c1", DataType::Int32, true),
Field::new("c2", DataType::Int32, true),
Field::new("c3", DataType::Int32, true),
Field::new("c4", DataType::LargeUtf8, true),
];
let batch = create_batch(100_000_000)?;

let start = std::time::SystemTime::now();
parallel_write("example.parquet", fields.into(), &[batch])?;
println!("took: {} ms", start.elapsed().unwrap().as_millis());
Ok(())
}
71 changes: 55 additions & 16 deletions src/io/parquet/read/deserialize/simple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::{
array::{Array, BinaryArray, DictionaryKey, MutablePrimitiveArray, PrimitiveArray, Utf8Array},
datatypes::{DataType, IntervalUnit, TimeUnit},
error::{Error, Result},
types::NativeType,
types::{days_ms, NativeType},
};

use super::super::{ArrayIter, DataPages};
Expand Down Expand Up @@ -115,9 +115,12 @@ pub fn page_iter_to_arrays<'a, I: 'a + DataPages>(
chunk_size,
|x: i32| x as i16,
))),
Int32 | Date32 | Time32(_) | Interval(IntervalUnit::YearMonth) => dyn_iter(iden(
primitive::Iter::new(pages, data_type, chunk_size, |x: i32| x as i32),
)),
Int32 | Date32 | Time32(_) => dyn_iter(iden(primitive::Iter::new(
pages,
data_type,
chunk_size,
|x: i32| x as i32,
))),

Timestamp(time_unit, _) => {
let time_unit = *time_unit;
Expand All @@ -133,6 +136,50 @@ pub fn page_iter_to_arrays<'a, I: 'a + DataPages>(

FixedSizeBinary(_) => dyn_iter(fixed_size_binary::Iter::new(pages, data_type, chunk_size)),

Interval(IntervalUnit::YearMonth) => {
let n = 12;
let pages =
fixed_size_binary::Iter::new(pages, DataType::FixedSizeBinary(n), chunk_size);

let pages = pages.map(move |maybe_array| {
let array = maybe_array?;
let values = array
.values()
.chunks_exact(n)
.map(|value: &[u8]| i32::from_le_bytes(value[..4].try_into().unwrap()))
.collect::<Vec<_>>();
let validity = array.validity().cloned();

PrimitiveArray::<i32>::try_new(data_type.clone(), values.into(), validity)
});

let arrays = pages.map(|x| x.map(|x| x.boxed()));

Box::new(arrays) as _
}

Interval(IntervalUnit::DayTime) => {
let n = 12;
let pages =
fixed_size_binary::Iter::new(pages, DataType::FixedSizeBinary(n), chunk_size);

let pages = pages.map(move |maybe_array| {
let array = maybe_array?;
let values = array
.values()
.chunks_exact(n)
.map(super::super::convert_days_ms)
.collect::<Vec<_>>();
let validity = array.validity().cloned();

PrimitiveArray::<days_ms>::try_new(data_type.clone(), values.into(), validity)
});

let arrays = pages.map(|x| x.map(|x| x.boxed()));

Box::new(arrays) as _
}

Decimal(_, _) => match physical_type {
PhysicalType::Int32 => dyn_iter(iden(primitive::Iter::new(
pages,
Expand All @@ -146,14 +193,14 @@ pub fn page_iter_to_arrays<'a, I: 'a + DataPages>(
chunk_size,
|x: i64| x as i128,
))),
&PhysicalType::FixedLenByteArray(n) if n > 16 => {
PhysicalType::FixedLenByteArray(n) if *n > 16 => {
return Err(Error::NotYetImplemented(format!(
"Can't decode Decimal128 type from Fixed Size Byte Array of len {:?}",
n
)))
}
&PhysicalType::FixedLenByteArray(n) => {
let n = n as usize;
PhysicalType::FixedLenByteArray(n) => {
let n = *n;

let pages =
fixed_size_binary::Iter::new(pages, DataType::FixedSizeBinary(n), chunk_size);
Expand All @@ -163,15 +210,7 @@ pub fn page_iter_to_arrays<'a, I: 'a + DataPages>(
let values = array
.values()
.chunks_exact(n)
.map(|value: &[u8]| {
// Copy the fixed-size byte value to the start of a 16 byte stack
// allocated buffer, then use an arithmetic right shift to fill in
// MSBs, which accounts for leading 1's in negative (two's complement)
// values.
let mut bytes = [0u8; 16];
bytes[..n].copy_from_slice(value);
i128::from_be_bytes(bytes) >> (8 * (16 - n))
})
.map(|value: &[u8]| super::super::convert_i128(value, n))
.collect::<Vec<_>>();
let validity = array.validity().cloned();

Expand Down
17 changes: 17 additions & 0 deletions src/io/parquet/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,20 @@ pub async fn read_metadata_async<R: AsyncRead + AsyncSeek + Send + Unpin>(
) -> Result<FileMetaData> {
Ok(_read_metadata_async(reader).await?)
}

fn convert_days_ms(value: &[u8]) -> crate::types::days_ms {
crate::types::days_ms(
i32::from_le_bytes(value[4..8].try_into().unwrap()),
i32::from_le_bytes(value[8..12].try_into().unwrap()),
)
}

fn convert_i128(value: &[u8], n: usize) -> i128 {
// Copy the fixed-size byte value to the start of a 16 byte stack
// allocated buffer, then use an arithmetic right shift to fill in
// MSBs, which accounts for leading 1's in negative (two's complement)
// values.
let mut bytes = [0u8; 16];
bytes[..n].copy_from_slice(value);
i128::from_be_bytes(bytes) >> (8 * (16 - n))
}
61 changes: 50 additions & 11 deletions src/io/parquet/read/statistics/fixlen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,9 @@ use parquet2::statistics::{FixedLenStatistics, Statistics as ParquetStatistics};

use crate::array::*;
use crate::error::Result;
use crate::types::days_ms;

fn convert(value: &[u8], n: usize) -> i128 {
// Copy the fixed-size byte value to the start of a 16 byte stack
// allocated buffer, then use an arithmetic right shift to fill in
// MSBs, which accounts for leading 1's in negative (two's complement)
// values.
let mut bytes = [0u8; 16];
bytes[..n].copy_from_slice(value);
i128::from_be_bytes(bytes) >> (8 * (16 - n))
}
use super::super::{convert_days_ms, convert_i128};

pub(super) fn push_i128(
from: Option<&dyn ParquetStatistics>,
Expand All @@ -29,8 +22,8 @@ pub(super) fn push_i128(
.unwrap();
let from = from.map(|s| s.as_any().downcast_ref::<FixedLenStatistics>().unwrap());

min.push(from.and_then(|s| s.min_value.as_deref().map(|x| convert(x, n))));
max.push(from.and_then(|s| s.max_value.as_deref().map(|x| convert(x, n))));
min.push(from.and_then(|s| s.min_value.as_deref().map(|x| convert_i128(x, n))));
max.push(from.and_then(|s| s.max_value.as_deref().map(|x| convert_i128(x, n))));

Ok(())
}
Expand All @@ -53,3 +46,49 @@ pub(super) fn push(
max.push(from.and_then(|s| s.max_value.as_ref()));
Ok(())
}

fn convert_year_month(value: &[u8]) -> i32 {
i32::from_le_bytes(value[..4].try_into().unwrap())
}

pub(super) fn push_year_month(
from: Option<&dyn ParquetStatistics>,
min: &mut dyn MutableArray,
max: &mut dyn MutableArray,
) -> Result<()> {
let min = min
.as_mut_any()
.downcast_mut::<MutablePrimitiveArray<i32>>()
.unwrap();
let max = max
.as_mut_any()
.downcast_mut::<MutablePrimitiveArray<i32>>()
.unwrap();
let from = from.map(|s| s.as_any().downcast_ref::<FixedLenStatistics>().unwrap());

min.push(from.and_then(|s| s.min_value.as_deref().map(convert_year_month)));
max.push(from.and_then(|s| s.max_value.as_deref().map(convert_year_month)));

Ok(())
}

pub(super) fn push_days_ms(
from: Option<&dyn ParquetStatistics>,
min: &mut dyn MutableArray,
max: &mut dyn MutableArray,
) -> Result<()> {
let min = min
.as_mut_any()
.downcast_mut::<MutablePrimitiveArray<days_ms>>()
.unwrap();
let max = max
.as_mut_any()
.downcast_mut::<MutablePrimitiveArray<days_ms>>()
.unwrap();
let from = from.map(|s| s.as_any().downcast_ref::<FixedLenStatistics>().unwrap());

min.push(from.and_then(|s| s.min_value.as_deref().map(convert_days_ms)));
max.push(from.and_then(|s| s.max_value.as_deref().map(convert_days_ms)));

Ok(())
}
6 changes: 3 additions & 3 deletions src/io/parquet/read/statistics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -405,9 +405,9 @@ fn push(
Boolean => boolean::push(from, min, max),
Int8 => primitive::push(from, min, max, |x: i32| Ok(x as i8)),
Int16 => primitive::push(from, min, max, |x: i32| Ok(x as i16)),
Date32 | Time32(_) | Interval(IntervalUnit::YearMonth) => {
primitive::push(from, min, max, |x: i32| Ok(x as i32))
}
Date32 | Time32(_) => primitive::push(from, min, max, |x: i32| Ok(x as i32)),
Interval(IntervalUnit::YearMonth) => fixlen::push_year_month(from, min, max),
Interval(IntervalUnit::DayTime) => fixlen::push_days_ms(from, min, max),
UInt8 => primitive::push(from, min, max, |x: i32| Ok(x as u8)),
UInt16 => primitive::push(from, min, max, |x: i32| Ok(x as u16)),
UInt32 => match physical_type {
Expand Down
4 changes: 2 additions & 2 deletions src/io/parquet/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ pub fn array_to_page_simple(
array.values().iter().for_each(|x| {
let bytes = &x.to_le_bytes();
values.extend_from_slice(bytes);
values.resize(values.len() + 8, 0);
values.extend_from_slice(&[0; 8]);
});
let array = FixedSizeBinaryArray::new(
DataType::FixedSizeBinary(12),
Expand All @@ -283,7 +283,7 @@ pub fn array_to_page_simple(
let mut values = Vec::<u8>::with_capacity(12 * array.len());
array.values().iter().for_each(|x| {
let bytes = &x.to_le_bytes();
values.resize(values.len() + 4, 0); // months
values.extend_from_slice(&[0; 4]); // months
values.extend_from_slice(bytes); // days and seconds
});
let array = FixedSizeBinaryArray::from_data(
Expand Down
Loading