Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Simplified Bytes (internal) #1099

Merged
merged 1 commit into from
Jun 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 58 additions & 101 deletions src/buffer/bytes.rs
Original file line number Diff line number Diff line change
@@ -1,161 +1,118 @@
//! This module contains an implementation of a contiguous immutable memory region that knows
//! how to de-allocate itself, [`Bytes`].
use std::{fmt::Debug, fmt::Formatter};
use std::mem::ManuallyDrop;
use std::ops::{Deref, DerefMut};
use std::panic::RefUnwindSafe;
use std::{ptr::NonNull, sync::Arc};

use super::foreign::MaybeForeign;
use crate::ffi;
use crate::types::NativeType;

/// Mode of deallocating memory regions
pub enum Deallocation {
/// Native deallocation, using Rust deallocator with Arrow-specific memory aligment
enum Allocation {
/// Native allocation
Native,
// Foreign interface, via a callback
Foreign(Arc<ffi::InternalArrowArray>),
// A foreign allocator and its ref count
Foreign(Arc<dyn RefUnwindSafe + Send + Sync>),
}

impl Debug for Deallocation {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
match self {
Deallocation::Native => {
write!(f, "Deallocation::Native")
}
Deallocation::Foreign(_) => {
write!(f, "Deallocation::Foreign {{ capacity: unknown }}")
}
}
}
}

/// A continuous, fixed-size, immutable memory region that knows how to de-allocate itself.
/// A continuous memory region that may be allocated externally.
///
/// In the most common case, this buffer is allocated using Rust's native allocator.
/// However, it may also be allocated by a foreign allocator, [Deallocation::Foreign].
pub struct Bytes<T: NativeType> {
/// inner data
data: MaybeForeign<T>,
/// how to deallocate this region
deallocation: Deallocation,
/// In the most common case, this is created from [`Vec`].
/// However, this region can also be allocated by a foreign allocator.
pub struct Bytes<T> {
/// An implementation using an `enum` of a `Vec` or a foreign pointer is not used
/// because `deref` is at least 50% more expensive than the deref of a `Vec`.
data: ManuallyDrop<Vec<T>>,
/// the region was allocated
allocation: Allocation,
}

impl<T: NativeType> Bytes<T> {
/// Takes ownership of an allocated memory region,
///
/// # Arguments
///
/// * `ptr` - Pointer to raw parts
/// * `len` - Length of raw parts in **bytes**
/// * `capacity` - Total allocated memory for the pointer `ptr`, in **bytes**
///
impl<T> Bytes<T> {
/// Takes ownership of an allocated memory region `[ptr, ptr+len[`,
/// # Safety
///
/// This function is unsafe as there is no guarantee that the given pointer is valid for `len`
/// bytes. If the `ptr` and `capacity` come from a `Buffer`, then this is guaranteed.
///
/// # Panics
///
/// This function panics if the give deallocation not is `Deallocation::Foreign`
/// This function is safe iff:
/// * the region is properly allocated in that a slice can be safely built from it.
/// * the region is immutable.
/// # Implementation
/// This function leaks iff `owner` does not deallocate the region when dropped.
#[inline]
pub unsafe fn from_ffi(
pub unsafe fn from_owned(
ptr: std::ptr::NonNull<T>,
len: usize,
deallocation: Deallocation,
owner: Arc<dyn RefUnwindSafe + Send + Sync>,
) -> Self {
assert!(matches!(deallocation, Deallocation::Foreign(_)));
// This line is technically outside the assumptions of `Vec::from_raw_parts`, since
// `ptr` was not allocated by `Vec`. However, one of the invariants of this struct
// is that we do not expose this region as a `Vec`; we only use `Vec` on it to provide
// immutable access to the region (via `Vec::deref` to `&[T]`).
// MIRI does not complain, which seems to agree with the line of thought.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could also use Box<[T]> and call Vec::from_raw_parts only on get_vec on native allocated data.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried this but in get_vec we would be creating a temporary Vec and returning &mut Vec, which does not compile :(

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yeah.. :(

let data = Vec::from_raw_parts(ptr.as_ptr(), len, len);
let data = MaybeForeign::new(data);

Self { data, deallocation }
}
let data = ManuallyDrop::new(data);

#[inline]
fn as_slice(&self) -> &[T] {
self
Self {
data,
allocation: Allocation::Foreign(owner),
}
}

/// The length of the region
#[inline]
pub fn len(&self) -> usize {
self.data.len()
}

/// The pointer to the region
#[inline]
pub fn ptr(&self) -> NonNull<T> {
debug_assert!(!self.data.as_ptr().is_null());
unsafe { NonNull::new_unchecked(self.data.as_ptr() as *mut T) }
}

/// Returns a mutable reference to the internal [`Vec<T>`] if it is natively allocated.
/// Returns `None` if allocated by a foreign interface.
/// Returns a `Some` mutable reference of [`Vec<T>`] iff this was initialized
/// from a [`Vec<T>`] and `None` otherwise.
#[inline]
pub fn get_vec(&mut self) -> Option<&mut Vec<T>> {
match &self.deallocation {
Deallocation::Foreign(_) => None,
// Safety:
// The allocation is native so we can share the vec
Deallocation::Native => Some(unsafe { self.data.mut_vec() }),
match &self.allocation {
Allocation::Foreign(_) => None,
Allocation::Native => Some(self.data.deref_mut()),
}
}
}

impl<T: NativeType> Drop for Bytes<T> {
impl<T> Drop for Bytes<T> {
#[inline]
fn drop(&mut self) {
match self.deallocation {
// a foreign interface knows how to deallocate itself
Deallocation::Foreign(_) => {}
Deallocation::Native => {
// Safety:
// the allocation is native, so we can safely drop
unsafe { self.data.drop_local() }
match self.allocation {
Allocation::Foreign(_) => {
// The ref count of the foreign is reduced by one
// we can't deallocate `Vec` here since the region was allocated by
// a foreign allocator
}
Allocation::Native => {
let data = std::mem::take(&mut self.data);
let _ = ManuallyDrop::into_inner(data);
}
}
}
}

impl<T: NativeType> std::ops::Deref for Bytes<T> {
impl<T> std::ops::Deref for Bytes<T> {
type Target = [T];

#[inline]
fn deref(&self) -> &[T] {
&self.data
}
}

impl<T: NativeType> PartialEq for Bytes<T> {
impl<T: PartialEq> PartialEq for Bytes<T> {
#[inline]
fn eq(&self, other: &Bytes<T>) -> bool {
self.as_slice() == other.as_slice()
self.deref() == other.deref()
}
}

impl<T: NativeType> Debug for Bytes<T> {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
write!(
f,
"Bytes {{ ptr: {:?}, len: {}, data: ",
self.data.as_ptr(),
self.len(),
)?;

f.debug_list().entries(self.iter()).finish()?;

write!(f, " }}")
}
}

impl<T: NativeType> From<Vec<T>> for Bytes<T> {
impl<T> From<Vec<T>> for Bytes<T> {
#[inline]
fn from(data: Vec<T>) -> Self {
let data = MaybeForeign::new(data);
Self {
data,
deallocation: Deallocation::Native,
data: ManuallyDrop::new(data),
allocation: Allocation::Native,
}
}
}

// This is sound because `Bytes` is an immutable container
unsafe impl<T: NativeType> Send for Bytes<T> {}
unsafe impl<T: NativeType> Sync for Bytes<T> {}
55 changes: 0 additions & 55 deletions src/buffer/foreign.rs

This file was deleted.

1 change: 0 additions & 1 deletion src/buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,5 @@
mod immutable;

pub(crate) mod bytes;
mod foreign;

pub use immutable::Buffer;
24 changes: 10 additions & 14 deletions src/ffi/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,7 @@ use std::{ptr::NonNull, sync::Arc};
use crate::{
array::*,
bitmap::{utils::bytes_for, Bitmap},
buffer::{
bytes::{Bytes, Deallocation},
Buffer,
},
buffer::{bytes::Bytes, Buffer},
datatypes::{DataType, PhysicalType},
error::{Error, Result},
ffi::schema::get_child,
Expand Down Expand Up @@ -181,7 +178,7 @@ impl ArrowArray {
unsafe fn create_buffer<T: NativeType>(
array: &ArrowArray,
data_type: &DataType,
deallocation: Deallocation,
owner: Arc<InternalArrowArray>,
index: usize,
) -> Result<Buffer<T>> {
if array.buffers.is_null() {
Expand All @@ -197,7 +194,7 @@ unsafe fn create_buffer<T: NativeType>(
let len = buffer_len(array, data_type, index)?;
let offset = buffer_offset(array, data_type, index);
let bytes = ptr
.map(|ptr| Bytes::from_ffi(ptr, len, deallocation))
.map(|ptr| Bytes::from_owned(ptr, len, owner))
.ok_or_else(|| Error::OutOfSpec(format!("The buffer at position {} is null", index)))?;

Ok(Buffer::from_bytes(bytes).slice(offset, len - offset))
Expand All @@ -212,7 +209,7 @@ unsafe fn create_buffer<T: NativeType>(
/// This function assumes that `ceil(self.length * bits, 8)` is the size of the buffer
unsafe fn create_bitmap(
array: &ArrowArray,
deallocation: Deallocation,
owner: Arc<InternalArrowArray>,
index: usize,
) -> Result<Bitmap> {
if array.buffers.is_null() {
Expand All @@ -228,7 +225,7 @@ unsafe fn create_bitmap(
let bytes_len = bytes_for(offset + len);
let ptr = NonNull::new(ptr as *mut u8);
let bytes = ptr
.map(|ptr| Bytes::from_ffi(ptr, bytes_len, deallocation))
.map(|ptr| Bytes::from_owned(ptr, bytes_len, owner))
.ok_or_else(|| {
Error::OutOfSpec(format!(
"The buffer {} is a null pointer and cannot be interpreted as a bitmap",
Expand Down Expand Up @@ -344,8 +341,8 @@ fn create_dictionary(
}

pub trait ArrowArrayRef: std::fmt::Debug {
fn deallocation(&self) -> Deallocation {
Deallocation::Foreign(self.parent().clone())
fn owner(&self) -> Arc<InternalArrowArray> {
self.parent().clone()
}

/// returns the null bit buffer.
Expand All @@ -358,23 +355,22 @@ pub trait ArrowArrayRef: std::fmt::Debug {
if self.array().null_count() == 0 {
Ok(None)
} else {
create_bitmap(self.array(), self.deallocation(), 0).map(Some)
create_bitmap(self.array(), self.owner(), 0).map(Some)
}
}

/// # Safety
/// The caller must guarantee that the buffer `index` corresponds to a bitmap.
/// This function assumes that the bitmap created from FFI is valid; this is impossible to prove.
unsafe fn buffer<T: NativeType>(&self, index: usize) -> Result<Buffer<T>> {
create_buffer::<T>(self.array(), self.data_type(), self.deallocation(), index)
create_buffer::<T>(self.array(), self.data_type(), self.owner(), index)
}

/// # Safety
/// The caller must guarantee that the buffer `index` corresponds to a bitmap.
/// This function assumes that the bitmap created from FFI is valid; this is impossible to prove.
unsafe fn bitmap(&self, index: usize) -> Result<Bitmap> {
// +1 to ignore null bitmap
create_bitmap(self.array(), self.deallocation(), index)
create_bitmap(self.array(), self.owner(), index)
}

/// # Safety
Expand Down