Skip to content

Commit

Permalink
growable array [skip ci]
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Dec 28, 2023
1 parent 4285c8e commit 4368537
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 60 deletions.
3 changes: 2 additions & 1 deletion crates/polars-arrow/src/array/binview/ffi.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::sync::Arc;
use polars_error::PolarsResult;

use super::BinaryViewArrayGeneric;
Expand Down Expand Up @@ -66,6 +67,6 @@ impl<T: ViewType + ?Sized, A: ffi::ArrowArrayRef> FromFfi<A> for BinaryViewArray
buffers.push(values);
}

Self::try_new(data_type, views, buffers, validity)
Self::try_new(data_type, views, Arc::from(buffers), validity)
}
}
12 changes: 6 additions & 6 deletions crates/polars-arrow/src/array/binview/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ impl<T: ViewType + ?Sized> Clone for BinaryViewArrayGeneric<T> {
unsafe impl<T: ViewType + ?Sized> Send for BinaryViewArrayGeneric<T> {}
unsafe impl<T: ViewType + ?Sized> Sync for BinaryViewArrayGeneric<T> {}

fn buffers_into_raw<T>(buffers: &[Buffer<T>]) -> Vec<(*const T, usize)> {
fn buffers_into_raw<T>(buffers: &[Buffer<T>]) -> Arc<[(*const T, usize)]> {
buffers
.iter()
.map(|buf| (buf.as_ptr(), buf.len()))
Expand All @@ -113,7 +113,7 @@ impl<T: ViewType + ?Sized> BinaryViewArrayGeneric<T> {
pub unsafe fn new_unchecked(
data_type: ArrowDataType,
views: Buffer<u128>,
buffers: Vec<Buffer<u8>>,
buffers: Arc<[Buffer<u8>]>,
validity: Option<Bitmap>,
) -> Self {
let raw_buffers = buffers_into_raw(&buffers);
Expand All @@ -127,7 +127,7 @@ impl<T: ViewType + ?Sized> BinaryViewArrayGeneric<T> {
}
}

pub fn buffers(&self) -> &[Buffer<u8>] {
pub fn data_buffers(&self) -> &[Buffer<u8>] {
self.buffers.as_ref()
}

Expand All @@ -138,7 +138,7 @@ impl<T: ViewType + ?Sized> BinaryViewArrayGeneric<T> {
pub fn try_new(
data_type: ArrowDataType,
views: Buffer<u128>,
buffers: Vec<Buffer<u8>>,
buffers: Arc<[Buffer<u8>]>,
validity: Option<Bitmap>,
) -> PolarsResult<Self> {
if T::IS_UTF8 {
Expand All @@ -164,14 +164,14 @@ impl<T: ViewType + ?Sized> BinaryViewArrayGeneric<T> {
/// Creates an empty [`BinaryViewArrayGeneric`], i.e. whose `.len` is zero.
#[inline]
pub fn new_empty(data_type: ArrowDataType) -> Self {
unsafe { Self::new_unchecked(data_type, Buffer::new(), vec![], None) }
unsafe { Self::new_unchecked(data_type, Buffer::new(), Arc::from([]), None) }
}

/// Returns a new null [`BinaryViewArrayGeneric`] of `length`.
#[inline]
pub fn new_null(data_type: ArrowDataType, length: usize) -> Self {
let validity = Some(Bitmap::new_zeroed(length));
unsafe { Self::new_unchecked(data_type, Buffer::zeroed(length), vec![], validity) }
unsafe { Self::new_unchecked(data_type, Buffer::zeroed(length), Arc::from([]), validity) }
}

/// Returns the element at index `i`
Expand Down
48 changes: 27 additions & 21 deletions crates/polars-arrow/src/array/binview/mutable.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
use std::sync::Arc;
use polars_utils::slice::GetSaferUnchecked;

use crate::array::binview::view::View;
use crate::array::binview::{BinaryViewArrayGeneric, ViewType};
use crate::bitmap::MutableBitmap;
use crate::buffer::Buffer;

const DEFAULT_BLOCK_SIZE: u32 = 8 * 1024;
const DEFAULT_BLOCK_SIZE: usize = 8 * 1024;

#[derive(Debug, Clone)]
pub struct MutableBinaryViewArray<T: ViewType + ?Sized> {
views: Vec<u128>,
completed_buffers: Vec<Buffer<u8>>,
in_progress_buffer: Vec<u8>,
validity: Option<MutableBitmap>,
phantom: std::marker::PhantomData<T>
}

impl<T: ViewType + ?Sized> Default for MutableBinaryViewArray<T> {
Expand All @@ -22,12 +24,15 @@ impl<T: ViewType + ?Sized> Default for MutableBinaryViewArray<T> {
}

impl<T: ViewType + ?Sized> From<MutableBinaryViewArray<T>> for BinaryViewArrayGeneric<T> {
fn from(value: MutableBinaryViewArray<T>) -> Self {
fn from(mut value: MutableBinaryViewArray<T>) -> Self {

value.completed_buffers.push(std::mem::take(&mut value.in_progress_buffer).into());

unsafe {
Self::new_unchecked(
T::DATA_TYPE,
value.views.into(),
vec![value.buffers.into()],
Arc::from(value.completed_buffers),
value.validity.map(|b| b.into()),
)
}
Expand All @@ -45,6 +50,7 @@ impl<T: ViewType + ?Sized> MutableBinaryViewArray<T> {
completed_buffers: vec![],
in_progress_buffer: vec![],
validity: None,
phantom: Default::default()
}
}

Expand All @@ -53,8 +59,12 @@ impl<T: ViewType + ?Sized> MutableBinaryViewArray<T> {
self.views.reserve(additional);
}

pub fn len(&self) -> usize {
self.views.len()
}

fn init_validity(&mut self) {
let mut validity = MutableBitmap::with_capacity(self.values.capacity());
let mut validity = MutableBitmap::with_capacity(self.views.capacity());
validity.extend_constant(self.len(), true);
validity.set(self.len() - 1, false);
self.validity = Some(validity);
Expand All @@ -79,8 +89,8 @@ impl<T: ViewType + ?Sized> MutableBinaryViewArray<T> {
let new_capacity = (self.in_progress_buffer.capacity() * 2)
.clamp(DEFAULT_BLOCK_SIZE, 16 * 1024 * 1024)
.max(bytes.len());
let mut in_progress = Vec::with_capacity(new_capacity);
let flushed = std::mem::replace(&mut self.in_progress_buffer, &mut in_progress);
let mut in_progress= Vec::with_capacity(new_capacity);
let flushed = std::mem::replace(&mut self.in_progress_buffer, in_progress);
if !flushed.is_empty() {
self.completed_buffers.push(flushed.into())
}
Expand Down Expand Up @@ -135,13 +145,21 @@ impl<T: ViewType + ?Sized> MutableBinaryViewArray<T> {
}
}

pub fn from_iter<I, P>(iterator: I) -> Self {
pub fn from_iter<I, P>(iterator: I) -> Self
where
I: Iterator<Item = Option<P>>,
P: AsRef<T>,
{
let mut mutable = Self::with_capacity(iterator.size_hint().0);
mutable.extend(iterator);
mutable
}

pub fn from_values_iter<I, P>(iterator: I) -> Self {
pub fn from_values_iter<I, P>(iterator: I) -> Self
where
I: Iterator<Item = P>,
P: AsRef<T>,
{
let mut mutable = Self::with_capacity(iterator.size_hint().0);
mutable.extend_values(iterator);
mutable
Expand All @@ -154,22 +172,10 @@ impl<T: ViewType + ?Sized, P: AsRef<T>> Extend<Option<P>> for MutableBinaryViewA
Self::extend(self, iter.into_iter())
}
}
impl<T: ViewType + ?Sized, P: AsRef<T>> Extend<P> for MutableBinaryViewArray<T> {
#[inline]
fn extend<I: IntoIterator<Item = Option<P>>>(&mut self, iter: I) {
Self::extend_values(self, iter.into_iter())
}
}

impl<T: ViewType + ?Sized, P: AsRef<T>> FromIterator<P> for MutableBinaryViewArray<T> {
#[inline]
fn from_iter<I: IntoIterator<Item = P>>(iter: I) -> Self {
Self::from_iter(iter.into_iter())
}
}
impl<T: ViewType + ?Sized, P: AsRef<T>> FromIterator<Option<P>> for MutableBinaryViewArray<T> {
#[inline]
fn from_iter<I: IntoIterator<Item = P>>(iter: Option<I>) -> Self {
fn from_iter<I: IntoIterator<Item = Option<P>>>(iter: I) -> Self {
Self::from_iter(iter.into_iter())
}
}
49 changes: 24 additions & 25 deletions crates/polars-arrow/src/array/growable/binview.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use std::sync::Arc;

use super::utils::{build_extend_null_bits, extend_offset_values, ExtendNullBits};
use super::utils::{extend_offset_values};
use super::Growable;
use crate::array::binview::{BinaryViewArrayGeneric, MutableBinaryViewArray, ViewType};
use crate::array::{Array, BinaryArray};
use crate::array::growable::utils::{extend_validity, prepare_validity};
use crate::bitmap::MutableBitmap;
use crate::buffer::Buffer;
use crate::datatypes::ArrowDataType;
Expand All @@ -13,10 +14,9 @@ use crate::offset::{Offset, Offsets};
pub struct GrowableBinaryViewArray<'a, T: ViewType + ?Sized> {
arrays: Vec<&'a BinaryViewArrayGeneric<T>>,
data_type: ArrowDataType,
validity: MutableBitmap::with_capacity(capacity),
validity: Option<MutableBitmap>,
views: Vec<u128>,
buffers: Vec<Buffer<u8>>,
extend_null_bits: Vec<ExtendNullBits<'a>>,
}

impl<'a, T: ViewType + ?Sized> GrowableBinaryViewArray<'a, T> {
Expand All @@ -32,44 +32,41 @@ impl<'a, T: ViewType + ?Sized> GrowableBinaryViewArray<'a, T> {
use_validity = true;
};

let extend_null_bits = arrays
.iter()
.map(|array| build_extend_null_bits(*array, use_validity))
.collect();

let n_buffers = arrays.iter().map(|binview| binview.buffers().len()).sum::<usize>();
let n_buffers = arrays.iter().map(|binview| binview.data_buffers().len()).sum::<usize>();

Self {
arrays,
data_type,
validity: MutableBitmap::with_capacity(capacity),
validity: prepare_validity(use_validity, capacity),
views: Vec::with_capacity(capacity),
buffers: Vec::with_capacity(n_buffers),
extend_null_bits,
}
}

fn to(&mut self) -> BinaryViewArrayGeneric<T> {
// let mutable = std::mem::take(&mut self.mutable);
// let out = mutable.into();
// debug_assert!(out.data_type() == &self.data_type);
// out
todo!()
let views = std::mem::take(&mut self.views);
let buffers = std::mem::take(&mut self.buffers);
let validity = self.validity.take();
unsafe { BinaryViewArrayGeneric::<T>::new_unchecked(
self.data_type.clone(),
views.into(),
Arc::from(buffers),
validity.map(|v| v.into())
) }
}
}

impl<'a, T: ViewType + ?Sized> Growable<'a> for GrowableBinaryViewArray<'a, T> {
fn extend(&mut self, index: usize, start: usize, len: usize) {
(self.extend_null_bits[index])(&mut self.validity, start, len);

let array = self.arrays[index];
extend_validity(&mut self.validity, array, start, len);

let buffer_offset: u32 = self.buffers.len().try_into().expect("unsupported");
let buffer_offset = (buffer_offset as u128) << 64;

let range = start..start + len;
self.buffers.extend_from_slice(&array.buffers()[range]);
self.views.extend(array.views()[range.clone()].iter().map(|&view| {
self.buffers.extend_from_slice(&array.data_buffers()[range.clone()]);
self.views.extend(array.views()[range].iter().map(|&view| {
// If null the buffer index is ignored because the length is 0,
// so we can just do this
view + buffer_offset
Expand All @@ -78,7 +75,9 @@ impl<'a, T: ViewType + ?Sized> Growable<'a> for GrowableBinaryViewArray<'a, T> {

fn extend_validity(&mut self, additional: usize) {
self.views.extend(std::iter::repeat(0).take(additional));
self.validity.extend_constant(additional, false);
if let Some(validity) = &mut self.validity {
validity.extend_constant(additional, false);
}
}

#[inline]
Expand All @@ -97,11 +96,11 @@ impl<'a, T: ViewType + ?Sized> Growable<'a> for GrowableBinaryViewArray<'a, T> {

impl<'a, T: ViewType + ?Sized> From<GrowableBinaryViewArray<'a, T>> for BinaryViewArrayGeneric<T> {
fn from(val: GrowableBinaryViewArray<'a, T>) -> Self {
BinaryViewArrayGeneric::<T>::new_unchecked(
unsafe { BinaryViewArrayGeneric::<T>::new_unchecked(
val.data_type,
val.views.into(),
val.buffers
val.validity.into(),
)
Arc::from(val.buffers),
val.validity.map(|v| v.into()),
) }
}
}
30 changes: 23 additions & 7 deletions crates/polars-arrow/src/array/growable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,14 +120,22 @@ pub fn make_growable<'a>(
use_validity,
capacity
),
Union => {
let arrays = arrays
.iter()
.map(|array| array.as_any().downcast_ref().unwrap())
.collect::<Vec<_>>();
Box::new(union::GrowableUnion::new(arrays, capacity))
BinaryView => {
dyn_growable!(
binview::GrowableBinaryViewArray::<[u8]>,
arrays,
use_validity,
capacity
)
},
Map => dyn_growable!(map::GrowableMap, arrays, use_validity, capacity),
Utf8View => {
dyn_growable!(
binview::GrowableBinaryViewArray::<str>,
arrays,
use_validity,
capacity
)
}
Dictionary(key_type) => {
match_integer_type!(key_type, |$T| {
let arrays = arrays
Expand All @@ -146,5 +154,13 @@ pub fn make_growable<'a>(
))
})
},
Map => dyn_growable!(map::GrowableMap, arrays, use_validity, capacity),
Union => {
let arrays = arrays
.iter()
.map(|array| array.as_any().downcast_ref().unwrap())
.collect::<Vec<_>>();
Box::new(union::GrowableUnion::new(arrays, capacity))
},
}
}
3 changes: 3 additions & 0 deletions crates/polars-arrow/src/datatypes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,9 @@ pub enum ArrowDataType {
/// Decimal backed by 256 bits
Decimal256(usize, usize),
/// Extension type.
/// - name
/// - physical type
/// - metadata
Extension(String, Box<ArrowDataType>, Option<String>),
/// A binary type that inlines small values
/// and can intern bytes.
Expand Down

0 comments on commit 4368537

Please sign in to comment.