diff --git a/src/encoding/byte.rs b/src/encoding/byte.rs index 6b1c4d57..d2ad1999 100644 --- a/src/encoding/byte.rs +++ b/src/encoding/byte.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use bytemuck::must_cast_slice; use bytes::{BufMut, BytesMut}; use snafu::ResultExt; @@ -24,7 +25,7 @@ use crate::{ }; use std::io::Read; -use super::{util::read_u8, PrimitiveValueDecoder, PrimitiveValueEncoder}; +use super::{rle::GenericRle, util::read_u8, PrimitiveValueEncoder}; const MAX_LITERAL_LENGTH: usize = 128; const MIN_REPEAT_LENGTH: usize = 3; @@ -209,10 +210,22 @@ impl ByteRleDecoder { index: 0, } } +} + +impl GenericRle for ByteRleDecoder { + fn advance(&mut self, n: usize) { + self.index += n + } - fn read_values(&mut self) -> Result<()> { + fn available(&self) -> &[i8] { + let bytes = &self.leftovers[self.index..]; + must_cast_slice(bytes) + } + + fn decode_batch(&mut self) -> Result<()> { self.index = 0; self.leftovers.clear(); + let header = read_u8(&mut self.reader)?; if header < 0x80 { // Run of repeated value @@ -231,24 +244,12 @@ impl ByteRleDecoder { } } -impl PrimitiveValueDecoder for ByteRleDecoder { - // TODO: can probably implement this better - fn decode(&mut self, out: &mut [i8]) -> Result<()> { - for x in out.iter_mut() { - if self.index == self.leftovers.len() { - self.read_values()?; - } - *x = self.leftovers[self.index] as i8; - self.index += 1; - } - Ok(()) - } -} - #[cfg(test)] mod tests { use std::io::Cursor; + use crate::encoding::PrimitiveValueDecoder; + use super::*; use proptest::prelude::*; diff --git a/src/encoding/integer/rle_v1.rs b/src/encoding/integer/rle_v1.rs index ab9c7766..02c04953 100644 --- a/src/encoding/integer/rle_v1.rs +++ b/src/encoding/integer/rle_v1.rs @@ -23,8 +23,8 @@ use snafu::OptionExt; use crate::{ encoding::{ + rle::GenericRle, util::{read_u8, try_read_u8}, - PrimitiveValueDecoder, }, error::{OutOfSpecSnafu, Result}, }; @@ -76,21 +76,6 @@ impl RleV1Decoder { sign: Default::default(), } } - - fn decode_batch(&mut self) -> Result<()> { - self.current_head = 0; - self.decoded_ints.clear(); - - match EncodingType::from_header(&mut self.reader)? { - Some(EncodingType::Literals { length }) => { - read_literals::<_, _, S>(&mut self.reader, &mut self.decoded_ints, length) - } - Some(EncodingType::Run { length, delta }) => { - read_run::<_, _, S>(&mut self.reader, &mut self.decoded_ints, length, delta) - } - None => Ok(()), - } - } } fn read_literals( @@ -137,46 +122,27 @@ fn read_run( Ok(()) } -impl PrimitiveValueDecoder for RleV1Decoder { - // TODO: this is exact duplicate from RLEv2 version; deduplicate it - fn decode(&mut self, out: &mut [N]) -> Result<()> { - let available = &self.decoded_ints[self.current_head..]; - // If we have enough in buffer to copy over - if available.len() >= out.len() { - out.copy_from_slice(&available[..out.len()]); - self.current_head += out.len(); - return Ok(()); - } - - // Otherwise progressively copy over chunks - let len_to_copy = out.len(); - let mut copied = 0; - while copied < len_to_copy { - let copying = self.decoded_ints.len() - self.current_head; - // At most, we fill to exact length of output buffer (don't overflow) - let copying = copying.min(len_to_copy - copied); +impl GenericRle for RleV1Decoder { + fn advance(&mut self, n: usize) { + self.current_head += n; + } - let target_out_slice = &mut out[copied..copied + copying]; - target_out_slice.copy_from_slice( - &self.decoded_ints[self.current_head..self.current_head + copying], - ); + fn available(&self) -> &[N] { + &self.decoded_ints[self.current_head..] + } - copied += copying; - self.current_head += copying; + fn decode_batch(&mut self) -> Result<()> { + self.current_head = 0; + self.decoded_ints.clear(); - if self.current_head == self.decoded_ints.len() { - self.decode_batch()?; + match EncodingType::from_header(&mut self.reader)? { + Some(EncodingType::Literals { length }) => { + read_literals::<_, _, S>(&mut self.reader, &mut self.decoded_ints, length) } - } - - if copied != out.len() { - // TODO: more descriptive error - OutOfSpecSnafu { - msg: "Array length less than expected", + Some(EncodingType::Run { length, delta }) => { + read_run::<_, _, S>(&mut self.reader, &mut self.decoded_ints, length, delta) } - .fail() - } else { - Ok(()) + None => Ok(()), } } } @@ -185,7 +151,7 @@ impl PrimitiveValueDecoder for RleV1Decode mod tests { use std::io::Cursor; - use crate::encoding::integer::UnsignedEncoding; + use crate::encoding::{integer::UnsignedEncoding, PrimitiveValueDecoder}; use super::*; diff --git a/src/encoding/integer/rle_v2/mod.rs b/src/encoding/integer/rle_v2/mod.rs index 9f654cd3..ed871cf8 100644 --- a/src/encoding/integer/rle_v2/mod.rs +++ b/src/encoding/integer/rle_v2/mod.rs @@ -20,8 +20,8 @@ use std::{io::Read, marker::PhantomData}; use bytes::BytesMut; use crate::{ - encoding::{util::try_read_u8, PrimitiveValueDecoder, PrimitiveValueEncoder}, - error::{OutOfSpecSnafu, Result}, + encoding::{rle::GenericRle, util::try_read_u8, PrimitiveValueEncoder}, + error::Result, memory::EstimateMemory, }; @@ -98,6 +98,16 @@ impl RleV2Decoder { sign: Default::default(), } } +} + +impl GenericRle for RleV2Decoder { + fn advance(&mut self, n: usize) { + self.current_head += n; + } + + fn available(&self) -> &[N] { + &self.decoded_ints[self.current_head..] + } fn decode_batch(&mut self) -> Result<()> { self.current_head = 0; @@ -131,49 +141,6 @@ impl RleV2Decoder { } } -impl PrimitiveValueDecoder for RleV2Decoder { - fn decode(&mut self, out: &mut [N]) -> Result<()> { - let available = &self.decoded_ints[self.current_head..]; - // If we have enough in buffer to copy over - if available.len() >= out.len() { - out.copy_from_slice(&available[..out.len()]); - self.current_head += out.len(); - return Ok(()); - } - - // Otherwise progressively copy over chunks - let len_to_copy = out.len(); - let mut copied = 0; - while copied < len_to_copy { - let copying = self.decoded_ints.len() - self.current_head; - // At most, we fill to exact length of output buffer (don't overflow) - let copying = copying.min(len_to_copy - copied); - - let target_out_slice = &mut out[copied..copied + copying]; - target_out_slice.copy_from_slice( - &self.decoded_ints[self.current_head..self.current_head + copying], - ); - - copied += copying; - self.current_head += copying; - - if self.current_head == self.decoded_ints.len() { - self.decode_batch()?; - } - } - - if copied != out.len() { - // TODO: more descriptive error - OutOfSpecSnafu { - msg: "Array length less than expected", - } - .fail() - } else { - Ok(()) - } - } -} - struct DeltaEncodingCheckResult { base_value: N, min: N, @@ -541,7 +508,10 @@ mod tests { use proptest::prelude::*; - use crate::encoding::integer::{SignedEncoding, UnsignedEncoding}; + use crate::encoding::{ + integer::{SignedEncoding, UnsignedEncoding}, + PrimitiveValueDecoder, + }; use super::*; diff --git a/src/encoding/mod.rs b/src/encoding/mod.rs index 5350c780..871ae0b0 100644 --- a/src/encoding/mod.rs +++ b/src/encoding/mod.rs @@ -27,6 +27,7 @@ pub mod byte; pub mod decimal; pub mod float; pub mod integer; +mod rle; pub mod timestamp; mod util; diff --git a/src/encoding/rle.rs b/src/encoding/rle.rs new file mode 100644 index 00000000..a330efa1 --- /dev/null +++ b/src/encoding/rle.rs @@ -0,0 +1,99 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::error::{OutOfSpecSnafu, Result}; + +use super::PrimitiveValueDecoder; + +mod sealed { + use std::io::Read; + + use crate::encoding::{ + byte::ByteRleDecoder, + integer::{rle_v1::RleV1Decoder, rle_v2::RleV2Decoder, EncodingSign, NInt}, + }; + + pub trait Rle {} + + impl Rle for ByteRleDecoder {} + impl Rle for RleV1Decoder {} + impl Rle for RleV2Decoder {} +} + +/// Generic decoding behaviour for run length encoded values, such as integers (v1 and v2) +/// and bytes. +/// +/// Assumes an internal buffer which acts like a (single headed) queue where values are first +/// decoded into, before being copied out into the output buffer (usually an Arrow array). +pub trait GenericRle { + /// Consume N elements from internal buffer to signify the values having been copied out. + fn advance(&mut self, n: usize); + + /// All values available in internal buffer, respecting the current advancement level. + fn available(&self) -> &[V]; + + /// This should clear the internal buffer and populate it with the next round of decoded + /// values. + // TODO: Have a version that copies directly into the output buffer (e.g. Arrow array). + // Currently we always decode to the internal buffer first, even if we can copy + // directly to the output and skip the middle man. Ideally the internal buffer + // should only be used for leftovers between calls to PrimitiveValueDecoder::decode. + fn decode_batch(&mut self) -> Result<()>; +} + +impl + sealed::Rle> PrimitiveValueDecoder for G { + fn decode(&mut self, out: &mut [V]) -> Result<()> { + let available = self.available(); + // If we have enough leftover to copy, can skip decoding more. + if available.len() >= out.len() { + out.copy_from_slice(&available[..out.len()]); + self.advance(out.len()); + return Ok(()); + } + + // Otherwise progressively decode and copy over chunks. + let len_to_copy = out.len(); + let mut copied = 0; + while copied < len_to_copy { + if self.available().is_empty() { + self.decode_batch()?; + } + + let copying = self.available().len(); + // At most, we fill to exact length of output buffer (don't overflow). + let copying = copying.min(len_to_copy - copied); + + let out = &mut out[copied..]; + out[..copying].copy_from_slice(&self.available()[..copying]); + + copied += copying; + self.advance(copying); + } + + // We always expect to be able to fill the output buffer; it is up to the + // caller to control that size. + if copied != out.len() { + // TODO: more descriptive error + OutOfSpecSnafu { + msg: "Array length less than expected", + } + .fail() + } else { + Ok(()) + } + } +}