From 644a1deb90b38f293cf50e1c42c21eb2120b4a28 Mon Sep 17 00:00:00 2001 From: Jorge Leitao Date: Sun, 31 Jul 2022 21:57:02 +0200 Subject: [PATCH] Added support to read ORC (#1189) --- .github/workflows/coverage.yml | 3 +- .github/workflows/test.yml | 9 +- Cargo.toml | 17 +- DEVELOPMENT.md | 4 +- examples/orc_read.rs | 55 +++++ src/io/mod.rs | 4 + src/io/orc/mod.rs | 12 ++ src/io/orc/read/mod.rs | 360 +++++++++++++++++++++++++++++++++ src/types/native.rs | 54 ++++- tests/it/io/mod.rs | 3 + tests/it/io/orc/mod.rs | 1 + tests/it/io/orc/read.rs | 124 ++++++++++++ tests/it/io/orc/write.py | 67 ++++++ 13 files changed, 704 insertions(+), 9 deletions(-) create mode 100644 examples/orc_read.rs create mode 100644 src/io/orc/mod.rs create mode 100644 src/io/orc/read/mod.rs create mode 100644 tests/it/io/orc/mod.rs create mode 100644 tests/it/io/orc/read.rs create mode 100644 tests/it/io/orc/write.py diff --git a/.github/workflows/coverage.yml b/.github/workflows/coverage.yml index 3007684b17e..69d448cd7ca 100644 --- a/.github/workflows/coverage.yml +++ b/.github/workflows/coverage.yml @@ -19,8 +19,9 @@ jobs: python3 -m venv venv source venv/bin/activate pip install pip --upgrade - pip install pyarrow==6 + pip install pyarrow==6 pyorc python parquet_integration/write_parquet.py + python tests/it/io/orc/write.py deactivate - uses: Swatinem/rust-cache@v1 - name: Generate code coverage diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index e0cb434ea60..c9b25238f80 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -13,16 +13,17 @@ jobs: submodules: true # needed to test IPC, which are located in a submodule - name: Install Rust run: rustup update stable - - uses: Swatinem/rust-cache@v1 - name: Setup parquet files run: | apt update && apt install python3-pip python3-venv -y -q python3 -m venv venv source venv/bin/activate pip install pip --upgrade - pip install pyarrow==6 + pip install pyarrow==6 pyorc python parquet_integration/write_parquet.py + python tests/it/io/orc/write.py deactivate + - uses: Swatinem/rust-cache@v1 - name: Run run: cargo test --features full @@ -41,7 +42,9 @@ jobs: - uses: Swatinem/rust-cache@v1 - name: Run shell: bash - run: ARROW2_IGNORE_PARQUET= cargo test --features full + run: | + cargo check --features full + cargo test --tests clippy: name: Clippy diff --git a/Cargo.toml b/Cargo.toml index 1aada784f46..869fe4fe00a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -72,9 +72,6 @@ parquet2 = { version = "0.14.0", optional = true, default_features = false } # avro support avro-schema = { version = "0.2", optional = true } -serde = { version = "^1.0", features = ["rc"], optional = true } -serde_derive = { version = "^1.0", optional = true } -serde_json = { version = "^1.0", features = ["preserve_order"], optional = true } # compression of avro libflate = { version = "1.1.1", optional = true } snap = { version = "1", optional = true } @@ -82,6 +79,14 @@ crc = { version = "2", optional = true } # async avro async-stream = { version = "0.3.2", optional = true } +# ORC support +orc-format = { version = "0.3.0", optional = true } + +# Arrow integration tests support +serde = { version = "^1.0", features = ["rc"], optional = true } +serde_derive = { version = "^1.0", optional = true } +serde_json = { version = "^1.0", features = ["preserve_order"], optional = true } + # for division/remainder optimization at runtime strength_reduce = { version = "0.2", optional = true } @@ -126,6 +131,7 @@ full = [ "io_parquet", "io_parquet_compression", "io_avro", + "io_orc", "io_avro_compression", "io_avro_async", "regex", @@ -145,6 +151,7 @@ io_ipc_write_async = ["io_ipc", "futures"] io_ipc_read_async = ["io_ipc", "futures", "async-stream"] io_ipc_compression = ["lz4", "zstd"] io_flight = ["io_ipc", "arrow-format/flight-data"] + # base64 + io_ipc because arrow schemas are stored as base64-encoded ipc format. io_parquet = ["parquet2", "io_ipc", "base64", "futures", "streaming-iterator", "fallible-streaming-iterator"] io_parquet_compression = [ @@ -154,6 +161,7 @@ io_parquet_compression = [ "parquet2/lz4", "parquet2/brotli", ] + io_avro = ["avro-schema", "streaming-iterator", "fallible-streaming-iterator", "serde_json"] io_avro_compression = [ "libflate", @@ -161,6 +169,9 @@ io_avro_compression = [ "crc", ] io_avro_async = ["io_avro", "futures", "async-stream"] + +io_orc = [ "orc-format" ] + # serde+serde_json: its dependencies + error handling # serde_derive: there is some derive around io_json_integration = ["hex", "serde", "serde_derive", "serde_json", "io_ipc"] diff --git a/DEVELOPMENT.md b/DEVELOPMENT.md index d81e6455838..aced5ffcf3b 100644 --- a/DEVELOPMENT.md +++ b/DEVELOPMENT.md @@ -42,10 +42,12 @@ source venv/bin/activate pip install pip --upgrade # Install pyarrow, version 6 -pip install pyarrow==6 +pip install pyarrow==6 pyorc # Generate the parquet files (this might take some time, depending on your computer setup) python parquet_integration/write_parquet.py +# generate ORC files +python parquet_integration/write_parquet.py # Get out of venv, back to normal terminal deactivate diff --git a/examples/orc_read.rs b/examples/orc_read.rs new file mode 100644 index 00000000000..f1a5acee4dd --- /dev/null +++ b/examples/orc_read.rs @@ -0,0 +1,55 @@ +use arrow2::array::*; +use arrow2::error::Error; +use arrow2::io::orc::{format, read}; + +fn deserialize_column(path: &str, column_name: &str) -> Result, Error> { + // open the file + let mut reader = std::fs::File::open(path).unwrap(); + + // read its metadata (IO-bounded) + let metadata = format::read::read_metadata(&mut reader)?; + + // infer its (Arrow) [`Schema`] + let schema = read::infer_schema(&metadata.footer)?; + + // find the position of the column in the schema + let (pos, field) = schema + .fields + .iter() + .enumerate() + .find(|f| f.1.name == column_name) + .unwrap(); + + // pick a stripe (basically a set of rows) + let stripe = 0; + + // read the stripe's footer (IO-bounded) + let footer = format::read::read_stripe_footer(&mut reader, &metadata, stripe, &mut vec![])?; + + // read the column's data from the stripe (IO-bounded) + let data_type = field.data_type.clone(); + let column = format::read::read_stripe_column( + &mut reader, + &metadata, + 0, + footer, + // 1 because ORC schemas always start with a struct, which we ignore + 1 + pos as u32, + vec![], + )?; + + // finally, deserialize to Arrow (CPU-bounded) + read::deserialize(data_type, &column) +} + +fn main() -> Result<(), Error> { + use std::env; + let args: Vec = env::args().collect(); + + let file_path = &args[1]; + let column = &args[2]; + + let array = deserialize_column(file_path, column)?; + println!("{array:?}"); + Ok(()) +} diff --git a/src/io/mod.rs b/src/io/mod.rs index 9343d4281ce..69e4657fd75 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -5,6 +5,10 @@ #[cfg(feature = "io_odbc")] pub mod odbc; +#[cfg(feature = "io_orc")] +#[cfg_attr(docsrs, doc(cfg(feature = "io_orc")))] +pub mod orc; + #[cfg(any( feature = "io_csv_read", feature = "io_csv_read_async", diff --git a/src/io/orc/mod.rs b/src/io/orc/mod.rs new file mode 100644 index 00000000000..6d1ac5953e3 --- /dev/null +++ b/src/io/orc/mod.rs @@ -0,0 +1,12 @@ +//! APIs to read from [ORC format](https://orc.apache.org). +pub mod read; + +pub use orc_format as format; + +use crate::error::Error; + +impl From for Error { + fn from(error: format::error::Error) -> Self { + Error::ExternalFormat(format!("{:?}", error)) + } +} diff --git a/src/io/orc/read/mod.rs b/src/io/orc/read/mod.rs new file mode 100644 index 00000000000..123addf3f9f --- /dev/null +++ b/src/io/orc/read/mod.rs @@ -0,0 +1,360 @@ +//! APIs to read from [ORC format](https://orc.apache.org). +use std::io::Read; + +use crate::array::{ + Array, BinaryArray, BooleanArray, Int64Array, Offset, PrimitiveArray, Utf8Array, +}; +use crate::bitmap::{Bitmap, MutableBitmap}; +use crate::datatypes::{DataType, Field, Schema}; +use crate::error::Error; +use crate::types::NativeType; + +use orc_format::proto::stream::Kind; +use orc_format::proto::{Footer, Type}; +use orc_format::read::decode; +use orc_format::read::Column; + +/// Infers a [`Schema`] from the files' [`Footer`]. +/// # Errors +/// This function errors if the type is not yet supported. +pub fn infer_schema(footer: &Footer) -> Result { + let types = &footer.types; + + let dt = infer_dt(&footer.types[0], types)?; + if let DataType::Struct(fields) = dt { + Ok(fields.into()) + } else { + Err(Error::ExternalFormat( + "ORC root type must be a struct".to_string(), + )) + } +} + +fn infer_dt(type_: &Type, types: &[Type]) -> Result { + use orc_format::proto::r#type::Kind::*; + let dt = match type_.kind() { + Boolean => DataType::Boolean, + Byte => DataType::Int8, + Short => DataType::Int16, + Int => DataType::Int32, + Long => DataType::Int64, + Float => DataType::Float32, + Double => DataType::Float64, + String => DataType::Utf8, + Binary => DataType::Binary, + Struct => { + let sub_types = type_ + .subtypes + .iter() + .cloned() + .zip(type_.field_names.iter()) + .map(|(i, name)| { + infer_dt( + types.get(i as usize).ok_or_else(|| { + Error::ExternalFormat(format!("ORC field {i} not found")) + })?, + types, + ) + .map(|dt| Field::new(name, dt, true)) + }) + .collect::, Error>>()?; + DataType::Struct(sub_types) + } + kind => return Err(Error::nyi(format!("Reading {kind:?} from ORC"))), + }; + Ok(dt) +} + +fn deserialize_validity(column: &Column, scratch: &mut Vec) -> Result, Error> { + let stream = column.get_stream(Kind::Present, std::mem::take(scratch))?; + + let mut stream = decode::BooleanIter::new(stream, column.number_of_rows()); + + let mut validity = MutableBitmap::with_capacity(column.number_of_rows()); + for item in stream.by_ref() { + validity.push(item?) + } + + *scratch = std::mem::take(&mut stream.into_inner().into_inner()); + + Ok(validity.into()) +} + +/// Deserializes column `column` from `stripe`, assumed to represent a f32 +fn deserialize_float( + data_type: DataType, + column: &Column, +) -> Result, Error> { + let mut scratch = vec![]; + let num_rows = column.number_of_rows(); + + let validity = deserialize_validity(column, &mut scratch)?; + + let mut chunks = column.get_stream(Kind::Data, scratch)?; + + let mut values = Vec::with_capacity(num_rows); + if let Some(validity) = &validity { + let mut items = + decode::FloatIter::::new(&mut chunks, validity.len() - validity.unset_bits()); + + for is_valid in validity { + if is_valid { + values.push(items.next().transpose()?.unwrap_or_default()) + } else { + values.push(T::default()) + } + } + } else { + let items = decode::FloatIter::::new(&mut chunks, num_rows); + for item in items { + values.push(item?); + } + } + + PrimitiveArray::try_new(data_type, values.into(), validity) +} + +/// Deserializes column `column` from `stripe`, assumed to represent a boolean array +fn deserialize_bool(data_type: DataType, column: &Column) -> Result { + let num_rows = column.number_of_rows(); + let mut scratch = vec![]; + + let validity = deserialize_validity(column, &mut scratch)?; + + let mut chunks = column.get_stream(Kind::Data, std::mem::take(&mut scratch))?; + + let mut values = MutableBitmap::with_capacity(num_rows); + if let Some(validity) = &validity { + let mut items = + decode::BooleanIter::new(&mut chunks, validity.len() - validity.unset_bits()); + + for is_valid in validity { + values.push(if is_valid { + items.next().transpose()?.unwrap_or_default() + } else { + false + }); + } + } else { + let valid_iter = decode::BooleanIter::new(&mut chunks, num_rows); + for v in valid_iter { + values.push(v?) + } + } + + BooleanArray::try_new(data_type, values.into(), validity) +} + +/// Deserializes column `column` from `stripe`, assumed to represent a boolean array +fn deserialize_i64(data_type: DataType, column: &Column) -> Result { + let num_rows = column.number_of_rows(); + let mut scratch = vec![]; + + let validity = deserialize_validity(column, &mut scratch)?; + + let chunks = column.get_stream(Kind::Data, std::mem::take(&mut scratch))?; + + let mut values = Vec::with_capacity(num_rows); + if let Some(validity) = &validity { + let mut iter = + decode::SignedRleV2Iter::new(chunks, validity.len() - validity.unset_bits(), vec![]); + + for is_valid in validity { + if is_valid { + let item = iter.next().transpose()?.unwrap_or_default(); + values.push(item); + } else { + values.push(0); + } + } + } else { + let mut iter = decode::SignedRleV2RunIter::new(chunks, num_rows, vec![]); + iter.try_for_each(|run| { + run.map(|run| match run { + decode::SignedRleV2Run::Direct(values_iter) => values.extend(values_iter), + decode::SignedRleV2Run::Delta(values_iter) => values.extend(values_iter), + decode::SignedRleV2Run::ShortRepeat(values_iter) => values.extend(values_iter), + }) + })?; + } + + Int64Array::try_new(data_type, values.into(), validity) +} + +/// Deserializes column `column` from `stripe`, assumed to represent a boolean array +fn deserialize_int(data_type: DataType, column: &Column) -> Result, Error> +where + T: NativeType + TryFrom, +{ + let num_rows = column.number_of_rows(); + let mut scratch = vec![]; + + let validity = deserialize_validity(column, &mut scratch)?; + + let chunks = column.get_stream(Kind::Data, std::mem::take(&mut scratch))?; + + let mut values = Vec::::with_capacity(num_rows); + if let Some(validity) = &validity { + let validity_iter = validity.iter(); + + let mut iter = + decode::SignedRleV2Iter::new(chunks, validity.len() - validity.unset_bits(), vec![]); + for is_valid in validity_iter { + if is_valid { + let item = iter.next().transpose()?.unwrap_or_default(); + let item = item + .try_into() + .map_err(|_| Error::ExternalFormat("value uncastable".to_string()))?; + values.push(item); + } else { + values.push(T::default()); + } + } + } else { + let mut iter = decode::SignedRleV2RunIter::new(chunks, num_rows, vec![]); + + iter.try_for_each(|run| { + run.map_err(Error::from).and_then(|run| match run { + decode::SignedRleV2Run::Direct(values_iter) => { + for item in values_iter { + let item = item + .try_into() + .map_err(|_| Error::ExternalFormat("value uncastable".to_string()))?; + values.push(item); + } + Ok(()) + } + decode::SignedRleV2Run::Delta(values_iter) => { + for item in values_iter { + let item = item + .try_into() + .map_err(|_| Error::ExternalFormat("value uncastable".to_string()))?; + values.push(item); + } + Ok(()) + } + decode::SignedRleV2Run::ShortRepeat(values_iter) => { + for item in values_iter { + let item = item + .try_into() + .map_err(|_| Error::ExternalFormat("value uncastable".to_string()))?; + values.push(item); + } + Ok(()) + } + }) + })?; + } + + PrimitiveArray::try_new(data_type, values.into(), validity) +} + +#[inline] +fn try_extend, I: Iterator>( + offsets: &mut Vec, + length: &mut O, + iter: I, +) -> Result<(), orc_format::error::Error> { + for item in iter { + println!("{item}"); + let item: O = item + .try_into() + .map_err(|_| orc_format::error::Error::OutOfSpec)?; + *length += item; + offsets.push(*length) + } + Ok(()) +} + +fn deserialize_binary_generic>( + column: &Column, +) -> Result<(Vec, Vec, Option), Error> { + let num_rows = column.number_of_rows(); + let mut scratch = vec![]; + + let validity = deserialize_validity(column, &mut scratch)?; + + let lengths = column.get_stream(Kind::Length, scratch)?; + + let mut offsets = Vec::with_capacity(num_rows + 1); + let mut length = O::default(); + offsets.push(length); + if let Some(validity) = &validity { + let mut iter = + decode::UnsignedRleV2Iter::new(lengths, validity.len() - validity.unset_bits(), vec![]); + for is_valid in validity { + if is_valid { + let item = iter + .next() + .transpose()? + .ok_or(orc_format::error::Error::OutOfSpec)?; + let item: O = item + .try_into() + .map_err(|_| Error::ExternalFormat("value uncastable".to_string()))?; + length += item; + } + offsets.push(length); + } + let (lengths, _) = iter.into_inner(); + scratch = std::mem::take(&mut lengths.into_inner()); + } else { + let mut iter = decode::UnsignedRleV2RunIter::new(lengths, num_rows, vec![]); + iter.try_for_each(|run| { + run.and_then(|run| match run { + decode::UnsignedRleV2Run::Direct(values_iter) => { + try_extend(&mut offsets, &mut length, values_iter) + } + decode::UnsignedRleV2Run::Delta(values_iter) => { + try_extend(&mut offsets, &mut length, values_iter) + } + decode::UnsignedRleV2Run::ShortRepeat(values_iter) => { + try_extend(&mut offsets, &mut length, values_iter) + } + }) + })?; + let (lengths, _) = iter.into_inner(); + scratch = std::mem::take(&mut lengths.into_inner()); + } + let length = length.to_usize(); + let mut values = vec![0; length]; + + let mut data = column.get_stream(Kind::Data, scratch)?; + data.read_exact(&mut values)?; + + Ok((offsets, values, validity)) +} + +fn deserialize_utf8>( + data_type: DataType, + column: &Column, +) -> Result, Error> { + let (offsets, values, validity) = deserialize_binary_generic::(column)?; + Utf8Array::try_new(data_type, offsets.into(), values.into(), validity) +} + +fn deserialize_binary>( + data_type: DataType, + column: &Column, +) -> Result, Error> { + let (offsets, values, validity) = deserialize_binary_generic::(column)?; + BinaryArray::try_new(data_type, offsets.into(), values.into(), validity) +} + +/// Deserializes column `column` from `stripe`, assumed +/// to represent an array of `data_type`. +pub fn deserialize(data_type: DataType, column: &Column) -> Result, Error> { + match data_type { + DataType::Boolean => deserialize_bool(data_type, column).map(|x| x.boxed()), + DataType::Int8 => deserialize_int::(data_type, column).map(|x| x.boxed()), + DataType::Int16 => deserialize_int::(data_type, column).map(|x| x.boxed()), + DataType::Int32 => deserialize_int::(data_type, column).map(|x| x.boxed()), + DataType::Int64 => deserialize_i64(data_type, column).map(|x| x.boxed()), + DataType::Float32 => deserialize_float::(data_type, column).map(|x| x.boxed()), + DataType::Float64 => deserialize_float::(data_type, column).map(|x| x.boxed()), + DataType::Utf8 => deserialize_utf8::(data_type, column).map(|x| x.boxed()), + DataType::LargeUtf8 => deserialize_utf8::(data_type, column).map(|x| x.boxed()), + DataType::Binary => deserialize_binary::(data_type, column).map(|x| x.boxed()), + DataType::LargeBinary => deserialize_binary::(data_type, column).map(|x| x.boxed()), + dt => return Err(Error::nyi(format!("Deserializing {dt:?} from ORC"))), + } +} diff --git a/src/types/native.rs b/src/types/native.rs index 7e7711589b6..fc100017a76 100644 --- a/src/types/native.rs +++ b/src/types/native.rs @@ -28,7 +28,8 @@ pub trait NativeType: + std::ops::Index + std::ops::IndexMut + for<'a> TryFrom<&'a [u8]> - + std::fmt::Debug; + + std::fmt::Debug + + Default; /// To bytes in little endian fn to_le_bytes(&self) -> Self::Bytes; @@ -36,6 +37,9 @@ pub trait NativeType: /// To bytes in big endian fn to_be_bytes(&self) -> Self::Bytes; + /// From bytes in little endian + fn from_le_bytes(bytes: Self::Bytes) -> Self; + /// From bytes in big endian fn from_be_bytes(bytes: Self::Bytes) -> Self; } @@ -56,6 +60,11 @@ macro_rules! native_type { Self::to_be_bytes(*self) } + #[inline] + fn from_le_bytes(bytes: Self::Bytes) -> Self { + Self::from_le_bytes(bytes) + } + #[inline] fn from_be_bytes(bytes: Self::Bytes) -> Self { Self::from_be_bytes(bytes) @@ -137,6 +146,21 @@ impl NativeType for days_ms { result } + #[inline] + fn from_le_bytes(bytes: Self::Bytes) -> Self { + let mut days = [0; 4]; + days[0] = bytes[0]; + days[1] = bytes[1]; + days[2] = bytes[2]; + days[3] = bytes[3]; + let mut ms = [0; 4]; + ms[0] = bytes[4]; + ms[1] = bytes[5]; + ms[2] = bytes[6]; + ms[3] = bytes[7]; + Self(i32::from_le_bytes(days), i32::from_le_bytes(ms)) + } + #[inline] fn from_be_bytes(bytes: Self::Bytes) -> Self { let mut days = [0; 4]; @@ -228,6 +252,29 @@ impl NativeType for months_days_ns { result } + #[inline] + fn from_le_bytes(bytes: Self::Bytes) -> Self { + let mut months = [0; 4]; + months[0] = bytes[0]; + months[1] = bytes[1]; + months[2] = bytes[2]; + months[3] = bytes[3]; + let mut days = [0; 4]; + days[0] = bytes[4]; + days[1] = bytes[5]; + days[2] = bytes[6]; + days[3] = bytes[7]; + let mut ns = [0; 8]; + (0..8).for_each(|i| { + ns[i] = bytes[8 + i]; + }); + Self( + i32::from_le_bytes(months), + i32::from_le_bytes(days), + i64::from_le_bytes(ns), + ) + } + #[inline] fn from_be_bytes(bytes: Self::Bytes) -> Self { let mut months = [0; 4]; @@ -446,6 +493,11 @@ impl NativeType for f16 { fn from_be_bytes(bytes: Self::Bytes) -> Self { Self(u16::from_be_bytes(bytes)) } + + #[inline] + fn from_le_bytes(bytes: Self::Bytes) -> Self { + Self(u16::from_le_bytes(bytes)) + } } #[cfg(test)] diff --git a/tests/it/io/mod.rs b/tests/it/io/mod.rs index 3fc0c2123e3..bf228251df3 100644 --- a/tests/it/io/mod.rs +++ b/tests/it/io/mod.rs @@ -16,6 +16,9 @@ mod parquet; #[cfg(feature = "io_avro")] mod avro; +#[cfg(feature = "io_orc")] +mod orc; + #[cfg(any( feature = "io_csv_read", feature = "io_csv_write", diff --git a/tests/it/io/orc/mod.rs b/tests/it/io/orc/mod.rs new file mode 100644 index 00000000000..0bec210a6a5 --- /dev/null +++ b/tests/it/io/orc/mod.rs @@ -0,0 +1 @@ +mod read; diff --git a/tests/it/io/orc/read.rs b/tests/it/io/orc/read.rs new file mode 100644 index 00000000000..a35c54d34e6 --- /dev/null +++ b/tests/it/io/orc/read.rs @@ -0,0 +1,124 @@ +use arrow2::array::*; +use arrow2::error::Error; +use arrow2::io::orc::{format, read}; + +#[test] +fn infer() -> Result<(), Error> { + let mut reader = std::fs::File::open("fixtures/pyorc/test.orc").unwrap(); + let metadata = format::read::read_metadata(&mut reader)?; + let schema = read::infer_schema(&metadata.footer)?; + + assert_eq!(schema.fields.len(), 12); + Ok(()) +} + +fn deserialize_column(column_name: &str) -> Result, Error> { + let mut reader = std::fs::File::open("fixtures/pyorc/test.orc").unwrap(); + let metadata = format::read::read_metadata(&mut reader)?; + let schema = read::infer_schema(&metadata.footer)?; + + let footer = format::read::read_stripe_footer(&mut reader, &metadata, 0, &mut vec![])?; + + let (pos, field) = schema + .fields + .iter() + .enumerate() + .find(|f| f.1.name == column_name) + .unwrap(); + + let data_type = field.data_type.clone(); + let column = format::read::read_stripe_column( + &mut reader, + &metadata, + 0, + footer, + 1 + pos as u32, + vec![], + )?; + + read::deserialize(data_type, &column) +} + +#[test] +fn float32() -> Result<(), Error> { + assert_eq!( + deserialize_column("float_nullable")?, + Float32Array::from([Some(1.0), Some(2.0), None, Some(4.0), Some(5.0)]).boxed() + ); + + assert_eq!( + deserialize_column("float_required")?, + Float32Array::from([Some(1.0), Some(2.0), Some(3.0), Some(4.0), Some(5.0)]).boxed() + ); + Ok(()) +} + +#[test] +fn float64() -> Result<(), Error> { + assert_eq!( + deserialize_column("double_nullable")?, + Float64Array::from([Some(1.0), Some(2.0), None, Some(4.0), Some(5.0)]).boxed() + ); + + assert_eq!( + deserialize_column("double_required")?, + Float64Array::from([Some(1.0), Some(2.0), Some(3.0), Some(4.0), Some(5.0)]).boxed() + ); + Ok(()) +} + +#[test] +fn boolean() -> Result<(), Error> { + assert_eq!( + deserialize_column("bool_nullable")?, + BooleanArray::from([Some(true), Some(false), None, Some(true), Some(false)]).boxed() + ); + + assert_eq!( + deserialize_column("bool_required")?, + BooleanArray::from([Some(true), Some(false), Some(true), Some(true), Some(false)]).boxed() + ); + Ok(()) +} + +#[test] +fn int() -> Result<(), Error> { + assert_eq!( + deserialize_column("int_required")?, + Int32Array::from([Some(5), Some(-5), Some(1), Some(5), Some(5)]).boxed() + ); + + assert_eq!( + deserialize_column("int_nullable")?, + Int32Array::from([Some(5), Some(-5), None, Some(5), Some(5)]).boxed() + ); + Ok(()) +} + +#[test] +fn bigint() -> Result<(), Error> { + assert_eq!( + deserialize_column("bigint_required")?, + Int64Array::from([Some(5), Some(-5), Some(1), Some(5), Some(5)]).boxed() + ); + + assert_eq!( + deserialize_column("bigint_nullable")?, + Int64Array::from([Some(5), Some(-5), None, Some(5), Some(5)]).boxed() + ); + Ok(()) +} + +#[test] +fn utf8() -> Result<(), Error> { + assert_eq!( + deserialize_column("utf8_required")?, + Utf8Array::::from_slice(["a", "bb", "ccc", "dddd", "eeeee"]).boxed() + ); + + assert_eq!( + deserialize_column("utf8_nullable")?, + Utf8Array::::from([Some("a"), Some("bb"), None, Some("dddd"), Some("eeeee")]).boxed() + ); + Ok(()) +} diff --git a/tests/it/io/orc/write.py b/tests/it/io/orc/write.py new file mode 100644 index 00000000000..2593b823eee --- /dev/null +++ b/tests/it/io/orc/write.py @@ -0,0 +1,67 @@ +import os + +import pyorc + + +data = { + "float_nullable": [1.0, 2.0, None, 4.0, 5.0], + "float_required": [1.0, 2.0, 3.0, 4.0, 5.0], + "bool_nullable": [True, False, None, True, False], + "bool_required": [True, False, True, True, False], + "int_nullable": [5, -5, None, 5, 5], + "int_required": [5, -5, 1, 5, 5], + "double_nullable": [1.0, 2.0, None, 4.0, 5.0], + "double_required": [1.0, 2.0, 3.0, 4.0, 5.0], + "bigint_nullable": [5, -5, None, 5, 5], + "bigint_required": [5, -5, 1, 5, 5], + "utf8_required": ["a", "bb", "ccc", "dddd", "eeeee"], + "utf8_nullable": ["a", "bb", None, "dddd", "eeeee"], +} + +def infer_schema(data): + schema = "struct<" + for key, value in data.items(): + dt = type(value[0]) + if dt == float: + dt = "float" + elif dt == int: + dt = "int" + elif dt == bool: + dt = "boolean" + elif dt == str: + dt = "string" + else: + raise NotImplementedError + if key.startswith("double"): + dt = "double" + if key.startswith("bigint"): + dt = "bigint" + schema += key + ":" + dt + "," + + schema = schema[:-1] + ">" + return schema + + +def _write( + data, + file_name: str, + compression=pyorc.CompressionKind.NONE, + dict_key_size_threshold=0.0, +): + schema = infer_schema(data) + + output = open(file_name, "wb") + writer = pyorc.Writer( + output, + schema, + dict_key_size_threshold=dict_key_size_threshold, + compression=compression, + ) + num_rows = len(list(data.values())[0]) + for x in range(num_rows): + row = tuple(values[x] for values in data.values()) + writer.write(row) + writer.close() + +os.makedirs("fixtures/pyorc", exist_ok=True) +_write(data, "fixtures/pyorc/test.orc")