Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Added support to the Arrow C stream interface (read and write) #857

Merged
merged 7 commits into from
Feb 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ documentation of each of its APIs.
* Most feature-complete implementation of Apache Arrow after the reference implementation (C++)
* Float 16 unsupported (not a Rust native type)
* Decimal 256 unsupported (not a Rust native type)
* FFI supported for all Arrow types
* C data interface supported for all Arrow types (read and write)
* C stream interface supported for all Arrow types (read and write)
* Full interoperability with Rust's `Vec`
* MutableArray API to work with bitmaps and arrays in-place
* Full support for timestamps with timezones, including arithmetics that take
Expand Down
2 changes: 1 addition & 1 deletion arrow-pyarrow-integration-testing/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ Note that this crate uses two languages and an external ABI:

Pyarrow exposes a C ABI to convert arrow arrays from and to its C implementation, see [here](https://arrow.apache.org/docs/format/CDataInterface.html).

This package uses the equivalent struct in Rust (`arrow::array::ArrowArray`), and verifies that
This package uses the equivalent struct in Rust (`arrow::array::InternalArrowArray`), and verifies that
we can use pyarrow's interface to move pointers from and to Rust.

## Relevant literature
Expand Down
2 changes: 1 addition & 1 deletion arrow-pyarrow-integration-testing/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@
# under the License.

[build-system]
requires = ["maturin"]
requires = ["maturin>=0.12,<0.13"]
build-backend = "maturin"
63 changes: 63 additions & 0 deletions arrow-pyarrow-integration-testing/src/c_stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
//! This library demonstrates a minimal usage of Rust's C data interface to pass
//! arrays from and to Python.
use pyo3::ffi::Py_uintptr_t;
use pyo3::prelude::*;

use arrow2::array::{Int32Array, StructArray};
use arrow2::datatypes::DataType;
use arrow2::ffi;

use super::*;

pub fn to_rust_iterator(ob: PyObject, py: Python) -> PyResult<Vec<PyObject>> {
let stream = Box::new(ffi::ArrowArrayStream::empty());

let stream_ptr = &*stream as *const ffi::ArrowArrayStream;

// make the conversion through PyArrow's private API
// this changes the pointer's memory and is thus unsafe. In particular, `_export_to_c` can go out of bounds
ob.call_method1(py, "_export_to_c", (stream_ptr as Py_uintptr_t,))?;

let mut iter =
unsafe { ffi::ArrowArrayStreamReader::try_new(stream).map_err(PyO3ArrowError::from) }?;

let mut arrays = vec![];
while let Some(array) = unsafe { iter.next() } {
let py_array = to_py_array(array.unwrap().into(), py)?;
arrays.push(py_array)
}
Ok(arrays)
}

pub fn from_rust_iterator(py: Python) -> PyResult<PyObject> {
// initialize an array
let array = Int32Array::from(&[Some(2), None, Some(1), None]);
let array = StructArray::from_data(
DataType::Struct(vec![Field::new("a", array.data_type().clone(), true)]),
vec![Arc::new(array)],
None,
);
// and a field with its datatype
let field = Field::new("a", array.data_type().clone(), true);

// Arc it, since it will be shared with an external program
let array: Arc<dyn Array> = Arc::new(array.clone());

// create an iterator of arrays
let arrays = vec![array.clone(), array.clone(), array];
let iter = Box::new(arrays.clone().into_iter().map(Ok)) as _;

// create an [`ArrowArrayStream`] based on this iterator and field
let mut stream = Box::new(ffi::ArrowArrayStream::empty());
unsafe { ffi::export_iterator(iter, field, &mut *stream) };

// call pyarrow's interface to read this stream
let pa = py.import("pyarrow.ipc")?;
let py_stream = pa.getattr("RecordBatchReader")?.call_method1(
"_import_from_c",
((&*stream as *const ffi::ArrowArrayStream) as Py_uintptr_t,),
)?;
Box::leak(stream); // this is not ideal => the struct should be allocated by pyarrow

Ok(py_stream.to_object(py))
}
31 changes: 22 additions & 9 deletions arrow-pyarrow-integration-testing/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! This library demonstrates a minimal usage of Rust's C data interface to pass
//! arrays from and to Python.
mod c_stream;

use std::error;
use std::fmt;
Expand Down Expand Up @@ -51,11 +52,11 @@ impl From<PyO3ArrowError> for PyErr {

fn to_rust_array(ob: PyObject, py: Python) -> PyResult<Arc<dyn Array>> {
// prepare a pointer to receive the Array struct
let array = Box::new(ffi::Ffi_ArrowArray::empty());
let schema = Box::new(ffi::Ffi_ArrowSchema::empty());
let array = Box::new(ffi::ArrowArray::empty());
let schema = Box::new(ffi::ArrowSchema::empty());

let array_ptr = &*array as *const ffi::Ffi_ArrowArray;
let schema_ptr = &*schema as *const ffi::Ffi_ArrowSchema;
let array_ptr = &*array as *const ffi::ArrowArray;
let schema_ptr = &*schema as *const ffi::ArrowSchema;

// make the conversion through PyArrow's private API
// this changes the pointer's memory and is thus unsafe. In particular, `_export_to_c` can go out of bounds
Expand All @@ -73,8 +74,8 @@ fn to_rust_array(ob: PyObject, py: Python) -> PyResult<Arc<dyn Array>> {
}

fn to_py_array(array: Arc<dyn Array>, py: Python) -> PyResult<PyObject> {
let array_ptr = Box::new(ffi::Ffi_ArrowArray::empty());
let schema_ptr = Box::new(ffi::Ffi_ArrowSchema::empty());
let array_ptr = Box::new(ffi::ArrowArray::empty());
let schema_ptr = Box::new(ffi::ArrowSchema::empty());

let array_ptr = Box::into_raw(array_ptr);
let schema_ptr = Box::into_raw(schema_ptr);
Expand All @@ -101,9 +102,9 @@ fn to_py_array(array: Arc<dyn Array>, py: Python) -> PyResult<PyObject> {

fn to_rust_field(ob: PyObject, py: Python) -> PyResult<Field> {
// prepare a pointer to receive the Array struct
let schema = Box::new(ffi::Ffi_ArrowSchema::empty());
let schema = Box::new(ffi::ArrowSchema::empty());

let schema_ptr = &*schema as *const ffi::Ffi_ArrowSchema;
let schema_ptr = &*schema as *const ffi::ArrowSchema;

// make the conversion through PyArrow's private API
// this changes the pointer's memory and is thus unsafe. In particular, `_export_to_c` can go out of bounds
Expand All @@ -115,7 +116,7 @@ fn to_rust_field(ob: PyObject, py: Python) -> PyResult<Field> {
}

fn to_py_field(field: &Field, py: Python) -> PyResult<PyObject> {
let schema_ptr = Box::new(ffi::Ffi_ArrowSchema::empty());
let schema_ptr = Box::new(ffi::ArrowSchema::empty());
let schema_ptr = Box::into_raw(schema_ptr);

unsafe {
Expand Down Expand Up @@ -153,9 +154,21 @@ fn round_trip_field(array: PyObject, py: Python) -> PyResult<PyObject> {
to_py_field(&field, py)
}

#[pyfunction]
pub fn to_rust_iterator(ob: PyObject, py: Python) -> PyResult<Vec<PyObject>> {
c_stream::to_rust_iterator(ob, py)
}

#[pyfunction]
pub fn from_rust_iterator(py: Python) -> PyResult<PyObject> {
c_stream::from_rust_iterator(py)
}

#[pymodule]
fn arrow_pyarrow_integration_testing(_py: Python, m: &PyModule) -> PyResult<()> {
m.add_function(wrap_pyfunction!(round_trip_array, m)?)?;
m.add_function(wrap_pyfunction!(round_trip_field, m)?)?;
m.add_function(wrap_pyfunction!(to_rust_iterator, m)?)?;
m.add_function(wrap_pyfunction!(from_rust_iterator, m)?)?;
Ok(())
}
29 changes: 29 additions & 0 deletions arrow-pyarrow-integration-testing/tests/test_c_stream.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import unittest

import pyarrow.ipc

import arrow_pyarrow_integration_testing


class TestCase(unittest.TestCase):
def test_rust_reads(self):
schema = pyarrow.schema([pyarrow.field("aa", pyarrow.int32())])
a = pyarrow.array([1, None, 2], type=pyarrow.int32())

batch = pyarrow.record_batch([a], schema)
reader = pyarrow.ipc.RecordBatchStreamReader.from_batches(schema, [batch])

arrays = arrow_pyarrow_integration_testing.to_rust_iterator(reader)

array = arrays[0].field(0)
assert array == a

def test_pyarrow_reads(self):
stream = arrow_pyarrow_integration_testing.from_rust_iterator()

arrays = [a for a in stream]

expected = pyarrow.RecordBatch.from_arrays([pyarrow.array([2, None, 1, None], pyarrow.int32())], names=["a"])
expected = [expected, expected, expected]

self.assertEqual(arrays, expected)
13 changes: 5 additions & 8 deletions examples/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,15 @@ use std::sync::Arc;

unsafe fn export(
array: Arc<dyn Array>,
array_ptr: *mut ffi::Ffi_ArrowArray,
schema_ptr: *mut ffi::Ffi_ArrowSchema,
array_ptr: *mut ffi::ArrowArray,
schema_ptr: *mut ffi::ArrowSchema,
) {
let field = Field::new("a", array.data_type().clone(), true);
ffi::export_array_to_c(array, array_ptr);
ffi::export_field_to_c(&field, schema_ptr);
}

unsafe fn import(
array: Box<ffi::Ffi_ArrowArray>,
schema: &ffi::Ffi_ArrowSchema,
) -> Result<Box<dyn Array>> {
unsafe fn import(array: Box<ffi::ArrowArray>, schema: &ffi::ArrowSchema) -> Result<Box<dyn Array>> {
let field = ffi::import_field_from_c(schema)?;
ffi::import_array_from_c(array, field.data_type)
}
Expand All @@ -28,8 +25,8 @@ fn main() -> Result<()> {

// the goal is to export this array and import it back via FFI.
// to import, we initialize the structs that will receive the data
let array_ptr = Box::new(ffi::Ffi_ArrowArray::empty());
let schema_ptr = Box::new(ffi::Ffi_ArrowSchema::empty());
let array_ptr = Box::new(ffi::ArrowArray::empty());
let schema_ptr = Box::new(ffi::ArrowSchema::empty());

// since FFIs work in raw pointers, let's temporarily relinquish ownership so that producers
// can write into it in a thread-safe manner
Expand Down
2 changes: 1 addition & 1 deletion src/buffer/bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ pub enum Deallocation {
/// Native deallocation, using Rust deallocator with Arrow-specific memory aligment
Native,
// Foreign interface, via a callback
Foreign(Arc<ffi::ArrowArray>),
Foreign(Arc<ffi::InternalArrowArray>),
}

impl Debug for Deallocation {
Expand Down
Loading