From 2d5ba53610d19f2dfc582159b87be26dcac79b66 Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Tue, 26 Jul 2022 15:43:34 +0000 Subject: [PATCH 1/8] Added basics of ORC --- .github/workflows/coverage.yml | 3 +- .github/workflows/test.yml | 5 +- Cargo.toml | 17 +++- DEVELOPMENT.md | 4 +- src/io/mod.rs | 3 + src/io/orc/mod.rs | 12 +++ src/io/orc/read/mod.rs | 178 +++++++++++++++++++++++++++++++++ tests/it/io/mod.rs | 3 + tests/it/io/orc/mod.rs | 1 + tests/it/io/orc/read.rs | 28 ++++++ tests/it/io/orc/write.py | 47 +++++++++ 11 files changed, 294 insertions(+), 7 deletions(-) create mode 100644 src/io/orc/mod.rs create mode 100644 src/io/orc/read/mod.rs create mode 100644 tests/it/io/orc/mod.rs create mode 100644 tests/it/io/orc/read.rs create mode 100644 tests/it/io/orc/write.py diff --git a/.github/workflows/coverage.yml b/.github/workflows/coverage.yml index 3007684b17e..69d448cd7ca 100644 --- a/.github/workflows/coverage.yml +++ b/.github/workflows/coverage.yml @@ -19,8 +19,9 @@ jobs: python3 -m venv venv source venv/bin/activate pip install pip --upgrade - pip install pyarrow==6 + pip install pyarrow==6 pyorc python parquet_integration/write_parquet.py + python tests/it/io/orc/write.py deactivate - uses: Swatinem/rust-cache@v1 - name: Generate code coverage diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index e0cb434ea60..2656838ec4f 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -13,16 +13,17 @@ jobs: submodules: true # needed to test IPC, which are located in a submodule - name: Install Rust run: rustup update stable - - uses: Swatinem/rust-cache@v1 - name: Setup parquet files run: | apt update && apt install python3-pip python3-venv -y -q python3 -m venv venv source venv/bin/activate pip install pip --upgrade - pip install pyarrow==6 + pip install pyarrow==6 pyorc python parquet_integration/write_parquet.py + python tests/it/io/orc/write.py deactivate + - uses: Swatinem/rust-cache@v1 - name: Run run: cargo test --features full diff --git a/Cargo.toml b/Cargo.toml index 1aada784f46..da5fef8481a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -72,9 +72,6 @@ parquet2 = { version = "0.14.0", optional = true, default_features = false } # avro support avro-schema = { version = "0.2", optional = true } -serde = { version = "^1.0", features = ["rc"], optional = true } -serde_derive = { version = "^1.0", optional = true } -serde_json = { version = "^1.0", features = ["preserve_order"], optional = true } # compression of avro libflate = { version = "1.1.1", optional = true } snap = { version = "1", optional = true } @@ -82,6 +79,14 @@ crc = { version = "2", optional = true } # async avro async-stream = { version = "0.3.2", optional = true } +# ORC support +orc-format = { git = "https://github.com/DataEngineeringLabs/orc-format.git", optional = true } + +# Arrow integration tests support +serde = { version = "^1.0", features = ["rc"], optional = true } +serde_derive = { version = "^1.0", optional = true } +serde_json = { version = "^1.0", features = ["preserve_order"], optional = true } + # for division/remainder optimization at runtime strength_reduce = { version = "0.2", optional = true } @@ -126,6 +131,7 @@ full = [ "io_parquet", "io_parquet_compression", "io_avro", + "io_orc", "io_avro_compression", "io_avro_async", "regex", @@ -145,6 +151,7 @@ io_ipc_write_async = ["io_ipc", "futures"] io_ipc_read_async = ["io_ipc", "futures", "async-stream"] io_ipc_compression = ["lz4", "zstd"] io_flight = ["io_ipc", "arrow-format/flight-data"] + # base64 + io_ipc because arrow schemas are stored as base64-encoded ipc format. io_parquet = ["parquet2", "io_ipc", "base64", "futures", "streaming-iterator", "fallible-streaming-iterator"] io_parquet_compression = [ @@ -154,6 +161,7 @@ io_parquet_compression = [ "parquet2/lz4", "parquet2/brotli", ] + io_avro = ["avro-schema", "streaming-iterator", "fallible-streaming-iterator", "serde_json"] io_avro_compression = [ "libflate", @@ -161,6 +169,9 @@ io_avro_compression = [ "crc", ] io_avro_async = ["io_avro", "futures", "async-stream"] + +io_orc = [ "orc-format" ] + # serde+serde_json: its dependencies + error handling # serde_derive: there is some derive around io_json_integration = ["hex", "serde", "serde_derive", "serde_json", "io_ipc"] diff --git a/DEVELOPMENT.md b/DEVELOPMENT.md index d81e6455838..aced5ffcf3b 100644 --- a/DEVELOPMENT.md +++ b/DEVELOPMENT.md @@ -42,10 +42,12 @@ source venv/bin/activate pip install pip --upgrade # Install pyarrow, version 6 -pip install pyarrow==6 +pip install pyarrow==6 pyorc # Generate the parquet files (this might take some time, depending on your computer setup) python parquet_integration/write_parquet.py +# generate ORC files +python parquet_integration/write_parquet.py # Get out of venv, back to normal terminal deactivate diff --git a/src/io/mod.rs b/src/io/mod.rs index 9343d4281ce..bc7d218ad36 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -5,6 +5,9 @@ #[cfg(feature = "io_odbc")] pub mod odbc; +#[cfg(feature = "io_orc")] +pub mod orc; + #[cfg(any( feature = "io_csv_read", feature = "io_csv_read_async", diff --git a/src/io/orc/mod.rs b/src/io/orc/mod.rs new file mode 100644 index 00000000000..f9d240113dd --- /dev/null +++ b/src/io/orc/mod.rs @@ -0,0 +1,12 @@ +//! APIs to read from [ORC format](https://orc.apache.org). +pub mod read; + +pub use orc_format as format; + +use crate::error::Error; + +impl From for Error { + fn from(error: format::Error) -> Self { + Error::ExternalFormat(format!("{:?}", error)) + } +} diff --git a/src/io/orc/read/mod.rs b/src/io/orc/read/mod.rs new file mode 100644 index 00000000000..b29e3d3edb9 --- /dev/null +++ b/src/io/orc/read/mod.rs @@ -0,0 +1,178 @@ +//! APIs to read from [ORC format](https://orc.apache.org). +use std::io::{Read, Seek, SeekFrom}; + +use crate::array::{BooleanArray, Float32Array}; +use crate::bitmap::{Bitmap, MutableBitmap}; +use crate::datatypes::{DataType, Field, Schema}; +use crate::error::Error; + +use orc_format::fallible_streaming_iterator::FallibleStreamingIterator; +use orc_format::proto::stream::Kind; +use orc_format::proto::{CompressionKind, Footer, StripeInformation, Type}; +use orc_format::read::decode; +use orc_format::read::Stripe; + +/// Infers a [`Schema`] from the files' [`Footer`]. +/// # Errors +/// This function errors if the type is not yet supported. +pub fn infer_schema(footer: &Footer) -> Result { + let types = &footer.types; + + let dt = infer_dt(&footer.types[0], types)?; + if let DataType::Struct(fields) = dt { + Ok(fields.into()) + } else { + Err(Error::ExternalFormat( + "ORC root type must be a struct".to_string(), + )) + } +} + +fn infer_dt(type_: &Type, types: &[Type]) -> Result { + use orc_format::proto::r#type::Kind::*; + let dt = match type_.kind() { + Boolean => DataType::Boolean, + Byte => DataType::Int8, + Short => DataType::Int16, + Int => DataType::Int32, + Long => DataType::Int64, + Float => DataType::Float32, + Double => DataType::Float64, + String => DataType::Utf8, + Binary => DataType::Binary, + Struct => { + let sub_types = type_ + .subtypes + .iter() + .cloned() + .zip(type_.field_names.iter()) + .map(|(i, name)| { + infer_dt( + types.get(i as usize).ok_or_else(|| { + Error::ExternalFormat(format!("ORC field {i} not found")) + })?, + types, + ) + .map(|dt| Field::new(name, dt, true)) + }) + .collect::, Error>>()?; + DataType::Struct(sub_types) + } + kind => return Err(Error::nyi(format!("Reading {kind:?} from ORC"))), + }; + Ok(dt) +} + +/// Reads the stripe [`StripeInformation`] into memory. +pub fn read_stripe( + reader: &mut R, + stripe_info: StripeInformation, + compression: CompressionKind, +) -> Result { + let offset = stripe_info.offset(); + reader.seek(SeekFrom::Start(offset)).unwrap(); + + let len = stripe_info.index_length() + stripe_info.data_length() + stripe_info.footer_length(); + let mut stripe = vec![0; len as usize]; + reader.read_exact(&mut stripe).unwrap(); + + Ok(Stripe::try_new(stripe, stripe_info, compression)?) +} + +fn deserialize_validity( + stripe: &Stripe, + column: usize, + scratch: &mut Vec, +) -> Result, Error> { + let mut chunks = stripe.get_bytes(column, Kind::Present, std::mem::take(scratch))?; + + let mut validity = MutableBitmap::with_capacity(stripe.number_of_rows()); + let mut remaining = stripe.number_of_rows(); + while let Some(chunk) = chunks.next()? { + // todo: this can be faster by iterating in bytes instead of single bits via `BooleanRun` + let iter = decode::BooleanIter::new(chunk, remaining); + for item in iter { + remaining -= 1; + validity.push(item?) + } + } + *scratch = std::mem::take(&mut chunks.into_inner()); + + Ok(validity.into()) +} + +/// Deserializes column `column` from `stripe`, assumed to represent a f32 +pub fn deserialize_f32( + data_type: DataType, + stripe: &Stripe, + column: usize, +) -> Result { + let mut scratch = vec![]; + let num_rows = stripe.number_of_rows(); + + let validity = deserialize_validity(stripe, column, &mut scratch)?; + + let mut chunks = stripe.get_bytes(column, Kind::Data, scratch)?; + + let mut values = Vec::with_capacity(num_rows); + if let Some(validity) = &validity { + let mut validity_iter = validity.iter(); + while let Some(chunk) = chunks.next()? { + let mut valid_iter = decode::deserialize_f32(chunk); + let iter = validity_iter.by_ref().map(|is_valid| { + if is_valid { + valid_iter.next().unwrap() + } else { + 0.0f32 + } + }); + values.extend(iter); + } + } else { + while let Some(chunk) = chunks.next()? { + values.extend(decode::deserialize_f32(chunk)); + } + } + + Float32Array::try_new(data_type, values.into(), validity) +} + +/// Deserializes column `column` from `stripe`, assumed to represent a boolean array +pub fn deserialize_bool( + data_type: DataType, + stripe: &Stripe, + column: usize, +) -> Result { + let num_rows = stripe.number_of_rows(); + let mut scratch = vec![]; + + let validity = deserialize_validity(stripe, column, &mut scratch)?; + + let mut chunks = stripe.get_bytes(column, Kind::Data, std::mem::take(&mut scratch))?; + + let mut values = MutableBitmap::with_capacity(num_rows); + if let Some(validity) = &validity { + let mut validity_iter = validity.iter(); + + while let Some(chunk) = chunks.next()? { + let mut valid_iter = decode::BooleanIter::new(chunk, chunk.len() * 8); + validity_iter.by_ref().try_for_each(|is_valid| { + values.push(if is_valid { + valid_iter.next().unwrap()? + } else { + false + }); + Result::<(), Error>::Ok(()) + })?; + } + } else { + while let Some(chunk) = chunks.next()? { + let valid_iter = decode::BooleanIter::new(chunk, chunk.len() * 8); + for v in valid_iter { + values.push(v?) + } + } + } + + BooleanArray::try_new(data_type, values.into(), validity) +} diff --git a/tests/it/io/mod.rs b/tests/it/io/mod.rs index 3fc0c2123e3..bf228251df3 100644 --- a/tests/it/io/mod.rs +++ b/tests/it/io/mod.rs @@ -16,6 +16,9 @@ mod parquet; #[cfg(feature = "io_avro")] mod avro; +#[cfg(feature = "io_orc")] +mod orc; + #[cfg(any( feature = "io_csv_read", feature = "io_csv_write", diff --git a/tests/it/io/orc/mod.rs b/tests/it/io/orc/mod.rs new file mode 100644 index 00000000000..0bec210a6a5 --- /dev/null +++ b/tests/it/io/orc/mod.rs @@ -0,0 +1 @@ +mod read; diff --git a/tests/it/io/orc/read.rs b/tests/it/io/orc/read.rs new file mode 100644 index 00000000000..e901348c16d --- /dev/null +++ b/tests/it/io/orc/read.rs @@ -0,0 +1,28 @@ +use arrow2::array::*; +use arrow2::datatypes::DataType; +use arrow2::error::Error; +use arrow2::io::orc::{format, read}; + +#[test] +fn infer() -> Result<(), Error> { + let mut reader = std::fs::File::open("fixtures/pyorc/test.orc").unwrap(); + let (ps, footer, _) = format::read::read_metadata(&mut reader)?; + let schema = read::infer_schema(&footer)?; + + assert_eq!(schema.fields.len(), 12); + + let stripe = read::read_stripe(&mut reader, footer.stripes[0].clone(), ps.compression())?; + + let array = read::deserialize_f32(DataType::Float32, &stripe, 1)?; + assert_eq!( + array, + Float32Array::from([Some(1.0), Some(2.0), None, Some(4.0), Some(5.0)]) + ); + + let array = read::deserialize_bool(DataType::Boolean, &stripe, 2)?; + assert_eq!( + array, + BooleanArray::from([Some(true), Some(false), None, Some(true), Some(false)]) + ); + Ok(()) +} diff --git a/tests/it/io/orc/write.py b/tests/it/io/orc/write.py new file mode 100644 index 00000000000..6aa94aa03f1 --- /dev/null +++ b/tests/it/io/orc/write.py @@ -0,0 +1,47 @@ +import os + +import pyorc + + +data = { + "a": [1.0, 2.0, None, 4.0, 5.0], + "b": [True, False, None, True, False], + "str_direct": ["a", "cccccc", None, "ddd", "ee"], + "d": ["a", "bb", None, "ccc", "ddd"], + "e": ["ddd", "cc", None, "bb", "a"], + "f": ["aaaaa", "bbbbb", None, "ccccc", "ddddd"], + "int_short_repeated": [5, 5, None, 5, 5], + "int_neg_short_repeated": [-5, -5, None, -5, -5], + "int_delta": [1, 2, None, 4, 5], + "int_neg_delta": [5, 4, None, 2, 1], + "int_direct": [1, 6, None, 3, 2], + "int_neg_direct": [-1, -6, None, -3, -2], +} + + +def _write( + schema: str, + data, + file_name: str, + compression=pyorc.CompressionKind.NONE, + dict_key_size_threshold=0.0, +): + output = open(file_name, "wb") + writer = pyorc.Writer( + output, + schema, + dict_key_size_threshold=dict_key_size_threshold, + compression=compression, + ) + num_rows = len(list(data.values())[0]) + for x in range(num_rows): + row = tuple(values[x] for values in data.values()) + writer.write(row) + writer.close() + +os.makedirs("fixtures/pyorc", exist_ok=True) +_write( + "struct", + data, + "fixtures/pyorc/test.orc", +) From 4f3d5f802c5cf7cc10d94cfa9158d8734fd854ff Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Tue, 26 Jul 2022 20:55:17 +0000 Subject: [PATCH 2/8] Added support for integers --- src/io/orc/read/mod.rs | 145 ++++++++++++++++++++++++++++++++++++++- tests/it/io/orc/read.rs | 52 +++++++++++++- tests/it/io/orc/write.py | 46 ++++++++----- 3 files changed, 220 insertions(+), 23 deletions(-) diff --git a/src/io/orc/read/mod.rs b/src/io/orc/read/mod.rs index b29e3d3edb9..30d22c73dbe 100644 --- a/src/io/orc/read/mod.rs +++ b/src/io/orc/read/mod.rs @@ -1,7 +1,7 @@ //! APIs to read from [ORC format](https://orc.apache.org). use std::io::{Read, Seek, SeekFrom}; -use crate::array::{BooleanArray, Float32Array}; +use crate::array::{BooleanArray, Float32Array, Int32Array, Int64Array}; use crate::bitmap::{Bitmap, MutableBitmap}; use crate::datatypes::{DataType, Field, Schema}; use crate::error::Error; @@ -167,7 +167,7 @@ pub fn deserialize_bool( } } else { while let Some(chunk) = chunks.next()? { - let valid_iter = decode::BooleanIter::new(chunk, chunk.len() * 8); + let valid_iter = decode::BooleanIter::new(chunk, num_rows); for v in valid_iter { values.push(v?) } @@ -176,3 +176,144 @@ pub fn deserialize_bool( BooleanArray::try_new(data_type, values.into(), validity) } + +struct IntIter<'a> { + current: Option>, + runs: decode::SignedRleV2Iter<'a>, +} + +impl<'a> IntIter<'a> { + fn new(data: &'a [u8]) -> Self { + Self { + runs: decode::SignedRleV2Iter::new(data), + current: None, + } + } +} + +impl<'a> Iterator for IntIter<'a> { + type Item = Result; + + #[inline] + fn next(&mut self) -> Option { + let next = if let Some(run) = &mut self.current { + match run { + decode::SignedRleV2Run::Direct(values_iter) => values_iter.next(), + decode::SignedRleV2Run::Delta(values_iter) => values_iter.next(), + decode::SignedRleV2Run::ShortRepeat(values_iter) => values_iter.next(), + } + } else { + None + }; + + if next.is_none() { + match self.runs.next()? { + Ok(run) => self.current = Some(run), + Err(e) => return Some(Err(Error::ExternalFormat(format!("{:?}", e)))), + } + self.next() + } else { + next.map(Ok) + } + } +} + +/// Deserializes column `column` from `stripe`, assumed to represent a boolean array +pub fn deserialize_i64( + data_type: DataType, + stripe: &Stripe, + column: usize, +) -> Result { + let num_rows = stripe.number_of_rows(); + let mut scratch = vec![]; + + let validity = deserialize_validity(stripe, column, &mut scratch)?; + + let mut chunks = stripe.get_bytes(column, Kind::Data, std::mem::take(&mut scratch))?; + + let mut values = Vec::with_capacity(num_rows); + if let Some(validity) = &validity { + let validity_iter = validity.iter(); + + let mut iter = IntIter::new(chunks.next()?.unwrap()); + for is_valid in validity_iter { + if is_valid { + let item = iter.next().transpose()?; + let item = if let Some(item) = item { + item + } else { + iter = IntIter::new(chunks.next()?.unwrap()); + iter.next().transpose()?.unwrap() + }; + values.push(item); + } else { + values.push(0); + } + } + } else { + while let Some(chunk) = chunks.next()? { + decode::SignedRleV2Iter::new(chunk).try_for_each(|run| { + run.map(|run| match run { + decode::SignedRleV2Run::Direct(values_iter) => values.extend(values_iter), + decode::SignedRleV2Run::Delta(values_iter) => values.extend(values_iter), + decode::SignedRleV2Run::ShortRepeat(values_iter) => values.extend(values_iter), + }) + })?; + } + } + + Int64Array::try_new(data_type, values.into(), validity) +} + +/// Deserializes column `column` from `stripe`, assumed to represent a boolean array +pub fn deserialize_i32( + data_type: DataType, + stripe: &Stripe, + column: usize, +) -> Result { + let num_rows = stripe.number_of_rows(); + let mut scratch = vec![]; + + let validity = deserialize_validity(stripe, column, &mut scratch)?; + + let mut chunks = stripe.get_bytes(column, Kind::Data, std::mem::take(&mut scratch))?; + + let mut values = Vec::with_capacity(num_rows); + if let Some(validity) = &validity { + let validity_iter = validity.iter(); + + let mut iter = IntIter::new(chunks.next()?.unwrap()); + for is_valid in validity_iter { + if is_valid { + let item = iter.next().transpose()?; + let item = if let Some(item) = item { + item + } else { + iter = IntIter::new(chunks.next()?.unwrap()); + iter.next().transpose()?.unwrap() + }; + values.push(item as i32); + } else { + values.push(0); + } + } + } else { + while let Some(chunk) = chunks.next()? { + decode::SignedRleV2Iter::new(chunk).try_for_each(|run| { + run.map(|run| match run { + decode::SignedRleV2Run::Direct(values_iter) => { + values.extend(values_iter.map(|x| x as i32)) + } + decode::SignedRleV2Run::Delta(values_iter) => { + values.extend(values_iter.map(|x| x as i32)) + } + decode::SignedRleV2Run::ShortRepeat(values_iter) => { + values.extend(values_iter.map(|x| x as i32)) + } + }) + })?; + } + } + + Int32Array::try_new(data_type, values.into(), validity) +} diff --git a/tests/it/io/orc/read.rs b/tests/it/io/orc/read.rs index e901348c16d..baef9e21f26 100644 --- a/tests/it/io/orc/read.rs +++ b/tests/it/io/orc/read.rs @@ -6,11 +6,17 @@ use arrow2::io::orc::{format, read}; #[test] fn infer() -> Result<(), Error> { let mut reader = std::fs::File::open("fixtures/pyorc/test.orc").unwrap(); - let (ps, footer, _) = format::read::read_metadata(&mut reader)?; + let (_, footer, _) = format::read::read_metadata(&mut reader)?; let schema = read::infer_schema(&footer)?; - assert_eq!(schema.fields.len(), 12); + assert_eq!(schema.fields.len(), 6); + Ok(()) +} +#[test] +fn float32() -> Result<(), Error> { + let mut reader = std::fs::File::open("fixtures/pyorc/test.orc").unwrap(); + let (ps, footer, _) = format::read::read_metadata(&mut reader)?; let stripe = read::read_stripe(&mut reader, footer.stripes[0].clone(), ps.compression())?; let array = read::deserialize_f32(DataType::Float32, &stripe, 1)?; @@ -19,10 +25,50 @@ fn infer() -> Result<(), Error> { Float32Array::from([Some(1.0), Some(2.0), None, Some(4.0), Some(5.0)]) ); - let array = read::deserialize_bool(DataType::Boolean, &stripe, 2)?; + let array = read::deserialize_f32(DataType::Float32, &stripe, 2)?; + assert_eq!( + array, + Float32Array::from([Some(1.0), Some(2.0), Some(3.0), Some(4.0), Some(5.0)]) + ); + Ok(()) +} + +#[test] +fn boolean() -> Result<(), Error> { + let mut reader = std::fs::File::open("fixtures/pyorc/test.orc").unwrap(); + let (ps, footer, _) = format::read::read_metadata(&mut reader)?; + let stripe = read::read_stripe(&mut reader, footer.stripes[0].clone(), ps.compression())?; + + let array = read::deserialize_bool(DataType::Boolean, &stripe, 3)?; assert_eq!( array, BooleanArray::from([Some(true), Some(false), None, Some(true), Some(false)]) ); + + let array = read::deserialize_bool(DataType::Boolean, &stripe, 4)?; + assert_eq!( + array, + BooleanArray::from([Some(true), Some(false), Some(true), Some(true), Some(false)]) + ); + Ok(()) +} + +#[test] +fn int() -> Result<(), Error> { + let mut reader = std::fs::File::open("fixtures/pyorc/test.orc").unwrap(); + let (ps, footer, _) = format::read::read_metadata(&mut reader)?; + let stripe = read::read_stripe(&mut reader, footer.stripes[0].clone(), ps.compression())?; + + let array = read::deserialize_i32(DataType::Int32, &stripe, 5)?; + assert_eq!( + array, + Int32Array::from([Some(5), Some(-5), None, Some(5), Some(5)]) + ); + + let array = read::deserialize_i32(DataType::Int32, &stripe, 6)?; + assert_eq!( + array, + Int32Array::from([Some(5), Some(-5), Some(1), Some(5), Some(5)]) + ); Ok(()) } diff --git a/tests/it/io/orc/write.py b/tests/it/io/orc/write.py index 6aa94aa03f1..633d28b8dd9 100644 --- a/tests/it/io/orc/write.py +++ b/tests/it/io/orc/write.py @@ -4,28 +4,42 @@ data = { - "a": [1.0, 2.0, None, 4.0, 5.0], - "b": [True, False, None, True, False], - "str_direct": ["a", "cccccc", None, "ddd", "ee"], - "d": ["a", "bb", None, "ccc", "ddd"], - "e": ["ddd", "cc", None, "bb", "a"], - "f": ["aaaaa", "bbbbb", None, "ccccc", "ddddd"], - "int_short_repeated": [5, 5, None, 5, 5], - "int_neg_short_repeated": [-5, -5, None, -5, -5], - "int_delta": [1, 2, None, 4, 5], - "int_neg_delta": [5, 4, None, 2, 1], - "int_direct": [1, 6, None, 3, 2], - "int_neg_direct": [-1, -6, None, -3, -2], + "float_nullable": [1.0, 2.0, None, 4.0, 5.0], + "float_required": [1.0, 2.0, 3.0, 4.0, 5.0], + "bool_nullable": [True, False, None, True, False], + "bool_required": [True, False, True, True, False], + "int_nulable": [5, -5, None, 5, 5], + "int_required": [5, -5, 1, 5, 5], } +def infer_schema(data): + schema = "struct<" + for key, value in data.items(): + dt = type(value[0]) + if dt == float: + dt = "float" + elif dt == int: + dt = "int" + elif dt == bool: + dt = "boolean" + elif dt == str: + dt = "string" + else: + raise NotImplementedError + schema += key + ":" + dt + "," + + schema = schema[:-1] + ">" + return schema + def _write( - schema: str, data, file_name: str, compression=pyorc.CompressionKind.NONE, dict_key_size_threshold=0.0, ): + schema = infer_schema(data) + output = open(file_name, "wb") writer = pyorc.Writer( output, @@ -40,8 +54,4 @@ def _write( writer.close() os.makedirs("fixtures/pyorc", exist_ok=True) -_write( - "struct", - data, - "fixtures/pyorc/test.orc", -) +_write(data, "fixtures/pyorc/test.orc") From 90abb254de9db1b811d4036a3487bdbc37e69c46 Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Tue, 26 Jul 2022 21:04:05 +0000 Subject: [PATCH 3/8] Added API for Box --- src/io/orc/read/mod.rs | 26 +++++++++++++++++++++----- tests/it/io/orc/read.rs | 30 ++++++++++++------------------ 2 files changed, 33 insertions(+), 23 deletions(-) diff --git a/src/io/orc/read/mod.rs b/src/io/orc/read/mod.rs index 30d22c73dbe..629300dbed2 100644 --- a/src/io/orc/read/mod.rs +++ b/src/io/orc/read/mod.rs @@ -1,7 +1,7 @@ //! APIs to read from [ORC format](https://orc.apache.org). use std::io::{Read, Seek, SeekFrom}; -use crate::array::{BooleanArray, Float32Array, Int32Array, Int64Array}; +use crate::array::{Array, BooleanArray, Float32Array, Int32Array, Int64Array}; use crate::bitmap::{Bitmap, MutableBitmap}; use crate::datatypes::{DataType, Field, Schema}; use crate::error::Error; @@ -102,7 +102,7 @@ fn deserialize_validity( } /// Deserializes column `column` from `stripe`, assumed to represent a f32 -pub fn deserialize_f32( +fn deserialize_f32( data_type: DataType, stripe: &Stripe, column: usize, @@ -138,7 +138,7 @@ pub fn deserialize_f32( } /// Deserializes column `column` from `stripe`, assumed to represent a boolean array -pub fn deserialize_bool( +fn deserialize_bool( data_type: DataType, stripe: &Stripe, column: usize, @@ -219,7 +219,7 @@ impl<'a> Iterator for IntIter<'a> { } /// Deserializes column `column` from `stripe`, assumed to represent a boolean array -pub fn deserialize_i64( +fn deserialize_i64( data_type: DataType, stripe: &Stripe, column: usize, @@ -266,7 +266,7 @@ pub fn deserialize_i64( } /// Deserializes column `column` from `stripe`, assumed to represent a boolean array -pub fn deserialize_i32( +fn deserialize_i32( data_type: DataType, stripe: &Stripe, column: usize, @@ -317,3 +317,19 @@ pub fn deserialize_i32( Int32Array::try_new(data_type, values.into(), validity) } + +/// Deserializes column `column` from `stripe`, assumed +/// to represent an array of `data_type`. +pub fn deserialize( + data_type: DataType, + stripe: &Stripe, + column: usize, +) -> Result, Error> { + match data_type { + DataType::Boolean => deserialize_bool(data_type, stripe, column).map(|x| x.boxed()), + DataType::Int32 => deserialize_i32(data_type, stripe, column).map(|x| x.boxed()), + DataType::Int64 => deserialize_i64(data_type, stripe, column).map(|x| x.boxed()), + DataType::Float32 => deserialize_f32(data_type, stripe, column).map(|x| x.boxed()), + dt => return Err(Error::nyi(format!("Reading {dt:?} from ORC"))), + } +} diff --git a/tests/it/io/orc/read.rs b/tests/it/io/orc/read.rs index baef9e21f26..7347d01e65d 100644 --- a/tests/it/io/orc/read.rs +++ b/tests/it/io/orc/read.rs @@ -19,16 +19,14 @@ fn float32() -> Result<(), Error> { let (ps, footer, _) = format::read::read_metadata(&mut reader)?; let stripe = read::read_stripe(&mut reader, footer.stripes[0].clone(), ps.compression())?; - let array = read::deserialize_f32(DataType::Float32, &stripe, 1)?; assert_eq!( - array, - Float32Array::from([Some(1.0), Some(2.0), None, Some(4.0), Some(5.0)]) + read::deserialize(DataType::Float32, &stripe, 1)?, + Float32Array::from([Some(1.0), Some(2.0), None, Some(4.0), Some(5.0)]).boxed() ); - let array = read::deserialize_f32(DataType::Float32, &stripe, 2)?; assert_eq!( - array, - Float32Array::from([Some(1.0), Some(2.0), Some(3.0), Some(4.0), Some(5.0)]) + read::deserialize(DataType::Float32, &stripe, 2)?, + Float32Array::from([Some(1.0), Some(2.0), Some(3.0), Some(4.0), Some(5.0)]).boxed() ); Ok(()) } @@ -39,16 +37,14 @@ fn boolean() -> Result<(), Error> { let (ps, footer, _) = format::read::read_metadata(&mut reader)?; let stripe = read::read_stripe(&mut reader, footer.stripes[0].clone(), ps.compression())?; - let array = read::deserialize_bool(DataType::Boolean, &stripe, 3)?; assert_eq!( - array, - BooleanArray::from([Some(true), Some(false), None, Some(true), Some(false)]) + read::deserialize(DataType::Boolean, &stripe, 3)?, + BooleanArray::from([Some(true), Some(false), None, Some(true), Some(false)]).boxed() ); - let array = read::deserialize_bool(DataType::Boolean, &stripe, 4)?; assert_eq!( - array, - BooleanArray::from([Some(true), Some(false), Some(true), Some(true), Some(false)]) + read::deserialize(DataType::Boolean, &stripe, 4)?, + BooleanArray::from([Some(true), Some(false), Some(true), Some(true), Some(false)]).boxed() ); Ok(()) } @@ -59,16 +55,14 @@ fn int() -> Result<(), Error> { let (ps, footer, _) = format::read::read_metadata(&mut reader)?; let stripe = read::read_stripe(&mut reader, footer.stripes[0].clone(), ps.compression())?; - let array = read::deserialize_i32(DataType::Int32, &stripe, 5)?; assert_eq!( - array, - Int32Array::from([Some(5), Some(-5), None, Some(5), Some(5)]) + read::deserialize(DataType::Int32, &stripe, 5)?, + Int32Array::from([Some(5), Some(-5), None, Some(5), Some(5)]).boxed() ); - let array = read::deserialize_i32(DataType::Int32, &stripe, 6)?; assert_eq!( - array, - Int32Array::from([Some(5), Some(-5), Some(1), Some(5), Some(5)]) + read::deserialize(DataType::Int32, &stripe, 6)?, + Int32Array::from([Some(5), Some(-5), Some(1), Some(5), Some(5)]).boxed() ); Ok(()) } From 2805de01f32342e717e842d0dc8b78e7fb9c13a9 Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Tue, 26 Jul 2022 21:39:59 +0000 Subject: [PATCH 4/8] Remaining int and f64 --- src/io/orc/read/mod.rs | 72 +++++++++++++++++++++++++++++----------- src/types/native.rs | 54 +++++++++++++++++++++++++++++- tests/it/io/orc/read.rs | 20 ++++++++++- tests/it/io/orc/write.py | 4 +++ 4 files changed, 129 insertions(+), 21 deletions(-) diff --git a/src/io/orc/read/mod.rs b/src/io/orc/read/mod.rs index 629300dbed2..f4723f807ee 100644 --- a/src/io/orc/read/mod.rs +++ b/src/io/orc/read/mod.rs @@ -1,10 +1,11 @@ //! APIs to read from [ORC format](https://orc.apache.org). use std::io::{Read, Seek, SeekFrom}; -use crate::array::{Array, BooleanArray, Float32Array, Int32Array, Int64Array}; +use crate::array::{Array, BooleanArray, Int64Array, PrimitiveArray}; use crate::bitmap::{Bitmap, MutableBitmap}; use crate::datatypes::{DataType, Field, Schema}; use crate::error::Error; +use crate::types::NativeType; use orc_format::fallible_streaming_iterator::FallibleStreamingIterator; use orc_format::proto::stream::Kind; @@ -102,11 +103,11 @@ fn deserialize_validity( } /// Deserializes column `column` from `stripe`, assumed to represent a f32 -fn deserialize_f32( +fn deserialize_float( data_type: DataType, stripe: &Stripe, column: usize, -) -> Result { +) -> Result, Error> { let mut scratch = vec![]; let num_rows = stripe.number_of_rows(); @@ -118,23 +119,29 @@ fn deserialize_f32( if let Some(validity) = &validity { let mut validity_iter = validity.iter(); while let Some(chunk) = chunks.next()? { - let mut valid_iter = decode::deserialize_f32(chunk); + let mut valid_iter = chunk + .chunks_exact(std::mem::size_of::()) + .map(|chunk| T::from_le_bytes(chunk.try_into().unwrap_or_default())); let iter = validity_iter.by_ref().map(|is_valid| { if is_valid { valid_iter.next().unwrap() } else { - 0.0f32 + T::default() } }); values.extend(iter); } } else { while let Some(chunk) = chunks.next()? { - values.extend(decode::deserialize_f32(chunk)); + values.extend( + chunk + .chunks_exact(std::mem::size_of::()) + .map(|chunk| T::from_le_bytes(chunk.try_into().unwrap_or_default())), + ); } } - Float32Array::try_new(data_type, values.into(), validity) + PrimitiveArray::try_new(data_type, values.into(), validity) } /// Deserializes column `column` from `stripe`, assumed to represent a boolean array @@ -266,11 +273,14 @@ fn deserialize_i64( } /// Deserializes column `column` from `stripe`, assumed to represent a boolean array -fn deserialize_i32( +fn deserialize_int( data_type: DataType, stripe: &Stripe, column: usize, -) -> Result { +) -> Result, Error> +where + T: NativeType + TryFrom, +{ let num_rows = stripe.number_of_rows(); let mut scratch = vec![]; @@ -278,7 +288,7 @@ fn deserialize_i32( let mut chunks = stripe.get_bytes(column, Kind::Data, std::mem::take(&mut scratch))?; - let mut values = Vec::with_capacity(num_rows); + let mut values = Vec::::with_capacity(num_rows); if let Some(validity) = &validity { let validity_iter = validity.iter(); @@ -292,30 +302,51 @@ fn deserialize_i32( iter = IntIter::new(chunks.next()?.unwrap()); iter.next().transpose()?.unwrap() }; - values.push(item as i32); + let item = item + .try_into() + .map_err(|_| Error::ExternalFormat("value uncastable".to_string()))?; + values.push(item); } else { - values.push(0); + values.push(T::default()); } } } else { while let Some(chunk) = chunks.next()? { decode::SignedRleV2Iter::new(chunk).try_for_each(|run| { - run.map(|run| match run { + run.map_err(Error::from).and_then(|run| match run { decode::SignedRleV2Run::Direct(values_iter) => { - values.extend(values_iter.map(|x| x as i32)) + for item in values_iter { + let item = item.try_into().map_err(|_| { + Error::ExternalFormat("value uncastable".to_string()) + })?; + values.push(item); + } + Ok(()) } decode::SignedRleV2Run::Delta(values_iter) => { - values.extend(values_iter.map(|x| x as i32)) + for item in values_iter { + let item = item.try_into().map_err(|_| { + Error::ExternalFormat("value uncastable".to_string()) + })?; + values.push(item); + } + Ok(()) } decode::SignedRleV2Run::ShortRepeat(values_iter) => { - values.extend(values_iter.map(|x| x as i32)) + for item in values_iter { + let item = item.try_into().map_err(|_| { + Error::ExternalFormat("value uncastable".to_string()) + })?; + values.push(item); + } + Ok(()) } }) })?; } } - Int32Array::try_new(data_type, values.into(), validity) + PrimitiveArray::try_new(data_type, values.into(), validity) } /// Deserializes column `column` from `stripe`, assumed @@ -327,9 +358,12 @@ pub fn deserialize( ) -> Result, Error> { match data_type { DataType::Boolean => deserialize_bool(data_type, stripe, column).map(|x| x.boxed()), - DataType::Int32 => deserialize_i32(data_type, stripe, column).map(|x| x.boxed()), + DataType::Int8 => deserialize_int::(data_type, stripe, column).map(|x| x.boxed()), + DataType::Int16 => deserialize_int::(data_type, stripe, column).map(|x| x.boxed()), + DataType::Int32 => deserialize_int::(data_type, stripe, column).map(|x| x.boxed()), DataType::Int64 => deserialize_i64(data_type, stripe, column).map(|x| x.boxed()), - DataType::Float32 => deserialize_f32(data_type, stripe, column).map(|x| x.boxed()), + DataType::Float32 => deserialize_float::(data_type, stripe, column).map(|x| x.boxed()), + DataType::Float64 => deserialize_float::(data_type, stripe, column).map(|x| x.boxed()), dt => return Err(Error::nyi(format!("Reading {dt:?} from ORC"))), } } diff --git a/src/types/native.rs b/src/types/native.rs index 7e7711589b6..fc100017a76 100644 --- a/src/types/native.rs +++ b/src/types/native.rs @@ -28,7 +28,8 @@ pub trait NativeType: + std::ops::Index + std::ops::IndexMut + for<'a> TryFrom<&'a [u8]> - + std::fmt::Debug; + + std::fmt::Debug + + Default; /// To bytes in little endian fn to_le_bytes(&self) -> Self::Bytes; @@ -36,6 +37,9 @@ pub trait NativeType: /// To bytes in big endian fn to_be_bytes(&self) -> Self::Bytes; + /// From bytes in little endian + fn from_le_bytes(bytes: Self::Bytes) -> Self; + /// From bytes in big endian fn from_be_bytes(bytes: Self::Bytes) -> Self; } @@ -56,6 +60,11 @@ macro_rules! native_type { Self::to_be_bytes(*self) } + #[inline] + fn from_le_bytes(bytes: Self::Bytes) -> Self { + Self::from_le_bytes(bytes) + } + #[inline] fn from_be_bytes(bytes: Self::Bytes) -> Self { Self::from_be_bytes(bytes) @@ -137,6 +146,21 @@ impl NativeType for days_ms { result } + #[inline] + fn from_le_bytes(bytes: Self::Bytes) -> Self { + let mut days = [0; 4]; + days[0] = bytes[0]; + days[1] = bytes[1]; + days[2] = bytes[2]; + days[3] = bytes[3]; + let mut ms = [0; 4]; + ms[0] = bytes[4]; + ms[1] = bytes[5]; + ms[2] = bytes[6]; + ms[3] = bytes[7]; + Self(i32::from_le_bytes(days), i32::from_le_bytes(ms)) + } + #[inline] fn from_be_bytes(bytes: Self::Bytes) -> Self { let mut days = [0; 4]; @@ -228,6 +252,29 @@ impl NativeType for months_days_ns { result } + #[inline] + fn from_le_bytes(bytes: Self::Bytes) -> Self { + let mut months = [0; 4]; + months[0] = bytes[0]; + months[1] = bytes[1]; + months[2] = bytes[2]; + months[3] = bytes[3]; + let mut days = [0; 4]; + days[0] = bytes[4]; + days[1] = bytes[5]; + days[2] = bytes[6]; + days[3] = bytes[7]; + let mut ns = [0; 8]; + (0..8).for_each(|i| { + ns[i] = bytes[8 + i]; + }); + Self( + i32::from_le_bytes(months), + i32::from_le_bytes(days), + i64::from_le_bytes(ns), + ) + } + #[inline] fn from_be_bytes(bytes: Self::Bytes) -> Self { let mut months = [0; 4]; @@ -446,6 +493,11 @@ impl NativeType for f16 { fn from_be_bytes(bytes: Self::Bytes) -> Self { Self(u16::from_be_bytes(bytes)) } + + #[inline] + fn from_le_bytes(bytes: Self::Bytes) -> Self { + Self(u16::from_le_bytes(bytes)) + } } #[cfg(test)] diff --git a/tests/it/io/orc/read.rs b/tests/it/io/orc/read.rs index 7347d01e65d..f6dd8fdeba1 100644 --- a/tests/it/io/orc/read.rs +++ b/tests/it/io/orc/read.rs @@ -9,7 +9,7 @@ fn infer() -> Result<(), Error> { let (_, footer, _) = format::read::read_metadata(&mut reader)?; let schema = read::infer_schema(&footer)?; - assert_eq!(schema.fields.len(), 6); + assert_eq!(schema.fields.len(), 8); Ok(()) } @@ -31,6 +31,24 @@ fn float32() -> Result<(), Error> { Ok(()) } +#[test] +fn float64() -> Result<(), Error> { + let mut reader = std::fs::File::open("fixtures/pyorc/test.orc").unwrap(); + let (ps, footer, _) = format::read::read_metadata(&mut reader)?; + let stripe = read::read_stripe(&mut reader, footer.stripes[0].clone(), ps.compression())?; + + assert_eq!( + read::deserialize(DataType::Float64, &stripe, 7)?, + Float64Array::from([Some(1.0), Some(2.0), None, Some(4.0), Some(5.0)]).boxed() + ); + + assert_eq!( + read::deserialize(DataType::Float64, &stripe, 8)?, + Float64Array::from([Some(1.0), Some(2.0), Some(3.0), Some(4.0), Some(5.0)]).boxed() + ); + Ok(()) +} + #[test] fn boolean() -> Result<(), Error> { let mut reader = std::fs::File::open("fixtures/pyorc/test.orc").unwrap(); diff --git a/tests/it/io/orc/write.py b/tests/it/io/orc/write.py index 633d28b8dd9..3416f1d21f4 100644 --- a/tests/it/io/orc/write.py +++ b/tests/it/io/orc/write.py @@ -10,6 +10,8 @@ "bool_required": [True, False, True, True, False], "int_nulable": [5, -5, None, 5, 5], "int_required": [5, -5, 1, 5, 5], + "double_nulable": [1.0, 2.0, None, 4.0, 5.0], + "double_required": [1.0, 2.0, 3.0, 4.0, 5.0], } def infer_schema(data): @@ -26,6 +28,8 @@ def infer_schema(data): dt = "string" else: raise NotImplementedError + if key.startswith("double"): + dt = "double" schema += key + ":" + dt + "," schema = schema[:-1] + ">" From c02b7b78865475187b971979601fbe501d5081f1 Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Sat, 30 Jul 2022 08:50:07 +0000 Subject: [PATCH 5/8] Bumped and updated --- Cargo.toml | 2 +- src/io/orc/mod.rs | 4 +- src/io/orc/read/mod.rs | 316 +++++++++++++-------------------------- tests/it/io/orc/read.rs | 89 ++++++++--- tests/it/io/orc/write.py | 4 + 5 files changed, 182 insertions(+), 233 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index da5fef8481a..c823cb058ba 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -80,7 +80,7 @@ crc = { version = "2", optional = true } async-stream = { version = "0.3.2", optional = true } # ORC support -orc-format = { git = "https://github.com/DataEngineeringLabs/orc-format.git", optional = true } +orc-format = { git = "https://github.com/DataEngineeringLabs/orc-format.git", branch = "iters", optional = true } # Arrow integration tests support serde = { version = "^1.0", features = ["rc"], optional = true } diff --git a/src/io/orc/mod.rs b/src/io/orc/mod.rs index f9d240113dd..6d1ac5953e3 100644 --- a/src/io/orc/mod.rs +++ b/src/io/orc/mod.rs @@ -5,8 +5,8 @@ pub use orc_format as format; use crate::error::Error; -impl From for Error { - fn from(error: format::Error) -> Self { +impl From for Error { + fn from(error: format::error::Error) -> Self { Error::ExternalFormat(format!("{:?}", error)) } } diff --git a/src/io/orc/read/mod.rs b/src/io/orc/read/mod.rs index f4723f807ee..c9907971ef0 100644 --- a/src/io/orc/read/mod.rs +++ b/src/io/orc/read/mod.rs @@ -1,17 +1,14 @@ //! APIs to read from [ORC format](https://orc.apache.org). -use std::io::{Read, Seek, SeekFrom}; - use crate::array::{Array, BooleanArray, Int64Array, PrimitiveArray}; use crate::bitmap::{Bitmap, MutableBitmap}; use crate::datatypes::{DataType, Field, Schema}; use crate::error::Error; use crate::types::NativeType; -use orc_format::fallible_streaming_iterator::FallibleStreamingIterator; use orc_format::proto::stream::Kind; -use orc_format::proto::{CompressionKind, Footer, StripeInformation, Type}; +use orc_format::proto::{Footer, Type}; use orc_format::read::decode; -use orc_format::read::Stripe; +use orc_format::read::Column; /// Infers a [`Schema`] from the files' [`Footer`]. /// # Errors @@ -64,80 +61,50 @@ fn infer_dt(type_: &Type, types: &[Type]) -> Result { Ok(dt) } -/// Reads the stripe [`StripeInformation`] into memory. -pub fn read_stripe( - reader: &mut R, - stripe_info: StripeInformation, - compression: CompressionKind, -) -> Result { - let offset = stripe_info.offset(); - reader.seek(SeekFrom::Start(offset)).unwrap(); - - let len = stripe_info.index_length() + stripe_info.data_length() + stripe_info.footer_length(); - let mut stripe = vec![0; len as usize]; - reader.read_exact(&mut stripe).unwrap(); +fn deserialize_validity(column: &Column, scratch: &mut Vec) -> Result, Error> { + let mut stream = column.get_stream(Kind::Present, std::mem::take(scratch))?; - Ok(Stripe::try_new(stripe, stripe_info, compression)?) -} + let stream = decode::BooleanIter::new(&mut stream, column.number_of_rows()); -fn deserialize_validity( - stripe: &Stripe, - column: usize, - scratch: &mut Vec, -) -> Result, Error> { - let mut chunks = stripe.get_bytes(column, Kind::Present, std::mem::take(scratch))?; - - let mut validity = MutableBitmap::with_capacity(stripe.number_of_rows()); - let mut remaining = stripe.number_of_rows(); - while let Some(chunk) = chunks.next()? { - // todo: this can be faster by iterating in bytes instead of single bits via `BooleanRun` - let iter = decode::BooleanIter::new(chunk, remaining); - for item in iter { - remaining -= 1; - validity.push(item?) - } + let mut validity = MutableBitmap::with_capacity(column.number_of_rows()); + for item in stream { + validity.push(item?) } - *scratch = std::mem::take(&mut chunks.into_inner()); + + //*scratch = std::mem::take(&mut stream.into_inner()); Ok(validity.into()) } /// Deserializes column `column` from `stripe`, assumed to represent a f32 -fn deserialize_float( +fn deserialize_float( data_type: DataType, - stripe: &Stripe, - column: usize, + column: &Column, ) -> Result, Error> { let mut scratch = vec![]; - let num_rows = stripe.number_of_rows(); + let num_rows = column.number_of_rows(); - let validity = deserialize_validity(stripe, column, &mut scratch)?; + let validity = deserialize_validity(column, &mut scratch)?; - let mut chunks = stripe.get_bytes(column, Kind::Data, scratch)?; + let mut chunks = column.get_stream(Kind::Data, scratch)?; let mut values = Vec::with_capacity(num_rows); if let Some(validity) = &validity { - let mut validity_iter = validity.iter(); - while let Some(chunk) = chunks.next()? { - let mut valid_iter = chunk - .chunks_exact(std::mem::size_of::()) - .map(|chunk| T::from_le_bytes(chunk.try_into().unwrap_or_default())); - let iter = validity_iter.by_ref().map(|is_valid| { - if is_valid { - valid_iter.next().unwrap() - } else { - T::default() - } - }); - values.extend(iter); + let validity_iter = validity.iter(); + let mut items = + decode::FloatIter::::new(&mut chunks, validity.len() - validity.unset_bits()); + + for is_valid in validity_iter { + if is_valid { + values.push(items.next().transpose()?.unwrap_or_default()) + } else { + values.push(T::default()) + } } } else { - while let Some(chunk) = chunks.next()? { - values.extend( - chunk - .chunks_exact(std::mem::size_of::()) - .map(|chunk| T::from_le_bytes(chunk.try_into().unwrap_or_default())), - ); + let items = decode::FloatIter::::new(&mut chunks, num_rows); + for item in items { + values.push(item?); } } @@ -145,163 +112,96 @@ fn deserialize_float( } /// Deserializes column `column` from `stripe`, assumed to represent a boolean array -fn deserialize_bool( - data_type: DataType, - stripe: &Stripe, - column: usize, -) -> Result { - let num_rows = stripe.number_of_rows(); +fn deserialize_bool(data_type: DataType, column: &Column) -> Result { + let num_rows = column.number_of_rows(); let mut scratch = vec![]; - let validity = deserialize_validity(stripe, column, &mut scratch)?; + let validity = deserialize_validity(column, &mut scratch)?; - let mut chunks = stripe.get_bytes(column, Kind::Data, std::mem::take(&mut scratch))?; + let mut chunks = column.get_stream(Kind::Data, std::mem::take(&mut scratch))?; let mut values = MutableBitmap::with_capacity(num_rows); if let Some(validity) = &validity { - let mut validity_iter = validity.iter(); - - while let Some(chunk) = chunks.next()? { - let mut valid_iter = decode::BooleanIter::new(chunk, chunk.len() * 8); - validity_iter.by_ref().try_for_each(|is_valid| { - values.push(if is_valid { - valid_iter.next().unwrap()? - } else { - false - }); - Result::<(), Error>::Ok(()) - })?; + let validity_iter = validity.iter(); + + let mut items = + decode::BooleanIter::new(&mut chunks, validity.len() - validity.unset_bits()); + + for is_valid in validity_iter { + values.push(if is_valid { + items.next().transpose()?.unwrap_or_default() + } else { + false + }); } } else { - while let Some(chunk) = chunks.next()? { - let valid_iter = decode::BooleanIter::new(chunk, num_rows); - for v in valid_iter { - values.push(v?) - } + let valid_iter = decode::BooleanIter::new(&mut chunks, num_rows); + for v in valid_iter { + values.push(v?) } } BooleanArray::try_new(data_type, values.into(), validity) } -struct IntIter<'a> { - current: Option>, - runs: decode::SignedRleV2Iter<'a>, -} - -impl<'a> IntIter<'a> { - fn new(data: &'a [u8]) -> Self { - Self { - runs: decode::SignedRleV2Iter::new(data), - current: None, - } - } -} - -impl<'a> Iterator for IntIter<'a> { - type Item = Result; - - #[inline] - fn next(&mut self) -> Option { - let next = if let Some(run) = &mut self.current { - match run { - decode::SignedRleV2Run::Direct(values_iter) => values_iter.next(), - decode::SignedRleV2Run::Delta(values_iter) => values_iter.next(), - decode::SignedRleV2Run::ShortRepeat(values_iter) => values_iter.next(), - } - } else { - None - }; - - if next.is_none() { - match self.runs.next()? { - Ok(run) => self.current = Some(run), - Err(e) => return Some(Err(Error::ExternalFormat(format!("{:?}", e)))), - } - self.next() - } else { - next.map(Ok) - } - } -} - /// Deserializes column `column` from `stripe`, assumed to represent a boolean array -fn deserialize_i64( - data_type: DataType, - stripe: &Stripe, - column: usize, -) -> Result { - let num_rows = stripe.number_of_rows(); +fn deserialize_i64(data_type: DataType, column: &Column) -> Result { + let num_rows = column.number_of_rows(); let mut scratch = vec![]; - let validity = deserialize_validity(stripe, column, &mut scratch)?; + let validity = deserialize_validity(column, &mut scratch)?; - let mut chunks = stripe.get_bytes(column, Kind::Data, std::mem::take(&mut scratch))?; + let chunks = column.get_stream(Kind::Data, std::mem::take(&mut scratch))?; let mut values = Vec::with_capacity(num_rows); if let Some(validity) = &validity { let validity_iter = validity.iter(); + let mut iter = + decode::SignedRleV2Iter::new(chunks, validity.len() - validity.unset_bits(), vec![]); - let mut iter = IntIter::new(chunks.next()?.unwrap()); for is_valid in validity_iter { if is_valid { - let item = iter.next().transpose()?; - let item = if let Some(item) = item { - item - } else { - iter = IntIter::new(chunks.next()?.unwrap()); - iter.next().transpose()?.unwrap() - }; + let item = iter.next().transpose()?.unwrap_or_default(); values.push(item); } else { values.push(0); } } } else { - while let Some(chunk) = chunks.next()? { - decode::SignedRleV2Iter::new(chunk).try_for_each(|run| { - run.map(|run| match run { - decode::SignedRleV2Run::Direct(values_iter) => values.extend(values_iter), - decode::SignedRleV2Run::Delta(values_iter) => values.extend(values_iter), - decode::SignedRleV2Run::ShortRepeat(values_iter) => values.extend(values_iter), - }) - })?; - } + let mut iter = decode::SignedRleV2RunIter::new(chunks, num_rows, vec![]); + iter.try_for_each(|run| { + run.map(|run| match run { + decode::SignedRleV2Run::Direct(values_iter) => values.extend(values_iter), + decode::SignedRleV2Run::Delta(values_iter) => values.extend(values_iter), + decode::SignedRleV2Run::ShortRepeat(values_iter) => values.extend(values_iter), + }) + })?; } Int64Array::try_new(data_type, values.into(), validity) } /// Deserializes column `column` from `stripe`, assumed to represent a boolean array -fn deserialize_int( - data_type: DataType, - stripe: &Stripe, - column: usize, -) -> Result, Error> +fn deserialize_int(data_type: DataType, column: &Column) -> Result, Error> where T: NativeType + TryFrom, { - let num_rows = stripe.number_of_rows(); + let num_rows = column.number_of_rows(); let mut scratch = vec![]; - let validity = deserialize_validity(stripe, column, &mut scratch)?; + let validity = deserialize_validity(column, &mut scratch)?; - let mut chunks = stripe.get_bytes(column, Kind::Data, std::mem::take(&mut scratch))?; + let chunks = column.get_stream(Kind::Data, std::mem::take(&mut scratch))?; let mut values = Vec::::with_capacity(num_rows); if let Some(validity) = &validity { let validity_iter = validity.iter(); - let mut iter = IntIter::new(chunks.next()?.unwrap()); + let mut iter = + decode::SignedRleV2Iter::new(chunks, validity.len() - validity.unset_bits(), vec![]); for is_valid in validity_iter { if is_valid { - let item = iter.next().transpose()?; - let item = if let Some(item) = item { - item - } else { - iter = IntIter::new(chunks.next()?.unwrap()); - iter.next().transpose()?.unwrap() - }; + let item = iter.next().transpose()?.unwrap_or_default(); let item = item .try_into() .map_err(|_| Error::ExternalFormat("value uncastable".to_string()))?; @@ -311,39 +211,39 @@ where } } } else { - while let Some(chunk) = chunks.next()? { - decode::SignedRleV2Iter::new(chunk).try_for_each(|run| { - run.map_err(Error::from).and_then(|run| match run { - decode::SignedRleV2Run::Direct(values_iter) => { - for item in values_iter { - let item = item.try_into().map_err(|_| { - Error::ExternalFormat("value uncastable".to_string()) - })?; - values.push(item); - } - Ok(()) + let mut iter = decode::SignedRleV2RunIter::new(chunks, num_rows, vec![]); + + iter.try_for_each(|run| { + run.map_err(Error::from).and_then(|run| match run { + decode::SignedRleV2Run::Direct(values_iter) => { + for item in values_iter { + let item = item + .try_into() + .map_err(|_| Error::ExternalFormat("value uncastable".to_string()))?; + values.push(item); } - decode::SignedRleV2Run::Delta(values_iter) => { - for item in values_iter { - let item = item.try_into().map_err(|_| { - Error::ExternalFormat("value uncastable".to_string()) - })?; - values.push(item); - } - Ok(()) + Ok(()) + } + decode::SignedRleV2Run::Delta(values_iter) => { + for item in values_iter { + let item = item + .try_into() + .map_err(|_| Error::ExternalFormat("value uncastable".to_string()))?; + values.push(item); } - decode::SignedRleV2Run::ShortRepeat(values_iter) => { - for item in values_iter { - let item = item.try_into().map_err(|_| { - Error::ExternalFormat("value uncastable".to_string()) - })?; - values.push(item); - } - Ok(()) + Ok(()) + } + decode::SignedRleV2Run::ShortRepeat(values_iter) => { + for item in values_iter { + let item = item + .try_into() + .map_err(|_| Error::ExternalFormat("value uncastable".to_string()))?; + values.push(item); } - }) - })?; - } + Ok(()) + } + }) + })?; } PrimitiveArray::try_new(data_type, values.into(), validity) @@ -351,19 +251,15 @@ where /// Deserializes column `column` from `stripe`, assumed /// to represent an array of `data_type`. -pub fn deserialize( - data_type: DataType, - stripe: &Stripe, - column: usize, -) -> Result, Error> { +pub fn deserialize(data_type: DataType, column: &Column) -> Result, Error> { match data_type { - DataType::Boolean => deserialize_bool(data_type, stripe, column).map(|x| x.boxed()), - DataType::Int8 => deserialize_int::(data_type, stripe, column).map(|x| x.boxed()), - DataType::Int16 => deserialize_int::(data_type, stripe, column).map(|x| x.boxed()), - DataType::Int32 => deserialize_int::(data_type, stripe, column).map(|x| x.boxed()), - DataType::Int64 => deserialize_i64(data_type, stripe, column).map(|x| x.boxed()), - DataType::Float32 => deserialize_float::(data_type, stripe, column).map(|x| x.boxed()), - DataType::Float64 => deserialize_float::(data_type, stripe, column).map(|x| x.boxed()), + DataType::Boolean => deserialize_bool(data_type, column).map(|x| x.boxed()), + DataType::Int8 => deserialize_int::(data_type, column).map(|x| x.boxed()), + DataType::Int16 => deserialize_int::(data_type, column).map(|x| x.boxed()), + DataType::Int32 => deserialize_int::(data_type, column).map(|x| x.boxed()), + DataType::Int64 => deserialize_i64(data_type, column).map(|x| x.boxed()), + DataType::Float32 => deserialize_float::(data_type, column).map(|x| x.boxed()), + DataType::Float64 => deserialize_float::(data_type, column).map(|x| x.boxed()), dt => return Err(Error::nyi(format!("Reading {dt:?} from ORC"))), } } diff --git a/tests/it/io/orc/read.rs b/tests/it/io/orc/read.rs index f6dd8fdeba1..08da7e587b0 100644 --- a/tests/it/io/orc/read.rs +++ b/tests/it/io/orc/read.rs @@ -6,26 +6,33 @@ use arrow2::io::orc::{format, read}; #[test] fn infer() -> Result<(), Error> { let mut reader = std::fs::File::open("fixtures/pyorc/test.orc").unwrap(); - let (_, footer, _) = format::read::read_metadata(&mut reader)?; - let schema = read::infer_schema(&footer)?; + let metadata = format::read::read_metadata(&mut reader)?; + let schema = read::infer_schema(&metadata.footer)?; - assert_eq!(schema.fields.len(), 8); + assert_eq!(schema.fields.len(), 10); Ok(()) } #[test] fn float32() -> Result<(), Error> { let mut reader = std::fs::File::open("fixtures/pyorc/test.orc").unwrap(); - let (ps, footer, _) = format::read::read_metadata(&mut reader)?; - let stripe = read::read_stripe(&mut reader, footer.stripes[0].clone(), ps.compression())?; + let metadata = format::read::read_metadata(&mut reader)?; + let footer = format::read::read_stripe_footer(&mut reader, &metadata, 0, &mut vec![])?; + + let column = + format::read::read_stripe_column(&mut reader, &metadata, 0, footer.clone(), 1, vec![])?; assert_eq!( - read::deserialize(DataType::Float32, &stripe, 1)?, + read::deserialize(DataType::Float32, &column)?, Float32Array::from([Some(1.0), Some(2.0), None, Some(4.0), Some(5.0)]).boxed() ); + let (footer, scratch) = column.into_inner(); + + let column = format::read::read_stripe_column(&mut reader, &metadata, 0, footer, 2, scratch)?; + assert_eq!( - read::deserialize(DataType::Float32, &stripe, 2)?, + read::deserialize(DataType::Float32, &column)?, Float32Array::from([Some(1.0), Some(2.0), Some(3.0), Some(4.0), Some(5.0)]).boxed() ); Ok(()) @@ -34,16 +41,22 @@ fn float32() -> Result<(), Error> { #[test] fn float64() -> Result<(), Error> { let mut reader = std::fs::File::open("fixtures/pyorc/test.orc").unwrap(); - let (ps, footer, _) = format::read::read_metadata(&mut reader)?; - let stripe = read::read_stripe(&mut reader, footer.stripes[0].clone(), ps.compression())?; + let metadata = format::read::read_metadata(&mut reader)?; + let footer = format::read::read_stripe_footer(&mut reader, &metadata, 0, &mut vec![])?; + + let column = format::read::read_stripe_column(&mut reader, &metadata, 0, footer, 7, vec![])?; assert_eq!( - read::deserialize(DataType::Float64, &stripe, 7)?, + read::deserialize(DataType::Float64, &column)?, Float64Array::from([Some(1.0), Some(2.0), None, Some(4.0), Some(5.0)]).boxed() ); + let (footer, scratch) = column.into_inner(); + + let column = format::read::read_stripe_column(&mut reader, &metadata, 0, footer, 8, vec![])?; + assert_eq!( - read::deserialize(DataType::Float64, &stripe, 8)?, + read::deserialize(DataType::Float64, &column)?, Float64Array::from([Some(1.0), Some(2.0), Some(3.0), Some(4.0), Some(5.0)]).boxed() ); Ok(()) @@ -52,16 +65,22 @@ fn float64() -> Result<(), Error> { #[test] fn boolean() -> Result<(), Error> { let mut reader = std::fs::File::open("fixtures/pyorc/test.orc").unwrap(); - let (ps, footer, _) = format::read::read_metadata(&mut reader)?; - let stripe = read::read_stripe(&mut reader, footer.stripes[0].clone(), ps.compression())?; + let metadata = format::read::read_metadata(&mut reader)?; + let footer = format::read::read_stripe_footer(&mut reader, &metadata, 0, &mut vec![])?; + + let column = format::read::read_stripe_column(&mut reader, &metadata, 0, footer, 3, vec![])?; assert_eq!( - read::deserialize(DataType::Boolean, &stripe, 3)?, + read::deserialize(DataType::Boolean, &column)?, BooleanArray::from([Some(true), Some(false), None, Some(true), Some(false)]).boxed() ); + let (footer, scratch) = column.into_inner(); + + let column = format::read::read_stripe_column(&mut reader, &metadata, 0, footer, 4, vec![])?; + assert_eq!( - read::deserialize(DataType::Boolean, &stripe, 4)?, + read::deserialize(DataType::Boolean, &column)?, BooleanArray::from([Some(true), Some(false), Some(true), Some(true), Some(false)]).boxed() ); Ok(()) @@ -70,17 +89,47 @@ fn boolean() -> Result<(), Error> { #[test] fn int() -> Result<(), Error> { let mut reader = std::fs::File::open("fixtures/pyorc/test.orc").unwrap(); - let (ps, footer, _) = format::read::read_metadata(&mut reader)?; - let stripe = read::read_stripe(&mut reader, footer.stripes[0].clone(), ps.compression())?; + let metadata = format::read::read_metadata(&mut reader)?; + let footer = format::read::read_stripe_footer(&mut reader, &metadata, 0, &mut vec![])?; + + let column = format::read::read_stripe_column(&mut reader, &metadata, 0, footer, 6, vec![])?; + + assert_eq!( + read::deserialize(DataType::Int32, &column)?, + Int32Array::from([Some(5), Some(-5), Some(1), Some(5), Some(5)]).boxed() + ); + + let (footer, _scratch) = column.into_inner(); + + let column = format::read::read_stripe_column(&mut reader, &metadata, 0, footer, 5, vec![])?; assert_eq!( - read::deserialize(DataType::Int32, &stripe, 5)?, + read::deserialize(DataType::Int32, &column)?, Int32Array::from([Some(5), Some(-5), None, Some(5), Some(5)]).boxed() ); + Ok(()) +} + +#[test] +fn bigint() -> Result<(), Error> { + let mut reader = std::fs::File::open("fixtures/pyorc/test.orc").unwrap(); + let metadata = format::read::read_metadata(&mut reader)?; + let footer = format::read::read_stripe_footer(&mut reader, &metadata, 0, &mut vec![])?; + + let column = format::read::read_stripe_column(&mut reader, &metadata, 0, footer, 10, vec![])?; + + assert_eq!( + read::deserialize(DataType::Int64, &column)?, + Int64Array::from([Some(5), Some(-5), Some(1), Some(5), Some(5)]).boxed() + ); + + let (footer, scratch) = column.into_inner(); + + let column = format::read::read_stripe_column(&mut reader, &metadata, 0, footer, 9, vec![])?; assert_eq!( - read::deserialize(DataType::Int32, &stripe, 6)?, - Int32Array::from([Some(5), Some(-5), Some(1), Some(5), Some(5)]).boxed() + read::deserialize(DataType::Int64, &column)?, + Int64Array::from([Some(5), Some(-5), None, Some(5), Some(5)]).boxed() ); Ok(()) } diff --git a/tests/it/io/orc/write.py b/tests/it/io/orc/write.py index 3416f1d21f4..70c767d95a1 100644 --- a/tests/it/io/orc/write.py +++ b/tests/it/io/orc/write.py @@ -12,6 +12,8 @@ "int_required": [5, -5, 1, 5, 5], "double_nulable": [1.0, 2.0, None, 4.0, 5.0], "double_required": [1.0, 2.0, 3.0, 4.0, 5.0], + "bigint_nulable": [5, -5, None, 5, 5], + "bigint_required": [5, -5, 1, 5, 5], } def infer_schema(data): @@ -30,6 +32,8 @@ def infer_schema(data): raise NotImplementedError if key.startswith("double"): dt = "double" + if key.startswith("bigint"): + dt = "bigint" schema += key + ":" + dt + "," schema = schema[:-1] + ">" From 8207a80211d8018a9e5c377740a3b69781e94f9a Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Sat, 30 Jul 2022 15:14:12 +0000 Subject: [PATCH 6/8] Fixed --- src/io/orc/read/mod.rs | 121 ++++++++++++++++++++++++++++++++++----- tests/it/io/orc/read.rs | 35 +++++++++-- tests/it/io/orc/write.py | 2 + 3 files changed, 139 insertions(+), 19 deletions(-) diff --git a/src/io/orc/read/mod.rs b/src/io/orc/read/mod.rs index c9907971ef0..123addf3f9f 100644 --- a/src/io/orc/read/mod.rs +++ b/src/io/orc/read/mod.rs @@ -1,5 +1,9 @@ //! APIs to read from [ORC format](https://orc.apache.org). -use crate::array::{Array, BooleanArray, Int64Array, PrimitiveArray}; +use std::io::Read; + +use crate::array::{ + Array, BinaryArray, BooleanArray, Int64Array, Offset, PrimitiveArray, Utf8Array, +}; use crate::bitmap::{Bitmap, MutableBitmap}; use crate::datatypes::{DataType, Field, Schema}; use crate::error::Error; @@ -62,16 +66,16 @@ fn infer_dt(type_: &Type, types: &[Type]) -> Result { } fn deserialize_validity(column: &Column, scratch: &mut Vec) -> Result, Error> { - let mut stream = column.get_stream(Kind::Present, std::mem::take(scratch))?; + let stream = column.get_stream(Kind::Present, std::mem::take(scratch))?; - let stream = decode::BooleanIter::new(&mut stream, column.number_of_rows()); + let mut stream = decode::BooleanIter::new(stream, column.number_of_rows()); let mut validity = MutableBitmap::with_capacity(column.number_of_rows()); - for item in stream { + for item in stream.by_ref() { validity.push(item?) } - //*scratch = std::mem::take(&mut stream.into_inner()); + *scratch = std::mem::take(&mut stream.into_inner().into_inner()); Ok(validity.into()) } @@ -90,11 +94,10 @@ fn deserialize_float( let mut values = Vec::with_capacity(num_rows); if let Some(validity) = &validity { - let validity_iter = validity.iter(); let mut items = decode::FloatIter::::new(&mut chunks, validity.len() - validity.unset_bits()); - for is_valid in validity_iter { + for is_valid in validity { if is_valid { values.push(items.next().transpose()?.unwrap_or_default()) } else { @@ -122,12 +125,10 @@ fn deserialize_bool(data_type: DataType, column: &Column) -> Result Result, I: Iterator>( + offsets: &mut Vec, + length: &mut O, + iter: I, +) -> Result<(), orc_format::error::Error> { + for item in iter { + println!("{item}"); + let item: O = item + .try_into() + .map_err(|_| orc_format::error::Error::OutOfSpec)?; + *length += item; + offsets.push(*length) + } + Ok(()) +} + +fn deserialize_binary_generic>( + column: &Column, +) -> Result<(Vec, Vec, Option), Error> { + let num_rows = column.number_of_rows(); + let mut scratch = vec![]; + + let validity = deserialize_validity(column, &mut scratch)?; + + let lengths = column.get_stream(Kind::Length, scratch)?; + + let mut offsets = Vec::with_capacity(num_rows + 1); + let mut length = O::default(); + offsets.push(length); + if let Some(validity) = &validity { + let mut iter = + decode::UnsignedRleV2Iter::new(lengths, validity.len() - validity.unset_bits(), vec![]); + for is_valid in validity { + if is_valid { + let item = iter + .next() + .transpose()? + .ok_or(orc_format::error::Error::OutOfSpec)?; + let item: O = item + .try_into() + .map_err(|_| Error::ExternalFormat("value uncastable".to_string()))?; + length += item; + } + offsets.push(length); + } + let (lengths, _) = iter.into_inner(); + scratch = std::mem::take(&mut lengths.into_inner()); + } else { + let mut iter = decode::UnsignedRleV2RunIter::new(lengths, num_rows, vec![]); + iter.try_for_each(|run| { + run.and_then(|run| match run { + decode::UnsignedRleV2Run::Direct(values_iter) => { + try_extend(&mut offsets, &mut length, values_iter) + } + decode::UnsignedRleV2Run::Delta(values_iter) => { + try_extend(&mut offsets, &mut length, values_iter) + } + decode::UnsignedRleV2Run::ShortRepeat(values_iter) => { + try_extend(&mut offsets, &mut length, values_iter) + } + }) + })?; + let (lengths, _) = iter.into_inner(); + scratch = std::mem::take(&mut lengths.into_inner()); + } + let length = length.to_usize(); + let mut values = vec![0; length]; + + let mut data = column.get_stream(Kind::Data, scratch)?; + data.read_exact(&mut values)?; + + Ok((offsets, values, validity)) +} + +fn deserialize_utf8>( + data_type: DataType, + column: &Column, +) -> Result, Error> { + let (offsets, values, validity) = deserialize_binary_generic::(column)?; + Utf8Array::try_new(data_type, offsets.into(), values.into(), validity) +} + +fn deserialize_binary>( + data_type: DataType, + column: &Column, +) -> Result, Error> { + let (offsets, values, validity) = deserialize_binary_generic::(column)?; + BinaryArray::try_new(data_type, offsets.into(), values.into(), validity) +} + /// Deserializes column `column` from `stripe`, assumed /// to represent an array of `data_type`. pub fn deserialize(data_type: DataType, column: &Column) -> Result, Error> { @@ -260,6 +351,10 @@ pub fn deserialize(data_type: DataType, column: &Column) -> Result deserialize_i64(data_type, column).map(|x| x.boxed()), DataType::Float32 => deserialize_float::(data_type, column).map(|x| x.boxed()), DataType::Float64 => deserialize_float::(data_type, column).map(|x| x.boxed()), - dt => return Err(Error::nyi(format!("Reading {dt:?} from ORC"))), + DataType::Utf8 => deserialize_utf8::(data_type, column).map(|x| x.boxed()), + DataType::LargeUtf8 => deserialize_utf8::(data_type, column).map(|x| x.boxed()), + DataType::Binary => deserialize_binary::(data_type, column).map(|x| x.boxed()), + DataType::LargeBinary => deserialize_binary::(data_type, column).map(|x| x.boxed()), + dt => return Err(Error::nyi(format!("Deserializing {dt:?} from ORC"))), } } diff --git a/tests/it/io/orc/read.rs b/tests/it/io/orc/read.rs index 08da7e587b0..06a0c18f329 100644 --- a/tests/it/io/orc/read.rs +++ b/tests/it/io/orc/read.rs @@ -9,7 +9,7 @@ fn infer() -> Result<(), Error> { let metadata = format::read::read_metadata(&mut reader)?; let schema = read::infer_schema(&metadata.footer)?; - assert_eq!(schema.fields.len(), 10); + assert_eq!(schema.fields.len(), 12); Ok(()) } @@ -19,8 +19,7 @@ fn float32() -> Result<(), Error> { let metadata = format::read::read_metadata(&mut reader)?; let footer = format::read::read_stripe_footer(&mut reader, &metadata, 0, &mut vec![])?; - let column = - format::read::read_stripe_column(&mut reader, &metadata, 0, footer.clone(), 1, vec![])?; + let column = format::read::read_stripe_column(&mut reader, &metadata, 0, footer, 1, vec![])?; assert_eq!( read::deserialize(DataType::Float32, &column)?, @@ -53,7 +52,7 @@ fn float64() -> Result<(), Error> { let (footer, scratch) = column.into_inner(); - let column = format::read::read_stripe_column(&mut reader, &metadata, 0, footer, 8, vec![])?; + let column = format::read::read_stripe_column(&mut reader, &metadata, 0, footer, 8, scratch)?; assert_eq!( read::deserialize(DataType::Float64, &column)?, @@ -77,7 +76,7 @@ fn boolean() -> Result<(), Error> { let (footer, scratch) = column.into_inner(); - let column = format::read::read_stripe_column(&mut reader, &metadata, 0, footer, 4, vec![])?; + let column = format::read::read_stripe_column(&mut reader, &metadata, 0, footer, 4, scratch)?; assert_eq!( read::deserialize(DataType::Boolean, &column)?, @@ -125,7 +124,7 @@ fn bigint() -> Result<(), Error> { let (footer, scratch) = column.into_inner(); - let column = format::read::read_stripe_column(&mut reader, &metadata, 0, footer, 9, vec![])?; + let column = format::read::read_stripe_column(&mut reader, &metadata, 0, footer, 9, scratch)?; assert_eq!( read::deserialize(DataType::Int64, &column)?, @@ -133,3 +132,27 @@ fn bigint() -> Result<(), Error> { ); Ok(()) } + +#[test] +fn utf8() -> Result<(), Error> { + let mut reader = std::fs::File::open("fixtures/pyorc/test.orc").unwrap(); + let metadata = format::read::read_metadata(&mut reader)?; + let footer = format::read::read_stripe_footer(&mut reader, &metadata, 0, &mut vec![])?; + + let column = format::read::read_stripe_column(&mut reader, &metadata, 0, footer, 11, vec![])?; + + assert_eq!( + read::deserialize(DataType::Utf8, &column)?, + Utf8Array::::from_slice(["a", "bb", "ccc", "dddd", "eeeee"]).boxed() + ); + + let (footer, _scratch) = column.into_inner(); + + let column = format::read::read_stripe_column(&mut reader, &metadata, 0, footer, 12, vec![])?; + + assert_eq!( + read::deserialize(DataType::Utf8, &column)?, + Utf8Array::::from([Some("a"), Some("bb"), None, Some("dddd"), Some("eeeee")]).boxed() + ); + Ok(()) +} diff --git a/tests/it/io/orc/write.py b/tests/it/io/orc/write.py index 70c767d95a1..25c3f467d22 100644 --- a/tests/it/io/orc/write.py +++ b/tests/it/io/orc/write.py @@ -14,6 +14,8 @@ "double_required": [1.0, 2.0, 3.0, 4.0, 5.0], "bigint_nulable": [5, -5, None, 5, 5], "bigint_required": [5, -5, 1, 5, 5], + "utf8_nulable": ["a", "bb", "ccc", "dddd", "eeeee"], + "utf8_required": ["a", "bb", None, "dddd", "eeeee"], } def infer_schema(data): From d2d1403db03ed71aa32772a3c31453df09cc54d2 Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Sat, 30 Jul 2022 15:39:23 +0000 Subject: [PATCH 7/8] Improved testing --- tests/it/io/orc/read.rs | 105 ++++++++++++++------------------------- tests/it/io/orc/write.py | 10 ++-- 2 files changed, 41 insertions(+), 74 deletions(-) diff --git a/tests/it/io/orc/read.rs b/tests/it/io/orc/read.rs index 06a0c18f329..9d6ccaee4ff 100644 --- a/tests/it/io/orc/read.rs +++ b/tests/it/io/orc/read.rs @@ -13,25 +13,42 @@ fn infer() -> Result<(), Error> { Ok(()) } -#[test] -fn float32() -> Result<(), Error> { +fn deserialize_column(column_name: &str) -> Result, Error> { let mut reader = std::fs::File::open("fixtures/pyorc/test.orc").unwrap(); let metadata = format::read::read_metadata(&mut reader)?; + let schema = read::infer_schema(&metadata.footer)?; + let footer = format::read::read_stripe_footer(&mut reader, &metadata, 0, &mut vec![])?; - let column = format::read::read_stripe_column(&mut reader, &metadata, 0, footer, 1, vec![])?; + let (pos, field) = schema + .fields + .iter() + .enumerate() + .find(|f| f.1.name == column_name) + .unwrap(); + + let data_type = field.data_type.clone(); + let column = format::read::read_stripe_column( + &mut reader, + &metadata, + 0, + footer, + 1 + pos as u32, + vec![], + )?; + + read::deserialize(data_type, &column) +} +#[test] +fn float32() -> Result<(), Error> { assert_eq!( - read::deserialize(DataType::Float32, &column)?, + deserialize_column("float_nullable")?, Float32Array::from([Some(1.0), Some(2.0), None, Some(4.0), Some(5.0)]).boxed() ); - let (footer, scratch) = column.into_inner(); - - let column = format::read::read_stripe_column(&mut reader, &metadata, 0, footer, 2, scratch)?; - assert_eq!( - read::deserialize(DataType::Float32, &column)?, + deserialize_column("float_required")?, Float32Array::from([Some(1.0), Some(2.0), Some(3.0), Some(4.0), Some(5.0)]).boxed() ); Ok(()) @@ -39,23 +56,13 @@ fn float32() -> Result<(), Error> { #[test] fn float64() -> Result<(), Error> { - let mut reader = std::fs::File::open("fixtures/pyorc/test.orc").unwrap(); - let metadata = format::read::read_metadata(&mut reader)?; - let footer = format::read::read_stripe_footer(&mut reader, &metadata, 0, &mut vec![])?; - - let column = format::read::read_stripe_column(&mut reader, &metadata, 0, footer, 7, vec![])?; - assert_eq!( - read::deserialize(DataType::Float64, &column)?, + deserialize_column("double_nullable")?, Float64Array::from([Some(1.0), Some(2.0), None, Some(4.0), Some(5.0)]).boxed() ); - let (footer, scratch) = column.into_inner(); - - let column = format::read::read_stripe_column(&mut reader, &metadata, 0, footer, 8, scratch)?; - assert_eq!( - read::deserialize(DataType::Float64, &column)?, + deserialize_column("double_required")?, Float64Array::from([Some(1.0), Some(2.0), Some(3.0), Some(4.0), Some(5.0)]).boxed() ); Ok(()) @@ -63,23 +70,13 @@ fn float64() -> Result<(), Error> { #[test] fn boolean() -> Result<(), Error> { - let mut reader = std::fs::File::open("fixtures/pyorc/test.orc").unwrap(); - let metadata = format::read::read_metadata(&mut reader)?; - let footer = format::read::read_stripe_footer(&mut reader, &metadata, 0, &mut vec![])?; - - let column = format::read::read_stripe_column(&mut reader, &metadata, 0, footer, 3, vec![])?; - assert_eq!( - read::deserialize(DataType::Boolean, &column)?, + deserialize_column("bool_nullable")?, BooleanArray::from([Some(true), Some(false), None, Some(true), Some(false)]).boxed() ); - let (footer, scratch) = column.into_inner(); - - let column = format::read::read_stripe_column(&mut reader, &metadata, 0, footer, 4, scratch)?; - assert_eq!( - read::deserialize(DataType::Boolean, &column)?, + deserialize_column("bool_required")?, BooleanArray::from([Some(true), Some(false), Some(true), Some(true), Some(false)]).boxed() ); Ok(()) @@ -87,23 +84,13 @@ fn boolean() -> Result<(), Error> { #[test] fn int() -> Result<(), Error> { - let mut reader = std::fs::File::open("fixtures/pyorc/test.orc").unwrap(); - let metadata = format::read::read_metadata(&mut reader)?; - let footer = format::read::read_stripe_footer(&mut reader, &metadata, 0, &mut vec![])?; - - let column = format::read::read_stripe_column(&mut reader, &metadata, 0, footer, 6, vec![])?; - assert_eq!( - read::deserialize(DataType::Int32, &column)?, + deserialize_column("int_required")?, Int32Array::from([Some(5), Some(-5), Some(1), Some(5), Some(5)]).boxed() ); - let (footer, _scratch) = column.into_inner(); - - let column = format::read::read_stripe_column(&mut reader, &metadata, 0, footer, 5, vec![])?; - assert_eq!( - read::deserialize(DataType::Int32, &column)?, + deserialize_column("int_nullable")?, Int32Array::from([Some(5), Some(-5), None, Some(5), Some(5)]).boxed() ); Ok(()) @@ -111,23 +98,13 @@ fn int() -> Result<(), Error> { #[test] fn bigint() -> Result<(), Error> { - let mut reader = std::fs::File::open("fixtures/pyorc/test.orc").unwrap(); - let metadata = format::read::read_metadata(&mut reader)?; - let footer = format::read::read_stripe_footer(&mut reader, &metadata, 0, &mut vec![])?; - - let column = format::read::read_stripe_column(&mut reader, &metadata, 0, footer, 10, vec![])?; - assert_eq!( - read::deserialize(DataType::Int64, &column)?, + deserialize_column("bigint_required")?, Int64Array::from([Some(5), Some(-5), Some(1), Some(5), Some(5)]).boxed() ); - let (footer, scratch) = column.into_inner(); - - let column = format::read::read_stripe_column(&mut reader, &metadata, 0, footer, 9, scratch)?; - assert_eq!( - read::deserialize(DataType::Int64, &column)?, + deserialize_column("bigint_nullable")?, Int64Array::from([Some(5), Some(-5), None, Some(5), Some(5)]).boxed() ); Ok(()) @@ -135,23 +112,13 @@ fn bigint() -> Result<(), Error> { #[test] fn utf8() -> Result<(), Error> { - let mut reader = std::fs::File::open("fixtures/pyorc/test.orc").unwrap(); - let metadata = format::read::read_metadata(&mut reader)?; - let footer = format::read::read_stripe_footer(&mut reader, &metadata, 0, &mut vec![])?; - - let column = format::read::read_stripe_column(&mut reader, &metadata, 0, footer, 11, vec![])?; - assert_eq!( - read::deserialize(DataType::Utf8, &column)?, + deserialize_column("utf8_required")?, Utf8Array::::from_slice(["a", "bb", "ccc", "dddd", "eeeee"]).boxed() ); - let (footer, _scratch) = column.into_inner(); - - let column = format::read::read_stripe_column(&mut reader, &metadata, 0, footer, 12, vec![])?; - assert_eq!( - read::deserialize(DataType::Utf8, &column)?, + deserialize_column("utf8_nullable")?, Utf8Array::::from([Some("a"), Some("bb"), None, Some("dddd"), Some("eeeee")]).boxed() ); Ok(()) diff --git a/tests/it/io/orc/write.py b/tests/it/io/orc/write.py index 25c3f467d22..2593b823eee 100644 --- a/tests/it/io/orc/write.py +++ b/tests/it/io/orc/write.py @@ -8,14 +8,14 @@ "float_required": [1.0, 2.0, 3.0, 4.0, 5.0], "bool_nullable": [True, False, None, True, False], "bool_required": [True, False, True, True, False], - "int_nulable": [5, -5, None, 5, 5], + "int_nullable": [5, -5, None, 5, 5], "int_required": [5, -5, 1, 5, 5], - "double_nulable": [1.0, 2.0, None, 4.0, 5.0], + "double_nullable": [1.0, 2.0, None, 4.0, 5.0], "double_required": [1.0, 2.0, 3.0, 4.0, 5.0], - "bigint_nulable": [5, -5, None, 5, 5], + "bigint_nullable": [5, -5, None, 5, 5], "bigint_required": [5, -5, 1, 5, 5], - "utf8_nulable": ["a", "bb", "ccc", "dddd", "eeeee"], - "utf8_required": ["a", "bb", None, "dddd", "eeeee"], + "utf8_required": ["a", "bb", "ccc", "dddd", "eeeee"], + "utf8_nullable": ["a", "bb", None, "dddd", "eeeee"], } def infer_schema(data): From 069aded1e7d747b6739b32a43bb69a5e77bb2853 Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Sat, 30 Jul 2022 16:10:18 +0000 Subject: [PATCH 8/8] Added example --- .github/workflows/test.yml | 4 ++- Cargo.toml | 2 +- examples/orc_read.rs | 55 ++++++++++++++++++++++++++++++++++++++ src/io/mod.rs | 1 + tests/it/io/orc/read.rs | 1 - 5 files changed, 60 insertions(+), 3 deletions(-) create mode 100644 examples/orc_read.rs diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 2656838ec4f..c9b25238f80 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -42,7 +42,9 @@ jobs: - uses: Swatinem/rust-cache@v1 - name: Run shell: bash - run: ARROW2_IGNORE_PARQUET= cargo test --features full + run: | + cargo check --features full + cargo test --tests clippy: name: Clippy diff --git a/Cargo.toml b/Cargo.toml index c823cb058ba..869fe4fe00a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -80,7 +80,7 @@ crc = { version = "2", optional = true } async-stream = { version = "0.3.2", optional = true } # ORC support -orc-format = { git = "https://github.com/DataEngineeringLabs/orc-format.git", branch = "iters", optional = true } +orc-format = { version = "0.3.0", optional = true } # Arrow integration tests support serde = { version = "^1.0", features = ["rc"], optional = true } diff --git a/examples/orc_read.rs b/examples/orc_read.rs new file mode 100644 index 00000000000..f1a5acee4dd --- /dev/null +++ b/examples/orc_read.rs @@ -0,0 +1,55 @@ +use arrow2::array::*; +use arrow2::error::Error; +use arrow2::io::orc::{format, read}; + +fn deserialize_column(path: &str, column_name: &str) -> Result, Error> { + // open the file + let mut reader = std::fs::File::open(path).unwrap(); + + // read its metadata (IO-bounded) + let metadata = format::read::read_metadata(&mut reader)?; + + // infer its (Arrow) [`Schema`] + let schema = read::infer_schema(&metadata.footer)?; + + // find the position of the column in the schema + let (pos, field) = schema + .fields + .iter() + .enumerate() + .find(|f| f.1.name == column_name) + .unwrap(); + + // pick a stripe (basically a set of rows) + let stripe = 0; + + // read the stripe's footer (IO-bounded) + let footer = format::read::read_stripe_footer(&mut reader, &metadata, stripe, &mut vec![])?; + + // read the column's data from the stripe (IO-bounded) + let data_type = field.data_type.clone(); + let column = format::read::read_stripe_column( + &mut reader, + &metadata, + 0, + footer, + // 1 because ORC schemas always start with a struct, which we ignore + 1 + pos as u32, + vec![], + )?; + + // finally, deserialize to Arrow (CPU-bounded) + read::deserialize(data_type, &column) +} + +fn main() -> Result<(), Error> { + use std::env; + let args: Vec = env::args().collect(); + + let file_path = &args[1]; + let column = &args[2]; + + let array = deserialize_column(file_path, column)?; + println!("{array:?}"); + Ok(()) +} diff --git a/src/io/mod.rs b/src/io/mod.rs index bc7d218ad36..69e4657fd75 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -6,6 +6,7 @@ pub mod odbc; #[cfg(feature = "io_orc")] +#[cfg_attr(docsrs, doc(cfg(feature = "io_orc")))] pub mod orc; #[cfg(any( diff --git a/tests/it/io/orc/read.rs b/tests/it/io/orc/read.rs index 9d6ccaee4ff..a35c54d34e6 100644 --- a/tests/it/io/orc/read.rs +++ b/tests/it/io/orc/read.rs @@ -1,5 +1,4 @@ use arrow2::array::*; -use arrow2::datatypes::DataType; use arrow2::error::Error; use arrow2::io::orc::{format, read};