Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Added COW semantics to Buffer, Bitmap and some arrays #794

Merged
merged 9 commits into from
Feb 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ name = "arrow2"
bench = false

[dependencies]
either = "1.6"
num-traits = "0.2"
bytemuck = { version = "1", features = ["derive"] }
chrono = { version = "0.4", default_features = false, features = ["std"] }
Expand Down
2 changes: 1 addition & 1 deletion src/array/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use crate::{
pub(self) mod physical_binary;

/// A trait representing an immutable Arrow array. Arrow arrays are trait objects
/// that are infalibly downcasted to concrete types according to the [`Array::data_type`].
/// that are infallibly downcasted to concrete types according to the [`Array::data_type`].
pub trait Array: Send + Sync {
/// Convert to trait object.
fn as_any(&self) -> &dyn Any;
Expand Down
36 changes: 36 additions & 0 deletions src/array/primitive/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::{
};

use super::Array;
use either::Either;

mod display;
mod ffi;
Expand Down Expand Up @@ -184,6 +185,41 @@ impl<T: NativeType> PrimitiveArray<T> {
validity: self.validity,
}
}
/// Try to convert this `PrimitiveArray` to a `MutablePrimitiveArray`
pub fn into_mut(self) -> Either<Self, MutablePrimitiveArray<T>> {
use Either::*;

if let Some(bitmap) = self.validity {
match bitmap.into_mut() {
Left(bitmap) => Left(PrimitiveArray::from_data(
self.data_type,
self.values,
Some(bitmap),
)),
Right(mutable_bitmap) => match self.values.get_vec() {
Left(buffer) => Left(PrimitiveArray::from_data(
self.data_type,
buffer,
Some(mutable_bitmap.into()),
)),
Right(values) => Right(MutablePrimitiveArray::from_data(
self.data_type,
values,
Some(mutable_bitmap),
)),
},
}
} else {
match self.values.get_vec() {
Left(buffer) => Left(PrimitiveArray::from_data(self.data_type, buffer, None)),
Right(values) => Right(MutablePrimitiveArray::from_data(
self.data_type,
values,
None,
)),
}
}
}
}

impl<T: NativeType> Array for PrimitiveArray<T> {
Expand Down
17 changes: 16 additions & 1 deletion src/bitmap/immutable.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use either::Either;
use std::iter::FromIterator;
use std::sync::Arc;

Expand Down Expand Up @@ -107,7 +108,7 @@ impl Bitmap {
self.null_count
}

/// Slices `self`, offseting by `offset` and truncating up to `length` bits.
/// Slices `self`, offsetting by `offset` and truncating up to `length` bits.
/// # Panic
/// Panics iff `self.offset + offset + length >= self.bytes.len() * 8`, i.e. if the offset and `length`
/// exceeds the allocated capacity of `self`.
Expand Down Expand Up @@ -175,6 +176,20 @@ impl Bitmap {
pub(crate) fn offset(&self) -> usize {
self.offset
}

/// Try to convert this `Bitmap` to a `MutableBitmap`
pub fn into_mut(mut self) -> Either<Self, MutableBitmap> {
match (
self.offset,
Arc::get_mut(&mut self.bytes).and_then(|b| b.get_vec()),
) {
(0, Some(v)) => {
let data = std::mem::take(v);
Either::Right(MutableBitmap::from_vec(data, self.length))
}
_ => Either::Left(self),
}
}
}

impl<P: AsRef<[bool]>> From<P> for Bitmap {
Expand Down
95 changes: 59 additions & 36 deletions src/buffer/bytes.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,26 @@
//! This module contains an implementation of a contiguous immutable memory region that knows
//! how to de-allocate itself, [`Bytes`].

use std::slice;
use std::{fmt::Debug, fmt::Formatter};
use std::{ptr::NonNull, sync::Arc};

use super::foreign::MaybeForeign;
use crate::ffi;
use crate::types::NativeType;

/// Mode of deallocating memory regions
pub enum Deallocation {
/// Native deallocation, using Rust deallocator with Arrow-specific memory aligment
Native(usize),
Native,
// Foreign interface, via a callback
Foreign(Arc<ffi::ArrowArray>),
}

impl Debug for Deallocation {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
match self {
Deallocation::Native(capacity) => {
write!(f, "Deallocation::Native {{ capacity: {} }}", capacity)
Deallocation::Native => {
write!(f, "Deallocation::Native")
}
Deallocation::Foreign(_) => {
write!(f, "Deallocation::Foreign {{ capacity: unknown }}")
Expand All @@ -36,12 +36,8 @@ impl Debug for Deallocation {
/// When the region is allocated by a foreign allocator, [Deallocation::Foreign], this calls the
/// foreign deallocator to deallocate the region when it is no longer needed.
pub struct Bytes<T: NativeType> {
/// The raw pointer to be begining of the region
ptr: NonNull<T>,

/// The number of bytes visible to this region. This is always smaller than its capacity (when avaliable).
len: usize,

/// inner data
data: MaybeForeign<T>,
/// how to deallocate this region
deallocation: Deallocation,
}
Expand All @@ -59,13 +55,25 @@ impl<T: NativeType> Bytes<T> {
///
/// This function is unsafe as there is no guarantee that the given pointer is valid for `len`
/// bytes. If the `ptr` and `capacity` come from a `Buffer`, then this is guaranteed.
///
/// # Panics
///
/// This function panics if the give deallocation not is `Deallocation::Foreign`
#[inline]
pub unsafe fn new(ptr: std::ptr::NonNull<T>, len: usize, deallocation: Deallocation) -> Self {
Self {
ptr,
len,
deallocation,
}
pub unsafe fn from_ffi(
ptr: std::ptr::NonNull<T>,
len: usize,
deallocation: Deallocation,
) -> Self {
assert!(matches!(deallocation, Deallocation::Foreign(_)));
// This line is technically outside the assumptions of `Vec::from_raw_parts`, since
// `ptr` was not allocated by `Vec`. However, one of the invariants of this struct
// is that we do not expose this region as a `Vec`; we only use `Vec` on it to provide
// immutable access to the region (via `Vec::deref` to `&[T]`).
let data = Vec::from_raw_parts(ptr.as_ptr(), len, len);
let data = MaybeForeign::new(data);

Self { data, deallocation }
}

#[inline]
Expand All @@ -75,24 +83,37 @@ impl<T: NativeType> Bytes<T> {

#[inline]
pub fn len(&self) -> usize {
self.len
self.data.len()
}

#[inline]
pub fn ptr(&self) -> NonNull<T> {
self.ptr
debug_assert!(!self.data.as_ptr().is_null());
unsafe { NonNull::new_unchecked(self.data.as_ptr() as *mut T) }
}

/// Returns a mutable reference to the internal [`Vec<T>`] if it is natively allocated.
/// Returns `None` if allocated by a foreign interface.
pub fn get_vec(&mut self) -> Option<&mut Vec<T>> {
match &self.deallocation {
Deallocation::Foreign(_) => None,
// Safety:
// The allocation is native so we can share the vec
Deallocation::Native => Some(unsafe { self.data.mut_vec() }),
}
}
}

impl<T: NativeType> Drop for Bytes<T> {
#[inline]
fn drop(&mut self) {
match &self.deallocation {
Deallocation::Native(capacity) => unsafe {
let _ = Vec::from_raw_parts(self.ptr.as_ptr(), self.len, *capacity);
},
// foreign interface knows how to deallocate itself.
Deallocation::Foreign(_) => (),
match self.deallocation {
// a foreign interface knows how to deallocate itself
Deallocation::Foreign(_) => {}
Deallocation::Native => {
// Safety:
// the allocation is native, so we can safely drop
unsafe { self.data.drop_local() }
}
}
}
}
Expand All @@ -101,7 +122,7 @@ impl<T: NativeType> std::ops::Deref for Bytes<T> {
type Target = [T];

fn deref(&self) -> &[T] {
unsafe { slice::from_raw_parts(self.ptr.as_ptr(), self.len) }
&self.data
}
}

Expand All @@ -113,7 +134,12 @@ impl<T: NativeType> PartialEq for Bytes<T> {

impl<T: NativeType> Debug for Bytes<T> {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
write!(f, "Bytes {{ ptr: {:?}, len: {}, data: ", self.ptr, self.len,)?;
write!(
f,
"Bytes {{ ptr: {:?}, len: {}, data: ",
self.data.as_ptr(),
self.len(),
)?;

f.debug_list().entries(self.iter()).finish()?;

Expand All @@ -123,15 +149,12 @@ impl<T: NativeType> Debug for Bytes<T> {

impl<T: NativeType> From<Vec<T>> for Bytes<T> {
#[inline]
fn from(mut data: Vec<T>) -> Self {
let ptr = NonNull::new(data.as_mut_ptr()).unwrap();
let len = data.len();
let capacity = data.capacity();

let result = unsafe { Bytes::new(ptr, len, Deallocation::Native(capacity)) };
// so that the memory region is not deallocated.
std::mem::forget(data);
result
fn from(data: Vec<T>) -> Self {
let data = MaybeForeign::new(data);
Self {
data,
deallocation: Deallocation::Native,
}
}
}

Expand Down
55 changes: 55 additions & 0 deletions src/buffer/foreign.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// this code is in its own module so that inner types are not accessible
// as that might break invariants assumptions
use crate::types::NativeType;
use std::mem::ManuallyDrop;
use std::ops::{Deref, DerefMut};

/// Holds a `Vec` that may hold a pointer that is not allocated by `Vec`. It is therefore not
/// safe to deallocate the inner type naively
///
/// This struct exists to avoid holding an `enum` of a `Vec` or a foreign pointer, whose `deref`
/// is known to be least 50% more expensive than the deref of a `Vec`.
///
/// # Safety
///
/// it is unsafe to take and drop the inner value of `MaybeForeign`
/// Doing so is only allowed if the `Vec` was created from the native `Vec` allocator
pub(super) struct MaybeForeign<T: NativeType> {
inner: ManuallyDrop<Vec<T>>,
}

impl<T: NativeType> MaybeForeign<T> {
#[inline]
pub(super) fn new(data: Vec<T>) -> Self {
Self {
inner: ManuallyDrop::new(data),
}
}

/// # Safety
/// This function may only be called if the inner `Vec<T>` was allocated
/// by `Vec<T, A>` allocator `A`.
#[inline]
pub(super) unsafe fn drop_local(&mut self) {
let data = std::mem::take(&mut self.inner);
let _data = ManuallyDrop::into_inner(data);
}

/// # Safety
/// This function may only be called if the inner `Vec<T>` was allocated
/// in Rust and the default `Vec<T, A>` allocator `A`.
/// Otherwise, users may reallocate `Vec`, which is unsound
#[inline]
pub(super) unsafe fn mut_vec(&mut self) -> &mut Vec<T> {
self.inner.deref_mut()
}
}

impl<T: NativeType> Deref for MaybeForeign<T> {
type Target = [T];

#[inline]
fn deref(&self) -> &Self::Target {
&self.inner
}
}
28 changes: 27 additions & 1 deletion src/buffer/immutable.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use either::Either;
use std::{iter::FromIterator, sync::Arc, usize};

use crate::{trusted_len::TrustedLen, types::NativeType};
Expand Down Expand Up @@ -84,7 +85,13 @@ impl<T: NativeType> Buffer<T> {
/// Returns the byte slice stored in this buffer
#[inline]
pub fn as_slice(&self) -> &[T] {
&self.data[self.offset..self.offset + self.length]
// Safety:
// invariant of this struct `offset + length <= data.len()`
debug_assert!(self.offset + self.length <= self.data.len());
unsafe {
self.data
.get_unchecked(self.offset..self.offset + self.length)
}
}

/// Returns a new [Buffer] that is a slice of this buffer starting at `offset`.
Expand Down Expand Up @@ -123,6 +130,25 @@ impl<T: NativeType> Buffer<T> {
pub fn offset(&self) -> usize {
self.offset
}

/// Try to get the inner data as a mutable [`Vec<T>`].
/// This succeeds iff:
/// * This data was allocated by Rust (i.e. it does not come from the C data interface)
/// * This region is not being shared any other struct.
/// * This buffer has no offset
pub fn get_vec(mut self) -> Either<Self, Vec<T>> {
if self.offset != 0 {
Either::Left(self)
} else {
match Arc::get_mut(&mut self.data).and_then(|b| b.get_vec()) {
Some(v) => {
let data = std::mem::take(v);
Either::Right(data)
}
None => Either::Left(self),
}
}
}
}

impl<T: NativeType> Buffer<T> {
Expand Down
1 change: 1 addition & 0 deletions src/buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@
mod immutable;

pub(crate) mod bytes;
mod foreign;

pub use immutable::Buffer;
4 changes: 2 additions & 2 deletions src/ffi/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ unsafe fn create_buffer<T: NativeType>(
let len = buffer_len(array, data_type, index)?;
let offset = buffer_offset(array, data_type, index);
let bytes = ptr
.map(|ptr| Bytes::new(ptr, len, deallocation))
.map(|ptr| Bytes::from_ffi(ptr, len, deallocation))
.ok_or_else(|| {
ArrowError::OutOfSpec(format!("The buffer at position {} is null", index))
})?;
Expand Down Expand Up @@ -240,7 +240,7 @@ unsafe fn create_bitmap(
let bytes_len = bytes_for(offset + len);
let ptr = NonNull::new(ptr as *mut u8);
let bytes = ptr
.map(|ptr| Bytes::new(ptr, bytes_len, deallocation))
.map(|ptr| Bytes::from_ffi(ptr, bytes_len, deallocation))
.ok_or_else(|| {
ArrowError::OutOfSpec(format!(
"The buffer {} is a null pointer and cannot be interpreted as a bitmap",
Expand Down
Loading