Skip to content

Commit

Permalink
Zero-copy Vec conversion (#3516) (#1176) (#3756)
Browse files Browse the repository at this point in the history
* Zero-copy Vec conversion (#3516) (#1176)

* Fix doc

* More tests

* Review feedback

* More tests
  • Loading branch information
tustvold authored Mar 1, 2023
1 parent 5edc954 commit d440c24
Show file tree
Hide file tree
Showing 6 changed files with 237 additions and 27 deletions.
2 changes: 2 additions & 0 deletions arrow-array/src/array/list_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -829,6 +829,7 @@ mod tests {

#[test]
#[should_panic(expected = "memory is not aligned")]
#[allow(deprecated)]
fn test_primitive_array_alignment() {
let ptr = arrow_buffer::alloc::allocate_aligned(8);
let buf = unsafe { Buffer::from_raw_parts(ptr, 8, 8) };
Expand All @@ -845,6 +846,7 @@ mod tests {
// Different error messages, so skip for now
// https://github.com/apache/arrow-rs/issues/1545
#[cfg(not(feature = "force_validate"))]
#[allow(deprecated)]
fn test_list_array_alignment() {
let ptr = arrow_buffer::alloc::allocate_aligned(8);
let buf = unsafe { Buffer::from_raw_parts(ptr, 8, 8) };
Expand Down
18 changes: 11 additions & 7 deletions arrow-buffer/src/alloc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ fn dangling_ptr() -> NonNull<u8> {
/// Allocates a cache-aligned memory region of `size` bytes with uninitialized values.
/// This is more performant than using [allocate_aligned_zeroed] when all bytes will have
/// an unknown or non-zero value and is semantically similar to `malloc`.
#[deprecated(note = "Use Vec")]
pub fn allocate_aligned(size: usize) -> NonNull<u8> {
unsafe {
if size == 0 {
Expand All @@ -60,6 +61,7 @@ pub fn allocate_aligned(size: usize) -> NonNull<u8> {
/// Allocates a cache-aligned memory region of `size` bytes with `0` on all of them.
/// This is more performant than using [allocate_aligned] and setting all bytes to zero
/// and is semantically similar to `calloc`.
#[deprecated(note = "Use Vec")]
pub fn allocate_aligned_zeroed(size: usize) -> NonNull<u8> {
unsafe {
if size == 0 {
Expand All @@ -80,6 +82,7 @@ pub fn allocate_aligned_zeroed(size: usize) -> NonNull<u8> {
/// * ptr must denote a block of memory currently allocated via this allocator,
///
/// * size must be the same size that was used to allocate that block of memory,
#[deprecated(note = "Use Vec")]
pub unsafe fn free_aligned(ptr: NonNull<u8>, size: usize) {
if size != 0 {
std::alloc::dealloc(
Expand All @@ -100,6 +103,8 @@ pub unsafe fn free_aligned(ptr: NonNull<u8>, size: usize) {
///
/// * new_size, when rounded up to the nearest multiple of [ALIGNMENT], must not overflow (i.e.,
/// the rounded value must be less than usize::MAX).
#[deprecated(note = "Use Vec")]
#[allow(deprecated)]
pub unsafe fn reallocate(
ptr: NonNull<u8>,
old_size: usize,
Expand Down Expand Up @@ -132,19 +137,18 @@ impl<T: RefUnwindSafe + Send + Sync> Allocation for T {}

/// Mode of deallocating memory regions
pub(crate) enum Deallocation {
/// An allocation of the given capacity that needs to be deallocated using arrows's cache aligned allocator.
/// See [allocate_aligned] and [free_aligned].
Arrow(usize),
/// An allocation from an external source like the FFI interface or a Rust Vec.
/// Deallocation will happen
/// An allocation using [`std::alloc`]
Standard(Layout),
/// An allocation from an external source like the FFI interface
/// Deallocation will happen on `Allocation::drop`
Custom(Arc<dyn Allocation>),
}

impl Debug for Deallocation {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
match self {
Deallocation::Arrow(capacity) => {
write!(f, "Deallocation::Arrow {{ capacity: {capacity} }}")
Deallocation::Standard(layout) => {
write!(f, "Deallocation::Standard {layout:?}")
}
Deallocation::Custom(_) => {
write!(f, "Deallocation::Custom {{ capacity: unknown }}")
Expand Down
192 changes: 187 additions & 5 deletions arrow-buffer/src/buffer/immutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@
// specific language governing permissions and limitations
// under the License.

use std::convert::AsRef;
use std::alloc::Layout;
use std::fmt::Debug;
use std::iter::FromIterator;
use std::ptr::NonNull;
use std::sync::Arc;

use crate::alloc::{Allocation, Deallocation};
use crate::alloc::{Allocation, Deallocation, ALIGNMENT};
use crate::util::bit_chunk_iterator::{BitChunks, UnalignedBitChunk};
use crate::{bytes::Bytes, native::ArrowNativeType};

Expand All @@ -42,6 +42,8 @@ pub struct Buffer {
ptr: *const u8,

/// Byte length of the buffer.
///
/// Must be less than or equal to `data.len()`
length: usize,
}

Expand Down Expand Up @@ -69,6 +71,22 @@ impl Buffer {
}
}

/// Create a [`Buffer`] from the provided `Vec` without copying
#[inline]
pub fn from_vec<T: ArrowNativeType>(vec: Vec<T>) -> Self {
// Safety
// Vec::as_ptr guaranteed to not be null and ArrowNativeType are trivially transmutable
let ptr = unsafe { NonNull::new_unchecked(vec.as_ptr() as _) };
let len = vec.len() * std::mem::size_of::<T>();
// Safety
// Vec guaranteed to have a valid layout matching that of `Layout::array`
// This is based on `RawVec::current_memory`
let layout = unsafe { Layout::array::<T>(vec.capacity()).unwrap_unchecked() };
std::mem::forget(vec);
let b = unsafe { Bytes::new(ptr, len, Deallocation::Standard(layout)) };
Self::from_bytes(b)
}

/// Initializes a [Buffer] from a slice of items.
pub fn from_slice_ref<U: ArrowNativeType, T: AsRef<[U]>>(items: T) -> Self {
let slice = items.as_ref();
Expand All @@ -78,7 +96,7 @@ impl Buffer {
buffer.into()
}

/// Creates a buffer from an existing memory region (must already be byte-aligned), this
/// Creates a buffer from an existing aligned memory region (must already be byte-aligned), this
/// `Buffer` will free this piece of memory when dropped.
///
/// # Arguments
Expand All @@ -91,9 +109,11 @@ impl Buffer {
///
/// This function is unsafe as there is no guarantee that the given pointer is valid for `len`
/// bytes. If the `ptr` and `capacity` come from a `Buffer`, then this is guaranteed.
#[deprecated(note = "Use From<Vec<T>>")]
pub unsafe fn from_raw_parts(ptr: NonNull<u8>, len: usize, capacity: usize) -> Self {
assert!(len <= capacity);
Buffer::build_with_arguments(ptr, len, Deallocation::Arrow(capacity))
let layout = Layout::from_size_align(capacity, ALIGNMENT).unwrap();
Buffer::build_with_arguments(ptr, len, Deallocation::Standard(layout))
}

/// Creates a buffer from an existing memory region. Ownership of the memory is tracked via reference counting
Expand Down Expand Up @@ -253,7 +273,8 @@ impl Buffer {
}

/// Returns `MutableBuffer` for mutating the buffer if this buffer is not shared.
/// Returns `Err` if this is shared or its allocation is from an external source.
/// Returns `Err` if this is shared or its allocation is from an external source or
/// it is not allocated with alignment [`ALIGNMENT`]
pub fn into_mutable(self) -> Result<MutableBuffer, Self> {
let ptr = self.ptr;
let length = self.length;
Expand All @@ -269,6 +290,45 @@ impl Buffer {
length,
})
}

/// Returns `Vec` for mutating the buffer
///
/// Returns `Err(self)` if this buffer does not have the same [`Layout`] as
/// the destination Vec or contains a non-zero offset
pub fn into_vec<T: ArrowNativeType>(self) -> Result<Vec<T>, Self> {
let layout = match self.data.deallocation() {
Deallocation::Standard(l) => l,
_ => return Err(self), // Custom allocation
};

if self.ptr != self.data.as_ptr() {
return Err(self); // Data is offset
}

let v_capacity = layout.size() / std::mem::size_of::<T>();
match Layout::array::<T>(v_capacity) {
Ok(expected) if layout == &expected => {}
_ => return Err(self), // Incorrect layout
}

let length = self.length;
let ptr = self.ptr;
let v_len = self.length / std::mem::size_of::<T>();

Arc::try_unwrap(self.data)
.map(|bytes| unsafe {
let ptr = bytes.ptr().as_ptr() as _;
std::mem::forget(bytes);
// Safety
// Verified that bytes layout matches that of Vec
Vec::from_raw_parts(ptr, v_len, v_capacity)
})
.map_err(|bytes| Buffer {
data: bytes,
ptr,
length,
})
}
}

/// Creating a `Buffer` instance by copying the memory from a `AsRef<[u8]>` into a newly
Expand Down Expand Up @@ -378,6 +438,7 @@ impl<T: ArrowNativeType> FromIterator<T> for Buffer {

#[cfg(test)]
mod tests {
use crate::i256;
use std::panic::{RefUnwindSafe, UnwindSafe};
use std::thread;

Expand Down Expand Up @@ -632,4 +693,125 @@ mod tests {
let buffer = Buffer::from(MutableBuffer::from_len_zeroed(12));
buffer.slice_with_length(2, usize::MAX);
}

#[test]
fn test_vec_interop() {
// Test empty vec
let a: Vec<i128> = Vec::new();
let b = Buffer::from_vec(a);
b.into_vec::<i128>().unwrap();

// Test vec with capacity
let a: Vec<i128> = Vec::with_capacity(20);
let b = Buffer::from_vec(a);
let back = b.into_vec::<i128>().unwrap();
assert_eq!(back.len(), 0);
assert_eq!(back.capacity(), 20);

// Test vec with values
let mut a: Vec<i128> = Vec::with_capacity(3);
a.extend_from_slice(&[1, 2, 3]);
let b = Buffer::from_vec(a);
let back = b.into_vec::<i128>().unwrap();
assert_eq!(back.len(), 3);
assert_eq!(back.capacity(), 3);

// Test vec with values and spare capacity
let mut a: Vec<i128> = Vec::with_capacity(20);
a.extend_from_slice(&[1, 4, 7, 8, 9, 3, 6]);
let b = Buffer::from_vec(a);
let back = b.into_vec::<i128>().unwrap();
assert_eq!(back.len(), 7);
assert_eq!(back.capacity(), 20);

// Test incorrect alignment
let a: Vec<i128> = Vec::new();
let b = Buffer::from_vec(a);
let b = b.into_vec::<i32>().unwrap_err();
b.into_vec::<i8>().unwrap_err();

// Test convert between types with same alignment
// This is an implementation quirk, but isn't harmful
// as ArrowNativeType are trivially transmutable
let a: Vec<i64> = vec![1, 2, 3, 4];
let b = Buffer::from_vec(a);
let back = b.into_vec::<u64>().unwrap();
assert_eq!(back.len(), 4);
assert_eq!(back.capacity(), 4);

// i256 has the same layout as i128 so this is valid
let mut b: Vec<i128> = Vec::with_capacity(4);
b.extend_from_slice(&[1, 2, 3, 4]);
let b = Buffer::from_vec(b);
let back = b.into_vec::<i256>().unwrap();
assert_eq!(back.len(), 2);
assert_eq!(back.capacity(), 2);

// Invalid layout
let b: Vec<i128> = vec![1, 2, 3];
let b = Buffer::from_vec(b);
b.into_vec::<i256>().unwrap_err();

// Invalid layout
let mut b: Vec<i128> = Vec::with_capacity(5);
b.extend_from_slice(&[1, 2, 3, 4]);
let b = Buffer::from_vec(b);
b.into_vec::<i256>().unwrap_err();

// Truncates length
// This is an implementation quirk, but isn't harmful
let mut b: Vec<i128> = Vec::with_capacity(4);
b.extend_from_slice(&[1, 2, 3]);
let b = Buffer::from_vec(b);
let back = b.into_vec::<i256>().unwrap();
assert_eq!(back.len(), 1);
assert_eq!(back.capacity(), 2);

// Cannot use aligned allocation
let b = Buffer::from(MutableBuffer::new(10));
let b = b.into_vec::<u8>().unwrap_err();
b.into_vec::<u64>().unwrap_err();

// Test slicing
let mut a: Vec<i128> = Vec::with_capacity(20);
a.extend_from_slice(&[1, 4, 7, 8, 9, 3, 6]);
let b = Buffer::from_vec(a);
let slice = b.slice_with_length(0, 64);

// Shared reference fails
let slice = slice.into_vec::<i128>().unwrap_err();
drop(b);

// Succeeds as no outstanding shared reference
let back = slice.into_vec::<i128>().unwrap();
assert_eq!(&back, &[1, 4, 7, 8]);
assert_eq!(back.capacity(), 20);

// Slicing by non-multiple length truncates
let mut a: Vec<i128> = Vec::with_capacity(8);
a.extend_from_slice(&[1, 4, 7, 3]);

let b = Buffer::from_vec(a);
let slice = b.slice_with_length(0, 34);
drop(b);

let back = slice.into_vec::<i128>().unwrap();
assert_eq!(&back, &[1, 4]);
assert_eq!(back.capacity(), 8);

// Offset prevents conversion
let a: Vec<u32> = vec![1, 3, 4, 6];
let b = Buffer::from_vec(a).slice(2);
b.into_vec::<u32>().unwrap_err();

let b = MutableBuffer::new(16).into_buffer();
let b = b.into_vec::<u8>().unwrap_err(); // Invalid layout
let b = b.into_vec::<u32>().unwrap_err(); // Invalid layout
b.into_mutable().unwrap();

let b = Buffer::from_vec(vec![1_u32, 3, 5]);
let b = b.into_mutable().unwrap_err(); // Invalid layout
let b = b.into_vec::<u32>().unwrap();
assert_eq!(b, &[1, 3, 5]);
}
}
Loading

0 comments on commit d440c24

Please sign in to comment.