diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 0751eccd8c8..a4205d05341 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -73,6 +73,25 @@ jobs: # --skip io: miri can't handle opening of files, so we skip those run: cargo miri test --features full -- --skip io::parquet --skip io::ipc + miri-checks-custom-allocator: + name: MIRI with custom allocator + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - uses: actions-rs/toolchain@v1 + with: + toolchain: nightly-2021-07-09 + override: true + - uses: Swatinem/rust-cache@v1 + - name: Install Miri + run: | + rustup component add miri + cargo miri setup + + - name: Run + # --skip io: miri can't handle opening of files, so we skip those + run: cargo miri test --features full,cache_aligned -- --skip io::parquet --skip io::ipc + coverage: name: Coverage runs-on: ubuntu-latest @@ -91,7 +110,9 @@ jobs: - name: Install tarpaulin run: cargo install cargo-tarpaulin - name: Run coverage - run: cargo tarpaulin --features full --out Xml + run: | + cargo tarpaulin --features cache_aligned --out Xml + cargo tarpaulin --features full --out Xml - name: Report coverage continue-on-error: true run: bash <(curl -s https://codecov.io/bash) diff --git a/Cargo.toml b/Cargo.toml index 6b7d9562c6c..9a9a531db29 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -98,7 +98,7 @@ full = [ "merge_sort", "compute", # parses timezones used in timestamp conversions - "chrono-tz", + "chrono-tz" ] merge_sort = ["itertools"] io_csv = ["io_csv_read", "io_csv_write"] @@ -125,6 +125,9 @@ compute = ["strength_reduce", "multiversion", "lexical-core", "ahash"] io_parquet = ["parquet2", "io_ipc", "base64", "futures"] benchmarks = ["rand"] simd = ["packed_simd"] +# uses a custom allocator whose pointers are aligned along cache lines. +# Using this features makes `Buffer` and `MutableBuffer` incompatible with `Vec`. +cache_aligned = [] [package.metadata.cargo-all-features] skip_feature_sets = [ diff --git a/arrow-pyarrow-integration-testing/src/lib.rs b/arrow-pyarrow-integration-testing/src/lib.rs index 0f6bdf8e2f7..a493c909142 100644 --- a/arrow-pyarrow-integration-testing/src/lib.rs +++ b/arrow-pyarrow-integration-testing/src/lib.rs @@ -152,14 +152,8 @@ fn round_trip_field(array: PyObject, py: Python) -> PyResult { to_py_field(&field, py) } -#[pyfunction] -fn total_allocated_bytes() -> PyResult { - Ok(arrow2::total_allocated_bytes()) -} - #[pymodule] fn arrow_pyarrow_integration_testing(_py: Python, m: &PyModule) -> PyResult<()> { - m.add_function(wrap_pyfunction!(total_allocated_bytes, m)?)?; m.add_function(wrap_pyfunction!(round_trip_array, m)?)?; m.add_function(wrap_pyfunction!(round_trip_field, m)?)?; Ok(()) diff --git a/arrow-pyarrow-integration-testing/tests/test_sql.py b/arrow-pyarrow-integration-testing/tests/test_sql.py index 5a6da2bdf65..2998e27a0b0 100644 --- a/arrow-pyarrow-integration-testing/tests/test_sql.py +++ b/arrow-pyarrow-integration-testing/tests/test_sql.py @@ -33,18 +33,9 @@ def __reduce__(self): class TestCase(unittest.TestCase): def setUp(self): - self.old_allocated_rust = ( - arrow_pyarrow_integration_testing.total_allocated_bytes() - ) self.old_allocated_cpp = pyarrow.total_allocated_bytes() def tearDown(self): - # No leak of Rust - self.assertEqual( - self.old_allocated_rust, - arrow_pyarrow_integration_testing.total_allocated_bytes(), - ) - # No leak of C++ memory self.assertEqual(self.old_allocated_cpp, pyarrow.total_allocated_bytes()) diff --git a/benches/arithmetic_kernels.rs b/benches/arithmetic_kernels.rs index 0faa06ff389..855850b6f98 100644 --- a/benches/arithmetic_kernels.rs +++ b/benches/arithmetic_kernels.rs @@ -1,20 +1,3 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - #[macro_use] extern crate criterion; use criterion::Criterion; @@ -22,10 +5,11 @@ use criterion::Criterion; use arrow2::array::*; use arrow2::util::bench_util::*; use arrow2::{ - compute::arithmetics::basic::div::div_scalar, datatypes::DataType, types::NativeType, + compute::arithmetics::basic::add::add, compute::arithmetics::basic::div::div_scalar, + datatypes::DataType, types::NativeType, }; use num_traits::NumCast; -use std::ops::Div; +use std::ops::{Add, Div}; fn bench_div_scalar(lhs: &PrimitiveArray, rhs: &T) where @@ -34,17 +18,31 @@ where criterion::black_box(div_scalar(lhs, rhs)); } +fn bench_add(lhs: &PrimitiveArray, rhs: &PrimitiveArray) +where + T: NativeType + Add + NumCast, +{ + criterion::black_box(add(lhs, rhs)).unwrap(); +} + fn add_benchmark(c: &mut Criterion) { - let size = 65536; - let arr = create_primitive_array_with_seed::(size, DataType::UInt64, 0.0, 43); + (10..=20).step_by(2).for_each(|log2_size| { + let size = 2usize.pow(log2_size); + let arr_a = create_primitive_array_with_seed::(size, DataType::UInt64, 0.0, 43); + let arr_b = create_primitive_array_with_seed::(size, DataType::UInt64, 0.0, 42); - c.bench_function("divide_scalar 4", |b| { - // 4 is a very fast optimizable divisor - b.iter(|| bench_div_scalar(&arr, &4)) - }); - c.bench_function("divide_scalar prime", |b| { - // large prime number that is probably harder to simplify - b.iter(|| bench_div_scalar(&arr, &524287)) + c.bench_function(&format!("divide_scalar 2^{}", log2_size), |b| { + // 4 is a very fast optimizable divisor + b.iter(|| bench_div_scalar(&arr_a, &4)) + }); + c.bench_function(&format!("divide_scalar prime 2^{}", log2_size), |b| { + // large prime number that is probably harder to simplify + b.iter(|| bench_div_scalar(&arr_a, &524287)) + }); + + c.bench_function(&format!("add 2^{}", log2_size), |b| { + b.iter(|| bench_add(&arr_a, &arr_b)) + }); }); } diff --git a/src/alloc/mod.rs b/src/alloc/mod.rs index deda8a83124..36b9f721583 100644 --- a/src/alloc/mod.rs +++ b/src/alloc/mod.rs @@ -34,11 +34,6 @@ pub use alignment::ALIGNMENT; // If this number is not zero after all objects have been `drop`, there is a memory leak static mut ALLOCATIONS: AtomicIsize = AtomicIsize::new(0); -/// Returns the total number of bytes allocated to buffers by the allocator. -pub fn total_allocated_bytes() -> isize { - unsafe { ALLOCATIONS.load(std::sync::atomic::Ordering::SeqCst) } -} - /// # Safety /// This pointer may only be used to check if memory is allocated. #[inline] diff --git a/src/bitmap/bitmap_ops.rs b/src/bitmap/bitmap_ops.rs index b402b717bf4..8e4fc88344b 100644 --- a/src/bitmap/bitmap_ops.rs +++ b/src/bitmap/bitmap_ops.rs @@ -112,9 +112,9 @@ where let iterator = iter.map(|left| op(left)).chain(std::iter::once(rem)); - let buffer = MutableBuffer::from_trusted_len_iter(iterator); + let buffer = MutableBuffer::from_chunk_iter(iterator); - Bitmap::from_u8_buffer(buffer.into(), length) + Bitmap::from_u8_buffer(buffer, length) } /// Apply a bitwise operation `op` to one input and return the result as a [`Bitmap`]. diff --git a/src/buffer/bytes.rs b/src/buffer/bytes.rs index eeb28ae1374..e057f0fd942 100644 --- a/src/buffer/bytes.rs +++ b/src/buffer/bytes.rs @@ -5,9 +5,10 @@ use std::slice; use std::{fmt::Debug, fmt::Formatter}; use std::{ptr::NonNull, sync::Arc}; -use crate::alloc; use crate::ffi; use crate::types::NativeType; +#[cfg(feature = "cache_aligned")] +use crate::vec::AlignedVec as Vec; /// Mode of deallocating memory regions pub enum Deallocation { @@ -79,11 +80,6 @@ impl Bytes { self.len } - #[inline] - pub fn is_empty(&self) -> bool { - self.len == 0 - } - #[inline] pub fn ptr(&self) -> NonNull { self.ptr @@ -94,9 +90,12 @@ impl Drop for Bytes { #[inline] fn drop(&mut self) { match &self.deallocation { - Deallocation::Native(capacity) => { - unsafe { alloc::free_aligned(self.ptr, *capacity) }; - } + Deallocation::Native(capacity) => unsafe { + #[cfg(feature = "cache_aligned")] + let _ = Vec::from_raw_parts(self.ptr, self.len, *capacity); + #[cfg(not(feature = "cache_aligned"))] + let _ = Vec::from_raw_parts(self.ptr.as_ptr(), self.len, *capacity); + }, // foreign interface knows how to deallocate itself. Deallocation::Foreign(_) => (), } @@ -127,6 +126,6 @@ impl Debug for Bytes { } } -// This is sound because `Bytes` is an imutable container +// This is sound because `Bytes` is an immutable container unsafe impl Send for Bytes {} unsafe impl Sync for Bytes {} diff --git a/src/buffer/immutable.rs b/src/buffer/immutable.rs index 63fe9260f89..eab542e67e3 100644 --- a/src/buffer/immutable.rs +++ b/src/buffer/immutable.rs @@ -52,9 +52,18 @@ impl Buffer { MutableBuffer::from_len_zeroed(length).into() } - /// Auxiliary method to create a new Buffer + /// Takes ownership of [`Vec`]. + /// # Implementation + /// This function is `O(1)` + #[cfg(not(feature = "cache_aligned"))] + #[cfg_attr(docsrs, doc(cfg(not(feature = "cache_aligned"))))] #[inline] - pub fn from_bytes(bytes: Bytes) -> Self { + pub fn from_vec(data: Vec) -> Self { + MutableBuffer::from_vec(data).into() + } + + /// Auxiliary method to create a new Buffer + pub(crate) fn from_bytes(bytes: Bytes) -> Self { let length = bytes.len(); Buffer { data: Arc::new(bytes), diff --git a/src/buffer/mod.rs b/src/buffer/mod.rs index bdb13195a3a..6a75438a4d1 100644 --- a/src/buffer/mod.rs +++ b/src/buffer/mod.rs @@ -6,7 +6,6 @@ mod immutable; mod mutable; pub(crate) mod bytes; -pub(crate) mod util; pub use immutable::Buffer; pub use mutable::MutableBuffer; diff --git a/src/buffer/mutable.rs b/src/buffer/mutable.rs index a004181faf8..70c4e4415d0 100644 --- a/src/buffer/mutable.rs +++ b/src/buffer/mutable.rs @@ -1,27 +1,20 @@ use std::iter::FromIterator; -use std::mem::size_of; use std::ptr::NonNull; use std::usize; -use crate::types::NativeType; -use crate::{alloc, trusted_len::TrustedLen}; +use crate::trusted_len::TrustedLen; +use crate::types::{BitChunk, NativeType}; -use super::{ - bytes::{Bytes, Deallocation}, - util, -}; +use super::bytes::{Bytes, Deallocation}; +#[cfg(feature = "cache_aligned")] +use crate::vec::AlignedVec as Vec; use super::immutable::Buffer; -#[inline] -fn capacity_multiple_of_64(capacity: usize) -> usize { - util::round_upto_multiple_of_64(capacity * size_of::()) / size_of::() -} - /// A [`MutableBuffer`] is this crates' interface to store types that are byte-like such as `i32`. -/// It behaves like a [`Vec`], with the following differences: -/// * memory is allocated along cache lines and in multiple of 64 bytes. -/// * it can only hold types supported by the arrow format (`u8-u64`, `i8-i128`, `f32,f64` and [`crate::types::days_ms`]) +/// It behaves like a [`Vec`] but can only hold types supported by the arrow format +/// (`u8-u64`, `i8-i128`, `f32,f64`, [`crate::types::days_ms`] and [`crate::types::months_days_ns`]). +/// When the feature `cache_aligned` is active, memory is allocated along cache lines and in multiple of 64 bytes. /// A [`MutableBuffer`] can be converted to a [`Buffer`] via `.into`. /// # Example /// ``` @@ -34,11 +27,15 @@ fn capacity_multiple_of_64(capacity: usize) -> usize { /// assert_eq!(buffer.as_slice(), &[256, 1]) /// ``` pub struct MutableBuffer { - // dangling iff capacity = 0 - ptr: NonNull, - // invariant: len <= capacity - len: usize, - capacity: usize, + data: Vec, +} + +#[cfg(not(feature = "cache_aligned"))] +#[cfg_attr(docsrs, doc(cfg(not(feature = "cache_aligned"))))] +impl From> for Vec { + fn from(data: MutableBuffer) -> Self { + data.data + } } impl std::fmt::Debug for MutableBuffer { @@ -57,26 +54,25 @@ impl MutableBuffer { /// Creates an empty [`MutableBuffer`]. This does not allocate in the heap. #[inline] pub fn new() -> Self { - let ptr = alloc::allocate_aligned(0); - Self { - ptr, - len: 0, - capacity: 0, - } + Self { data: Vec::new() } } /// Allocate a new [`MutableBuffer`] with initial capacity to be at least `capacity`. #[inline] pub fn with_capacity(capacity: usize) -> Self { - let capacity = capacity_multiple_of_64::(capacity); - let ptr = alloc::allocate_aligned(capacity); Self { - ptr, - len: 0, - capacity, + data: Vec::with_capacity(capacity), } } + /// Takes ownership of [`Vec`]. + #[cfg(not(feature = "cache_aligned"))] + #[cfg_attr(docsrs, doc(cfg(not(feature = "cache_aligned"))))] + #[inline] + pub fn from_vec(data: Vec) -> Self { + Self { data } + } + /// Allocates a new [MutableBuffer] with `len` and capacity to be at least `len` /// where data is zeroed. /// # Example @@ -90,13 +86,11 @@ impl MutableBuffer { /// ``` #[inline] pub fn from_len_zeroed(len: usize) -> Self { - let new_capacity = capacity_multiple_of_64::(len); - let ptr = alloc::allocate_aligned_zeroed(new_capacity); - Self { - ptr, - len, - capacity: new_capacity, - } + #[cfg(not(feature = "cache_aligned"))] + let data = vec![T::default(); len]; + #[cfg(feature = "cache_aligned")] + let data = Vec::from_len_zeroed(len); + Self { data } } /// Ensures that this buffer has at least `self.len + additional` bytes. This re-allocates iff @@ -114,17 +108,7 @@ impl MutableBuffer { // exits. #[inline(always)] pub fn reserve(&mut self, additional: usize) { - let required_cap = self.len + additional; - if required_cap > self.capacity { - // JUSTIFICATION - // Benefit - // necessity - // Soundness - // `self.data` is valid for `self.capacity`. - let (ptr, new_capacity) = unsafe { reallocate(self.ptr, self.capacity, required_cap) }; - self.ptr = ptr; - self.capacity = new_capacity; - } + self.data.reserve(additional) } /// Resizes the buffer, either truncating its contents (with no change in capacity), or @@ -140,91 +124,66 @@ impl MutableBuffer { // exits. #[inline(always)] pub fn resize(&mut self, new_len: usize, value: T) { - if new_len > self.len { - if self.capacity == 0 && value == T::default() { - // edge case where the allocate - let required_cap = capacity_multiple_of_64::(new_len); - let ptr = alloc::allocate_aligned_zeroed(required_cap); - self.ptr = ptr; - self.capacity = required_cap; - self.len = new_len; - return; - } - - let diff = new_len - self.len; - self.reserve(diff); - unsafe { - // write the value - let mut ptr = self.ptr.as_ptr().add(self.len); - (0..diff).for_each(|_| { - std::ptr::write(ptr, value); - ptr = ptr.add(1); - }) - } - } - // this truncates the buffer when new_len < self.len - self.len = new_len; + self.data.resize(new_len, value) } /// Returns whether this buffer is empty. #[inline] pub fn is_empty(&self) -> bool { - self.len == 0 + self.data.is_empty() } /// Returns the length (the number of items) in this buffer. /// The invariant `buffer.len() <= buffer.capacity()` is always upheld. #[inline] pub fn len(&self) -> usize { - self.len + self.data.len() } /// Returns the total capacity in this buffer. /// The invariant `buffer.len() <= buffer.capacity()` is always upheld. #[inline] pub fn capacity(&self) -> usize { - self.capacity + self.data.capacity() } /// Clear all existing data from this buffer. #[inline] pub fn clear(&mut self) { - self.len = 0 + self.data.clear() } /// Shortens the buffer. /// If `len` is greater or equal to the buffers' current length, this has no effect. #[inline] pub fn truncate(&mut self, len: usize) { - if len < self.len { - self.len = len; - } + self.data.truncate(len) } /// Returns the data stored in this buffer as a slice. #[inline] pub fn as_slice(&self) -> &[T] { - self + self.data.as_slice() } /// Returns the data stored in this buffer as a mutable slice. #[inline] pub fn as_mut_slice(&mut self) -> &mut [T] { - self + self.data.as_mut_slice() } /// Returns a raw pointer to this buffer's internal memory /// This pointer is guaranteed to be aligned along cache-lines. #[inline] pub fn as_ptr(&self) -> *const T { - self.ptr.as_ptr() + self.data.as_ptr() } /// Returns a mutable raw pointer to this buffer's internal memory /// This pointer is guaranteed to be aligned along cache-lines. #[inline] pub fn as_mut_ptr(&mut self) -> *mut T { - self.ptr.as_ptr() + self.data.as_mut_ptr() } /// Extends this buffer from a slice of items, increasing its capacity if needed. @@ -237,14 +196,7 @@ impl MutableBuffer { /// ``` #[inline] pub fn extend_from_slice(&mut self, items: &[T]) { - let additional = items.len(); - self.reserve(additional); - unsafe { - let dst = self.ptr.as_ptr().add(self.len); - let src = items.as_ptr(); - std::ptr::copy_nonoverlapping(src, dst, additional) - } - self.len += additional; + self.data.extend_from_slice(items) } /// Pushes a new item to the buffer, increasing its capacity if needed. @@ -257,12 +209,7 @@ impl MutableBuffer { /// ``` #[inline] pub fn push(&mut self, item: T) { - self.reserve(1); - unsafe { - let dst = self.ptr.as_ptr().add(self.len) as *mut T; - std::ptr::write(dst, item); - } - self.len += 1; + self.data.push(item) } /// Extends the buffer with a new item without checking for sufficient capacity @@ -270,13 +217,13 @@ impl MutableBuffer { /// Caller must ensure that `self.capacity() - self.len() >= 1` #[inline] pub(crate) unsafe fn push_unchecked(&mut self, item: T) { - let dst = self.ptr.as_ptr().add(self.len); + let dst = self.as_mut_ptr().add(self.len()); std::ptr::write(dst, item); - self.len += 1; + self.data.set_len(self.data.len() + 1); } /// Sets the length of this buffer. - /// # Safety: + /// # Safety /// The caller must uphold the following invariants: /// * ensure no reads are performed on any /// item within `[len, capacity - len]` @@ -284,7 +231,7 @@ impl MutableBuffer { #[inline] pub unsafe fn set_len(&mut self, len: usize) { debug_assert!(len <= self.capacity()); - self.len = len; + self.data.set_len(len); } /// Extends this buffer by `additional` items of value `value`. @@ -294,7 +241,7 @@ impl MutableBuffer { } /// Shrinks the capacity of the [`MutableBuffer`] to fit its current length. - /// The new capacity will be a multiple of 64 bytes. + /// When the feature `cache_aligned`, the new capacity will be a multiple of 64 bytes. /// /// # Example /// ``` @@ -306,64 +253,16 @@ impl MutableBuffer { /// buffer.push(2); /// /// buffer.shrink_to_fit(); - /// assert!(buffer.capacity() == 8); + /// assert!(buffer.capacity() < 16); // 2 or 8 depending on feature `cache_aligned` /// ``` pub fn shrink_to_fit(&mut self) { - let new_capacity = capacity_multiple_of_64::(self.len); - if new_capacity < self.capacity { - // JUSTIFICATION - // Benefit - // necessity - // Soundness - // `self.ptr` is valid for `self.capacity`. - let ptr = unsafe { alloc::reallocate(self.ptr, self.capacity, new_capacity) }; - - self.ptr = ptr; - self.capacity = new_capacity; - } + self.data.shrink_to_fit(); } } -/// # Safety -/// `ptr` must be allocated for `old_capacity`. -#[inline] -unsafe fn reallocate( - ptr: NonNull, - old_capacity: usize, - new_capacity: usize, -) -> (NonNull, usize) { - let new_capacity = capacity_multiple_of_64::(new_capacity); - let new_capacity = std::cmp::max(new_capacity, old_capacity * 2); - let ptr = alloc::reallocate(ptr, old_capacity, new_capacity); - (ptr, new_capacity) -} - impl Extend for MutableBuffer { fn extend>(&mut self, iter: T) { - let mut iterator = iter.into_iter(); - let (lower, _) = iterator.size_hint(); - let additional = lower; - self.reserve(additional); - - // this is necessary because of https://github.com/rust-lang/rust/issues/32155 - let mut len = SetLenOnDrop::new(&mut self.len); - let mut dst = unsafe { self.ptr.as_ptr().add(len.local_len) as *mut A }; - let capacity = self.capacity; - - while len.local_len < capacity { - if let Some(item) = iterator.next() { - unsafe { - std::ptr::write(dst, item); - dst = dst.add(1); - } - len.local_len += 1; - } else { - break; - } - } - drop(len); - - iterator.for_each(|item| self.push(item)); + self.data.extend(iter) } } @@ -388,19 +287,21 @@ impl MutableBuffer { let upper = upper.expect("trusted_len_iter requires an upper limit"); let len = upper; + let self_len = self.len(); + self.reserve(len); - let mut dst = self.ptr.as_ptr().add(self.len); + let mut dst = self.as_mut_ptr().add(self_len); for item in iterator { // note how there is no reserve here (compared with `extend_from_iter`) std::ptr::write(dst, item); dst = dst.add(1); } assert_eq!( - dst.offset_from(self.ptr.as_ptr().add(self.len)) as usize, + dst.offset_from(self.as_ptr().add(self_len)) as usize, upper, "Trusted iterator length was not accurately reported" ); - self.len += len; + self.set_len(self_len + len); } /// Creates a [`MutableBuffer`] from an [`Iterator`] with a trusted (upper) length. @@ -474,41 +375,25 @@ impl MutableBuffer { let mut buffer = MutableBuffer::with_capacity(len); - let mut dst = buffer.ptr.as_ptr(); + let mut dst = buffer.as_mut_ptr(); for item in iterator { std::ptr::write(dst, item?); dst = dst.add(1); } assert_eq!( - dst.offset_from(buffer.ptr.as_ptr()) as usize, + dst.offset_from(buffer.as_ptr()) as usize, upper, "Trusted iterator length was not accurately reported" ); - buffer.len = len; + buffer.set_len(len); Ok(buffer) } } impl FromIterator for MutableBuffer { fn from_iter>(iter: I) -> Self { - let mut iterator = iter.into_iter(); - - // first iteration, which will likely reserve sufficient space for the buffer. - let mut buffer = match iterator.next() { - None => MutableBuffer::new(), - Some(element) => { - let (lower, _) = iterator.size_hint(); - let mut buffer = MutableBuffer::with_capacity(lower.saturating_add(1)); - unsafe { - std::ptr::write(buffer.as_mut_ptr(), element); - buffer.len = 1; - } - buffer - } - }; - - buffer.extend(iterator); - buffer + let data = Vec::from_iter(iter); + Self { data } } } @@ -523,42 +408,14 @@ impl std::ops::Deref for MutableBuffer { #[inline] fn deref(&self) -> &[T] { - unsafe { std::slice::from_raw_parts(self.as_ptr(), self.len) } + &self.data } } impl std::ops::DerefMut for MutableBuffer { #[inline] fn deref_mut(&mut self) -> &mut [T] { - unsafe { std::slice::from_raw_parts_mut(self.as_mut_ptr(), self.len) } - } -} - -impl Drop for MutableBuffer { - fn drop(&mut self) { - unsafe { alloc::free_aligned(self.ptr, self.capacity) }; - } -} - -struct SetLenOnDrop<'a> { - len: &'a mut usize, - local_len: usize, -} - -impl<'a> SetLenOnDrop<'a> { - #[inline] - fn new(len: &'a mut usize) -> Self { - SetLenOnDrop { - local_len: *len, - len, - } - } -} - -impl Drop for SetLenOnDrop<'_> { - #[inline] - fn drop(&mut self) { - *self.len = self.local_len; + &mut self.data } } @@ -574,53 +431,61 @@ impl> From

for MutableBuffer { impl From> for Buffer { #[inline] fn from(buffer: MutableBuffer) -> Self { - Buffer::from_bytes(buffer.into()) + Self::from_bytes(buffer.into()) } } impl From> for Bytes { #[inline] fn from(buffer: MutableBuffer) -> Self { - let result = unsafe { - Bytes::new( - buffer.ptr, - buffer.len, - Deallocation::Native(buffer.capacity), - ) - }; + let mut data = buffer.data; + let ptr = NonNull::new(data.as_mut_ptr()).unwrap(); + let len = data.len(); + let capacity = data.capacity(); + + let result = unsafe { Bytes::new(ptr, len, Deallocation::Native(capacity)) }; // so that the memory region is not deallocated. - std::mem::forget(buffer); + std::mem::forget(data); result } } -impl From> for MutableBuffer { - #[inline] - fn from(buffer: MutableBuffer) -> Self { - let ratio = std::mem::size_of::() / std::mem::size_of::(); - - let capacity = buffer.capacity * ratio; - let len = buffer.len * ratio; - let ptr = unsafe { NonNull::new_unchecked(buffer.ptr.as_ptr() as *mut u8) }; - // so that the memory region is not deallocated; ownership was transfered - std::mem::forget(buffer); - Self { ptr, len, capacity } - } -} - impl MutableBuffer { /// Creates a [`MutableBuffer`] from an iterator of `u64`. #[inline] - pub fn from_chunk_iter>(iter: I) -> Self { - MutableBuffer::from_trusted_len_iter(iter).into() + pub fn from_chunk_iter>(iter: I) -> Self { + // TrustedLen + unsafe { Self::from_chunk_iter_unchecked(iter) } } /// # Safety /// This method assumes that the iterator's size is correct and is undefined behavior /// to use it on an iterator that reports an incorrect length. #[inline] - pub unsafe fn from_chunk_iter_unchecked>(iter: I) -> Self { - MutableBuffer::from_trusted_len_iter_unchecked(iter).into() + pub unsafe fn from_chunk_iter_unchecked>( + iterator: I, + ) -> Self { + let (_, upper) = iterator.size_hint(); + let upper = upper.expect("try_from_trusted_len_iter requires an upper limit"); + let len = upper * std::mem::size_of::(); + + let mut buffer = MutableBuffer::with_capacity(len); + + let mut dst = buffer.as_mut_ptr(); + for item in iterator { + let bytes = item.to_ne_bytes(); + for i in 0..std::mem::size_of::() { + std::ptr::write(dst, bytes[i]); + dst = dst.add(1); + } + } + assert_eq!( + dst.offset_from(buffer.as_ptr()) as usize, + len, + "Trusted iterator length was not accurately reported" + ); + buffer.set_len(len); + buffer } } diff --git a/src/buffer/util.rs b/src/buffer/util.rs deleted file mode 100644 index 7cd1324f196..00000000000 --- a/src/buffer/util.rs +++ /dev/null @@ -1,12 +0,0 @@ -/// Returns the nearest number that is `>=` than `num` and is a multiple of 64 -#[inline] -pub fn round_upto_multiple_of_64(num: usize) -> usize { - round_upto_power_of_2(num, 64) -} - -/// Returns the nearest multiple of `factor` that is `>=` than `num`. Here `factor` must -/// be a power of 2. -pub fn round_upto_power_of_2(num: usize, factor: usize) -> usize { - debug_assert!(factor > 0 && (factor & (factor - 1)) == 0); - (num + (factor - 1)) & !(factor - 1) -} diff --git a/src/lib.rs b/src/lib.rs index 272f17afaae..a6ca6598496 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,9 +2,10 @@ #![cfg_attr(docsrs, feature(doc_cfg))] -pub mod alloc; #[macro_use] pub mod array; +#[cfg(feature = "cache_aligned")] +mod alloc; pub mod bitmap; pub mod buffer; mod endianess; @@ -12,6 +13,8 @@ pub mod error; pub mod scalar; pub mod trusted_len; pub mod types; +#[cfg(feature = "cache_aligned")] +mod vec; #[cfg(feature = "compute")] #[cfg_attr(docsrs, doc(cfg(feature = "compute")))] @@ -19,7 +22,6 @@ pub mod compute; pub mod io; pub mod record_batch; pub mod temporal_conversions; -pub use alloc::total_allocated_bytes; pub mod datatypes; diff --git a/src/types/bit_chunk.rs b/src/types/bit_chunk.rs index c3b1768454e..76cb1e71b7a 100644 --- a/src/types/bit_chunk.rs +++ b/src/types/bit_chunk.rs @@ -3,12 +3,15 @@ use std::{ ops::{BitAnd, BitAndAssign, BitOr, Not, Shl, ShlAssign, ShrAssign}, }; -/// Something that can be use as a chunk of bits. This is used to create masks of a given number -/// of lengths, whose width is `1`. In `simd_packed` notation, this corresponds to `m1xY`. +use super::NativeType; + +/// Something that can be use as a chunk of bits. This is used to create masks ofa given number +/// of length, whose width is `1`. In `simd_packed` notation, this corresponds to `m1xY`. /// # Safety /// Do not implement. pub unsafe trait BitChunk: Sized + + NativeType + Copy + std::fmt::Debug + Binary @@ -22,12 +25,6 @@ pub unsafe trait BitChunk: + BitAndAssign + BitOr { - /// The representation of this type in the stack. - type Bytes: std::ops::Index - + std::ops::IndexMut - + for<'a> std::convert::TryFrom<&'a [u8]> - + std::fmt::Debug; - /// A value with a single bit set at the most right position. fn one() -> Self; /// A value with no bits set. @@ -39,8 +36,6 @@ pub unsafe trait BitChunk: } unsafe impl BitChunk for u8 { - type Bytes = [u8; 1]; - #[inline(always)] fn zero() -> Self { 0 @@ -63,8 +58,6 @@ unsafe impl BitChunk for u8 { } unsafe impl BitChunk for u16 { - type Bytes = [u8; 2]; - #[inline(always)] fn zero() -> Self { 0 @@ -87,8 +80,6 @@ unsafe impl BitChunk for u16 { } unsafe impl BitChunk for u32 { - type Bytes = [u8; 4]; - #[inline(always)] fn zero() -> Self { 0 @@ -111,8 +102,6 @@ unsafe impl BitChunk for u32 { } unsafe impl BitChunk for u64 { - type Bytes = [u8; 8]; - #[inline(always)] fn zero() -> Self { 0 diff --git a/src/types/mod.rs b/src/types/mod.rs index 27dc0183996..b990b56ede7 100644 --- a/src/types/mod.rs +++ b/src/types/mod.rs @@ -66,12 +66,20 @@ pub unsafe trait NativeType: + Sized + 'static { - /// Type denoting its representation as bytes - type Bytes: AsRef<[u8]> + for<'a> TryFrom<&'a [u8]>; + /// Type denoting its representation as bytes. + /// This must be `[u8; N]` where `N = size_of::`. + type Bytes: AsRef<[u8]> + + std::ops::Index + + std::ops::IndexMut + + for<'a> TryFrom<&'a [u8]> + + std::fmt::Debug; /// To bytes in little endian fn to_le_bytes(&self) -> Self::Bytes; + /// To bytes in native endian + fn to_ne_bytes(&self) -> Self::Bytes; + /// To bytes in big endian fn to_be_bytes(&self) -> Self::Bytes; @@ -93,6 +101,11 @@ macro_rules! native { Self::to_be_bytes(*self) } + #[inline] + fn to_ne_bytes(&self) -> Self::Bytes { + Self::to_ne_bytes(*self) + } + #[inline] fn from_be_bytes(bytes: Self::Bytes) -> Self { Self::from_be_bytes(bytes) @@ -171,6 +184,22 @@ unsafe impl NativeType for days_ms { result } + #[inline] + fn to_ne_bytes(&self) -> Self::Bytes { + let days = self.0[0].to_ne_bytes(); + let ms = self.0[1].to_ne_bytes(); + let mut result = [0; 8]; + result[0] = days[0]; + result[1] = days[1]; + result[2] = days[2]; + result[3] = days[3]; + result[4] = ms[0]; + result[5] = ms[1]; + result[6] = ms[2]; + result[7] = ms[3]; + result + } + #[inline] fn to_be_bytes(&self) -> Self::Bytes { let days = self.0[0].to_be_bytes(); @@ -259,6 +288,26 @@ unsafe impl NativeType for months_days_ns { result } + #[inline] + fn to_ne_bytes(&self) -> Self::Bytes { + let months = self.months().to_ne_bytes(); + let days = self.days().to_ne_bytes(); + let ns = self.ns().to_ne_bytes(); + let mut result = [0; 16]; + result[0] = months[0]; + result[1] = months[1]; + result[2] = months[2]; + result[3] = months[3]; + result[4] = days[0]; + result[5] = days[1]; + result[6] = days[2]; + result[7] = days[3]; + (0..8).for_each(|i| { + result[8 + i] = ns[i]; + }); + result + } + #[inline] fn to_be_bytes(&self) -> Self::Bytes { let months = self.months().to_be_bytes(); diff --git a/src/vec.rs b/src/vec.rs new file mode 100644 index 00000000000..48966c0c616 --- /dev/null +++ b/src/vec.rs @@ -0,0 +1,336 @@ +use std::iter::FromIterator; +use std::ptr::NonNull; + +use crate::alloc; +use crate::types::NativeType; + +/// Returns the nearest number that is `>=` than `num` and is a multiple of 64 +#[inline] +fn round_upto_multiple_of_64(num: usize) -> usize { + round_upto_power_of_2(num, 64) +} + +/// Returns the nearest multiple of `factor` that is `>=` than `num`. Here `factor` must +/// be a power of 2. +fn round_upto_power_of_2(num: usize, factor: usize) -> usize { + debug_assert!(factor > 0 && (factor & (factor - 1)) == 0); + (num + (factor - 1)) & !(factor - 1) +} + +#[inline] +fn capacity_multiple_of_64(capacity: usize) -> usize { + round_upto_multiple_of_64(capacity * std::mem::size_of::()) / std::mem::size_of::() +} + +/// # Safety +/// `ptr` must be allocated for `old_capacity`. +#[inline] +unsafe fn reallocate( + ptr: NonNull, + old_capacity: usize, + new_capacity: usize, +) -> (NonNull, usize) { + let new_capacity = capacity_multiple_of_64::(new_capacity); + let new_capacity = std::cmp::max(new_capacity, old_capacity * 2); + let ptr = alloc::reallocate(ptr, old_capacity, new_capacity); + (ptr, new_capacity) +} + +/// An interface equivalent to `std::vec::Vec` with an allocator aligned along cache lines. +pub(crate) struct AlignedVec { + // dangling iff capacity = 0 + ptr: NonNull, + // invariant: len <= capacity + len: usize, + capacity: usize, +} + +impl Drop for AlignedVec { + fn drop(&mut self) { + unsafe { alloc::free_aligned(self.ptr, self.capacity) } + } +} + +impl AlignedVec { + #[inline] + pub fn new() -> Self { + let ptr = alloc::allocate_aligned(0); + Self { + ptr, + len: 0, + capacity: 0, + } + } + + #[inline] + pub fn clear(&mut self) { + self.len = 0 + } + + #[inline] + pub fn capacity(&self) -> usize { + self.capacity + } + + #[inline] + pub fn truncate(&mut self, len: usize) { + if len < self.len { + self.len = len; + } + } + + /// Sets the length of this buffer. + /// # Safety: + /// The caller must uphold the following invariants: + /// * ensure no reads are performed on any + /// item within `[len, capacity - len]` + /// * ensure `len <= self.capacity()` + #[inline] + pub unsafe fn set_len(&mut self, len: usize) { + debug_assert!(len <= self.capacity()); + self.len = len; + } + + /// Returns the data stored in this buffer as a slice. + #[inline] + pub fn as_slice(&self) -> &[T] { + self + } + + /// Returns the data stored in this buffer as a mutable slice. + #[inline] + pub fn as_mut_slice(&mut self) -> &mut [T] { + self + } + + /// Returns a raw pointer to this buffer's internal memory + /// This pointer is guaranteed to be aligned along cache-lines. + #[inline] + pub fn as_ptr(&self) -> *const T { + self.ptr.as_ptr() + } + + /// Returns a mutable raw pointer to this buffer's internal memory + /// This pointer is guaranteed to be aligned along cache-lines. + #[inline] + pub fn as_mut_ptr(&mut self) -> *mut T { + self.ptr.as_ptr() + } + + #[inline] + pub fn with_capacity(capacity: usize) -> Self { + let capacity = capacity_multiple_of_64::(capacity); + let ptr = alloc::allocate_aligned(capacity); + Self { + ptr, + len: 0, + capacity, + } + } + + #[inline] + pub fn len(&self) -> usize { + self.len + } + + #[inline] + pub fn is_empty(&self) -> bool { + self.len == 0 + } + + #[inline(always)] + pub fn reserve(&mut self, additional: usize) { + let required_cap = self.len + additional; + if required_cap > self.capacity { + // JUSTIFICATION + // Benefit + // necessity + // Soundness + // `self.data` is valid for `self.capacity`. + let (ptr, new_capacity) = unsafe { reallocate(self.ptr, self.capacity, required_cap) }; + self.ptr = ptr; + self.capacity = new_capacity; + } + } + + #[inline(always)] + pub fn resize(&mut self, new_len: usize, value: T) { + if new_len > self.len { + if self.capacity == 0 && value == T::default() { + // edge case where the allocate + let required_cap = capacity_multiple_of_64::(new_len); + let ptr = alloc::allocate_aligned_zeroed(required_cap); + self.ptr = ptr; + self.capacity = required_cap; + self.len = new_len; + return; + } + + let diff = new_len - self.len; + self.reserve(diff); + unsafe { + // write the value + let mut ptr = self.ptr.as_ptr().add(self.len); + (0..diff).for_each(|_| { + std::ptr::write(ptr, value); + ptr = ptr.add(1); + }) + } + } + // this truncates the buffer when new_len < self.len + self.len = new_len; + } + + #[inline] + pub fn extend_from_slice(&mut self, items: &[T]) { + let additional = items.len(); + self.reserve(additional); + unsafe { + let dst = self.ptr.as_ptr().add(self.len); + let src = items.as_ptr(); + std::ptr::copy_nonoverlapping(src, dst, additional) + } + self.len += additional; + } + + #[inline] + pub fn push(&mut self, item: T) { + self.reserve(1); + unsafe { + let dst = self.ptr.as_ptr().add(self.len) as *mut T; + std::ptr::write(dst, item); + } + self.len += 1; + } + + pub fn shrink_to_fit(&mut self) { + let new_capacity = capacity_multiple_of_64::(self.len); + if new_capacity < self.capacity { + // JUSTIFICATION + // Benefit + // necessity + // Soundness + // `self.ptr` is valid for `self.capacity`. + let ptr = unsafe { alloc::reallocate(self.ptr, self.capacity, new_capacity) }; + + self.ptr = ptr; + self.capacity = new_capacity; + } + } + + #[inline] + pub fn from_len_zeroed(len: usize) -> Self { + let new_capacity = capacity_multiple_of_64::(len); + let ptr = alloc::allocate_aligned_zeroed(new_capacity); + Self { + ptr, + len, + capacity: new_capacity, + } + } + + #[inline] + pub unsafe fn from_raw_parts(ptr: NonNull, length: usize, capacity: usize) -> Self { + Self { + ptr, + capacity, + len: length, + } + } +} + +impl Default for AlignedVec { + fn default() -> Self { + Self::new() + } +} + +impl std::ops::Deref for AlignedVec { + type Target = [T]; + + #[inline] + fn deref(&self) -> &[T] { + unsafe { std::slice::from_raw_parts(self.as_ptr(), self.len) } + } +} + +impl std::ops::DerefMut for AlignedVec { + #[inline] + fn deref_mut(&mut self) -> &mut [T] { + unsafe { std::slice::from_raw_parts_mut(self.as_mut_ptr(), self.len) } + } +} + +impl Extend for AlignedVec { + fn extend>(&mut self, iter: T) { + let mut iterator = iter.into_iter(); + let (lower, _) = iterator.size_hint(); + let additional = lower; + self.reserve(additional); + + // this is necessary because of https://github.com/rust-lang/rust/issues/32155 + let mut len = SetLenOnDrop::new(&mut self.len); + let mut dst = unsafe { self.ptr.as_ptr().add(len.local_len) as *mut A }; + let capacity = self.capacity; + + while len.local_len < capacity { + if let Some(item) = iterator.next() { + unsafe { + std::ptr::write(dst, item); + dst = dst.add(1); + } + len.local_len += 1; + } else { + break; + } + } + drop(len); + + iterator.for_each(|item| self.push(item)); + } +} + +struct SetLenOnDrop<'a> { + len: &'a mut usize, + local_len: usize, +} + +impl<'a> SetLenOnDrop<'a> { + #[inline] + fn new(len: &'a mut usize) -> Self { + SetLenOnDrop { + local_len: *len, + len, + } + } +} + +impl Drop for SetLenOnDrop<'_> { + #[inline] + fn drop(&mut self) { + *self.len = self.local_len; + } +} + +impl FromIterator for AlignedVec { + fn from_iter>(iter: I) -> Self { + let mut iterator = iter.into_iter(); + + // first iteration, which will likely reserve sufficient space for the buffer. + let mut buffer = match iterator.next() { + None => AlignedVec::new(), + Some(element) => { + let (lower, _) = iterator.size_hint(); + let mut buffer = AlignedVec::with_capacity(lower.saturating_add(1)); + unsafe { + std::ptr::write(buffer.as_mut_ptr(), element); + buffer.len = 1; + } + buffer + } + }; + + buffer.extend(iterator); + buffer + } +} diff --git a/tests/it/alloc.rs b/tests/it/alloc.rs deleted file mode 100644 index bfd35bcae1c..00000000000 --- a/tests/it/alloc.rs +++ /dev/null @@ -1,49 +0,0 @@ -use arrow2::alloc::*; - -#[test] -fn allocate_dangling() { - let p = allocate_aligned::(0); - assert_eq!(0, (p.as_ptr() as usize) % ALIGNMENT); -} - -#[test] -fn allocate() { - let p = allocate_aligned::(1024); - assert_eq!(0, (p.as_ptr() as usize) % ALIGNMENT); - unsafe { free_aligned(p, 1024) }; -} - -#[test] -fn allocate_zeroed() { - let p = allocate_aligned_zeroed::(1024); - assert_eq!(0, (p.as_ptr() as usize) % ALIGNMENT); - unsafe { free_aligned(p, 1024) }; -} - -#[test] -fn reallocate_from_zero() { - let ptr = allocate_aligned::(0); - let ptr = unsafe { reallocate(ptr, 0, 512) }; - unsafe { free_aligned(ptr, 512) }; -} - -#[test] -fn reallocate_from_alloc() { - let ptr = allocate_aligned::(32); - let ptr = unsafe { reallocate(ptr, 32, 64) }; - unsafe { free_aligned(ptr, 64) }; -} - -#[test] -fn reallocate_smaller() { - let ptr = allocate_aligned::(32); - let ptr = unsafe { reallocate(ptr, 32, 16) }; - unsafe { free_aligned(ptr, 16) }; -} - -#[test] -fn reallocate_to_zero() { - let ptr = allocate_aligned::(32); - let ptr = unsafe { reallocate(ptr, 32, 0) }; - assert_eq!(ptr, unsafe { dangling() }); -} diff --git a/tests/it/array/utf8/mutable.rs b/tests/it/array/utf8/mutable.rs index 71187849c47..20087010743 100644 --- a/tests/it/array/utf8/mutable.rs +++ b/tests/it/array/utf8/mutable.rs @@ -7,8 +7,8 @@ use arrow2::datatypes::DataType; fn capacities() { let b = MutableUtf8Array::::with_capacities(1, 10); - assert_eq!(b.values().capacity(), 64); - assert_eq!(b.offsets().capacity(), 16); // 64 bytes + assert!(b.values().capacity() >= 10); + assert!(b.offsets().capacity() >= 2); } #[test] diff --git a/tests/it/bitmap/mutable.rs b/tests/it/bitmap/mutable.rs index fa5e57c2276..ab6a7d15789 100644 --- a/tests/it/bitmap/mutable.rs +++ b/tests/it/bitmap/mutable.rs @@ -75,14 +75,7 @@ fn push_exact_ones() { #[test] fn capacity() { let b = MutableBitmap::with_capacity(10); - assert_eq!(b.capacity(), 512); - - let b = MutableBitmap::with_capacity(512); - assert_eq!(b.capacity(), 512); - - let mut b = MutableBitmap::with_capacity(512); - b.reserve(8); - assert_eq!(b.capacity(), 512); + assert!(b.capacity() >= 10); } #[test] diff --git a/tests/it/buffer/immutable.rs b/tests/it/buffer/immutable.rs index db969849053..5fbf711ec5f 100644 --- a/tests/it/buffer/immutable.rs +++ b/tests/it/buffer/immutable.rs @@ -67,3 +67,11 @@ fn debug() { let a = format!("{:?}", buffer); assert_eq!(a, "[1, 2]") } + +#[cfg(not(feature = "cache_aligned"))] +#[test] +fn from_vec() { + let buffer = Buffer::::from_vec(vec![0, 1, 2]); + assert_eq!(buffer.len(), 3); + assert_eq!(buffer.as_slice(), &[0, 1, 2]); +} diff --git a/tests/it/buffer/mutable.rs b/tests/it/buffer/mutable.rs index cb6134b0807..ba26f4a6ebc 100644 --- a/tests/it/buffer/mutable.rs +++ b/tests/it/buffer/mutable.rs @@ -71,12 +71,7 @@ fn push() { #[test] fn capacity() { let b = MutableBuffer::::with_capacity(10); - assert_eq!(b.capacity(), 64 / std::mem::size_of::()); - let b = MutableBuffer::::with_capacity(16); - assert_eq!(b.capacity(), 16); - - let b = MutableBuffer::::with_capacity(64); - assert!(b.capacity() >= 64); + assert!(b.capacity() >= 10); let mut b = MutableBuffer::::with_capacity(16); b.reserve(4); diff --git a/tests/it/main.rs b/tests/it/main.rs index 9f9208e7f23..e83b6e88ede 100644 --- a/tests/it/main.rs +++ b/tests/it/main.rs @@ -1,4 +1,3 @@ -mod alloc; mod array; mod bitmap; mod buffer;