From 157b1741c0a1b4ab44276ff00655f988470a5762 Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Wed, 19 Jan 2022 19:58:39 +0000 Subject: [PATCH] Added Utf8Sequence --- src/array/display.rs | 226 ++++++++++++++++++++++++ src/array/equal/mod.rs | 11 ++ src/array/equal/utf8_sequence.rs | 5 + src/array/ffi.rs | 2 + src/array/fmt.rs | 1 + src/array/growable/mod.rs | 1 + src/array/mod.rs | 10 ++ src/array/string_sequence/fmt.rs | 27 +++ src/array/string_sequence/iterator.rs | 80 +++++++++ src/array/string_sequence/mod.rs | 220 +++++++++++++++++++++++ src/compute/aggregate/memory.rs | 13 ++ src/datatypes/mod.rs | 6 + src/datatypes/physical_type.rs | 4 + src/ffi/array.rs | 2 + src/ffi/bridge.rs | 2 + src/ffi/schema.rs | 6 + src/io/ipc/read/deserialize.rs | 8 +- src/io/ipc/write/common.rs | 2 +- src/io/ipc/write/schema.rs | 5 + src/io/ipc/write/serialize.rs | 3 + src/io/json_integration/read/array.rs | 3 + src/io/json_integration/write/schema.rs | 3 + src/io/parquet/read/deserialize/mod.rs | 4 +- src/scalar/equal.rs | 2 + src/scalar/mod.rs | 1 + 25 files changed, 644 insertions(+), 3 deletions(-) create mode 100644 src/array/display.rs create mode 100644 src/array/equal/utf8_sequence.rs create mode 100644 src/array/string_sequence/fmt.rs create mode 100644 src/array/string_sequence/iterator.rs create mode 100644 src/array/string_sequence/mod.rs diff --git a/src/array/display.rs b/src/array/display.rs new file mode 100644 index 00000000000..f15f681a576 --- /dev/null +++ b/src/array/display.rs @@ -0,0 +1,226 @@ +use crate::{ + array::*, + datatypes::{DataType, IntervalUnit, TimeUnit}, + temporal_conversions, +}; + +macro_rules! dyn_display { + ($array:expr, $ty:ty, $expr:expr) => {{ + let a = $array.as_any().downcast_ref::<$ty>().unwrap(); + Box::new(move |row: usize| format!("{}", $expr(a.value(row)))) + }}; +} + +macro_rules! dyn_primitive { + ($array:expr, $ty:ty, $expr:expr) => {{ + dyn_display!($array, PrimitiveArray<$ty>, $expr) + }}; +} + +/// Returns a function of index returning the string representation of the _value_ of `array`. +/// This does not take nulls into account. +pub fn get_value_display<'a>(array: &'a dyn Array) -> Box String + 'a> { + use DataType::*; + match array.data_type() { + Null => Box::new(|_: usize| "".to_string()), + Boolean => { + let a = array.as_any().downcast_ref::().unwrap(); + Box::new(move |row: usize| format!("{}", a.value(row))) + } + Int8 => dyn_primitive!(array, i8, |x| x), + Int16 => dyn_primitive!(array, i16, |x| x), + Int32 => dyn_primitive!(array, i32, |x| x), + Int64 => dyn_primitive!(array, i64, |x| x), + UInt8 => dyn_primitive!(array, u8, |x| x), + UInt16 => dyn_primitive!(array, u16, |x| x), + UInt32 => dyn_primitive!(array, u32, |x| x), + UInt64 => dyn_primitive!(array, u64, |x| x), + Float16 => unreachable!(), + Float32 => dyn_primitive!(array, f32, |x| x), + Float64 => dyn_primitive!(array, f64, |x| x), + Date32 => dyn_primitive!(array, i32, temporal_conversions::date32_to_date), + Date64 => dyn_primitive!(array, i64, temporal_conversions::date64_to_date), + Time32(TimeUnit::Second) => { + dyn_primitive!(array, i32, temporal_conversions::time32s_to_time) + } + Time32(TimeUnit::Millisecond) => { + dyn_primitive!(array, i32, temporal_conversions::time32ms_to_time) + } + Time32(_) => unreachable!(), // remaining are not valid + Time64(TimeUnit::Microsecond) => { + dyn_primitive!(array, i64, temporal_conversions::time64us_to_time) + } + Time64(TimeUnit::Nanosecond) => { + dyn_primitive!(array, i64, temporal_conversions::time64ns_to_time) + } + Time64(_) => unreachable!(), // remaining are not valid + Timestamp(time_unit, tz) => { + if let Some(tz) = tz { + let timezone = temporal_conversions::parse_offset(tz); + match timezone { + Ok(timezone) => { + dyn_primitive!(array, i64, |time| { + temporal_conversions::timestamp_to_datetime(time, *time_unit, &timezone) + }) + } + #[cfg(feature = "chrono-tz")] + Err(_) => { + let timezone = temporal_conversions::parse_offset_tz(tz).unwrap(); + dyn_primitive!(array, i64, |time| { + temporal_conversions::timestamp_to_datetime(time, *time_unit, &timezone) + }) + } + #[cfg(not(feature = "chrono-tz"))] + _ => panic!( + "Invalid Offset format (must be [-]00:00) or chrono-tz feature not active" + ), + } + } else { + dyn_primitive!(array, i64, |time| { + temporal_conversions::timestamp_to_naive_datetime(time, *time_unit) + }) + } + } + Interval(IntervalUnit::YearMonth) => { + dyn_primitive!(array, i32, |x| format!("{}m", x)) + } + Interval(IntervalUnit::DayTime) => { + dyn_primitive!(array, days_ms, |x: days_ms| format!( + "{}d{}ms", + x.days(), + x.milliseconds() + )) + } + + Interval(IntervalUnit::MonthDayNano) => { + dyn_primitive!(array, months_days_ns, |x: months_days_ns| format!( + "{}m{}d{}ns", + x.months(), + x.days(), + x.ns() + )) + } + Duration(TimeUnit::Second) => dyn_primitive!(array, i64, |x| format!("{}s", x)), + Duration(TimeUnit::Millisecond) => dyn_primitive!(array, i64, |x| format!("{}ms", x)), + Duration(TimeUnit::Microsecond) => dyn_primitive!(array, i64, |x| format!("{}us", x)), + Duration(TimeUnit::Nanosecond) => dyn_primitive!(array, i64, |x| format!("{}ns", x)), + Binary => dyn_display!(array, BinaryArray, |x: &[u8]| { + x.iter().fold("".to_string(), |mut acc, x| { + acc.push_str(&format!("{:#010b}", x)); + acc + }) + }), + LargeBinary => dyn_display!(array, BinaryArray, |x: &[u8]| { + x.iter().fold("".to_string(), |mut acc, x| { + acc.push_str(&format!("{:#010b}", x)); + acc + }) + }), + FixedSizeBinary(_) => dyn_display!(array, FixedSizeBinaryArray, |x: &[u8]| { + x.iter().fold("".to_string(), |mut acc, x| { + acc.push_str(&format!("{:#010b}", x)); + acc + }) + }), + Utf8 => dyn_display!(array, Utf8Array, |x| x), + LargeUtf8 => dyn_display!(array, Utf8Array, |x| x), + Utf8Sequence => dyn_display!(array, StringSequenceArray, |x| x), + LargeUtf8Sequence => dyn_display!(array, StringSequenceArray, |x| x), + Decimal(_, scale) => { + // The number 999.99 has a precision of 5 and scale of 2 + let scale = *scale as u32; + let display = move |x| { + let base = x / 10i128.pow(scale); + let decimals = x - base * 10i128.pow(scale); + format!("{}.{}", base, decimals) + }; + dyn_primitive!(array, i128, display) + } + List(_) => { + let f = |x: Box| { + let display = get_value_display(x.as_ref()); + let string_values = (0..x.len()).map(display).collect::>(); + format!("[{}]", string_values.join(", ")) + }; + dyn_display!(array, ListArray, f) + } + FixedSizeList(_, _) => { + let f = |x: Box| { + let display = get_value_display(x.as_ref()); + let string_values = (0..x.len()).map(display).collect::>(); + format!("[{}]", string_values.join(", ")) + }; + dyn_display!(array, FixedSizeListArray, f) + } + LargeList(_) => { + let f = |x: Box| { + let display = get_value_display(x.as_ref()); + let string_values = (0..x.len()).map(display).collect::>(); + format!("[{}]", string_values.join(", ")) + }; + dyn_display!(array, ListArray, f) + } + Dictionary(key_type, ..) => match_integer_type!(key_type, |$T| { + let a = array + .as_any() + .downcast_ref::>() + .unwrap(); + let keys = a.keys(); + let display = get_display(a.values().as_ref()); + Box::new(move |row: usize| { + if keys.is_null(row) { + "".to_string() + }else { + display(keys.value(row) as usize) + } + }) + }), + Map(_, _) => todo!(), + Struct(_) => { + let a = array.as_any().downcast_ref::().unwrap(); + let displays = a + .values() + .iter() + .map(|x| get_value_display(x.as_ref())) + .collect::>(); + Box::new(move |row: usize| { + let mut string = displays + .iter() + .zip(a.fields().iter().map(|f| &f.name)) + .map(|(f, name)| (f(row), name)) + .fold("{".to_string(), |mut acc, (v, name)| { + acc.push_str(&format!("{}: {}, ", name, v)); + acc + }); + if string.len() > 1 { + // remove last ", " + string.pop(); + string.pop(); + } + string.push('}'); + string + }) + } + Union(_, _, _) => { + let array = array.as_any().downcast_ref::().unwrap(); + Box::new(move |row: usize| { + let (field, index) = array.index(row); + get_display(array.fields()[field].as_ref())(index) + }) + } + Extension(_, _, _) => todo!(), + } +} + +/// Returns a function of index returning the string representation of the item of `array`. +/// This outputs an empty string on nulls. +pub fn get_display<'a>(array: &'a dyn Array) -> Box String + 'a> { + let value_display = get_value_display(array); + Box::new(move |row| { + if array.is_null(row) { + "".to_string() + } else { + value_display(row) + } + }) +} diff --git a/src/array/equal/mod.rs b/src/array/equal/mod.rs index e916aaa0bb6..f00ddbc657b 100644 --- a/src/array/equal/mod.rs +++ b/src/array/equal/mod.rs @@ -14,6 +14,7 @@ mod primitive; mod struct_; mod union; mod utf8; +mod utf8_sequence; impl PartialEq for dyn Array + '_ { fn eq(&self, that: &dyn Array) -> bool { @@ -201,6 +202,16 @@ pub fn equal(lhs: &dyn Array, rhs: &dyn Array) -> bool { let rhs = rhs.as_any().downcast_ref().unwrap(); utf8::equal::(lhs, rhs) } + Utf8Sequence => { + let lhs = lhs.as_any().downcast_ref().unwrap(); + let rhs = rhs.as_any().downcast_ref().unwrap(); + utf8_sequence::equal::(lhs, rhs) + } + LargeUtf8Sequence => { + let lhs = lhs.as_any().downcast_ref().unwrap(); + let rhs = rhs.as_any().downcast_ref().unwrap(); + utf8_sequence::equal::(lhs, rhs) + } Binary => { let lhs = lhs.as_any().downcast_ref().unwrap(); let rhs = rhs.as_any().downcast_ref().unwrap(); diff --git a/src/array/equal/utf8_sequence.rs b/src/array/equal/utf8_sequence.rs new file mode 100644 index 00000000000..6739fcbb79c --- /dev/null +++ b/src/array/equal/utf8_sequence.rs @@ -0,0 +1,5 @@ +use crate::array::{Offset, StringSequenceArray}; + +pub(super) fn equal(lhs: &StringSequenceArray, rhs: &StringSequenceArray) -> bool { + lhs.data_type() == rhs.data_type() && lhs.len() == rhs.len() && lhs.iter().eq(rhs.iter()) +} diff --git a/src/array/ffi.rs b/src/array/ffi.rs index a9d0a089074..c95435ed014 100644 --- a/src/array/ffi.rs +++ b/src/array/ffi.rs @@ -67,6 +67,8 @@ pub fn offset_buffers_children_dictionary(array: &dyn Array) -> BuffersChildren FixedSizeBinary => ffi_dyn!(array, FixedSizeBinaryArray), Utf8 => ffi_dyn!(array, Utf8Array::), LargeUtf8 => ffi_dyn!(array, Utf8Array::), + Utf8Sequence => todo!("Arrow does not yet support exporting sequence views via FFI"), + LargeUtf8Sequence => todo!("Arrow does not yet support exporting sequence views via FFI"), List => ffi_dyn!(array, ListArray::), LargeList => ffi_dyn!(array, ListArray::), FixedSizeList => ffi_dyn!(array, FixedSizeListArray), diff --git a/src/array/fmt.rs b/src/array/fmt.rs index 39f4f8b1603..4af2d4c9d1e 100644 --- a/src/array/fmt.rs +++ b/src/array/fmt.rs @@ -51,6 +51,7 @@ pub fn get_value_display<'a, F: Write + 'a>( f, ) }), + Utf8Sequence | LargeUtf8Sequence => todo!(), LargeUtf8 => Box::new(|f, index| { super::utf8::fmt::write_value::( array.as_any().downcast_ref().unwrap(), diff --git a/src/array/growable/mod.rs b/src/array/growable/mod.rs index 398c4297317..107c45807a2 100644 --- a/src/array/growable/mod.rs +++ b/src/array/growable/mod.rs @@ -138,5 +138,6 @@ pub fn make_growable<'a>( )) }) } + _ => todo!("Sequence views"), } } diff --git a/src/array/mod.rs b/src/array/mod.rs index b58633ad8cb..07923eed9d6 100644 --- a/src/array/mod.rs +++ b/src/array/mod.rs @@ -227,6 +227,8 @@ impl std::fmt::Debug for dyn Array + '_ { FixedSizeBinary => fmt_dyn!(self, FixedSizeBinaryArray, f), Utf8 => fmt_dyn!(self, Utf8Array::, f), LargeUtf8 => fmt_dyn!(self, Utf8Array::, f), + Utf8Sequence => fmt_dyn!(self, StringSequenceArray::, f), + LargeUtf8Sequence => fmt_dyn!(self, StringSequenceArray::, f), List => fmt_dyn!(self, ListArray::, f), LargeList => fmt_dyn!(self, ListArray::, f), FixedSizeList => fmt_dyn!(self, FixedSizeListArray, f), @@ -256,6 +258,8 @@ pub fn new_empty_array(data_type: DataType) -> Box { FixedSizeBinary => Box::new(FixedSizeBinaryArray::new_empty(data_type)), Utf8 => Box::new(Utf8Array::::new_empty(data_type)), LargeUtf8 => Box::new(Utf8Array::::new_empty(data_type)), + Utf8Sequence => Box::new(StringSequenceArray::::new_empty(data_type)), + LargeUtf8Sequence => Box::new(StringSequenceArray::::new_empty(data_type)), List => Box::new(ListArray::::new_empty(data_type)), LargeList => Box::new(ListArray::::new_empty(data_type)), FixedSizeList => Box::new(FixedSizeListArray::new_empty(data_type)), @@ -286,6 +290,8 @@ pub fn new_null_array(data_type: DataType, length: usize) -> Box { FixedSizeBinary => Box::new(FixedSizeBinaryArray::new_null(data_type, length)), Utf8 => Box::new(Utf8Array::::new_null(data_type, length)), LargeUtf8 => Box::new(Utf8Array::::new_null(data_type, length)), + Utf8Sequence => Box::new(StringSequenceArray::::new_null(data_type, length)), + LargeUtf8Sequence => Box::new(StringSequenceArray::::new_null(data_type, length)), List => Box::new(ListArray::::new_null(data_type, length)), LargeList => Box::new(ListArray::::new_null(data_type, length)), FixedSizeList => Box::new(FixedSizeListArray::new_null(data_type, length)), @@ -324,6 +330,8 @@ pub fn clone(array: &dyn Array) -> Box { FixedSizeBinary => clone_dyn!(array, FixedSizeBinaryArray), Utf8 => clone_dyn!(array, Utf8Array::), LargeUtf8 => clone_dyn!(array, Utf8Array::), + Utf8Sequence => clone_dyn!(array, StringSequenceArray::), + LargeUtf8Sequence => clone_dyn!(array, StringSequenceArray::), List => clone_dyn!(array, ListArray::), LargeList => clone_dyn!(array, ListArray::), FixedSizeList => clone_dyn!(array, FixedSizeListArray), @@ -356,6 +364,7 @@ mod map; mod null; mod primitive; mod specification; +mod string_sequence; mod struct_; mod union; mod utf8; @@ -379,6 +388,7 @@ pub use list::{ListArray, ListValuesIter, MutableListArray}; pub use map::MapArray; pub use null::NullArray; pub use primitive::*; +pub use string_sequence::StringSequenceArray; pub use struct_::StructArray; pub use union::UnionArray; pub use utf8::{MutableUtf8Array, Utf8Array, Utf8ValuesIter}; diff --git a/src/array/string_sequence/fmt.rs b/src/array/string_sequence/fmt.rs new file mode 100644 index 00000000000..6ede349696b --- /dev/null +++ b/src/array/string_sequence/fmt.rs @@ -0,0 +1,27 @@ +use std::fmt::{Debug, Formatter, Result, Write}; + +use super::super::fmt::write_vec; +use super::super::Offset; +use super::StringSequenceArray; + +pub fn write_value( + array: &StringSequenceArray, + index: usize, + f: &mut W, +) -> Result { + write!(f, "{}", array.value(index)) +} + +impl Debug for StringSequenceArray { + fn fmt(&self, f: &mut Formatter<'_>) -> Result { + let writer = |f: &mut Formatter, index| write_value(self, index, f); + + let head = if O::is_large() { + "LargeStringSequenceArray" + } else { + "StringSequenceArray" + }; + write!(f, "{}", head)?; + write_vec(f, writer, self.validity(), self.len(), "None", false) + } +} diff --git a/src/array/string_sequence/iterator.rs b/src/array/string_sequence/iterator.rs new file mode 100644 index 00000000000..c0859680311 --- /dev/null +++ b/src/array/string_sequence/iterator.rs @@ -0,0 +1,80 @@ +use crate::bitmap::utils::{zip_validity, ZipValidity}; +use crate::{array::Offset, trusted_len::TrustedLen}; + +use super::StringSequenceArray; + +/// Iterator of values of an `Utf8Array`. +#[derive(Debug, Clone)] +pub struct StringSequenceValuesIter<'a, O: Offset> { + array: &'a StringSequenceArray, + index: usize, + end: usize, +} + +impl<'a, O: Offset> StringSequenceValuesIter<'a, O> { + /// Creates a new [`StringSequenceValuesIter`] + pub fn new(array: &'a StringSequenceArray) -> Self { + Self { + array, + index: 0, + end: array.len(), + } + } +} + +impl<'a, O: Offset> Iterator for StringSequenceValuesIter<'a, O> { + type Item = &'a str; + + #[inline] + fn next(&mut self) -> Option { + if self.index == self.end { + return None; + } + let old = self.index; + self.index += 1; + Some(unsafe { self.array.value_unchecked(old) }) + } + + #[inline] + fn size_hint(&self) -> (usize, Option) { + (self.end - self.index, Some(self.end - self.index)) + } +} + +impl<'a, O: Offset> DoubleEndedIterator for StringSequenceValuesIter<'a, O> { + #[inline] + fn next_back(&mut self) -> Option { + if self.index == self.end { + None + } else { + self.end -= 1; + Some(unsafe { self.array.value_unchecked(self.end) }) + } + } +} + +impl<'a, O: Offset> IntoIterator for &'a StringSequenceArray { + type Item = Option<&'a str>; + type IntoIter = ZipValidity<'a, &'a str, StringSequenceValuesIter<'a, O>>; + + fn into_iter(self) -> Self::IntoIter { + self.iter() + } +} + +impl<'a, O: Offset> StringSequenceArray { + /// Returns an iterator of `Option<&str>` + pub fn iter(&'a self) -> ZipValidity<'a, &'a str, StringSequenceValuesIter<'a, O>> { + zip_validity( + StringSequenceValuesIter::new(self), + self.validity.as_ref().map(|x| x.iter()), + ) + } + + /// Returns an iterator of `&str` + pub fn values_iter(&'a self) -> StringSequenceValuesIter<'a, O> { + StringSequenceValuesIter::new(self) + } +} + +unsafe impl TrustedLen for StringSequenceValuesIter<'_, O> {} diff --git a/src/array/string_sequence/mod.rs b/src/array/string_sequence/mod.rs new file mode 100644 index 00000000000..3b261cde0a0 --- /dev/null +++ b/src/array/string_sequence/mod.rs @@ -0,0 +1,220 @@ +use crate::{bitmap::Bitmap, buffer::Buffer, datatypes::DataType}; + +use super::{Array, Offset}; + +mod iterator; + +/// An equivalent representation of [`Vec>`] with the following properties: +/// * Clone is O(1) +/// * Slice is O(1) +/// * row-based operations are `O(N)` +/// # Safety +/// The following invariants are uphold: +/// * offsets.len() == lengths.len() +/// * offsets[i] + lengths[i] < values.len() for all i +/// * `&values[offsets[i]..offsets[i] + lengths[i]]` is valid utf8 for all i +#[derive(Debug, Clone)] +pub struct StringSequenceArray { + data_type: DataType, + validity: Option, + values: Buffer, + offsets: Buffer, + lengths: Buffer, +} + +impl StringSequenceArray { + /// returns the [`DataType`] of this [`StringSequenceArray`] + #[inline] + pub fn data_type(&self) -> &DataType { + &self.data_type + } + + /// returns the length of this [`StringSequenceArray`] + #[inline] + pub fn len(&self) -> usize { + self.offsets.len() + } + + /// The validity + pub fn validity(&self) -> Option<&Bitmap> { + self.validity.as_ref() + } + + /// Returns the offsets that together with `lengths` slice `.values()` of values. + #[inline] + pub fn offsets(&self) -> &Buffer { + &self.offsets + } + + /// Returns the offsets that together with `lengths` slice `.values()` of values. + #[inline] + pub fn lengths(&self) -> &Buffer { + &self.lengths + } + + /// Returns all values in this array. Use `.offsets()` to slice them. + #[inline] + pub fn values(&self) -> &Buffer { + &self.values + } + + /// Returns the element at index `i` + /// # Panics + /// iff `i >= self.len()` + #[inline] + pub fn value(&self, i: usize) -> &str { + assert!(i < self.len()); + // soundness: invariant verified above + unsafe { self.value_unchecked(i) } + } + + /// Returns the element at index `i` + /// # Safety + /// Assumes that the `i < self.len`. + #[inline] + pub unsafe fn value_unchecked(&self, i: usize) -> &str { + let start = self.offsets.get_unchecked(i).to_usize(); + let length = self.lengths.get_unchecked(i).to_usize(); + + // soundness: the invariant of the struct + let slice = unsafe { self.values.get_unchecked(start..start + length) }; + + // soundness: the invariant of the struct + std::str::from_utf8_unchecked(slice) + } +} + +impl StringSequenceArray { + /// Creates an empty [`StringSequenceArray`], i.e. whose `.len` is zero. + pub fn new_empty(data_type: DataType) -> Self { + Self { + data_type, + validity: None, + lengths: vec![].into(), + offsets: vec![].into(), + values: vec![].into(), + } + } + + /// Creates an null [`BinaryArray`], i.e. whose `.null_count() == .len()`. + #[inline] + pub fn new_null(data_type: DataType, length: usize) -> Self { + Self { + data_type, + validity: Some(Bitmap::new_zeroed(length)), + lengths: vec![O::default(); length].into(), + offsets: vec![O::default(); length].into(), + values: vec![].into(), + } + } + + /// Creates a new [`StringSequenceArray`] with no validity + pub fn from_values, I: Iterator>(iter: I) -> Self { + let mut values = vec![]; + let mut offsets = Vec::with_capacity(iter.size_hint().0); + let mut lengths = Vec::with_capacity(iter.size_hint().0); + let mut length = O::default(); + for item in iter { + let item = item.as_ref(); + values.extend_from_slice(item.as_bytes()); + + let new_length = O::from_usize(item.len()).unwrap(); + lengths.push(new_length); + length += new_length; + offsets.push(length); + } + + let data_type = if O::is_large() { + DataType::LargeUtf8Sequence + } else { + DataType::Utf8Sequence + }; + + Self { + data_type, + validity: None, + values: values.into(), + offsets: offsets.into(), + lengths: lengths.into(), + } + } + + /// Creates a new [`StringSequenceArray`] by slicing this [`StringSequenceArray`]. + /// # Implementation + /// This function is `O(1)`: all data will be shared between both arrays. + /// # Panics + /// iff `offset + length > self.len()`. + pub fn slice(&self, offset: usize, length: usize) -> Self { + assert!( + offset + length <= self.len(), + "the offset of the new Buffer cannot exceed the existing length" + ); + unsafe { self.slice_unchecked(offset, length) } + } + + /// Creates a new [`StringSequenceArray`] by slicing this [`StringSequenceArray`]. + /// # Implementation + /// This function is `O(1)`: all data will be shared between both arrays. + /// # Safety + /// The caller must ensure that `offset + length <= self.len()`. + pub unsafe fn slice_unchecked(&self, offset: usize, length: usize) -> Self { + let validity = self + .validity + .clone() + .map(|x| x.slice_unchecked(offset, length)); + let offsets = self.offsets.clone().slice_unchecked(offset, length); + let lengths = self.lengths.clone().slice_unchecked(offset, length); + Self { + data_type: self.data_type.clone(), + offsets, + lengths, + values: self.values.clone(), + validity, + } + } + + /// Clones this [`StringSequenceArray`] with a different validity. + /// # Panic + /// Panics iff `validity.len() != self.len()`. + pub fn with_validity(&self, validity: Option) -> Self { + if matches!(&validity, Some(bitmap) if bitmap.len() != self.len()) { + panic!("validity's length must be equal to the array's length") + } + let mut arr = self.clone(); + arr.validity = validity; + arr + } +} + +impl Array for StringSequenceArray { + #[inline] + fn as_any(&self) -> &dyn std::any::Any { + self + } + + #[inline] + fn len(&self) -> usize { + self.len() + } + + #[inline] + fn data_type(&self) -> &DataType { + &self.data_type + } + + fn validity(&self) -> Option<&Bitmap> { + self.validity.as_ref() + } + + fn slice(&self, offset: usize, length: usize) -> Box { + Box::new(self.slice(offset, length)) + } + + unsafe fn slice_unchecked(&self, offset: usize, length: usize) -> Box { + Box::new(self.slice_unchecked(offset, length)) + } + + fn with_validity(&self, validity: Option) -> Box { + Box::new(self.with_validity(validity)) + } +} diff --git a/src/compute/aggregate/memory.rs b/src/compute/aggregate/memory.rs index 8c8617d936c..b61d6698998 100644 --- a/src/compute/aggregate/memory.rs +++ b/src/compute/aggregate/memory.rs @@ -16,6 +16,17 @@ macro_rules! dyn_binary { }}; } +macro_rules! dyn_sequence { + ($array:expr, $ty:ty, $o:ty) => {{ + let array = $array.as_any().downcast_ref::<$ty>().unwrap(); + + array.values().len() + + array.offsets().len() * std::mem::size_of::<$o>() + + array.lengths().len() * std::mem::size_of::<$o>() + + validity_size(array.validity()) + }}; +} + /// Returns the total (heap) allocated size of the array in bytes. /// # Implementation /// This estimation is the sum of the size of its buffers, validity, including nested arrays. @@ -54,6 +65,8 @@ pub fn estimated_bytes_size(array: &dyn Array) -> usize { LargeBinary => dyn_binary!(array, BinaryArray, i64), Utf8 => dyn_binary!(array, Utf8Array, i32), LargeUtf8 => dyn_binary!(array, Utf8Array, i64), + Utf8Sequence => dyn_sequence!(array, StringSequenceArray, i32), + LargeUtf8Sequence => dyn_sequence!(array, StringSequenceArray, i64), List => { let array = array.as_any().downcast_ref::>().unwrap(); estimated_bytes_size(array.values().as_ref()) diff --git a/src/datatypes/mod.rs b/src/datatypes/mod.rs index e7e2c9b98e8..c75cc2d7b94 100644 --- a/src/datatypes/mod.rs +++ b/src/datatypes/mod.rs @@ -99,6 +99,10 @@ pub enum DataType { Utf8, /// A variable-length UTF-8 encoded string whose offsets are represented as [`i64`]. LargeUtf8, + /// A another repr of utf8 + Utf8Sequence, + /// A another repr of utf8 + LargeUtf8Sequence, /// A list of some logical data type whose offsets are represented as [`i32`]. List(Box), /// A list of some logical data type with a fixed number of elements. @@ -249,6 +253,8 @@ impl DataType { LargeBinary => PhysicalType::LargeBinary, Utf8 => PhysicalType::Utf8, LargeUtf8 => PhysicalType::LargeUtf8, + Utf8Sequence => PhysicalType::Utf8Sequence, + LargeUtf8Sequence => PhysicalType::LargeUtf8Sequence, List(_) => PhysicalType::List, FixedSizeList(_, _) => PhysicalType::FixedSizeList, LargeList(_) => PhysicalType::LargeList, diff --git a/src/datatypes/physical_type.rs b/src/datatypes/physical_type.rs index 828df9541f0..2de8ef205f9 100644 --- a/src/datatypes/physical_type.rs +++ b/src/datatypes/physical_type.rs @@ -25,6 +25,10 @@ pub enum PhysicalType { Utf8, /// A variable-length string in Unicode with UFT-8 encoding and 64-bit offsets. LargeUtf8, + /// A variable-length string in Unicode with UFT-8 encoding and 32-bit offsets. + Utf8Sequence, + /// A variable-length string in Unicode with UFT-8 encoding and 32-bit offsets. + LargeUtf8Sequence, /// A list of some data type with variable length. List, /// A list of some data type with fixed length. diff --git a/src/ffi/array.rs b/src/ffi/array.rs index abaa0363597..22a9bf35601 100644 --- a/src/ffi/array.rs +++ b/src/ffi/array.rs @@ -30,6 +30,8 @@ pub unsafe fn try_from(array: A) -> Result> { }), Utf8 => Box::new(Utf8Array::::try_from_ffi(array)?), LargeUtf8 => Box::new(Utf8Array::::try_from_ffi(array)?), + Utf8Sequence => todo!("Arrow does not yet support exporting sequence views via FFI"), + LargeUtf8Sequence => todo!("Arrow does not yet support exporting sequence views via FFI"), Binary => Box::new(BinaryArray::::try_from_ffi(array)?), LargeBinary => Box::new(BinaryArray::::try_from_ffi(array)?), FixedSizeBinary => Box::new(FixedSizeBinaryArray::try_from_ffi(array)?), diff --git a/src/ffi/bridge.rs b/src/ffi/bridge.rs index 1a51e56e780..0dff4f0c6e8 100644 --- a/src/ffi/bridge.rs +++ b/src/ffi/bridge.rs @@ -26,6 +26,8 @@ pub fn align_to_c_data_interface(array: Arc) -> Arc { FixedSizeBinary => ffi_dyn!(array, FixedSizeBinaryArray), Utf8 => ffi_dyn!(array, Utf8Array::), LargeUtf8 => ffi_dyn!(array, Utf8Array::), + Utf8Sequence => todo!("Arrow does not yet support exporting sequence views via FFI"), + LargeUtf8Sequence => todo!("Arrow does not yet support exporting sequence views via FFI"), List => ffi_dyn!(array, ListArray::), LargeList => ffi_dyn!(array, ListArray::), FixedSizeList => ffi_dyn!(array, FixedSizeListArray), diff --git a/src/ffi/schema.rs b/src/ffi/schema.rs index a65d5568b7a..51314f3688f 100644 --- a/src/ffi/schema.rs +++ b/src/ffi/schema.rs @@ -381,6 +381,12 @@ fn to_format(data_type: &DataType) -> String { DataType::LargeBinary => "Z".to_string(), DataType::Utf8 => "u".to_string(), DataType::LargeUtf8 => "U".to_string(), + DataType::Utf8Sequence => { + todo!("Arrow does not yet support exporting sequence views via FFI") + } + DataType::LargeUtf8Sequence => { + todo!("Arrow does not yet support exporting sequence views via FFI") + } DataType::Date32 => "tdD".to_string(), DataType::Date64 => "tdm".to_string(), DataType::Time32(TimeUnit::Second) => "tts".to_string(), diff --git a/src/io/ipc/read/deserialize.rs b/src/io/ipc/read/deserialize.rs index 56bd0632128..398ddb649cb 100644 --- a/src/io/ipc/read/deserialize.rs +++ b/src/io/ipc/read/deserialize.rs @@ -9,7 +9,7 @@ use arrow_format::ipc::MetadataVersion; use crate::array::*; use crate::datatypes::{DataType, Field, PhysicalType}; -use crate::error::Result; +use crate::error::{ArrowError, Result}; use crate::io::ipc::IpcField; use super::{array::*, Dictionaries}; @@ -115,6 +115,9 @@ pub fn read( )?; Ok(Arc::new(array)) } + LargeUtf8Sequence | Utf8Sequence => Err(ArrowError::OutOfSpec( + "Arrow does not yet support exporting sequence views via IPC".to_string(), + )), List => read_list::( field_nodes, data_type, @@ -223,6 +226,9 @@ pub fn skip( Primitive(_) => skip_primitive(field_nodes, buffers), LargeBinary | Binary => skip_binary(field_nodes, buffers), LargeUtf8 | Utf8 => skip_utf8(field_nodes, buffers), + LargeUtf8Sequence | Utf8Sequence => Err(ArrowError::OutOfSpec( + "Arrow does not yet support exporting sequence views via IPC".to_string(), + )), FixedSizeBinary => skip_fixed_size_binary(field_nodes, buffers), List => skip_list::(field_nodes, data_type, buffers), LargeList => skip_list::(field_nodes, data_type, buffers), diff --git a/src/io/ipc/write/common.rs b/src/io/ipc/write/common.rs index bd905948714..58f8ff9f3a6 100644 --- a/src/io/ipc/write/common.rs +++ b/src/io/ipc/write/common.rs @@ -40,7 +40,7 @@ fn encode_dictionary( use PhysicalType::*; match array.data_type().to_physical_type() { Utf8 | LargeUtf8 | Binary | LargeBinary | Primitive(_) | Boolean | Null - | FixedSizeBinary => Ok(()), + | FixedSizeBinary | Utf8Sequence | LargeUtf8Sequence => Ok(()), Dictionary(key_type) => match_integer_type!(key_type, |$T| { let dict_id = field.dictionary_id .ok_or_else(|| ArrowError::InvalidArgumentError("Dictionaries must have an associated id".to_string()))?; diff --git a/src/io/ipc/write/schema.rs b/src/io/ipc/write/schema.rs index 0636cb9e6c0..2c30a9db1a8 100644 --- a/src/io/ipc/write/schema.rs +++ b/src/io/ipc/write/schema.rs @@ -201,6 +201,9 @@ fn serialize_type(data_type: &DataType) -> arrow_format::ipc::Type { LargeBinary => ipc::Type::LargeBinary(Box::new(ipc::LargeBinary {})), Utf8 => ipc::Type::Utf8(Box::new(ipc::Utf8 {})), LargeUtf8 => ipc::Type::LargeUtf8(Box::new(ipc::LargeUtf8 {})), + Utf8Sequence | LargeUtf8Sequence => { + todo!("Arrow does not yet support exporting sequence views via IPC") + } FixedSizeBinary(size) => ipc::Type::FixedSizeBinary(Box::new(ipc::FixedSizeBinary { byte_width: *size as i32, })), @@ -281,6 +284,8 @@ fn serialize_children(data_type: &DataType, ipc_field: &IpcField) -> Vec vec![], FixedSizeList(inner, _) | LargeList(inner) | List(inner) | Map(inner, _) => { vec![serialize_field(inner, &ipc_field.fields[0])] diff --git a/src/io/ipc/write/serialize.rs b/src/io/ipc/write/serialize.rs index 40eb13a00bd..c5c265bd418 100644 --- a/src/io/ipc/write/serialize.rs +++ b/src/io/ipc/write/serialize.rs @@ -490,6 +490,9 @@ pub fn write( is_little_endian, compression, ), + Utf8Sequence | LargeUtf8Sequence => { + todo!("Arrow does not yet support exporting sequence views via IPC") + } List => write_list::( array.as_any().downcast_ref().unwrap(), buffers, diff --git a/src/io/json_integration/read/array.rs b/src/io/json_integration/read/array.rs index f15c0a4de31..f762e22687d 100644 --- a/src/io/json_integration/read/array.rs +++ b/src/io/json_integration/read/array.rs @@ -295,6 +295,9 @@ pub fn to_array( LargeBinary => Ok(to_binary::(json_col, data_type)), Utf8 => Ok(to_utf8::(json_col, data_type)), LargeUtf8 => Ok(to_utf8::(json_col, data_type)), + LargeUtf8Sequence | Utf8Sequence => Err(ArrowError::OutOfSpec( + "Arrow does not yet support exporting sequence views".to_string(), + )), FixedSizeBinary => { let validity = to_validity(&json_col.validity); diff --git a/src/io/json_integration/write/schema.rs b/src/io/json_integration/write/schema.rs index f4297c3fb0f..86f031775b7 100644 --- a/src/io/json_integration/write/schema.rs +++ b/src/io/json_integration/write/schema.rs @@ -23,6 +23,9 @@ fn serialize_data_type(data_type: &DataType) -> Value { DataType::Float64 => json!({"name": "floatingpoint", "precision": "DOUBLE"}), DataType::Utf8 => json!({"name": "utf8"}), DataType::LargeUtf8 => json!({"name": "largeutf8"}), + DataType::Utf8Sequence | DataType::LargeUtf8Sequence => { + todo!("Arrow does not yet support sequence views") + } DataType::Binary => json!({"name": "binary"}), DataType::LargeBinary => json!({"name": "largebinary"}), DataType::FixedSizeBinary(byte_width) => { diff --git a/src/io/parquet/read/deserialize/mod.rs b/src/io/parquet/read/deserialize/mod.rs index bb688a12f89..6acb84c7f22 100644 --- a/src/io/parquet/read/deserialize/mod.rs +++ b/src/io/parquet/read/deserialize/mod.rs @@ -200,7 +200,9 @@ fn field_to_init(field: &Field) -> Vec { use crate::datatypes::PhysicalType::*; match field.data_type.to_physical_type() { Null | Boolean | Primitive(_) | Binary | FixedSizeBinary | LargeBinary | Utf8 - | Dictionary(_) | LargeUtf8 => vec![InitNested::Primitive(field.is_nullable)], + | Dictionary(_) | LargeUtf8 | Utf8Sequence | LargeUtf8Sequence => { + vec![InitNested::Primitive(field.is_nullable)] + } List | FixedSizeList | LargeList => { let a = field.data_type().to_logical_type(); let inner = if let DataType::List(inner) = a { diff --git a/src/scalar/equal.rs b/src/scalar/equal.rs index e2cf20ee4f7..a59c82b8340 100644 --- a/src/scalar/equal.rs +++ b/src/scalar/equal.rs @@ -43,6 +43,7 @@ fn equal(lhs: &dyn Scalar, rhs: &dyn Scalar) -> bool { }), Utf8 => dyn_eq!(Utf8Scalar, lhs, rhs), LargeUtf8 => dyn_eq!(Utf8Scalar, lhs, rhs), + Binary => dyn_eq!(BinaryScalar, lhs, rhs), LargeBinary => dyn_eq!(BinaryScalar, lhs, rhs), List => dyn_eq!(ListScalar, lhs, rhs), @@ -55,5 +56,6 @@ fn equal(lhs: &dyn Scalar, rhs: &dyn Scalar) -> bool { FixedSizeList => dyn_eq!(FixedSizeListScalar, lhs, rhs), Union => unimplemented!("{:?}", Union), Map => unimplemented!("{:?}", Map), + Utf8Sequence | LargeUtf8Sequence => unimplemented!("{:?}", Utf8Sequence), } } diff --git a/src/scalar/mod.rs b/src/scalar/mod.rs index 44a909c990f..47446386eec 100644 --- a/src/scalar/mod.rs +++ b/src/scalar/mod.rs @@ -106,6 +106,7 @@ pub fn new_scalar(array: &dyn Array, index: usize) -> Box { }), Utf8 => dyn_new_utf8!(array, index, i32), LargeUtf8 => dyn_new_utf8!(array, index, i64), + Utf8Sequence | LargeUtf8Sequence => todo!(), Binary => dyn_new_binary!(array, index, i32), LargeBinary => dyn_new_binary!(array, index, i64), List => dyn_new_list!(array, index, i32),