From 86b1c91716544343562ba36d42ce79924c1a347a Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Sat, 30 Jul 2022 08:50:07 +0000 Subject: [PATCH] Bumped and updated --- Cargo.toml | 2 +- src/io/orc/mod.rs | 4 +- src/io/orc/read/mod.rs | 317 +++++++++++++-------------------------- tests/it/io/orc/read.rs | 89 ++++++++--- tests/it/io/orc/write.py | 4 + 5 files changed, 183 insertions(+), 233 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index da5fef8481a..c823cb058ba 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -80,7 +80,7 @@ crc = { version = "2", optional = true } async-stream = { version = "0.3.2", optional = true } # ORC support -orc-format = { git = "https://github.com/DataEngineeringLabs/orc-format.git", optional = true } +orc-format = { git = "https://github.com/DataEngineeringLabs/orc-format.git", branch = "iters", optional = true } # Arrow integration tests support serde = { version = "^1.0", features = ["rc"], optional = true } diff --git a/src/io/orc/mod.rs b/src/io/orc/mod.rs index f9d240113dd..6d1ac5953e3 100644 --- a/src/io/orc/mod.rs +++ b/src/io/orc/mod.rs @@ -5,8 +5,8 @@ pub use orc_format as format; use crate::error::Error; -impl From for Error { - fn from(error: format::Error) -> Self { +impl From for Error { + fn from(error: format::error::Error) -> Self { Error::ExternalFormat(format!("{:?}", error)) } } diff --git a/src/io/orc/read/mod.rs b/src/io/orc/read/mod.rs index f4723f807ee..5e6d933d735 100644 --- a/src/io/orc/read/mod.rs +++ b/src/io/orc/read/mod.rs @@ -1,17 +1,14 @@ //! APIs to read from [ORC format](https://orc.apache.org). -use std::io::{Read, Seek, SeekFrom}; - use crate::array::{Array, BooleanArray, Int64Array, PrimitiveArray}; use crate::bitmap::{Bitmap, MutableBitmap}; use crate::datatypes::{DataType, Field, Schema}; use crate::error::Error; use crate::types::NativeType; -use orc_format::fallible_streaming_iterator::FallibleStreamingIterator; use orc_format::proto::stream::Kind; -use orc_format::proto::{CompressionKind, Footer, StripeInformation, Type}; +use orc_format::proto::{Footer, Type}; use orc_format::read::decode; -use orc_format::read::Stripe; +use orc_format::read::Column; /// Infers a [`Schema`] from the files' [`Footer`]. /// # Errors @@ -64,80 +61,50 @@ fn infer_dt(type_: &Type, types: &[Type]) -> Result { Ok(dt) } -/// Reads the stripe [`StripeInformation`] into memory. -pub fn read_stripe( - reader: &mut R, - stripe_info: StripeInformation, - compression: CompressionKind, -) -> Result { - let offset = stripe_info.offset(); - reader.seek(SeekFrom::Start(offset)).unwrap(); - - let len = stripe_info.index_length() + stripe_info.data_length() + stripe_info.footer_length(); - let mut stripe = vec![0; len as usize]; - reader.read_exact(&mut stripe).unwrap(); +fn deserialize_validity(column: &Column, scratch: &mut Vec) -> Result, Error> { + let mut stream = column.get_stream(Kind::Present, std::mem::take(scratch))?; - Ok(Stripe::try_new(stripe, stripe_info, compression)?) -} + let stream = decode::BooleanIter::new(&mut stream, column.number_of_rows()); -fn deserialize_validity( - stripe: &Stripe, - column: usize, - scratch: &mut Vec, -) -> Result, Error> { - let mut chunks = stripe.get_bytes(column, Kind::Present, std::mem::take(scratch))?; - - let mut validity = MutableBitmap::with_capacity(stripe.number_of_rows()); - let mut remaining = stripe.number_of_rows(); - while let Some(chunk) = chunks.next()? { - // todo: this can be faster by iterating in bytes instead of single bits via `BooleanRun` - let iter = decode::BooleanIter::new(chunk, remaining); - for item in iter { - remaining -= 1; - validity.push(item?) - } + let mut validity = MutableBitmap::with_capacity(column.number_of_rows()); + for item in stream { + validity.push(item?) } - *scratch = std::mem::take(&mut chunks.into_inner()); + + //*scratch = std::mem::take(&mut stream.into_inner()); Ok(validity.into()) } /// Deserializes column `column` from `stripe`, assumed to represent a f32 -fn deserialize_float( +fn deserialize_float( data_type: DataType, - stripe: &Stripe, - column: usize, + column: &Column, ) -> Result, Error> { let mut scratch = vec![]; - let num_rows = stripe.number_of_rows(); + let num_rows = column.number_of_rows(); - let validity = deserialize_validity(stripe, column, &mut scratch)?; + let validity = deserialize_validity(column, &mut scratch)?; - let mut chunks = stripe.get_bytes(column, Kind::Data, scratch)?; + let mut chunks = column.get_stream(Kind::Data, scratch)?; let mut values = Vec::with_capacity(num_rows); if let Some(validity) = &validity { - let mut validity_iter = validity.iter(); - while let Some(chunk) = chunks.next()? { - let mut valid_iter = chunk - .chunks_exact(std::mem::size_of::()) - .map(|chunk| T::from_le_bytes(chunk.try_into().unwrap_or_default())); - let iter = validity_iter.by_ref().map(|is_valid| { - if is_valid { - valid_iter.next().unwrap() - } else { - T::default() - } - }); - values.extend(iter); + let validity_iter = validity.iter(); + let mut items = + decode::FloatIter::::new(&mut chunks, validity.len() - validity.unset_bits()); + + for is_valid in validity_iter { + if is_valid { + values.push(items.next().transpose()?.unwrap_or_default()) + } else { + values.push(T::default()) + } } } else { - while let Some(chunk) = chunks.next()? { - values.extend( - chunk - .chunks_exact(std::mem::size_of::()) - .map(|chunk| T::from_le_bytes(chunk.try_into().unwrap_or_default())), - ); + let items = decode::FloatIter::::new(&mut chunks, num_rows); + for item in items { + values.push(item?); } } @@ -145,163 +112,97 @@ fn deserialize_float( } /// Deserializes column `column` from `stripe`, assumed to represent a boolean array -fn deserialize_bool( - data_type: DataType, - stripe: &Stripe, - column: usize, -) -> Result { - let num_rows = stripe.number_of_rows(); +fn deserialize_bool(data_type: DataType, column: &Column) -> Result { + let num_rows = column.number_of_rows(); let mut scratch = vec![]; - let validity = deserialize_validity(stripe, column, &mut scratch)?; + let validity = deserialize_validity(column, &mut scratch)?; - let mut chunks = stripe.get_bytes(column, Kind::Data, std::mem::take(&mut scratch))?; + let mut chunks = column.get_stream(Kind::Data, std::mem::take(&mut scratch))?; let mut values = MutableBitmap::with_capacity(num_rows); if let Some(validity) = &validity { - let mut validity_iter = validity.iter(); - - while let Some(chunk) = chunks.next()? { - let mut valid_iter = decode::BooleanIter::new(chunk, chunk.len() * 8); - validity_iter.by_ref().try_for_each(|is_valid| { - values.push(if is_valid { - valid_iter.next().unwrap()? - } else { - false - }); - Result::<(), Error>::Ok(()) - })?; + let validity_iter = validity.iter(); + + let mut items = + decode::BooleanIter::new(&mut chunks, validity.len() - validity.unset_bits()); + + for is_valid in validity_iter { + values.push(if is_valid { + items.next().transpose()?.unwrap_or_default() + } else { + false + }); } } else { - while let Some(chunk) = chunks.next()? { - let valid_iter = decode::BooleanIter::new(chunk, num_rows); - for v in valid_iter { - values.push(v?) - } + let valid_iter = decode::BooleanIter::new(&mut chunks, num_rows); + for v in valid_iter { + values.push(v?) } } BooleanArray::try_new(data_type, values.into(), validity) } -struct IntIter<'a> { - current: Option>, - runs: decode::SignedRleV2Iter<'a>, -} - -impl<'a> IntIter<'a> { - fn new(data: &'a [u8]) -> Self { - Self { - runs: decode::SignedRleV2Iter::new(data), - current: None, - } - } -} - -impl<'a> Iterator for IntIter<'a> { - type Item = Result; - - #[inline] - fn next(&mut self) -> Option { - let next = if let Some(run) = &mut self.current { - match run { - decode::SignedRleV2Run::Direct(values_iter) => values_iter.next(), - decode::SignedRleV2Run::Delta(values_iter) => values_iter.next(), - decode::SignedRleV2Run::ShortRepeat(values_iter) => values_iter.next(), - } - } else { - None - }; - - if next.is_none() { - match self.runs.next()? { - Ok(run) => self.current = Some(run), - Err(e) => return Some(Err(Error::ExternalFormat(format!("{:?}", e)))), - } - self.next() - } else { - next.map(Ok) - } - } -} - /// Deserializes column `column` from `stripe`, assumed to represent a boolean array -fn deserialize_i64( - data_type: DataType, - stripe: &Stripe, - column: usize, -) -> Result { - let num_rows = stripe.number_of_rows(); +fn deserialize_i64(data_type: DataType, column: &Column) -> Result { + let num_rows = column.number_of_rows(); let mut scratch = vec![]; - let validity = deserialize_validity(stripe, column, &mut scratch)?; + let validity = deserialize_validity(column, &mut scratch)?; - let mut chunks = stripe.get_bytes(column, Kind::Data, std::mem::take(&mut scratch))?; + let chunks = column.get_stream(Kind::Data, std::mem::take(&mut scratch))?; let mut values = Vec::with_capacity(num_rows); if let Some(validity) = &validity { let validity_iter = validity.iter(); + let mut iter = + decode::SignedRleV2Iter::new(chunks, validity.len() - validity.unset_bits(), vec![]); - let mut iter = IntIter::new(chunks.next()?.unwrap()); for is_valid in validity_iter { if is_valid { - let item = iter.next().transpose()?; - let item = if let Some(item) = item { - item - } else { - iter = IntIter::new(chunks.next()?.unwrap()); - iter.next().transpose()?.unwrap() - }; + let item = iter.next().transpose()?.unwrap_or_default(); values.push(item); } else { values.push(0); } } } else { - while let Some(chunk) = chunks.next()? { - decode::SignedRleV2Iter::new(chunk).try_for_each(|run| { - run.map(|run| match run { - decode::SignedRleV2Run::Direct(values_iter) => values.extend(values_iter), - decode::SignedRleV2Run::Delta(values_iter) => values.extend(values_iter), - decode::SignedRleV2Run::ShortRepeat(values_iter) => values.extend(values_iter), - }) - })?; - } + let mut iter = decode::SignedRleV2RunIter::new(chunks, num_rows, vec![]); + iter.try_for_each(|run| { + println!("{:?}", run); + run.map(|run| match run { + decode::SignedRleV2Run::Direct(values_iter) => values.extend(values_iter), + decode::SignedRleV2Run::Delta(values_iter) => values.extend(values_iter), + decode::SignedRleV2Run::ShortRepeat(values_iter) => values.extend(values_iter), + }) + })?; } Int64Array::try_new(data_type, values.into(), validity) } /// Deserializes column `column` from `stripe`, assumed to represent a boolean array -fn deserialize_int( - data_type: DataType, - stripe: &Stripe, - column: usize, -) -> Result, Error> +fn deserialize_int(data_type: DataType, column: &Column) -> Result, Error> where T: NativeType + TryFrom, { - let num_rows = stripe.number_of_rows(); + let num_rows = column.number_of_rows(); let mut scratch = vec![]; - let validity = deserialize_validity(stripe, column, &mut scratch)?; + let validity = deserialize_validity(column, &mut scratch)?; - let mut chunks = stripe.get_bytes(column, Kind::Data, std::mem::take(&mut scratch))?; + let chunks = column.get_stream(Kind::Data, std::mem::take(&mut scratch))?; let mut values = Vec::::with_capacity(num_rows); if let Some(validity) = &validity { let validity_iter = validity.iter(); - let mut iter = IntIter::new(chunks.next()?.unwrap()); + let mut iter = + decode::SignedRleV2Iter::new(chunks, validity.len() - validity.unset_bits(), vec![]); for is_valid in validity_iter { if is_valid { - let item = iter.next().transpose()?; - let item = if let Some(item) = item { - item - } else { - iter = IntIter::new(chunks.next()?.unwrap()); - iter.next().transpose()?.unwrap() - }; + let item = iter.next().transpose()?.unwrap_or_default(); let item = item .try_into() .map_err(|_| Error::ExternalFormat("value uncastable".to_string()))?; @@ -311,39 +212,39 @@ where } } } else { - while let Some(chunk) = chunks.next()? { - decode::SignedRleV2Iter::new(chunk).try_for_each(|run| { - run.map_err(Error::from).and_then(|run| match run { - decode::SignedRleV2Run::Direct(values_iter) => { - for item in values_iter { - let item = item.try_into().map_err(|_| { - Error::ExternalFormat("value uncastable".to_string()) - })?; - values.push(item); - } - Ok(()) + let mut iter = decode::SignedRleV2RunIter::new(chunks, num_rows, vec![]); + + iter.try_for_each(|run| { + run.map_err(Error::from).and_then(|run| match run { + decode::SignedRleV2Run::Direct(values_iter) => { + for item in values_iter { + let item = item + .try_into() + .map_err(|_| Error::ExternalFormat("value uncastable".to_string()))?; + values.push(item); } - decode::SignedRleV2Run::Delta(values_iter) => { - for item in values_iter { - let item = item.try_into().map_err(|_| { - Error::ExternalFormat("value uncastable".to_string()) - })?; - values.push(item); - } - Ok(()) + Ok(()) + } + decode::SignedRleV2Run::Delta(values_iter) => { + for item in values_iter { + let item = item + .try_into() + .map_err(|_| Error::ExternalFormat("value uncastable".to_string()))?; + values.push(item); } - decode::SignedRleV2Run::ShortRepeat(values_iter) => { - for item in values_iter { - let item = item.try_into().map_err(|_| { - Error::ExternalFormat("value uncastable".to_string()) - })?; - values.push(item); - } - Ok(()) + Ok(()) + } + decode::SignedRleV2Run::ShortRepeat(values_iter) => { + for item in values_iter { + let item = item + .try_into() + .map_err(|_| Error::ExternalFormat("value uncastable".to_string()))?; + values.push(item); } - }) - })?; - } + Ok(()) + } + }) + })?; } PrimitiveArray::try_new(data_type, values.into(), validity) @@ -351,19 +252,15 @@ where /// Deserializes column `column` from `stripe`, assumed /// to represent an array of `data_type`. -pub fn deserialize( - data_type: DataType, - stripe: &Stripe, - column: usize, -) -> Result, Error> { +pub fn deserialize(data_type: DataType, column: &Column) -> Result, Error> { match data_type { - DataType::Boolean => deserialize_bool(data_type, stripe, column).map(|x| x.boxed()), - DataType::Int8 => deserialize_int::(data_type, stripe, column).map(|x| x.boxed()), - DataType::Int16 => deserialize_int::(data_type, stripe, column).map(|x| x.boxed()), - DataType::Int32 => deserialize_int::(data_type, stripe, column).map(|x| x.boxed()), - DataType::Int64 => deserialize_i64(data_type, stripe, column).map(|x| x.boxed()), - DataType::Float32 => deserialize_float::(data_type, stripe, column).map(|x| x.boxed()), - DataType::Float64 => deserialize_float::(data_type, stripe, column).map(|x| x.boxed()), + DataType::Boolean => deserialize_bool(data_type, column).map(|x| x.boxed()), + DataType::Int8 => deserialize_int::(data_type, column).map(|x| x.boxed()), + DataType::Int16 => deserialize_int::(data_type, column).map(|x| x.boxed()), + DataType::Int32 => deserialize_int::(data_type, column).map(|x| x.boxed()), + DataType::Int64 => deserialize_i64(data_type, column).map(|x| x.boxed()), + DataType::Float32 => deserialize_float::(data_type, column).map(|x| x.boxed()), + DataType::Float64 => deserialize_float::(data_type, column).map(|x| x.boxed()), dt => return Err(Error::nyi(format!("Reading {dt:?} from ORC"))), } } diff --git a/tests/it/io/orc/read.rs b/tests/it/io/orc/read.rs index f6dd8fdeba1..08da7e587b0 100644 --- a/tests/it/io/orc/read.rs +++ b/tests/it/io/orc/read.rs @@ -6,26 +6,33 @@ use arrow2::io::orc::{format, read}; #[test] fn infer() -> Result<(), Error> { let mut reader = std::fs::File::open("fixtures/pyorc/test.orc").unwrap(); - let (_, footer, _) = format::read::read_metadata(&mut reader)?; - let schema = read::infer_schema(&footer)?; + let metadata = format::read::read_metadata(&mut reader)?; + let schema = read::infer_schema(&metadata.footer)?; - assert_eq!(schema.fields.len(), 8); + assert_eq!(schema.fields.len(), 10); Ok(()) } #[test] fn float32() -> Result<(), Error> { let mut reader = std::fs::File::open("fixtures/pyorc/test.orc").unwrap(); - let (ps, footer, _) = format::read::read_metadata(&mut reader)?; - let stripe = read::read_stripe(&mut reader, footer.stripes[0].clone(), ps.compression())?; + let metadata = format::read::read_metadata(&mut reader)?; + let footer = format::read::read_stripe_footer(&mut reader, &metadata, 0, &mut vec![])?; + + let column = + format::read::read_stripe_column(&mut reader, &metadata, 0, footer.clone(), 1, vec![])?; assert_eq!( - read::deserialize(DataType::Float32, &stripe, 1)?, + read::deserialize(DataType::Float32, &column)?, Float32Array::from([Some(1.0), Some(2.0), None, Some(4.0), Some(5.0)]).boxed() ); + let (footer, scratch) = column.into_inner(); + + let column = format::read::read_stripe_column(&mut reader, &metadata, 0, footer, 2, scratch)?; + assert_eq!( - read::deserialize(DataType::Float32, &stripe, 2)?, + read::deserialize(DataType::Float32, &column)?, Float32Array::from([Some(1.0), Some(2.0), Some(3.0), Some(4.0), Some(5.0)]).boxed() ); Ok(()) @@ -34,16 +41,22 @@ fn float32() -> Result<(), Error> { #[test] fn float64() -> Result<(), Error> { let mut reader = std::fs::File::open("fixtures/pyorc/test.orc").unwrap(); - let (ps, footer, _) = format::read::read_metadata(&mut reader)?; - let stripe = read::read_stripe(&mut reader, footer.stripes[0].clone(), ps.compression())?; + let metadata = format::read::read_metadata(&mut reader)?; + let footer = format::read::read_stripe_footer(&mut reader, &metadata, 0, &mut vec![])?; + + let column = format::read::read_stripe_column(&mut reader, &metadata, 0, footer, 7, vec![])?; assert_eq!( - read::deserialize(DataType::Float64, &stripe, 7)?, + read::deserialize(DataType::Float64, &column)?, Float64Array::from([Some(1.0), Some(2.0), None, Some(4.0), Some(5.0)]).boxed() ); + let (footer, scratch) = column.into_inner(); + + let column = format::read::read_stripe_column(&mut reader, &metadata, 0, footer, 8, vec![])?; + assert_eq!( - read::deserialize(DataType::Float64, &stripe, 8)?, + read::deserialize(DataType::Float64, &column)?, Float64Array::from([Some(1.0), Some(2.0), Some(3.0), Some(4.0), Some(5.0)]).boxed() ); Ok(()) @@ -52,16 +65,22 @@ fn float64() -> Result<(), Error> { #[test] fn boolean() -> Result<(), Error> { let mut reader = std::fs::File::open("fixtures/pyorc/test.orc").unwrap(); - let (ps, footer, _) = format::read::read_metadata(&mut reader)?; - let stripe = read::read_stripe(&mut reader, footer.stripes[0].clone(), ps.compression())?; + let metadata = format::read::read_metadata(&mut reader)?; + let footer = format::read::read_stripe_footer(&mut reader, &metadata, 0, &mut vec![])?; + + let column = format::read::read_stripe_column(&mut reader, &metadata, 0, footer, 3, vec![])?; assert_eq!( - read::deserialize(DataType::Boolean, &stripe, 3)?, + read::deserialize(DataType::Boolean, &column)?, BooleanArray::from([Some(true), Some(false), None, Some(true), Some(false)]).boxed() ); + let (footer, scratch) = column.into_inner(); + + let column = format::read::read_stripe_column(&mut reader, &metadata, 0, footer, 4, vec![])?; + assert_eq!( - read::deserialize(DataType::Boolean, &stripe, 4)?, + read::deserialize(DataType::Boolean, &column)?, BooleanArray::from([Some(true), Some(false), Some(true), Some(true), Some(false)]).boxed() ); Ok(()) @@ -70,17 +89,47 @@ fn boolean() -> Result<(), Error> { #[test] fn int() -> Result<(), Error> { let mut reader = std::fs::File::open("fixtures/pyorc/test.orc").unwrap(); - let (ps, footer, _) = format::read::read_metadata(&mut reader)?; - let stripe = read::read_stripe(&mut reader, footer.stripes[0].clone(), ps.compression())?; + let metadata = format::read::read_metadata(&mut reader)?; + let footer = format::read::read_stripe_footer(&mut reader, &metadata, 0, &mut vec![])?; + + let column = format::read::read_stripe_column(&mut reader, &metadata, 0, footer, 6, vec![])?; + + assert_eq!( + read::deserialize(DataType::Int32, &column)?, + Int32Array::from([Some(5), Some(-5), Some(1), Some(5), Some(5)]).boxed() + ); + + let (footer, _scratch) = column.into_inner(); + + let column = format::read::read_stripe_column(&mut reader, &metadata, 0, footer, 5, vec![])?; assert_eq!( - read::deserialize(DataType::Int32, &stripe, 5)?, + read::deserialize(DataType::Int32, &column)?, Int32Array::from([Some(5), Some(-5), None, Some(5), Some(5)]).boxed() ); + Ok(()) +} + +#[test] +fn bigint() -> Result<(), Error> { + let mut reader = std::fs::File::open("fixtures/pyorc/test.orc").unwrap(); + let metadata = format::read::read_metadata(&mut reader)?; + let footer = format::read::read_stripe_footer(&mut reader, &metadata, 0, &mut vec![])?; + + let column = format::read::read_stripe_column(&mut reader, &metadata, 0, footer, 10, vec![])?; + + assert_eq!( + read::deserialize(DataType::Int64, &column)?, + Int64Array::from([Some(5), Some(-5), Some(1), Some(5), Some(5)]).boxed() + ); + + let (footer, scratch) = column.into_inner(); + + let column = format::read::read_stripe_column(&mut reader, &metadata, 0, footer, 9, vec![])?; assert_eq!( - read::deserialize(DataType::Int32, &stripe, 6)?, - Int32Array::from([Some(5), Some(-5), Some(1), Some(5), Some(5)]).boxed() + read::deserialize(DataType::Int64, &column)?, + Int64Array::from([Some(5), Some(-5), None, Some(5), Some(5)]).boxed() ); Ok(()) } diff --git a/tests/it/io/orc/write.py b/tests/it/io/orc/write.py index 3416f1d21f4..70c767d95a1 100644 --- a/tests/it/io/orc/write.py +++ b/tests/it/io/orc/write.py @@ -12,6 +12,8 @@ "int_required": [5, -5, 1, 5, 5], "double_nulable": [1.0, 2.0, None, 4.0, 5.0], "double_required": [1.0, 2.0, 3.0, 4.0, 5.0], + "bigint_nulable": [5, -5, None, 5, 5], + "bigint_required": [5, -5, 1, 5, 5], } def infer_schema(data): @@ -30,6 +32,8 @@ def infer_schema(data): raise NotImplementedError if key.startswith("double"): dt = "double" + if key.startswith("bigint"): + dt = "bigint" schema += key + ":" + dt + "," schema = schema[:-1] + ">"