From 52eacd965d4f9b29c77745f5cab8a4651fef2814 Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Tue, 26 Jul 2022 15:43:34 +0000 Subject: [PATCH] Added basics of ORC --- src/io/orc/mod.rs | 12 +++ src/io/orc/read/mod.rs | 178 +++++++++++++++++++++++++++++++++++++++ tests/it/io/orc/mod.rs | 1 + tests/it/io/orc/read.rs | 28 ++++++ tests/it/io/orc/write.py | 47 +++++++++++ 5 files changed, 266 insertions(+) create mode 100644 src/io/orc/mod.rs create mode 100644 src/io/orc/read/mod.rs create mode 100644 tests/it/io/orc/mod.rs create mode 100644 tests/it/io/orc/read.rs create mode 100644 tests/it/io/orc/write.py diff --git a/src/io/orc/mod.rs b/src/io/orc/mod.rs new file mode 100644 index 00000000000..f9d240113dd --- /dev/null +++ b/src/io/orc/mod.rs @@ -0,0 +1,12 @@ +//! APIs to read from [ORC format](https://orc.apache.org). +pub mod read; + +pub use orc_format as format; + +use crate::error::Error; + +impl From for Error { + fn from(error: format::Error) -> Self { + Error::ExternalFormat(format!("{:?}", error)) + } +} diff --git a/src/io/orc/read/mod.rs b/src/io/orc/read/mod.rs new file mode 100644 index 00000000000..b29e3d3edb9 --- /dev/null +++ b/src/io/orc/read/mod.rs @@ -0,0 +1,178 @@ +//! APIs to read from [ORC format](https://orc.apache.org). +use std::io::{Read, Seek, SeekFrom}; + +use crate::array::{BooleanArray, Float32Array}; +use crate::bitmap::{Bitmap, MutableBitmap}; +use crate::datatypes::{DataType, Field, Schema}; +use crate::error::Error; + +use orc_format::fallible_streaming_iterator::FallibleStreamingIterator; +use orc_format::proto::stream::Kind; +use orc_format::proto::{CompressionKind, Footer, StripeInformation, Type}; +use orc_format::read::decode; +use orc_format::read::Stripe; + +/// Infers a [`Schema`] from the files' [`Footer`]. +/// # Errors +/// This function errors if the type is not yet supported. +pub fn infer_schema(footer: &Footer) -> Result { + let types = &footer.types; + + let dt = infer_dt(&footer.types[0], types)?; + if let DataType::Struct(fields) = dt { + Ok(fields.into()) + } else { + Err(Error::ExternalFormat( + "ORC root type must be a struct".to_string(), + )) + } +} + +fn infer_dt(type_: &Type, types: &[Type]) -> Result { + use orc_format::proto::r#type::Kind::*; + let dt = match type_.kind() { + Boolean => DataType::Boolean, + Byte => DataType::Int8, + Short => DataType::Int16, + Int => DataType::Int32, + Long => DataType::Int64, + Float => DataType::Float32, + Double => DataType::Float64, + String => DataType::Utf8, + Binary => DataType::Binary, + Struct => { + let sub_types = type_ + .subtypes + .iter() + .cloned() + .zip(type_.field_names.iter()) + .map(|(i, name)| { + infer_dt( + types.get(i as usize).ok_or_else(|| { + Error::ExternalFormat(format!("ORC field {i} not found")) + })?, + types, + ) + .map(|dt| Field::new(name, dt, true)) + }) + .collect::, Error>>()?; + DataType::Struct(sub_types) + } + kind => return Err(Error::nyi(format!("Reading {kind:?} from ORC"))), + }; + Ok(dt) +} + +/// Reads the stripe [`StripeInformation`] into memory. +pub fn read_stripe( + reader: &mut R, + stripe_info: StripeInformation, + compression: CompressionKind, +) -> Result { + let offset = stripe_info.offset(); + reader.seek(SeekFrom::Start(offset)).unwrap(); + + let len = stripe_info.index_length() + stripe_info.data_length() + stripe_info.footer_length(); + let mut stripe = vec![0; len as usize]; + reader.read_exact(&mut stripe).unwrap(); + + Ok(Stripe::try_new(stripe, stripe_info, compression)?) +} + +fn deserialize_validity( + stripe: &Stripe, + column: usize, + scratch: &mut Vec, +) -> Result, Error> { + let mut chunks = stripe.get_bytes(column, Kind::Present, std::mem::take(scratch))?; + + let mut validity = MutableBitmap::with_capacity(stripe.number_of_rows()); + let mut remaining = stripe.number_of_rows(); + while let Some(chunk) = chunks.next()? { + // todo: this can be faster by iterating in bytes instead of single bits via `BooleanRun` + let iter = decode::BooleanIter::new(chunk, remaining); + for item in iter { + remaining -= 1; + validity.push(item?) + } + } + *scratch = std::mem::take(&mut chunks.into_inner()); + + Ok(validity.into()) +} + +/// Deserializes column `column` from `stripe`, assumed to represent a f32 +pub fn deserialize_f32( + data_type: DataType, + stripe: &Stripe, + column: usize, +) -> Result { + let mut scratch = vec![]; + let num_rows = stripe.number_of_rows(); + + let validity = deserialize_validity(stripe, column, &mut scratch)?; + + let mut chunks = stripe.get_bytes(column, Kind::Data, scratch)?; + + let mut values = Vec::with_capacity(num_rows); + if let Some(validity) = &validity { + let mut validity_iter = validity.iter(); + while let Some(chunk) = chunks.next()? { + let mut valid_iter = decode::deserialize_f32(chunk); + let iter = validity_iter.by_ref().map(|is_valid| { + if is_valid { + valid_iter.next().unwrap() + } else { + 0.0f32 + } + }); + values.extend(iter); + } + } else { + while let Some(chunk) = chunks.next()? { + values.extend(decode::deserialize_f32(chunk)); + } + } + + Float32Array::try_new(data_type, values.into(), validity) +} + +/// Deserializes column `column` from `stripe`, assumed to represent a boolean array +pub fn deserialize_bool( + data_type: DataType, + stripe: &Stripe, + column: usize, +) -> Result { + let num_rows = stripe.number_of_rows(); + let mut scratch = vec![]; + + let validity = deserialize_validity(stripe, column, &mut scratch)?; + + let mut chunks = stripe.get_bytes(column, Kind::Data, std::mem::take(&mut scratch))?; + + let mut values = MutableBitmap::with_capacity(num_rows); + if let Some(validity) = &validity { + let mut validity_iter = validity.iter(); + + while let Some(chunk) = chunks.next()? { + let mut valid_iter = decode::BooleanIter::new(chunk, chunk.len() * 8); + validity_iter.by_ref().try_for_each(|is_valid| { + values.push(if is_valid { + valid_iter.next().unwrap()? + } else { + false + }); + Result::<(), Error>::Ok(()) + })?; + } + } else { + while let Some(chunk) = chunks.next()? { + let valid_iter = decode::BooleanIter::new(chunk, chunk.len() * 8); + for v in valid_iter { + values.push(v?) + } + } + } + + BooleanArray::try_new(data_type, values.into(), validity) +} diff --git a/tests/it/io/orc/mod.rs b/tests/it/io/orc/mod.rs new file mode 100644 index 00000000000..0bec210a6a5 --- /dev/null +++ b/tests/it/io/orc/mod.rs @@ -0,0 +1 @@ +mod read; diff --git a/tests/it/io/orc/read.rs b/tests/it/io/orc/read.rs new file mode 100644 index 00000000000..e901348c16d --- /dev/null +++ b/tests/it/io/orc/read.rs @@ -0,0 +1,28 @@ +use arrow2::array::*; +use arrow2::datatypes::DataType; +use arrow2::error::Error; +use arrow2::io::orc::{format, read}; + +#[test] +fn infer() -> Result<(), Error> { + let mut reader = std::fs::File::open("fixtures/pyorc/test.orc").unwrap(); + let (ps, footer, _) = format::read::read_metadata(&mut reader)?; + let schema = read::infer_schema(&footer)?; + + assert_eq!(schema.fields.len(), 12); + + let stripe = read::read_stripe(&mut reader, footer.stripes[0].clone(), ps.compression())?; + + let array = read::deserialize_f32(DataType::Float32, &stripe, 1)?; + assert_eq!( + array, + Float32Array::from([Some(1.0), Some(2.0), None, Some(4.0), Some(5.0)]) + ); + + let array = read::deserialize_bool(DataType::Boolean, &stripe, 2)?; + assert_eq!( + array, + BooleanArray::from([Some(true), Some(false), None, Some(true), Some(false)]) + ); + Ok(()) +} diff --git a/tests/it/io/orc/write.py b/tests/it/io/orc/write.py new file mode 100644 index 00000000000..6aa94aa03f1 --- /dev/null +++ b/tests/it/io/orc/write.py @@ -0,0 +1,47 @@ +import os + +import pyorc + + +data = { + "a": [1.0, 2.0, None, 4.0, 5.0], + "b": [True, False, None, True, False], + "str_direct": ["a", "cccccc", None, "ddd", "ee"], + "d": ["a", "bb", None, "ccc", "ddd"], + "e": ["ddd", "cc", None, "bb", "a"], + "f": ["aaaaa", "bbbbb", None, "ccccc", "ddddd"], + "int_short_repeated": [5, 5, None, 5, 5], + "int_neg_short_repeated": [-5, -5, None, -5, -5], + "int_delta": [1, 2, None, 4, 5], + "int_neg_delta": [5, 4, None, 2, 1], + "int_direct": [1, 6, None, 3, 2], + "int_neg_direct": [-1, -6, None, -3, -2], +} + + +def _write( + schema: str, + data, + file_name: str, + compression=pyorc.CompressionKind.NONE, + dict_key_size_threshold=0.0, +): + output = open(file_name, "wb") + writer = pyorc.Writer( + output, + schema, + dict_key_size_threshold=dict_key_size_threshold, + compression=compression, + ) + num_rows = len(list(data.values())[0]) + for x in range(num_rows): + row = tuple(values[x] for values in data.values()) + writer.write(row) + writer.close() + +os.makedirs("fixtures/pyorc", exist_ok=True) +_write( + "struct", + data, + "fixtures/pyorc/test.orc", +)