Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

perf: elide unneeded offset checks in reading parquet binary/utf8 columns #1307

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions src/array/binary/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ mod from;
mod mutable_values;
pub use mutable_values::*;
mod mutable;
use crate::array::offsets::ValidOffsets;
pub use mutable::*;

/// A [`BinaryArray`] is Arrow's semantically equivalent of an immutable `Vec<Option<Vec<u8>>>`.
Expand Down Expand Up @@ -106,6 +107,45 @@ impl<O: Offset> BinaryArray<O> {
})
}

/// Returns a [`BinaryArray`] created from its internal representation.
///
/// # Errors
/// This function returns an error iff:
/// * The last offset is not equal to the values' length.
/// * the validity's length is not equal to `offsets.len() - 1`.
/// * The `data_type`'s [`crate::datatypes::PhysicalType`] is not equal to either `Binary` or `LargeBinary`.
/// # Implementation
/// This function is `O(N)` - checking monotinicity is `O(N)`
pub fn try_new_from_valid_offsets(
data_type: DataType,
offsets: ValidOffsets<O>,
values: Buffer<u8>,
validity: Option<Bitmap>,
) -> Result<Self, Error> {
try_check_offsets_bounds(offsets.as_ref(), values.len())?;

if validity.as_ref().map_or(false, |validity| {
validity.len() != offsets.as_ref().len() - 1
}) {
return Err(Error::oos(
"validity mask length must match the number of values",
));
}

if data_type.to_physical_type() != Self::default_data_type().to_physical_type() {
return Err(Error::oos(
"BinaryArray can only be initialized with DataType::Binary or DataType::LargeBinary",
));
}

Ok(Self {
data_type,
offsets: offsets.into(),
values,
validity,
})
}

/// Creates a new [`BinaryArray`] from slices of `&[u8]`.
pub fn from_slice<T: AsRef<[u8]>, P: AsRef<[T]>>(slice: P) -> Self {
Self::from_trusted_len_values_iter(slice.as_ref().iter())
Expand Down
1 change: 1 addition & 0 deletions src/array/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,7 @@ mod ffi;
mod fmt;
pub mod growable;
mod iterator;
pub mod offsets;
pub mod ord;

pub(crate) use iterator::ArrayAccessor;
Expand Down
47 changes: 47 additions & 0 deletions src/array/offsets.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
//! Contains the [`ValidityOffsets'] struct and implementations.
use crate::array::specification::try_check_offsets;
use crate::array::Offset;
use crate::buffer::Buffer;
use crate::error::{Error, Result};

/// Offsets that have the invariant that they
/// are monotonically increasing.
pub struct ValidOffsets<O: Offset>(Buffer<O>);

impl<O: Offset> ValidOffsets<O> {
/// Try to create a new [`ValidOffsets`] buffer by checking the offsets.
pub fn try_new(offsets: Buffer<O>) -> Result<Self> {
match offsets.last() {
None => Err(Error::oos("offsets must have at least one element")),
Some(last) => {
try_check_offsets(offsets.as_slice(), last.to_usize())?;
Ok(ValidOffsets(offsets))
}
}
}

/// Create a new [`ValidOffsets`] buffer.
///
/// # Safety
///
/// The offsets must be monotonically increasing.
pub unsafe fn new_unchecked(offsets: Buffer<O>) -> Result<Self> {
if offsets.first().is_none() {
Err(Error::oos("offsets must have at least one element"))
} else {
Ok(ValidOffsets(offsets))
}
}
}

impl<O: Offset> From<ValidOffsets<O>> for Buffer<O> {
fn from(vo: ValidOffsets<O>) -> Self {
vo.0
}
}

impl<O: Offset> AsRef<[O]> for ValidOffsets<O> {
fn as_ref(&self) -> &[O] {
self.0.as_slice()
}
}
29 changes: 28 additions & 1 deletion src/array/specification.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::array::offsets::ValidOffsets;
use crate::error::{Error, Result};
use crate::types::Offset;

Expand Down Expand Up @@ -30,7 +31,33 @@ pub fn check_offsets_minimal<O: Offset>(offsets: &[O], values_len: usize) -> usi
len
}

/// # Panics iff:
/// # Errors iff:
/// * the `offsets` is not monotonically increasing, or
/// * any slice of `values` between two consecutive pairs from `offsets` is invalid `utf8`, or
/// * any offset is larger or equal to `values_len`.
pub fn try_check_utf8<O: Offset>(offsets: &ValidOffsets<O>, values: &[u8]) -> Result<()> {
if values.is_ascii() {
Ok(())
} else {
let offsets = offsets.as_ref();
simdutf8::basic::from_utf8(values)?;

for start in &offsets[..offsets.len() - 1] {
let first = values.get(start.to_usize());

if let Some(&b) = first {
// A valid code-point iff it does not start with 0b10xxxxxx
// Bit-magic taken from `std::str::is_char_boundary`
if (b as i8) < -0x40 {
return Err(Error::oos("Non-valid char boundary detected"));
}
}
}
Ok(())
}
}

/// # Errors iff:
/// * the `offsets` is not monotonically increasing, or
/// * any slice of `values` between two consecutive pairs from `offsets` is invalid `utf8`, or
/// * any offset is larger or equal to `values_len`.
Expand Down
42 changes: 42 additions & 0 deletions src/array/utf8/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ mod from;
mod iterator;
mod mutable;
mod mutable_values;
use crate::array::offsets::ValidOffsets;
use crate::array::specification::try_check_utf8;
pub use iterator::*;
pub use mutable::*;
pub use mutable_values::MutableUtf8ValuesArray;
Expand Down Expand Up @@ -116,6 +118,46 @@ impl<O: Offset> Utf8Array<O> {
})
}

/// Returns a [`Utf8Array`] created from its internal representation.
///
/// # Errors
/// This function returns an error iff:
/// * The last offset is not equal to the values' length.
/// * the validity's length is not equal to `offsets.len() - 1`.
/// * The `data_type`'s [`crate::datatypes::PhysicalType`] is not equal to either `Utf8` or `LargeUtf8`.
/// * The `values` between two consecutive `offsets` are not valid utf8
/// # Implementation
/// This function is `O(N)` - checking monotinicity and utf8 is `O(N)`
pub fn try_new_from_valid_offsets(
data_type: DataType,
offsets: ValidOffsets<O>,
values: Buffer<u8>,
validity: Option<Bitmap>,
) -> Result<Self> {
try_check_offsets_bounds(offsets.as_ref(), values.len())?;
try_check_utf8(&offsets, &values)?;
if validity.as_ref().map_or(false, |validity| {
validity.len() != offsets.as_ref().len() - 1
}) {
return Err(Error::oos(
"validity mask length must match the number of values",
));
}

if data_type.to_physical_type() != Self::default_data_type().to_physical_type() {
return Err(Error::oos(
"Utf8Array can only be initialized with DataType::Utf8 or DataType::LargeUtf8",
));
}

Ok(Self {
data_type,
offsets: offsets.into(),
values,
validity,
})
}

/// Returns a [`Utf8Array`] from a slice of `&str`.
///
/// A convenience method that uses [`Self::from_trusted_len_values_iter`].
Expand Down
1 change: 0 additions & 1 deletion src/io/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
#![forbid(unsafe_code)]
//! Contains modules to interface with other formats such as [`csv`],
//! [`parquet`], [`json`], [`ipc`], [`mod@print`] and [`avro`].

Expand Down
20 changes: 9 additions & 11 deletions src/io/parquet/read/deserialize/binary/basic.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::array::offsets::ValidOffsets;
use std::collections::VecDeque;
use std::default::Default;

Expand Down Expand Up @@ -227,7 +228,7 @@ impl<'a> utils::PageState<'a> for State<'a> {
pub trait TraitBinaryArray<O: Offset>: Array + 'static {
fn try_new(
data_type: DataType,
offsets: Buffer<O>,
offsets: ValidOffsets<O>,
values: Buffer<u8>,
validity: Option<Bitmap>,
) -> Result<Self>
Expand All @@ -238,22 +239,22 @@ pub trait TraitBinaryArray<O: Offset>: Array + 'static {
impl<O: Offset> TraitBinaryArray<O> for BinaryArray<O> {
fn try_new(
data_type: DataType,
offsets: Buffer<O>,
offsets: ValidOffsets<O>,
values: Buffer<u8>,
validity: Option<Bitmap>,
) -> Result<Self> {
Self::try_new(data_type, offsets, values, validity)
Self::try_new_from_valid_offsets(data_type, offsets, values, validity)
}
}

impl<O: Offset> TraitBinaryArray<O> for Utf8Array<O> {
fn try_new(
data_type: DataType,
offsets: Buffer<O>,
offsets: ValidOffsets<O>,
values: Buffer<u8>,
validity: Option<Bitmap>,
) -> Result<Self> {
Self::try_new(data_type, offsets, values, validity)
Self::try_new_from_valid_offsets(data_type, offsets, values, validity)
}
}

Expand Down Expand Up @@ -483,12 +484,9 @@ pub(super) fn finish<O: Offset, A: TraitBinaryArray<O>>(
values: Binary<O>,
validity: MutableBitmap,
) -> Result<A> {
A::try_new(
data_type.clone(),
values.offsets.0.into(),
values.values.into(),
validity.into(),
)
let (offsets, values) = values.into_inner();

A::try_new(data_type.clone(), offsets, values.into(), validity.into())
}

pub struct Iter<O: Offset, A: TraitBinaryArray<O>, I: Pages> {
Expand Down
8 changes: 8 additions & 0 deletions src/io/parquet/read/deserialize/binary/utils.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::array::offsets::ValidOffsets;
use crate::array::Offset;

use super::super::utils::Pushable;
Expand Down Expand Up @@ -50,6 +51,13 @@ impl<O: Offset> Pushable<O> for Offsets<O> {
}

impl<O: Offset> Binary<O> {
pub fn into_inner(self) -> (ValidOffsets<O>, Vec<u8>) {
// Safety:
// the invariant that all offsets are monotonically increasing is upheld.
let offsets = unsafe { ValidOffsets::new_unchecked(self.offsets.0.into()) }.unwrap();
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The offsets is private and only constructed by us, so the invariant is guaranteed.

(offsets, self.values)
}

#[inline]
pub fn with_capacity(capacity: usize) -> Self {
let mut offsets = Vec::with_capacity(1 + capacity);
Expand Down