diff --git a/arrow-pyarrow-integration-testing/src/c_stream.rs b/arrow-pyarrow-integration-testing/src/c_stream.rs index 8998f4812ea..4f10f5cb4c5 100644 --- a/arrow-pyarrow-integration-testing/src/c_stream.rs +++ b/arrow-pyarrow-integration-testing/src/c_stream.rs @@ -23,7 +23,7 @@ pub fn to_rust_iterator(ob: PyObject, py: Python) -> PyResult> { let mut arrays = vec![]; while let Some(array) = unsafe { iter.next() } { - let py_array = to_py_array(array.unwrap().into(), py)?; + let py_array = to_py_array(array.map_err(PyO3Error::from)?, py)?; arrays.push(py_array) } Ok(arrays) @@ -38,16 +38,16 @@ pub fn from_rust_iterator(py: Python) -> PyResult { None, ) .boxed(); + // and a field with its datatype let field = Field::new("a", array.data_type().clone(), true); // create an iterator of arrays let arrays = vec![array.clone(), array.clone(), array]; - let iter = Box::new(arrays.clone().into_iter().map(Ok)) as _; + let iter = Box::new(arrays.into_iter().map(Ok)) as _; // create an [`ArrowArrayStream`] based on this iterator and field - let mut stream = Box::new(ffi::ArrowArrayStream::empty()); - unsafe { ffi::export_iterator(iter, field, &mut *stream) }; + let stream = Box::new(ffi::export_iterator(iter, field)); // call pyarrow's interface to read this stream let pa = py.import("pyarrow.ipc")?; @@ -55,7 +55,6 @@ pub fn from_rust_iterator(py: Python) -> PyResult { "_import_from_c", ((&*stream as *const ffi::ArrowArrayStream) as Py_uintptr_t,), )?; - Box::leak(stream); // this is not ideal => the struct should be allocated by pyarrow Ok(py_stream.to_object(py)) } diff --git a/arrow-pyarrow-integration-testing/src/lib.rs b/arrow-pyarrow-integration-testing/src/lib.rs index ad6a16bbadb..ddfe8005999 100644 --- a/arrow-pyarrow-integration-testing/src/lib.rs +++ b/arrow-pyarrow-integration-testing/src/lib.rs @@ -67,22 +67,21 @@ fn to_rust_array(ob: PyObject, py: Python) -> PyResult> { let field = unsafe { ffi::import_field_from_c(schema.as_ref()).map_err(PyO3Error::from)? }; let array = - unsafe { ffi::import_array_from_c(array, field.data_type).map_err(PyO3Error::from)? }; + unsafe { ffi::import_array_from_c(*array, field.data_type).map_err(PyO3Error::from)? }; - Ok(array.into()) + Ok(array) } fn to_py_array(array: Box, py: Python) -> PyResult { - let array_ptr = Box::new(ffi::ArrowArray::empty()); - let schema_ptr = Box::new(ffi::ArrowSchema::empty()); + let schema = Box::new(ffi::export_field_to_c(&Field::new( + "", + array.data_type().clone(), + true, + ))); + let array = Box::new(ffi::export_array_to_c(array)); - let array_ptr = Box::into_raw(array_ptr); - let schema_ptr = Box::into_raw(schema_ptr); - - unsafe { - ffi::export_field_to_c(&Field::new("", array.data_type().clone(), true), schema_ptr); - ffi::export_array_to_c(array, array_ptr); - }; + let schema_ptr: *const arrow2::ffi::ArrowSchema = &*schema; + let array_ptr: *const arrow2::ffi::ArrowArray = &*array; let pa = py.import("pyarrow")?; @@ -91,11 +90,6 @@ fn to_py_array(array: Box, py: Python) -> PyResult { (array_ptr as Py_uintptr_t, schema_ptr as Py_uintptr_t), )?; - unsafe { - Box::from_raw(array_ptr); - Box::from_raw(schema_ptr); - }; - Ok(array.to_object(py)) } @@ -115,12 +109,8 @@ fn to_rust_field(ob: PyObject, py: Python) -> PyResult { } fn to_py_field(field: &Field, py: Python) -> PyResult { - let schema_ptr = Box::new(ffi::ArrowSchema::empty()); - let schema_ptr = Box::into_raw(schema_ptr); - - unsafe { - ffi::export_field_to_c(field, schema_ptr); - }; + let schema = Box::new(ffi::export_field_to_c(field)); + let schema_ptr: *const arrow2::ffi::ArrowSchema = &*schema; let pa = py.import("pyarrow")?; @@ -128,8 +118,6 @@ fn to_py_field(field: &Field, py: Python) -> PyResult { .getattr("Field")? .call_method1("_import_from_c", (schema_ptr as Py_uintptr_t,))?; - unsafe { Box::from_raw(schema_ptr) }; - Ok(array.to_object(py)) } diff --git a/examples/ffi.rs b/examples/ffi.rs index 84a65599e4a..ed5b9cd87e7 100644 --- a/examples/ffi.rs +++ b/examples/ffi.rs @@ -3,18 +3,19 @@ use arrow2::datatypes::Field; use arrow2::error::Result; use arrow2::ffi; -unsafe fn export( - array: Box, - array_ptr: *mut ffi::ArrowArray, - schema_ptr: *mut ffi::ArrowSchema, -) { - // exporting an array requires an associated field so that the consumer knows its datatype +fn export(array: Box) -> (ffi::ArrowArray, ffi::ArrowSchema) { + // importing an array requires an associated field so that the consumer knows its datatype. + // Thus, we need to export both let field = Field::new("a", array.data_type().clone(), true); - ffi::export_array_to_c(array, array_ptr); - ffi::export_field_to_c(&field, schema_ptr); + ( + ffi::export_array_to_c(array), + ffi::export_field_to_c(&field), + ) } -unsafe fn import(array: Box, schema: &ffi::ArrowSchema) -> Result> { +/// # Safety +/// `ArrowArray` and `ArrowSchema` must be valid +unsafe fn import(array: ffi::ArrowArray, schema: &ffi::ArrowSchema) -> Result> { let field = ffi::import_field_from_c(schema)?; ffi::import_array_from_c(array, field.data_type) } @@ -23,19 +24,14 @@ fn main() -> Result<()> { // let's assume that we have an array: let array = PrimitiveArray::::from([Some(1), None, Some(123)]).boxed(); - // the goal is to export this array and import it back via FFI. - // to import, we initialize the structs that will receive the data - let mut array_ptr = Box::new(ffi::ArrowArray::empty()); - let mut schema_ptr = Box::new(ffi::ArrowSchema::empty()); + // here we export - `array_ffi` and `schema_ffi` are the structs of the C data interface + let (array_ffi, schema_ffi) = export(array.clone()); - // this is where a producer (in this case also us ^_^) writes to the pointers' location. - // `array` here could be anything or not even be available, if this was e.g. from Python. - // Safety: we just allocated the pointers - unsafe { export(array.clone(), &mut *array_ptr, &mut *schema_ptr) }; + // here we import them. Often the structs are wrapped in a pointer. In that case you + // need to read the pointer to the stack. - // and finally interpret the written memory into a new array. // Safety: we used `export`, which is a valid exporter to the C data interface - let new_array = unsafe { import(array_ptr, schema_ptr.as_ref())? }; + let new_array = unsafe { import(array_ffi, &schema_ffi)? }; // which is equal to the exported array assert_eq!(array.as_ref(), new_array.as_ref()); diff --git a/src/buffer/bytes.rs b/src/buffer/bytes.rs index 3d573da5fe5..67ad7bed132 100644 --- a/src/buffer/bytes.rs +++ b/src/buffer/bytes.rs @@ -1,14 +1,14 @@ use std::mem::ManuallyDrop; use std::ops::{Deref, DerefMut}; use std::panic::RefUnwindSafe; -use std::{ptr::NonNull, sync::Arc}; +use std::ptr::NonNull; /// Mode of deallocating memory regions enum Allocation { /// Native allocation Native, // A foreign allocator and its ref count - Foreign(Arc), + Foreign(Box), } /// A continuous memory region that may be allocated externally. @@ -35,7 +35,7 @@ impl Bytes { pub unsafe fn from_owned( ptr: std::ptr::NonNull, len: usize, - owner: Arc, + owner: Box, ) -> Self { // This line is technically outside the assumptions of `Vec::from_raw_parts`, since // `ptr` was not allocated by `Vec`. However, one of the invariants of this struct diff --git a/src/ffi/array.rs b/src/ffi/array.rs index 700e83e2e05..6bce8162557 100644 --- a/src/ffi/array.rs +++ b/src/ffi/array.rs @@ -1,5 +1,6 @@ //! Contains functionality to load an ArrayData from the C Data Interface -use std::{ptr::NonNull, sync::Arc}; +use std::ptr::NonNull; +use std::sync::Arc; use crate::{ array::*, @@ -178,7 +179,7 @@ impl ArrowArray { unsafe fn create_buffer( array: &ArrowArray, data_type: &DataType, - owner: Arc, + owner: Box, index: usize, ) -> Result> { if array.buffers.is_null() { @@ -209,7 +210,7 @@ unsafe fn create_buffer( /// This function assumes that `ceil(self.length * bits, 8)` is the size of the buffer unsafe fn create_bitmap( array: &ArrowArray, - owner: Arc, + owner: Box, index: usize, ) -> Result { if array.buffers.is_null() { @@ -310,7 +311,7 @@ fn buffer_len(array: &ArrowArray, data_type: &DataType, i: usize) -> Result, + parent: Box, index: usize, ) -> Result> { let data_type = get_child(field, index)?; @@ -328,7 +329,7 @@ fn create_child( fn create_dictionary( array: &ArrowArray, data_type: &DataType, - parent: Arc, + parent: Box, ) -> Result>> { if let DataType::Dictionary(_, values, _) = data_type { let data_type = values.as_ref().clone(); @@ -341,8 +342,8 @@ fn create_dictionary( } pub trait ArrowArrayRef: std::fmt::Debug { - fn owner(&self) -> Arc { - self.parent().clone() + fn owner(&self) -> Box { + (*self.parent()).clone() } /// returns the null bit buffer. @@ -385,7 +386,7 @@ pub trait ArrowArrayRef: std::fmt::Debug { fn n_buffers(&self) -> usize; - fn parent(&self) -> &Arc; + fn parent(&self) -> &Box; fn array(&self) -> &ArrowArray; fn data_type(&self) -> &DataType; } @@ -409,25 +410,30 @@ pub trait ArrowArrayRef: std::fmt::Debug { /// calling [ArrowArray::release] and [ArrowSchema::release] accordingly. /// /// Furthermore, this struct assumes that the incoming data agrees with the C data interface. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct InternalArrowArray { - array: Box, - data_type: DataType, + // Arc is used for sharability since this is immutable + array: Arc, + // Arced to reduce cost of cloning + data_type: Arc, } impl InternalArrowArray { - pub fn new(array: Box, data_type: DataType) -> Self { - Self { array, data_type } + pub fn new(array: ArrowArray, data_type: DataType) -> Self { + Self { + array: Arc::new(array), + data_type: Arc::new(data_type), + } } } -impl ArrowArrayRef for Arc { +impl ArrowArrayRef for Box { /// the data_type as declared in the schema fn data_type(&self) -> &DataType { &self.data_type } - fn parent(&self) -> &Arc { + fn parent(&self) -> &Box { self } @@ -444,7 +450,7 @@ impl ArrowArrayRef for Arc { pub struct ArrowArrayChild<'a> { array: &'a ArrowArray, data_type: DataType, - parent: Arc, + parent: Box, } impl<'a> ArrowArrayRef for ArrowArrayChild<'a> { @@ -453,7 +459,7 @@ impl<'a> ArrowArrayRef for ArrowArrayChild<'a> { &self.data_type } - fn parent(&self) -> &Arc { + fn parent(&self) -> &Box { &self.parent } @@ -470,7 +476,7 @@ impl<'a> ArrowArrayChild<'a> { fn from_raw( array: &'a ArrowArray, data_type: DataType, - parent: Arc, + parent: Box, ) -> Self { Self { array, diff --git a/src/ffi/mod.rs b/src/ffi/mod.rs index 4a0c82a0557..10fc2fb994a 100644 --- a/src/ffi/mod.rs +++ b/src/ffi/mod.rs @@ -9,8 +9,6 @@ mod stream; pub(crate) use array::try_from; pub(crate) use array::{ArrowArrayRef, InternalArrowArray}; -use std::sync::Arc; - use crate::array::Array; use crate::datatypes::{DataType, Field}; use crate::error::Result; @@ -21,25 +19,19 @@ pub use generated::{ArrowArray, ArrowArrayStream, ArrowSchema}; pub use stream::{export_iterator, ArrowArrayStreamReader}; /// Exports an [`Box`] to the C data interface. -/// # Safety -/// The pointer `ptr` must be allocated and valid -pub unsafe fn export_array_to_c(array: Box, ptr: *mut ArrowArray) { - let array = bridge::align_to_c_data_interface(array); - - std::ptr::write_unaligned(ptr, ArrowArray::new(array)); +pub fn export_array_to_c(array: Box) -> ArrowArray { + ArrowArray::new(bridge::align_to_c_data_interface(array)) } /// Exports a [`Field`] to the C data interface. -/// # Safety -/// The pointer `ptr` must be allocated and valid -pub unsafe fn export_field_to_c(field: &Field, ptr: *mut ArrowSchema) { - std::ptr::write_unaligned(ptr, ArrowSchema::new(field)); +pub fn export_field_to_c(field: &Field) -> ArrowSchema { + ArrowSchema::new(field) } /// Imports a [`Field`] from the C data interface. /// # Safety /// This function is intrinsically `unsafe` and relies on a [`ArrowSchema`] -/// valid according to the [C data interface](https://arrow.apache.org/docs/format/CDataInterface.html) (FFI). +/// being valid according to the [C data interface](https://arrow.apache.org/docs/format/CDataInterface.html) (FFI). pub unsafe fn import_field_from_c(field: &ArrowSchema) -> Result { to_field(field) } @@ -47,10 +39,10 @@ pub unsafe fn import_field_from_c(field: &ArrowSchema) -> Result { /// Imports an [`Array`] from the C data interface. /// # Safety /// This function is intrinsically `unsafe` and relies on a [`ArrowArray`] -/// valid according to the [C data interface](https://arrow.apache.org/docs/format/CDataInterface.html) (FFI). +/// being valid according to the [C data interface](https://arrow.apache.org/docs/format/CDataInterface.html) (FFI). pub unsafe fn import_array_from_c( - array: Box, + array: ArrowArray, data_type: DataType, ) -> Result> { - try_from(Arc::new(InternalArrowArray::new(array, data_type))) + try_from(Box::new(InternalArrowArray::new(array, data_type))) } diff --git a/src/ffi/stream.rs b/src/ffi/stream.rs index ab128e4626f..5e0dabe012b 100644 --- a/src/ffi/stream.rs +++ b/src/ffi/stream.rs @@ -61,8 +61,6 @@ impl ArrowArrayStreamReader { /// * The `ArrowArrayStream` fulfills the invariants of the C stream interface /// * The schema `get_schema` produces fulfills the C data interface pub unsafe fn try_new(mut iter: Box) -> Result { - let mut field = Box::new(ArrowSchema::empty()); - if iter.get_next.is_none() { return Err(Error::OutOfSpec( "The C stream MUST contain a non-null get_next".to_string(), @@ -75,8 +73,9 @@ impl ArrowArrayStreamReader { )); }; + let mut field = ArrowSchema::empty(); let status = if let Some(f) = iter.get_schema { - unsafe { (f)(&mut *iter, &mut *field) } + unsafe { (f)(&mut *iter, &mut field) } } else { return Err(Error::OutOfSpec( "The C stream MUST contain a non-null get_schema".to_string(), @@ -106,8 +105,8 @@ impl ArrowArrayStreamReader { /// Calling this iterator's `next` assumes that the [`ArrowArrayStream`] produces arrow arrays /// that fulfill the C data interface pub unsafe fn next(&mut self) -> Option, Error>> { - let mut array = Box::new(ArrowArray::empty()); - let status = unsafe { (self.iter.get_next.unwrap())(&mut *self.iter, &mut *array) }; + let mut array = ArrowArray::empty(); + let status = unsafe { (self.iter.get_next.unwrap())(&mut *self.iter, &mut array) }; if status != 0 { return Some(Err(unsafe { handle_error(&mut self.iter) })); @@ -145,7 +144,8 @@ unsafe extern "C" fn get_next(iter: *mut ArrowArrayStream, array: *mut ArrowArra return 2001; // custom application specific error (since this is never a result of this interface) } - export_array_to_c(item, array); + std::ptr::write(array, export_array_to_c(item)); + private.error = None; 0 } @@ -168,7 +168,7 @@ unsafe extern "C" fn get_schema(iter: *mut ArrowArrayStream, schema: *mut ArrowS } let private = &mut *((*iter).private_data as *mut PrivateData); - export_field_to_c(&private.field, schema); + std::ptr::write(schema, export_field_to_c(&private.field)); 0 } @@ -195,20 +195,17 @@ unsafe extern "C" fn release(iter: *mut ArrowArrayStream) { } /// Exports an iterator to the [C stream interface](https://arrow.apache.org/docs/format/CStreamInterface.html) -/// # Safety -/// The pointer `consumer` must be allocated -pub unsafe fn export_iterator( +pub fn export_iterator( iter: Box, Error>>>, field: Field, - consumer: *mut ArrowArrayStream, -) { +) -> ArrowArrayStream { let private_data = Box::new(PrivateData { iter, field, error: None, }); - *consumer = ArrowArrayStream { + ArrowArrayStream { get_schema: Some(get_schema), get_next: Some(get_next), get_last_error: Some(get_last_error), diff --git a/tests/it/ffi/data.rs b/tests/it/ffi/data.rs index 29a0122384d..337326b6319 100644 --- a/tests/it/ffi/data.rs +++ b/tests/it/ffi/data.rs @@ -7,24 +7,14 @@ use std::collections::BTreeMap; fn _test_round_trip(array: Box, expected: Box) -> Result<()> { let field = Field::new("a", array.data_type().clone(), true); - let array_ptr = Box::new(ffi::ArrowArray::empty()); - let schema_ptr = Box::new(ffi::ArrowSchema::empty()); - - let array_ptr = Box::into_raw(array_ptr); - let schema_ptr = Box::into_raw(schema_ptr); - - unsafe { - ffi::export_array_to_c(array, array_ptr); - ffi::export_field_to_c(&field, schema_ptr); - } - - let array_ptr = unsafe { Box::from_raw(array_ptr) }; - let schema_ptr = unsafe { Box::from_raw(schema_ptr) }; + // export array and corresponding data_type + let array_ffi = ffi::export_array_to_c(array); + let schema_ffi = ffi::export_field_to_c(&field); // import references - let result_field = unsafe { ffi::import_field_from_c(schema_ptr.as_ref())? }; + let result_field = unsafe { ffi::import_field_from_c(&schema_ffi)? }; let result_array = - unsafe { ffi::import_array_from_c(array_ptr, result_field.data_type.clone())? }; + unsafe { ffi::import_array_from_c(array_ffi, result_field.data_type.clone())? }; assert_eq!(&result_array, &expected); assert_eq!(result_field, field); @@ -41,16 +31,9 @@ fn test_round_trip(expected: impl Array + Clone + 'static) -> Result<()> { } fn test_round_trip_schema(field: Field) -> Result<()> { - // create a `InternalArrowArray` from the data. - let schema_ptr = Box::new(ffi::ArrowSchema::empty()); - - let schema_ptr = Box::into_raw(schema_ptr); - - unsafe { ffi::export_field_to_c(&field, schema_ptr) }; - - let schema_ptr = unsafe { Box::from_raw(schema_ptr) }; + let schema_ffi = ffi::export_field_to_c(&field); - let result = unsafe { ffi::import_field_from_c(schema_ptr.as_ref())? }; + let result = unsafe { ffi::import_field_from_c(&schema_ffi)? }; assert_eq!(result, field); Ok(()) diff --git a/tests/it/ffi/stream.rs b/tests/it/ffi/stream.rs index 944da6e9b49..44d0e1e7cc1 100644 --- a/tests/it/ffi/stream.rs +++ b/tests/it/ffi/stream.rs @@ -8,8 +8,9 @@ fn _test_round_trip(arrays: Vec>) -> Result<()> { let mut stream = Box::new(ffi::ArrowArrayStream::empty()); - unsafe { ffi::export_iterator(iter, field.clone(), &mut *stream) } + *stream = ffi::export_iterator(iter, field.clone()); + // import let mut stream = unsafe { ffi::ArrowArrayStreamReader::try_new(stream)? }; let mut produced_arrays: Vec> = vec![];