Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
PrimitiveArray -> MutableArray (#794)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 authored Feb 2, 2022
1 parent 086c54d commit 54797de
Show file tree
Hide file tree
Showing 11 changed files with 252 additions and 41 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ name = "arrow2"
bench = false

[dependencies]
either = "1.6"
num-traits = "0.2"
bytemuck = { version = "1", features = ["derive"] }
chrono = { version = "0.4", default_features = false, features = ["std"] }
Expand Down
2 changes: 1 addition & 1 deletion src/array/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use crate::{
pub(self) mod physical_binary;

/// A trait representing an immutable Arrow array. Arrow arrays are trait objects
/// that are infalibly downcasted to concrete types according to the [`Array::data_type`].
/// that are infallibly downcasted to concrete types according to the [`Array::data_type`].
pub trait Array: Send + Sync {
/// Convert to trait object.
fn as_any(&self) -> &dyn Any;
Expand Down
36 changes: 36 additions & 0 deletions src/array/primitive/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::{
};

use super::Array;
use either::Either;

mod display;
mod ffi;
Expand Down Expand Up @@ -184,6 +185,41 @@ impl<T: NativeType> PrimitiveArray<T> {
validity: self.validity,
}
}
/// Try to convert this `PrimitiveArray` to a `MutablePrimitiveArray`
pub fn into_mut(self) -> Either<Self, MutablePrimitiveArray<T>> {
use Either::*;

if let Some(bitmap) = self.validity {
match bitmap.into_mut() {
Left(bitmap) => Left(PrimitiveArray::from_data(
self.data_type,
self.values,
Some(bitmap),
)),
Right(mutable_bitmap) => match self.values.get_vec() {
Left(buffer) => Left(PrimitiveArray::from_data(
self.data_type,
buffer,
Some(mutable_bitmap.into()),
)),
Right(values) => Right(MutablePrimitiveArray::from_data(
self.data_type,
values,
Some(mutable_bitmap),
)),
},
}
} else {
match self.values.get_vec() {
Left(buffer) => Left(PrimitiveArray::from_data(self.data_type, buffer, None)),
Right(values) => Right(MutablePrimitiveArray::from_data(
self.data_type,
values,
None,
)),
}
}
}
}

impl<T: NativeType> Array for PrimitiveArray<T> {
Expand Down
17 changes: 16 additions & 1 deletion src/bitmap/immutable.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use either::Either;
use std::iter::FromIterator;
use std::sync::Arc;

Expand Down Expand Up @@ -107,7 +108,7 @@ impl Bitmap {
self.null_count
}

/// Slices `self`, offseting by `offset` and truncating up to `length` bits.
/// Slices `self`, offsetting by `offset` and truncating up to `length` bits.
/// # Panic
/// Panics iff `self.offset + offset + length >= self.bytes.len() * 8`, i.e. if the offset and `length`
/// exceeds the allocated capacity of `self`.
Expand Down Expand Up @@ -175,6 +176,20 @@ impl Bitmap {
pub(crate) fn offset(&self) -> usize {
self.offset
}

/// Try to convert this `Bitmap` to a `MutableBitmap`
pub fn into_mut(mut self) -> Either<Self, MutableBitmap> {
match (
self.offset,
Arc::get_mut(&mut self.bytes).and_then(|b| b.get_vec()),
) {
(0, Some(v)) => {
let data = std::mem::take(v);
Either::Right(MutableBitmap::from_vec(data, self.length))
}
_ => Either::Left(self),
}
}
}

impl<P: AsRef<[bool]>> From<P> for Bitmap {
Expand Down
95 changes: 59 additions & 36 deletions src/buffer/bytes.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,26 @@
//! This module contains an implementation of a contiguous immutable memory region that knows
//! how to de-allocate itself, [`Bytes`].
use std::slice;
use std::{fmt::Debug, fmt::Formatter};
use std::{ptr::NonNull, sync::Arc};

use super::foreign::MaybeForeign;
use crate::ffi;
use crate::types::NativeType;

/// Mode of deallocating memory regions
pub enum Deallocation {
/// Native deallocation, using Rust deallocator with Arrow-specific memory aligment
Native(usize),
Native,
// Foreign interface, via a callback
Foreign(Arc<ffi::ArrowArray>),
}

impl Debug for Deallocation {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
match self {
Deallocation::Native(capacity) => {
write!(f, "Deallocation::Native {{ capacity: {} }}", capacity)
Deallocation::Native => {
write!(f, "Deallocation::Native")
}
Deallocation::Foreign(_) => {
write!(f, "Deallocation::Foreign {{ capacity: unknown }}")
Expand All @@ -36,12 +36,8 @@ impl Debug for Deallocation {
/// When the region is allocated by a foreign allocator, [Deallocation::Foreign], this calls the
/// foreign deallocator to deallocate the region when it is no longer needed.
pub struct Bytes<T: NativeType> {
/// The raw pointer to be begining of the region
ptr: NonNull<T>,

/// The number of bytes visible to this region. This is always smaller than its capacity (when avaliable).
len: usize,

/// inner data
data: MaybeForeign<T>,
/// how to deallocate this region
deallocation: Deallocation,
}
Expand All @@ -59,13 +55,25 @@ impl<T: NativeType> Bytes<T> {
///
/// This function is unsafe as there is no guarantee that the given pointer is valid for `len`
/// bytes. If the `ptr` and `capacity` come from a `Buffer`, then this is guaranteed.
///
/// # Panics
///
/// This function panics if the give deallocation not is `Deallocation::Foreign`
#[inline]
pub unsafe fn new(ptr: std::ptr::NonNull<T>, len: usize, deallocation: Deallocation) -> Self {
Self {
ptr,
len,
deallocation,
}
pub unsafe fn from_ffi(
ptr: std::ptr::NonNull<T>,
len: usize,
deallocation: Deallocation,
) -> Self {
assert!(matches!(deallocation, Deallocation::Foreign(_)));
// This line is technically outside the assumptions of `Vec::from_raw_parts`, since
// `ptr` was not allocated by `Vec`. However, one of the invariants of this struct
// is that we do not expose this region as a `Vec`; we only use `Vec` on it to provide
// immutable access to the region (via `Vec::deref` to `&[T]`).
let data = Vec::from_raw_parts(ptr.as_ptr(), len, len);
let data = MaybeForeign::new(data);

Self { data, deallocation }
}

#[inline]
Expand All @@ -75,24 +83,37 @@ impl<T: NativeType> Bytes<T> {

#[inline]
pub fn len(&self) -> usize {
self.len
self.data.len()
}

#[inline]
pub fn ptr(&self) -> NonNull<T> {
self.ptr
debug_assert!(!self.data.as_ptr().is_null());
unsafe { NonNull::new_unchecked(self.data.as_ptr() as *mut T) }
}

/// Returns a mutable reference to the internal [`Vec<T>`] if it is natively allocated.
/// Returns `None` if allocated by a foreign interface.
pub fn get_vec(&mut self) -> Option<&mut Vec<T>> {
match &self.deallocation {
Deallocation::Foreign(_) => None,
// Safety:
// The allocation is native so we can share the vec
Deallocation::Native => Some(unsafe { self.data.mut_vec() }),
}
}
}

impl<T: NativeType> Drop for Bytes<T> {
#[inline]
fn drop(&mut self) {
match &self.deallocation {
Deallocation::Native(capacity) => unsafe {
let _ = Vec::from_raw_parts(self.ptr.as_ptr(), self.len, *capacity);
},
// foreign interface knows how to deallocate itself.
Deallocation::Foreign(_) => (),
match self.deallocation {
// a foreign interface knows how to deallocate itself
Deallocation::Foreign(_) => {}
Deallocation::Native => {
// Safety:
// the allocation is native, so we can safely drop
unsafe { self.data.drop_local() }
}
}
}
}
Expand All @@ -101,7 +122,7 @@ impl<T: NativeType> std::ops::Deref for Bytes<T> {
type Target = [T];

fn deref(&self) -> &[T] {
unsafe { slice::from_raw_parts(self.ptr.as_ptr(), self.len) }
&self.data
}
}

Expand All @@ -113,7 +134,12 @@ impl<T: NativeType> PartialEq for Bytes<T> {

impl<T: NativeType> Debug for Bytes<T> {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
write!(f, "Bytes {{ ptr: {:?}, len: {}, data: ", self.ptr, self.len,)?;
write!(
f,
"Bytes {{ ptr: {:?}, len: {}, data: ",
self.data.as_ptr(),
self.len(),
)?;

f.debug_list().entries(self.iter()).finish()?;

Expand All @@ -123,15 +149,12 @@ impl<T: NativeType> Debug for Bytes<T> {

impl<T: NativeType> From<Vec<T>> for Bytes<T> {
#[inline]
fn from(mut data: Vec<T>) -> Self {
let ptr = NonNull::new(data.as_mut_ptr()).unwrap();
let len = data.len();
let capacity = data.capacity();

let result = unsafe { Bytes::new(ptr, len, Deallocation::Native(capacity)) };
// so that the memory region is not deallocated.
std::mem::forget(data);
result
fn from(data: Vec<T>) -> Self {
let data = MaybeForeign::new(data);
Self {
data,
deallocation: Deallocation::Native,
}
}
}

Expand Down
55 changes: 55 additions & 0 deletions src/buffer/foreign.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// this code is in its own module so that inner types are not accessible
// as that might break invariants assumptions
use crate::types::NativeType;
use std::mem::ManuallyDrop;
use std::ops::{Deref, DerefMut};

/// Holds a `Vec` that may hold a pointer that is not allocated by `Vec`. It is therefore not
/// safe to deallocate the inner type naively
///
/// This struct exists to avoid holding an `enum` of a `Vec` or a foreign pointer, whose `deref`
/// is known to be least 50% more expensive than the deref of a `Vec`.
///
/// # Safety
///
/// it is unsafe to take and drop the inner value of `MaybeForeign`
/// Doing so is only allowed if the `Vec` was created from the native `Vec` allocator
pub(super) struct MaybeForeign<T: NativeType> {
inner: ManuallyDrop<Vec<T>>,
}

impl<T: NativeType> MaybeForeign<T> {
#[inline]
pub(super) fn new(data: Vec<T>) -> Self {
Self {
inner: ManuallyDrop::new(data),
}
}

/// # Safety
/// This function may only be called if the inner `Vec<T>` was allocated
/// by `Vec<T, A>` allocator `A`.
#[inline]
pub(super) unsafe fn drop_local(&mut self) {
let data = std::mem::take(&mut self.inner);
let _data = ManuallyDrop::into_inner(data);
}

/// # Safety
/// This function may only be called if the inner `Vec<T>` was allocated
/// in Rust and the default `Vec<T, A>` allocator `A`.
/// Otherwise, users may reallocate `Vec`, which is unsound
#[inline]
pub(super) unsafe fn mut_vec(&mut self) -> &mut Vec<T> {
self.inner.deref_mut()
}
}

impl<T: NativeType> Deref for MaybeForeign<T> {
type Target = [T];

#[inline]
fn deref(&self) -> &Self::Target {
&self.inner
}
}
28 changes: 27 additions & 1 deletion src/buffer/immutable.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use either::Either;
use std::{iter::FromIterator, sync::Arc, usize};

use crate::{trusted_len::TrustedLen, types::NativeType};
Expand Down Expand Up @@ -84,7 +85,13 @@ impl<T: NativeType> Buffer<T> {
/// Returns the byte slice stored in this buffer
#[inline]
pub fn as_slice(&self) -> &[T] {
&self.data[self.offset..self.offset + self.length]
// Safety:
// invariant of this struct `offset + length <= data.len()`
debug_assert!(self.offset + self.length <= self.data.len());
unsafe {
self.data
.get_unchecked(self.offset..self.offset + self.length)
}
}

/// Returns a new [Buffer] that is a slice of this buffer starting at `offset`.
Expand Down Expand Up @@ -123,6 +130,25 @@ impl<T: NativeType> Buffer<T> {
pub fn offset(&self) -> usize {
self.offset
}

/// Try to get the inner data as a mutable [`Vec<T>`].
/// This succeeds iff:
/// * This data was allocated by Rust (i.e. it does not come from the C data interface)
/// * This region is not being shared any other struct.
/// * This buffer has no offset
pub fn get_vec(mut self) -> Either<Self, Vec<T>> {
if self.offset != 0 {
Either::Left(self)
} else {
match Arc::get_mut(&mut self.data).and_then(|b| b.get_vec()) {
Some(v) => {
let data = std::mem::take(v);
Either::Right(data)
}
None => Either::Left(self),
}
}
}
}

impl<T: NativeType> Buffer<T> {
Expand Down
1 change: 1 addition & 0 deletions src/buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@
mod immutable;

pub(crate) mod bytes;
mod foreign;

pub use immutable::Buffer;
4 changes: 2 additions & 2 deletions src/ffi/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ unsafe fn create_buffer<T: NativeType>(
let len = buffer_len(array, data_type, index)?;
let offset = buffer_offset(array, data_type, index);
let bytes = ptr
.map(|ptr| Bytes::new(ptr, len, deallocation))
.map(|ptr| Bytes::from_ffi(ptr, len, deallocation))
.ok_or_else(|| {
ArrowError::OutOfSpec(format!("The buffer at position {} is null", index))
})?;
Expand Down Expand Up @@ -240,7 +240,7 @@ unsafe fn create_bitmap(
let bytes_len = bytes_for(offset + len);
let ptr = NonNull::new(ptr as *mut u8);
let bytes = ptr
.map(|ptr| Bytes::new(ptr, bytes_len, deallocation))
.map(|ptr| Bytes::from_ffi(ptr, bytes_len, deallocation))
.ok_or_else(|| {
ArrowError::OutOfSpec(format!(
"The buffer {} is a null pointer and cannot be interpreted as a bitmap",
Expand Down
Loading

0 comments on commit 54797de

Please sign in to comment.