Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Reduced need of unsafe in FFI #1100

Merged
merged 1 commit into from
Jun 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions arrow-pyarrow-integration-testing/src/c_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub fn to_rust_iterator(ob: PyObject, py: Python) -> PyResult<Vec<PyObject>> {

let mut arrays = vec![];
while let Some(array) = unsafe { iter.next() } {
let py_array = to_py_array(array.unwrap().into(), py)?;
let py_array = to_py_array(array.map_err(PyO3Error::from)?, py)?;
arrays.push(py_array)
}
Ok(arrays)
Expand All @@ -38,24 +38,23 @@ pub fn from_rust_iterator(py: Python) -> PyResult<PyObject> {
None,
)
.boxed();

// and a field with its datatype
let field = Field::new("a", array.data_type().clone(), true);

// create an iterator of arrays
let arrays = vec![array.clone(), array.clone(), array];
let iter = Box::new(arrays.clone().into_iter().map(Ok)) as _;
let iter = Box::new(arrays.into_iter().map(Ok)) as _;

// create an [`ArrowArrayStream`] based on this iterator and field
let mut stream = Box::new(ffi::ArrowArrayStream::empty());
unsafe { ffi::export_iterator(iter, field, &mut *stream) };
let stream = Box::new(ffi::export_iterator(iter, field));

// call pyarrow's interface to read this stream
let pa = py.import("pyarrow.ipc")?;
let py_stream = pa.getattr("RecordBatchReader")?.call_method1(
"_import_from_c",
((&*stream as *const ffi::ArrowArrayStream) as Py_uintptr_t,),
)?;
Box::leak(stream); // this is not ideal => the struct should be allocated by pyarrow

Ok(py_stream.to_object(py))
}
36 changes: 12 additions & 24 deletions arrow-pyarrow-integration-testing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,22 +67,21 @@ fn to_rust_array(ob: PyObject, py: Python) -> PyResult<Box<dyn Array>> {

let field = unsafe { ffi::import_field_from_c(schema.as_ref()).map_err(PyO3Error::from)? };
let array =
unsafe { ffi::import_array_from_c(array, field.data_type).map_err(PyO3Error::from)? };
unsafe { ffi::import_array_from_c(*array, field.data_type).map_err(PyO3Error::from)? };

Ok(array.into())
Ok(array)
}

fn to_py_array(array: Box<dyn Array>, py: Python) -> PyResult<PyObject> {
let array_ptr = Box::new(ffi::ArrowArray::empty());
let schema_ptr = Box::new(ffi::ArrowSchema::empty());
let schema = Box::new(ffi::export_field_to_c(&Field::new(
"",
array.data_type().clone(),
true,
)));
let array = Box::new(ffi::export_array_to_c(array));

let array_ptr = Box::into_raw(array_ptr);
let schema_ptr = Box::into_raw(schema_ptr);

unsafe {
ffi::export_field_to_c(&Field::new("", array.data_type().clone(), true), schema_ptr);
ffi::export_array_to_c(array, array_ptr);
};
let schema_ptr: *const arrow2::ffi::ArrowSchema = &*schema;
let array_ptr: *const arrow2::ffi::ArrowArray = &*array;

let pa = py.import("pyarrow")?;

Expand All @@ -91,11 +90,6 @@ fn to_py_array(array: Box<dyn Array>, py: Python) -> PyResult<PyObject> {
(array_ptr as Py_uintptr_t, schema_ptr as Py_uintptr_t),
)?;

unsafe {
Box::from_raw(array_ptr);
Box::from_raw(schema_ptr);
};

Ok(array.to_object(py))
}

Expand All @@ -115,21 +109,15 @@ fn to_rust_field(ob: PyObject, py: Python) -> PyResult<Field> {
}

fn to_py_field(field: &Field, py: Python) -> PyResult<PyObject> {
let schema_ptr = Box::new(ffi::ArrowSchema::empty());
let schema_ptr = Box::into_raw(schema_ptr);

unsafe {
ffi::export_field_to_c(field, schema_ptr);
};
let schema = Box::new(ffi::export_field_to_c(field));
let schema_ptr: *const arrow2::ffi::ArrowSchema = &*schema;

let pa = py.import("pyarrow")?;

let array = pa
.getattr("Field")?
.call_method1("_import_from_c", (schema_ptr as Py_uintptr_t,))?;

unsafe { Box::from_raw(schema_ptr) };

Ok(array.to_object(py))
}

Expand Down
34 changes: 15 additions & 19 deletions examples/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,19 @@ use arrow2::datatypes::Field;
use arrow2::error::Result;
use arrow2::ffi;

unsafe fn export(
array: Box<dyn Array>,
array_ptr: *mut ffi::ArrowArray,
schema_ptr: *mut ffi::ArrowSchema,
) {
// exporting an array requires an associated field so that the consumer knows its datatype
fn export(array: Box<dyn Array>) -> (ffi::ArrowArray, ffi::ArrowSchema) {
// importing an array requires an associated field so that the consumer knows its datatype.
// Thus, we need to export both
let field = Field::new("a", array.data_type().clone(), true);
ffi::export_array_to_c(array, array_ptr);
ffi::export_field_to_c(&field, schema_ptr);
(
ffi::export_array_to_c(array),
ffi::export_field_to_c(&field),
)
}

unsafe fn import(array: Box<ffi::ArrowArray>, schema: &ffi::ArrowSchema) -> Result<Box<dyn Array>> {
/// # Safety
/// `ArrowArray` and `ArrowSchema` must be valid
unsafe fn import(array: ffi::ArrowArray, schema: &ffi::ArrowSchema) -> Result<Box<dyn Array>> {
let field = ffi::import_field_from_c(schema)?;
ffi::import_array_from_c(array, field.data_type)
}
Expand All @@ -23,19 +24,14 @@ fn main() -> Result<()> {
// let's assume that we have an array:
let array = PrimitiveArray::<i32>::from([Some(1), None, Some(123)]).boxed();

// the goal is to export this array and import it back via FFI.
// to import, we initialize the structs that will receive the data
let mut array_ptr = Box::new(ffi::ArrowArray::empty());
let mut schema_ptr = Box::new(ffi::ArrowSchema::empty());
// here we export - `array_ffi` and `schema_ffi` are the structs of the C data interface
let (array_ffi, schema_ffi) = export(array.clone());

// this is where a producer (in this case also us ^_^) writes to the pointers' location.
// `array` here could be anything or not even be available, if this was e.g. from Python.
// Safety: we just allocated the pointers
unsafe { export(array.clone(), &mut *array_ptr, &mut *schema_ptr) };
// here we import them. Often the structs are wrapped in a pointer. In that case you
// need to read the pointer to the stack.

// and finally interpret the written memory into a new array.
// Safety: we used `export`, which is a valid exporter to the C data interface
let new_array = unsafe { import(array_ptr, schema_ptr.as_ref())? };
let new_array = unsafe { import(array_ffi, &schema_ffi)? };

// which is equal to the exported array
assert_eq!(array.as_ref(), new_array.as_ref());
Expand Down
6 changes: 3 additions & 3 deletions src/buffer/bytes.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
use std::mem::ManuallyDrop;
use std::ops::{Deref, DerefMut};
use std::panic::RefUnwindSafe;
use std::{ptr::NonNull, sync::Arc};
use std::ptr::NonNull;

/// Mode of deallocating memory regions
enum Allocation {
/// Native allocation
Native,
// A foreign allocator and its ref count
Foreign(Arc<dyn RefUnwindSafe + Send + Sync>),
Foreign(Box<dyn RefUnwindSafe + Send + Sync>),
}

/// A continuous memory region that may be allocated externally.
Expand All @@ -35,7 +35,7 @@ impl<T> Bytes<T> {
pub unsafe fn from_owned(
ptr: std::ptr::NonNull<T>,
len: usize,
owner: Arc<dyn RefUnwindSafe + Send + Sync>,
owner: Box<dyn RefUnwindSafe + Send + Sync>,
) -> Self {
// This line is technically outside the assumptions of `Vec::from_raw_parts`, since
// `ptr` was not allocated by `Vec`. However, one of the invariants of this struct
Expand Down
42 changes: 24 additions & 18 deletions src/ffi/array.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! Contains functionality to load an ArrayData from the C Data Interface
use std::{ptr::NonNull, sync::Arc};
use std::ptr::NonNull;
use std::sync::Arc;

use crate::{
array::*,
Expand Down Expand Up @@ -178,7 +179,7 @@ impl ArrowArray {
unsafe fn create_buffer<T: NativeType>(
array: &ArrowArray,
data_type: &DataType,
owner: Arc<InternalArrowArray>,
owner: Box<InternalArrowArray>,
index: usize,
) -> Result<Buffer<T>> {
if array.buffers.is_null() {
Expand Down Expand Up @@ -209,7 +210,7 @@ unsafe fn create_buffer<T: NativeType>(
/// This function assumes that `ceil(self.length * bits, 8)` is the size of the buffer
unsafe fn create_bitmap(
array: &ArrowArray,
owner: Arc<InternalArrowArray>,
owner: Box<InternalArrowArray>,
index: usize,
) -> Result<Bitmap> {
if array.buffers.is_null() {
Expand Down Expand Up @@ -310,7 +311,7 @@ fn buffer_len(array: &ArrowArray, data_type: &DataType, i: usize) -> Result<usiz
fn create_child(
array: &ArrowArray,
field: &DataType,
parent: Arc<InternalArrowArray>,
parent: Box<InternalArrowArray>,
index: usize,
) -> Result<ArrowArrayChild<'static>> {
let data_type = get_child(field, index)?;
Expand All @@ -328,7 +329,7 @@ fn create_child(
fn create_dictionary(
array: &ArrowArray,
data_type: &DataType,
parent: Arc<InternalArrowArray>,
parent: Box<InternalArrowArray>,
) -> Result<Option<ArrowArrayChild<'static>>> {
if let DataType::Dictionary(_, values, _) = data_type {
let data_type = values.as_ref().clone();
Expand All @@ -341,8 +342,8 @@ fn create_dictionary(
}

pub trait ArrowArrayRef: std::fmt::Debug {
fn owner(&self) -> Arc<InternalArrowArray> {
self.parent().clone()
fn owner(&self) -> Box<InternalArrowArray> {
(*self.parent()).clone()
}

/// returns the null bit buffer.
Expand Down Expand Up @@ -385,7 +386,7 @@ pub trait ArrowArrayRef: std::fmt::Debug {

fn n_buffers(&self) -> usize;

fn parent(&self) -> &Arc<InternalArrowArray>;
fn parent(&self) -> &Box<InternalArrowArray>;
fn array(&self) -> &ArrowArray;
fn data_type(&self) -> &DataType;
}
Expand All @@ -409,25 +410,30 @@ pub trait ArrowArrayRef: std::fmt::Debug {
/// calling [ArrowArray::release] and [ArrowSchema::release] accordingly.
///
/// Furthermore, this struct assumes that the incoming data agrees with the C data interface.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct InternalArrowArray {
array: Box<ArrowArray>,
data_type: DataType,
// Arc is used for sharability since this is immutable
array: Arc<ArrowArray>,
// Arced to reduce cost of cloning
data_type: Arc<DataType>,
}

impl InternalArrowArray {
pub fn new(array: Box<ArrowArray>, data_type: DataType) -> Self {
Self { array, data_type }
pub fn new(array: ArrowArray, data_type: DataType) -> Self {
Self {
array: Arc::new(array),
data_type: Arc::new(data_type),
}
}
}

impl ArrowArrayRef for Arc<InternalArrowArray> {
impl ArrowArrayRef for Box<InternalArrowArray> {
/// the data_type as declared in the schema
fn data_type(&self) -> &DataType {
&self.data_type
}

fn parent(&self) -> &Arc<InternalArrowArray> {
fn parent(&self) -> &Box<InternalArrowArray> {
self
}

Expand All @@ -444,7 +450,7 @@ impl ArrowArrayRef for Arc<InternalArrowArray> {
pub struct ArrowArrayChild<'a> {
array: &'a ArrowArray,
data_type: DataType,
parent: Arc<InternalArrowArray>,
parent: Box<InternalArrowArray>,
}

impl<'a> ArrowArrayRef for ArrowArrayChild<'a> {
Expand All @@ -453,7 +459,7 @@ impl<'a> ArrowArrayRef for ArrowArrayChild<'a> {
&self.data_type
}

fn parent(&self) -> &Arc<InternalArrowArray> {
fn parent(&self) -> &Box<InternalArrowArray> {
&self.parent
}

Expand All @@ -470,7 +476,7 @@ impl<'a> ArrowArrayChild<'a> {
fn from_raw(
array: &'a ArrowArray,
data_type: DataType,
parent: Arc<InternalArrowArray>,
parent: Box<InternalArrowArray>,
) -> Self {
Self {
array,
Expand Down
24 changes: 8 additions & 16 deletions src/ffi/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ mod stream;
pub(crate) use array::try_from;
pub(crate) use array::{ArrowArrayRef, InternalArrowArray};

use std::sync::Arc;

use crate::array::Array;
use crate::datatypes::{DataType, Field};
use crate::error::Result;
Expand All @@ -21,36 +19,30 @@ pub use generated::{ArrowArray, ArrowArrayStream, ArrowSchema};
pub use stream::{export_iterator, ArrowArrayStreamReader};

/// Exports an [`Box<dyn Array>`] to the C data interface.
/// # Safety
/// The pointer `ptr` must be allocated and valid
pub unsafe fn export_array_to_c(array: Box<dyn Array>, ptr: *mut ArrowArray) {
let array = bridge::align_to_c_data_interface(array);

std::ptr::write_unaligned(ptr, ArrowArray::new(array));
pub fn export_array_to_c(array: Box<dyn Array>) -> ArrowArray {
ArrowArray::new(bridge::align_to_c_data_interface(array))
}

/// Exports a [`Field`] to the C data interface.
/// # Safety
/// The pointer `ptr` must be allocated and valid
pub unsafe fn export_field_to_c(field: &Field, ptr: *mut ArrowSchema) {
std::ptr::write_unaligned(ptr, ArrowSchema::new(field));
pub fn export_field_to_c(field: &Field) -> ArrowSchema {
ArrowSchema::new(field)
}

/// Imports a [`Field`] from the C data interface.
/// # Safety
/// This function is intrinsically `unsafe` and relies on a [`ArrowSchema`]
/// valid according to the [C data interface](https://arrow.apache.org/docs/format/CDataInterface.html) (FFI).
/// being valid according to the [C data interface](https://arrow.apache.org/docs/format/CDataInterface.html) (FFI).
pub unsafe fn import_field_from_c(field: &ArrowSchema) -> Result<Field> {
to_field(field)
}

/// Imports an [`Array`] from the C data interface.
/// # Safety
/// This function is intrinsically `unsafe` and relies on a [`ArrowArray`]
/// valid according to the [C data interface](https://arrow.apache.org/docs/format/CDataInterface.html) (FFI).
/// being valid according to the [C data interface](https://arrow.apache.org/docs/format/CDataInterface.html) (FFI).
pub unsafe fn import_array_from_c(
array: Box<ArrowArray>,
array: ArrowArray,
data_type: DataType,
) -> Result<Box<dyn Array>> {
try_from(Arc::new(InternalArrowArray::new(array, data_type)))
try_from(Box::new(InternalArrowArray::new(array, data_type)))
}
Loading