Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Added basics of ORC
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Jul 26, 2022
1 parent f775c2a commit 52eacd9
Show file tree
Hide file tree
Showing 5 changed files with 266 additions and 0 deletions.
12 changes: 12 additions & 0 deletions src/io/orc/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
//! APIs to read from [ORC format](https://orc.apache.org).
pub mod read;

pub use orc_format as format;

use crate::error::Error;

impl From<format::Error> for Error {
fn from(error: format::Error) -> Self {
Error::ExternalFormat(format!("{:?}", error))
}
}
178 changes: 178 additions & 0 deletions src/io/orc/read/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
//! APIs to read from [ORC format](https://orc.apache.org).
use std::io::{Read, Seek, SeekFrom};

use crate::array::{BooleanArray, Float32Array};
use crate::bitmap::{Bitmap, MutableBitmap};
use crate::datatypes::{DataType, Field, Schema};
use crate::error::Error;

use orc_format::fallible_streaming_iterator::FallibleStreamingIterator;
use orc_format::proto::stream::Kind;
use orc_format::proto::{CompressionKind, Footer, StripeInformation, Type};
use orc_format::read::decode;
use orc_format::read::Stripe;

/// Infers a [`Schema`] from the files' [`Footer`].
/// # Errors
/// This function errors if the type is not yet supported.
pub fn infer_schema(footer: &Footer) -> Result<Schema, Error> {
let types = &footer.types;

let dt = infer_dt(&footer.types[0], types)?;
if let DataType::Struct(fields) = dt {
Ok(fields.into())
} else {
Err(Error::ExternalFormat(
"ORC root type must be a struct".to_string(),
))
}
}

fn infer_dt(type_: &Type, types: &[Type]) -> Result<DataType, Error> {
use orc_format::proto::r#type::Kind::*;
let dt = match type_.kind() {
Boolean => DataType::Boolean,
Byte => DataType::Int8,
Short => DataType::Int16,
Int => DataType::Int32,
Long => DataType::Int64,
Float => DataType::Float32,
Double => DataType::Float64,
String => DataType::Utf8,
Binary => DataType::Binary,
Struct => {
let sub_types = type_
.subtypes
.iter()
.cloned()
.zip(type_.field_names.iter())
.map(|(i, name)| {
infer_dt(
types.get(i as usize).ok_or_else(|| {
Error::ExternalFormat(format!("ORC field {i} not found"))
})?,
types,
)
.map(|dt| Field::new(name, dt, true))
})
.collect::<Result<Vec<_>, Error>>()?;
DataType::Struct(sub_types)
}
kind => return Err(Error::nyi(format!("Reading {kind:?} from ORC"))),
};
Ok(dt)
}

/// Reads the stripe [`StripeInformation`] into memory.
pub fn read_stripe<R: Read + Seek>(
reader: &mut R,
stripe_info: StripeInformation,
compression: CompressionKind,
) -> Result<Stripe, Error> {
let offset = stripe_info.offset();
reader.seek(SeekFrom::Start(offset)).unwrap();

let len = stripe_info.index_length() + stripe_info.data_length() + stripe_info.footer_length();
let mut stripe = vec![0; len as usize];
reader.read_exact(&mut stripe).unwrap();

Ok(Stripe::try_new(stripe, stripe_info, compression)?)
}

fn deserialize_validity(
stripe: &Stripe,
column: usize,
scratch: &mut Vec<u8>,
) -> Result<Option<Bitmap>, Error> {
let mut chunks = stripe.get_bytes(column, Kind::Present, std::mem::take(scratch))?;

let mut validity = MutableBitmap::with_capacity(stripe.number_of_rows());
let mut remaining = stripe.number_of_rows();
while let Some(chunk) = chunks.next()? {
// todo: this can be faster by iterating in bytes instead of single bits via `BooleanRun`
let iter = decode::BooleanIter::new(chunk, remaining);
for item in iter {
remaining -= 1;
validity.push(item?)
}
}
*scratch = std::mem::take(&mut chunks.into_inner());

Ok(validity.into())
}

/// Deserializes column `column` from `stripe`, assumed to represent a f32
pub fn deserialize_f32(
data_type: DataType,
stripe: &Stripe,
column: usize,
) -> Result<Float32Array, Error> {
let mut scratch = vec![];
let num_rows = stripe.number_of_rows();

let validity = deserialize_validity(stripe, column, &mut scratch)?;

let mut chunks = stripe.get_bytes(column, Kind::Data, scratch)?;

let mut values = Vec::with_capacity(num_rows);
if let Some(validity) = &validity {
let mut validity_iter = validity.iter();
while let Some(chunk) = chunks.next()? {
let mut valid_iter = decode::deserialize_f32(chunk);
let iter = validity_iter.by_ref().map(|is_valid| {
if is_valid {
valid_iter.next().unwrap()
} else {
0.0f32
}
});
values.extend(iter);
}
} else {
while let Some(chunk) = chunks.next()? {
values.extend(decode::deserialize_f32(chunk));
}
}

Float32Array::try_new(data_type, values.into(), validity)
}

/// Deserializes column `column` from `stripe`, assumed to represent a boolean array
pub fn deserialize_bool(
data_type: DataType,
stripe: &Stripe,
column: usize,
) -> Result<BooleanArray, Error> {
let num_rows = stripe.number_of_rows();
let mut scratch = vec![];

let validity = deserialize_validity(stripe, column, &mut scratch)?;

let mut chunks = stripe.get_bytes(column, Kind::Data, std::mem::take(&mut scratch))?;

let mut values = MutableBitmap::with_capacity(num_rows);
if let Some(validity) = &validity {
let mut validity_iter = validity.iter();

while let Some(chunk) = chunks.next()? {
let mut valid_iter = decode::BooleanIter::new(chunk, chunk.len() * 8);
validity_iter.by_ref().try_for_each(|is_valid| {
values.push(if is_valid {
valid_iter.next().unwrap()?
} else {
false
});
Result::<(), Error>::Ok(())
})?;
}
} else {
while let Some(chunk) = chunks.next()? {
let valid_iter = decode::BooleanIter::new(chunk, chunk.len() * 8);
for v in valid_iter {
values.push(v?)
}
}
}

BooleanArray::try_new(data_type, values.into(), validity)
}
1 change: 1 addition & 0 deletions tests/it/io/orc/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
mod read;
28 changes: 28 additions & 0 deletions tests/it/io/orc/read.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
use arrow2::array::*;
use arrow2::datatypes::DataType;
use arrow2::error::Error;
use arrow2::io::orc::{format, read};

#[test]
fn infer() -> Result<(), Error> {
let mut reader = std::fs::File::open("fixtures/pyorc/test.orc").unwrap();
let (ps, footer, _) = format::read::read_metadata(&mut reader)?;
let schema = read::infer_schema(&footer)?;

assert_eq!(schema.fields.len(), 12);

let stripe = read::read_stripe(&mut reader, footer.stripes[0].clone(), ps.compression())?;

let array = read::deserialize_f32(DataType::Float32, &stripe, 1)?;
assert_eq!(
array,
Float32Array::from([Some(1.0), Some(2.0), None, Some(4.0), Some(5.0)])
);

let array = read::deserialize_bool(DataType::Boolean, &stripe, 2)?;
assert_eq!(
array,
BooleanArray::from([Some(true), Some(false), None, Some(true), Some(false)])
);
Ok(())
}
47 changes: 47 additions & 0 deletions tests/it/io/orc/write.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import os

import pyorc


data = {
"a": [1.0, 2.0, None, 4.0, 5.0],
"b": [True, False, None, True, False],
"str_direct": ["a", "cccccc", None, "ddd", "ee"],
"d": ["a", "bb", None, "ccc", "ddd"],
"e": ["ddd", "cc", None, "bb", "a"],
"f": ["aaaaa", "bbbbb", None, "ccccc", "ddddd"],
"int_short_repeated": [5, 5, None, 5, 5],
"int_neg_short_repeated": [-5, -5, None, -5, -5],
"int_delta": [1, 2, None, 4, 5],
"int_neg_delta": [5, 4, None, 2, 1],
"int_direct": [1, 6, None, 3, 2],
"int_neg_direct": [-1, -6, None, -3, -2],
}


def _write(
schema: str,
data,
file_name: str,
compression=pyorc.CompressionKind.NONE,
dict_key_size_threshold=0.0,
):
output = open(file_name, "wb")
writer = pyorc.Writer(
output,
schema,
dict_key_size_threshold=dict_key_size_threshold,
compression=compression,
)
num_rows = len(list(data.values())[0])
for x in range(num_rows):
row = tuple(values[x] for values in data.values())
writer.write(row)
writer.close()

os.makedirs("fixtures/pyorc", exist_ok=True)
_write(
"struct<a:float,b:boolean,str_direct:string,d:string,e:string,f:string,int_short_repeated:int,int_neg_short_repeated:int,int_delta:int,int_neg_delta:int,int_direct:int,int_neg_direct:int>",
data,
"fixtures/pyorc/test.orc",
)

0 comments on commit 52eacd9

Please sign in to comment.