Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Fixed
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Jul 31, 2022
1 parent c02b7b7 commit 8207a80
Show file tree
Hide file tree
Showing 3 changed files with 139 additions and 19 deletions.
121 changes: 108 additions & 13 deletions src/io/orc/read/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
//! APIs to read from [ORC format](https://orc.apache.org).
use crate::array::{Array, BooleanArray, Int64Array, PrimitiveArray};
use std::io::Read;

use crate::array::{
Array, BinaryArray, BooleanArray, Int64Array, Offset, PrimitiveArray, Utf8Array,
};
use crate::bitmap::{Bitmap, MutableBitmap};
use crate::datatypes::{DataType, Field, Schema};
use crate::error::Error;
Expand Down Expand Up @@ -62,16 +66,16 @@ fn infer_dt(type_: &Type, types: &[Type]) -> Result<DataType, Error> {
}

fn deserialize_validity(column: &Column, scratch: &mut Vec<u8>) -> Result<Option<Bitmap>, Error> {
let mut stream = column.get_stream(Kind::Present, std::mem::take(scratch))?;
let stream = column.get_stream(Kind::Present, std::mem::take(scratch))?;

let stream = decode::BooleanIter::new(&mut stream, column.number_of_rows());
let mut stream = decode::BooleanIter::new(stream, column.number_of_rows());

let mut validity = MutableBitmap::with_capacity(column.number_of_rows());
for item in stream {
for item in stream.by_ref() {
validity.push(item?)
}

//*scratch = std::mem::take(&mut stream.into_inner());
*scratch = std::mem::take(&mut stream.into_inner().into_inner());

Ok(validity.into())
}
Expand All @@ -90,11 +94,10 @@ fn deserialize_float<T: NativeType + decode::Float>(

let mut values = Vec::with_capacity(num_rows);
if let Some(validity) = &validity {
let validity_iter = validity.iter();
let mut items =
decode::FloatIter::<T, _>::new(&mut chunks, validity.len() - validity.unset_bits());

for is_valid in validity_iter {
for is_valid in validity {
if is_valid {
values.push(items.next().transpose()?.unwrap_or_default())
} else {
Expand Down Expand Up @@ -122,12 +125,10 @@ fn deserialize_bool(data_type: DataType, column: &Column) -> Result<BooleanArray

let mut values = MutableBitmap::with_capacity(num_rows);
if let Some(validity) = &validity {
let validity_iter = validity.iter();

let mut items =
decode::BooleanIter::new(&mut chunks, validity.len() - validity.unset_bits());

for is_valid in validity_iter {
for is_valid in validity {
values.push(if is_valid {
items.next().transpose()?.unwrap_or_default()
} else {
Expand Down Expand Up @@ -155,11 +156,10 @@ fn deserialize_i64(data_type: DataType, column: &Column) -> Result<Int64Array, E

let mut values = Vec::with_capacity(num_rows);
if let Some(validity) = &validity {
let validity_iter = validity.iter();
let mut iter =
decode::SignedRleV2Iter::new(chunks, validity.len() - validity.unset_bits(), vec![]);

for is_valid in validity_iter {
for is_valid in validity {
if is_valid {
let item = iter.next().transpose()?.unwrap_or_default();
values.push(item);
Expand Down Expand Up @@ -249,6 +249,97 @@ where
PrimitiveArray::try_new(data_type, values.into(), validity)
}

#[inline]
fn try_extend<O: Offset + TryFrom<u64>, I: Iterator<Item = u64>>(
offsets: &mut Vec<O>,
length: &mut O,
iter: I,
) -> Result<(), orc_format::error::Error> {
for item in iter {
println!("{item}");
let item: O = item
.try_into()
.map_err(|_| orc_format::error::Error::OutOfSpec)?;
*length += item;
offsets.push(*length)
}
Ok(())
}

fn deserialize_binary_generic<O: Offset + TryFrom<u64>>(
column: &Column,
) -> Result<(Vec<O>, Vec<u8>, Option<Bitmap>), Error> {
let num_rows = column.number_of_rows();
let mut scratch = vec![];

let validity = deserialize_validity(column, &mut scratch)?;

let lengths = column.get_stream(Kind::Length, scratch)?;

let mut offsets = Vec::with_capacity(num_rows + 1);
let mut length = O::default();
offsets.push(length);
if let Some(validity) = &validity {
let mut iter =
decode::UnsignedRleV2Iter::new(lengths, validity.len() - validity.unset_bits(), vec![]);
for is_valid in validity {
if is_valid {
let item = iter
.next()
.transpose()?
.ok_or(orc_format::error::Error::OutOfSpec)?;
let item: O = item
.try_into()
.map_err(|_| Error::ExternalFormat("value uncastable".to_string()))?;
length += item;
}
offsets.push(length);
}
let (lengths, _) = iter.into_inner();
scratch = std::mem::take(&mut lengths.into_inner());
} else {
let mut iter = decode::UnsignedRleV2RunIter::new(lengths, num_rows, vec![]);
iter.try_for_each(|run| {
run.and_then(|run| match run {
decode::UnsignedRleV2Run::Direct(values_iter) => {
try_extend(&mut offsets, &mut length, values_iter)
}
decode::UnsignedRleV2Run::Delta(values_iter) => {
try_extend(&mut offsets, &mut length, values_iter)
}
decode::UnsignedRleV2Run::ShortRepeat(values_iter) => {
try_extend(&mut offsets, &mut length, values_iter)
}
})
})?;
let (lengths, _) = iter.into_inner();
scratch = std::mem::take(&mut lengths.into_inner());
}
let length = length.to_usize();
let mut values = vec![0; length];

let mut data = column.get_stream(Kind::Data, scratch)?;
data.read_exact(&mut values)?;

Ok((offsets, values, validity))
}

fn deserialize_utf8<O: Offset + TryFrom<u64>>(
data_type: DataType,
column: &Column,
) -> Result<Utf8Array<O>, Error> {
let (offsets, values, validity) = deserialize_binary_generic::<O>(column)?;
Utf8Array::try_new(data_type, offsets.into(), values.into(), validity)
}

fn deserialize_binary<O: Offset + TryFrom<u64>>(
data_type: DataType,
column: &Column,
) -> Result<BinaryArray<O>, Error> {
let (offsets, values, validity) = deserialize_binary_generic::<O>(column)?;
BinaryArray::try_new(data_type, offsets.into(), values.into(), validity)
}

/// Deserializes column `column` from `stripe`, assumed
/// to represent an array of `data_type`.
pub fn deserialize(data_type: DataType, column: &Column) -> Result<Box<dyn Array>, Error> {
Expand All @@ -260,6 +351,10 @@ pub fn deserialize(data_type: DataType, column: &Column) -> Result<Box<dyn Array
DataType::Int64 => deserialize_i64(data_type, column).map(|x| x.boxed()),
DataType::Float32 => deserialize_float::<f32>(data_type, column).map(|x| x.boxed()),
DataType::Float64 => deserialize_float::<f64>(data_type, column).map(|x| x.boxed()),
dt => return Err(Error::nyi(format!("Reading {dt:?} from ORC"))),
DataType::Utf8 => deserialize_utf8::<i32>(data_type, column).map(|x| x.boxed()),
DataType::LargeUtf8 => deserialize_utf8::<i64>(data_type, column).map(|x| x.boxed()),
DataType::Binary => deserialize_binary::<i32>(data_type, column).map(|x| x.boxed()),
DataType::LargeBinary => deserialize_binary::<i64>(data_type, column).map(|x| x.boxed()),
dt => return Err(Error::nyi(format!("Deserializing {dt:?} from ORC"))),
}
}
35 changes: 29 additions & 6 deletions tests/it/io/orc/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ fn infer() -> Result<(), Error> {
let metadata = format::read::read_metadata(&mut reader)?;
let schema = read::infer_schema(&metadata.footer)?;

assert_eq!(schema.fields.len(), 10);
assert_eq!(schema.fields.len(), 12);
Ok(())
}

Expand All @@ -19,8 +19,7 @@ fn float32() -> Result<(), Error> {
let metadata = format::read::read_metadata(&mut reader)?;
let footer = format::read::read_stripe_footer(&mut reader, &metadata, 0, &mut vec![])?;

let column =
format::read::read_stripe_column(&mut reader, &metadata, 0, footer.clone(), 1, vec![])?;
let column = format::read::read_stripe_column(&mut reader, &metadata, 0, footer, 1, vec![])?;

assert_eq!(
read::deserialize(DataType::Float32, &column)?,
Expand Down Expand Up @@ -53,7 +52,7 @@ fn float64() -> Result<(), Error> {

let (footer, scratch) = column.into_inner();

let column = format::read::read_stripe_column(&mut reader, &metadata, 0, footer, 8, vec![])?;
let column = format::read::read_stripe_column(&mut reader, &metadata, 0, footer, 8, scratch)?;

assert_eq!(
read::deserialize(DataType::Float64, &column)?,
Expand All @@ -77,7 +76,7 @@ fn boolean() -> Result<(), Error> {

let (footer, scratch) = column.into_inner();

let column = format::read::read_stripe_column(&mut reader, &metadata, 0, footer, 4, vec![])?;
let column = format::read::read_stripe_column(&mut reader, &metadata, 0, footer, 4, scratch)?;

assert_eq!(
read::deserialize(DataType::Boolean, &column)?,
Expand Down Expand Up @@ -125,11 +124,35 @@ fn bigint() -> Result<(), Error> {

let (footer, scratch) = column.into_inner();

let column = format::read::read_stripe_column(&mut reader, &metadata, 0, footer, 9, vec![])?;
let column = format::read::read_stripe_column(&mut reader, &metadata, 0, footer, 9, scratch)?;

assert_eq!(
read::deserialize(DataType::Int64, &column)?,
Int64Array::from([Some(5), Some(-5), None, Some(5), Some(5)]).boxed()
);
Ok(())
}

#[test]
fn utf8() -> Result<(), Error> {
let mut reader = std::fs::File::open("fixtures/pyorc/test.orc").unwrap();
let metadata = format::read::read_metadata(&mut reader)?;
let footer = format::read::read_stripe_footer(&mut reader, &metadata, 0, &mut vec![])?;

let column = format::read::read_stripe_column(&mut reader, &metadata, 0, footer, 11, vec![])?;

assert_eq!(
read::deserialize(DataType::Utf8, &column)?,
Utf8Array::<i32>::from_slice(["a", "bb", "ccc", "dddd", "eeeee"]).boxed()
);

let (footer, _scratch) = column.into_inner();

let column = format::read::read_stripe_column(&mut reader, &metadata, 0, footer, 12, vec![])?;

assert_eq!(
read::deserialize(DataType::Utf8, &column)?,
Utf8Array::<i32>::from([Some("a"), Some("bb"), None, Some("dddd"), Some("eeeee")]).boxed()
);
Ok(())
}
2 changes: 2 additions & 0 deletions tests/it/io/orc/write.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
"double_required": [1.0, 2.0, 3.0, 4.0, 5.0],
"bigint_nulable": [5, -5, None, 5, 5],
"bigint_required": [5, -5, 1, 5, 5],
"utf8_nulable": ["a", "bb", "ccc", "dddd", "eeeee"],
"utf8_required": ["a", "bb", None, "dddd", "eeeee"],
}

def infer_schema(data):
Expand Down

0 comments on commit 8207a80

Please sign in to comment.