From 6bb9fe2e7ba6d097946cb855b76f85343ebf6368 Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Tue, 26 Jul 2022 15:43:34 +0000 Subject: [PATCH] Added basics of ORC --- .github/workflows/coverage.yml | 3 +- .github/workflows/test.yml | 5 +- Cargo.toml | 17 +++- DEVELOPMENT.md | 4 +- src/io/mod.rs | 3 + src/io/orc/mod.rs | 12 +++ src/io/orc/read/mod.rs | 178 +++++++++++++++++++++++++++++++++ tests/it/io/mod.rs | 3 + tests/it/io/orc/mod.rs | 1 + tests/it/io/orc/read.rs | 28 ++++++ tests/it/io/orc/write.py | 47 +++++++++ 11 files changed, 294 insertions(+), 7 deletions(-) create mode 100644 src/io/orc/mod.rs create mode 100644 src/io/orc/read/mod.rs create mode 100644 tests/it/io/orc/mod.rs create mode 100644 tests/it/io/orc/read.rs create mode 100644 tests/it/io/orc/write.py diff --git a/.github/workflows/coverage.yml b/.github/workflows/coverage.yml index 3007684b17e..69d448cd7ca 100644 --- a/.github/workflows/coverage.yml +++ b/.github/workflows/coverage.yml @@ -19,8 +19,9 @@ jobs: python3 -m venv venv source venv/bin/activate pip install pip --upgrade - pip install pyarrow==6 + pip install pyarrow==6 pyorc python parquet_integration/write_parquet.py + python tests/it/io/orc/write.py deactivate - uses: Swatinem/rust-cache@v1 - name: Generate code coverage diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index e0cb434ea60..2656838ec4f 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -13,16 +13,17 @@ jobs: submodules: true # needed to test IPC, which are located in a submodule - name: Install Rust run: rustup update stable - - uses: Swatinem/rust-cache@v1 - name: Setup parquet files run: | apt update && apt install python3-pip python3-venv -y -q python3 -m venv venv source venv/bin/activate pip install pip --upgrade - pip install pyarrow==6 + pip install pyarrow==6 pyorc python parquet_integration/write_parquet.py + python tests/it/io/orc/write.py deactivate + - uses: Swatinem/rust-cache@v1 - name: Run run: cargo test --features full diff --git a/Cargo.toml b/Cargo.toml index 1aada784f46..da5fef8481a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -72,9 +72,6 @@ parquet2 = { version = "0.14.0", optional = true, default_features = false } # avro support avro-schema = { version = "0.2", optional = true } -serde = { version = "^1.0", features = ["rc"], optional = true } -serde_derive = { version = "^1.0", optional = true } -serde_json = { version = "^1.0", features = ["preserve_order"], optional = true } # compression of avro libflate = { version = "1.1.1", optional = true } snap = { version = "1", optional = true } @@ -82,6 +79,14 @@ crc = { version = "2", optional = true } # async avro async-stream = { version = "0.3.2", optional = true } +# ORC support +orc-format = { git = "https://github.com/DataEngineeringLabs/orc-format.git", optional = true } + +# Arrow integration tests support +serde = { version = "^1.0", features = ["rc"], optional = true } +serde_derive = { version = "^1.0", optional = true } +serde_json = { version = "^1.0", features = ["preserve_order"], optional = true } + # for division/remainder optimization at runtime strength_reduce = { version = "0.2", optional = true } @@ -126,6 +131,7 @@ full = [ "io_parquet", "io_parquet_compression", "io_avro", + "io_orc", "io_avro_compression", "io_avro_async", "regex", @@ -145,6 +151,7 @@ io_ipc_write_async = ["io_ipc", "futures"] io_ipc_read_async = ["io_ipc", "futures", "async-stream"] io_ipc_compression = ["lz4", "zstd"] io_flight = ["io_ipc", "arrow-format/flight-data"] + # base64 + io_ipc because arrow schemas are stored as base64-encoded ipc format. io_parquet = ["parquet2", "io_ipc", "base64", "futures", "streaming-iterator", "fallible-streaming-iterator"] io_parquet_compression = [ @@ -154,6 +161,7 @@ io_parquet_compression = [ "parquet2/lz4", "parquet2/brotli", ] + io_avro = ["avro-schema", "streaming-iterator", "fallible-streaming-iterator", "serde_json"] io_avro_compression = [ "libflate", @@ -161,6 +169,9 @@ io_avro_compression = [ "crc", ] io_avro_async = ["io_avro", "futures", "async-stream"] + +io_orc = [ "orc-format" ] + # serde+serde_json: its dependencies + error handling # serde_derive: there is some derive around io_json_integration = ["hex", "serde", "serde_derive", "serde_json", "io_ipc"] diff --git a/DEVELOPMENT.md b/DEVELOPMENT.md index d81e6455838..aced5ffcf3b 100644 --- a/DEVELOPMENT.md +++ b/DEVELOPMENT.md @@ -42,10 +42,12 @@ source venv/bin/activate pip install pip --upgrade # Install pyarrow, version 6 -pip install pyarrow==6 +pip install pyarrow==6 pyorc # Generate the parquet files (this might take some time, depending on your computer setup) python parquet_integration/write_parquet.py +# generate ORC files +python parquet_integration/write_parquet.py # Get out of venv, back to normal terminal deactivate diff --git a/src/io/mod.rs b/src/io/mod.rs index 9343d4281ce..bc7d218ad36 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -5,6 +5,9 @@ #[cfg(feature = "io_odbc")] pub mod odbc; +#[cfg(feature = "io_orc")] +pub mod orc; + #[cfg(any( feature = "io_csv_read", feature = "io_csv_read_async", diff --git a/src/io/orc/mod.rs b/src/io/orc/mod.rs new file mode 100644 index 00000000000..f9d240113dd --- /dev/null +++ b/src/io/orc/mod.rs @@ -0,0 +1,12 @@ +//! APIs to read from [ORC format](https://orc.apache.org). +pub mod read; + +pub use orc_format as format; + +use crate::error::Error; + +impl From for Error { + fn from(error: format::Error) -> Self { + Error::ExternalFormat(format!("{:?}", error)) + } +} diff --git a/src/io/orc/read/mod.rs b/src/io/orc/read/mod.rs new file mode 100644 index 00000000000..b29e3d3edb9 --- /dev/null +++ b/src/io/orc/read/mod.rs @@ -0,0 +1,178 @@ +//! APIs to read from [ORC format](https://orc.apache.org). +use std::io::{Read, Seek, SeekFrom}; + +use crate::array::{BooleanArray, Float32Array}; +use crate::bitmap::{Bitmap, MutableBitmap}; +use crate::datatypes::{DataType, Field, Schema}; +use crate::error::Error; + +use orc_format::fallible_streaming_iterator::FallibleStreamingIterator; +use orc_format::proto::stream::Kind; +use orc_format::proto::{CompressionKind, Footer, StripeInformation, Type}; +use orc_format::read::decode; +use orc_format::read::Stripe; + +/// Infers a [`Schema`] from the files' [`Footer`]. +/// # Errors +/// This function errors if the type is not yet supported. +pub fn infer_schema(footer: &Footer) -> Result { + let types = &footer.types; + + let dt = infer_dt(&footer.types[0], types)?; + if let DataType::Struct(fields) = dt { + Ok(fields.into()) + } else { + Err(Error::ExternalFormat( + "ORC root type must be a struct".to_string(), + )) + } +} + +fn infer_dt(type_: &Type, types: &[Type]) -> Result { + use orc_format::proto::r#type::Kind::*; + let dt = match type_.kind() { + Boolean => DataType::Boolean, + Byte => DataType::Int8, + Short => DataType::Int16, + Int => DataType::Int32, + Long => DataType::Int64, + Float => DataType::Float32, + Double => DataType::Float64, + String => DataType::Utf8, + Binary => DataType::Binary, + Struct => { + let sub_types = type_ + .subtypes + .iter() + .cloned() + .zip(type_.field_names.iter()) + .map(|(i, name)| { + infer_dt( + types.get(i as usize).ok_or_else(|| { + Error::ExternalFormat(format!("ORC field {i} not found")) + })?, + types, + ) + .map(|dt| Field::new(name, dt, true)) + }) + .collect::, Error>>()?; + DataType::Struct(sub_types) + } + kind => return Err(Error::nyi(format!("Reading {kind:?} from ORC"))), + }; + Ok(dt) +} + +/// Reads the stripe [`StripeInformation`] into memory. +pub fn read_stripe( + reader: &mut R, + stripe_info: StripeInformation, + compression: CompressionKind, +) -> Result { + let offset = stripe_info.offset(); + reader.seek(SeekFrom::Start(offset)).unwrap(); + + let len = stripe_info.index_length() + stripe_info.data_length() + stripe_info.footer_length(); + let mut stripe = vec![0; len as usize]; + reader.read_exact(&mut stripe).unwrap(); + + Ok(Stripe::try_new(stripe, stripe_info, compression)?) +} + +fn deserialize_validity( + stripe: &Stripe, + column: usize, + scratch: &mut Vec, +) -> Result, Error> { + let mut chunks = stripe.get_bytes(column, Kind::Present, std::mem::take(scratch))?; + + let mut validity = MutableBitmap::with_capacity(stripe.number_of_rows()); + let mut remaining = stripe.number_of_rows(); + while let Some(chunk) = chunks.next()? { + // todo: this can be faster by iterating in bytes instead of single bits via `BooleanRun` + let iter = decode::BooleanIter::new(chunk, remaining); + for item in iter { + remaining -= 1; + validity.push(item?) + } + } + *scratch = std::mem::take(&mut chunks.into_inner()); + + Ok(validity.into()) +} + +/// Deserializes column `column` from `stripe`, assumed to represent a f32 +pub fn deserialize_f32( + data_type: DataType, + stripe: &Stripe, + column: usize, +) -> Result { + let mut scratch = vec![]; + let num_rows = stripe.number_of_rows(); + + let validity = deserialize_validity(stripe, column, &mut scratch)?; + + let mut chunks = stripe.get_bytes(column, Kind::Data, scratch)?; + + let mut values = Vec::with_capacity(num_rows); + if let Some(validity) = &validity { + let mut validity_iter = validity.iter(); + while let Some(chunk) = chunks.next()? { + let mut valid_iter = decode::deserialize_f32(chunk); + let iter = validity_iter.by_ref().map(|is_valid| { + if is_valid { + valid_iter.next().unwrap() + } else { + 0.0f32 + } + }); + values.extend(iter); + } + } else { + while let Some(chunk) = chunks.next()? { + values.extend(decode::deserialize_f32(chunk)); + } + } + + Float32Array::try_new(data_type, values.into(), validity) +} + +/// Deserializes column `column` from `stripe`, assumed to represent a boolean array +pub fn deserialize_bool( + data_type: DataType, + stripe: &Stripe, + column: usize, +) -> Result { + let num_rows = stripe.number_of_rows(); + let mut scratch = vec![]; + + let validity = deserialize_validity(stripe, column, &mut scratch)?; + + let mut chunks = stripe.get_bytes(column, Kind::Data, std::mem::take(&mut scratch))?; + + let mut values = MutableBitmap::with_capacity(num_rows); + if let Some(validity) = &validity { + let mut validity_iter = validity.iter(); + + while let Some(chunk) = chunks.next()? { + let mut valid_iter = decode::BooleanIter::new(chunk, chunk.len() * 8); + validity_iter.by_ref().try_for_each(|is_valid| { + values.push(if is_valid { + valid_iter.next().unwrap()? + } else { + false + }); + Result::<(), Error>::Ok(()) + })?; + } + } else { + while let Some(chunk) = chunks.next()? { + let valid_iter = decode::BooleanIter::new(chunk, chunk.len() * 8); + for v in valid_iter { + values.push(v?) + } + } + } + + BooleanArray::try_new(data_type, values.into(), validity) +} diff --git a/tests/it/io/mod.rs b/tests/it/io/mod.rs index 3fc0c2123e3..bf228251df3 100644 --- a/tests/it/io/mod.rs +++ b/tests/it/io/mod.rs @@ -16,6 +16,9 @@ mod parquet; #[cfg(feature = "io_avro")] mod avro; +#[cfg(feature = "io_orc")] +mod orc; + #[cfg(any( feature = "io_csv_read", feature = "io_csv_write", diff --git a/tests/it/io/orc/mod.rs b/tests/it/io/orc/mod.rs new file mode 100644 index 00000000000..0bec210a6a5 --- /dev/null +++ b/tests/it/io/orc/mod.rs @@ -0,0 +1 @@ +mod read; diff --git a/tests/it/io/orc/read.rs b/tests/it/io/orc/read.rs new file mode 100644 index 00000000000..e901348c16d --- /dev/null +++ b/tests/it/io/orc/read.rs @@ -0,0 +1,28 @@ +use arrow2::array::*; +use arrow2::datatypes::DataType; +use arrow2::error::Error; +use arrow2::io::orc::{format, read}; + +#[test] +fn infer() -> Result<(), Error> { + let mut reader = std::fs::File::open("fixtures/pyorc/test.orc").unwrap(); + let (ps, footer, _) = format::read::read_metadata(&mut reader)?; + let schema = read::infer_schema(&footer)?; + + assert_eq!(schema.fields.len(), 12); + + let stripe = read::read_stripe(&mut reader, footer.stripes[0].clone(), ps.compression())?; + + let array = read::deserialize_f32(DataType::Float32, &stripe, 1)?; + assert_eq!( + array, + Float32Array::from([Some(1.0), Some(2.0), None, Some(4.0), Some(5.0)]) + ); + + let array = read::deserialize_bool(DataType::Boolean, &stripe, 2)?; + assert_eq!( + array, + BooleanArray::from([Some(true), Some(false), None, Some(true), Some(false)]) + ); + Ok(()) +} diff --git a/tests/it/io/orc/write.py b/tests/it/io/orc/write.py new file mode 100644 index 00000000000..6aa94aa03f1 --- /dev/null +++ b/tests/it/io/orc/write.py @@ -0,0 +1,47 @@ +import os + +import pyorc + + +data = { + "a": [1.0, 2.0, None, 4.0, 5.0], + "b": [True, False, None, True, False], + "str_direct": ["a", "cccccc", None, "ddd", "ee"], + "d": ["a", "bb", None, "ccc", "ddd"], + "e": ["ddd", "cc", None, "bb", "a"], + "f": ["aaaaa", "bbbbb", None, "ccccc", "ddddd"], + "int_short_repeated": [5, 5, None, 5, 5], + "int_neg_short_repeated": [-5, -5, None, -5, -5], + "int_delta": [1, 2, None, 4, 5], + "int_neg_delta": [5, 4, None, 2, 1], + "int_direct": [1, 6, None, 3, 2], + "int_neg_direct": [-1, -6, None, -3, -2], +} + + +def _write( + schema: str, + data, + file_name: str, + compression=pyorc.CompressionKind.NONE, + dict_key_size_threshold=0.0, +): + output = open(file_name, "wb") + writer = pyorc.Writer( + output, + schema, + dict_key_size_threshold=dict_key_size_threshold, + compression=compression, + ) + num_rows = len(list(data.values())[0]) + for x in range(num_rows): + row = tuple(values[x] for values in data.values()) + writer.write(row) + writer.close() + +os.makedirs("fixtures/pyorc", exist_ok=True) +_write( + "struct", + data, + "fixtures/pyorc/test.orc", +)