Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Added support to read from C data stream
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Feb 21, 2022
1 parent 5a251aa commit 22f0c3e
Show file tree
Hide file tree
Showing 7 changed files with 199 additions and 30 deletions.
20 changes: 1 addition & 19 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -105,25 +105,7 @@ rustdoc-args = ["--cfg", "docsrs"]
[features]
default = []
full = [
"io_csv",
"io_csv_async",
"io_json",
"io_ipc",
"io_flight",
"io_ipc_write_async",
"io_ipc_read_async",
"io_ipc_compression",
"io_json_integration",
"io_print",
"io_parquet",
"io_parquet_compression",
"io_avro",
"io_avro_compression",
"io_avro_async",
"regex",
"compute",
# parses timezones used in timestamp conversions
"chrono-tz",

]
io_csv = ["io_csv_read", "io_csv_write"]
io_csv_async = ["io_csv_read_async"]
Expand Down
28 changes: 28 additions & 0 deletions arrow-pyarrow-integration-testing/src/c_stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
//! This library demonstrates a minimal usage of Rust's C data interface to pass
//! arrays from and to Python.
use pyo3::ffi::Py_uintptr_t;
use pyo3::prelude::*;

use arrow2::ffi;

use super::*;

pub fn to_rust_iterator(ob: PyObject, py: Python) -> PyResult<Vec<PyObject>> {
let stream = Box::new(ffi::ArrowArrayStream::empty());

let stream_ptr = &*stream as *const ffi::ArrowArrayStream;

// make the conversion through PyArrow's private API
// this changes the pointer's memory and is thus unsafe. In particular, `_export_to_c` can go out of bounds
ob.call_method1(py, "_export_to_c", (stream_ptr as Py_uintptr_t,))?;

let mut iter =
unsafe { ffi::ArrowArrayStreamReader::try_new(stream).map_err(PyO3ArrowError::from) }?;

let mut arrays = vec![];
while let Some(array) = unsafe { iter.next() } {
let py_array = to_py_array(array.unwrap().into(), py)?;
arrays.push(py_array)
}
Ok(arrays)
}
7 changes: 7 additions & 0 deletions arrow-pyarrow-integration-testing/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! This library demonstrates a minimal usage of Rust's C data interface to pass
//! arrays from and to Python.
mod c_stream;

use std::error;
use std::fmt;
Expand Down Expand Up @@ -153,9 +154,15 @@ fn round_trip_field(array: PyObject, py: Python) -> PyResult<PyObject> {
to_py_field(&field, py)
}

#[pyfunction]
pub fn to_rust_iterator(ob: PyObject, py: Python) -> PyResult<Vec<PyObject>> {
c_stream::to_rust_iterator(ob, py)
}

#[pymodule]
fn arrow_pyarrow_integration_testing(_py: Python, m: &PyModule) -> PyResult<()> {
m.add_function(wrap_pyfunction!(round_trip_array, m)?)?;
m.add_function(wrap_pyfunction!(round_trip_field, m)?)?;
m.add_function(wrap_pyfunction!(to_rust_iterator, m)?)?;
Ok(())
}
22 changes: 22 additions & 0 deletions arrow-pyarrow-integration-testing/tests/test_c_stream.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import unittest

import pyarrow.ipc

import arrow_pyarrow_integration_testing


class TestCase(unittest.TestCase):
def test_rust_reads(self):
"""
Python -> Rust -> Python
"""
schema = pyarrow.schema([pyarrow.field("aa", pyarrow.int32())])
a = pyarrow.array([1, None, 2], type=pyarrow.int32())

batch = pyarrow.record_batch([a], schema)
reader = pyarrow.ipc.RecordBatchStreamReader.from_batches(schema, [batch])

arrays = arrow_pyarrow_integration_testing.to_rust_iterator(reader)

array = arrays[0].field(0)
assert array == a
19 changes: 9 additions & 10 deletions src/ffi/generated.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
/* automatically generated by rust-bindgen 0.59.2 */

/// ABI-compatible struct for `ArrowSchema` from C Data Interface
/// See <https://arrow.apache.org/docs/format/CDataInterface.html#structure-definitions>
/// ABI-compatible struct for [`ArrowSchema`](https://arrow.apache.org/docs/format/CDataInterface.html#structure-definitions)
#[repr(C)]
#[derive(Debug, Clone)]
pub struct ArrowSchema {
Expand All @@ -16,8 +15,7 @@ pub struct ArrowSchema {
pub(super) private_data: *mut ::std::os::raw::c_void,
}

/// ABI-compatible struct for `ArrowArray` from C Data Interface
/// See <https://arrow.apache.org/docs/format/CDataInterface.html#structure-definitions>
/// ABI-compatible struct for [`ArrowArray`](https://arrow.apache.org/docs/format/CDataInterface.html#structure-definitions)
#[repr(C)]
#[derive(Debug, Clone)]
pub struct ArrowArray {
Expand All @@ -33,24 +31,25 @@ pub struct ArrowArray {
pub(super) private_data: *mut ::std::os::raw::c_void,
}

/// ABI-compatible struct for [`ArrowArrayStream`](https://arrow.apache.org/docs/format/CStreamInterface.html).
#[repr(C)]
#[derive(Debug, Copy, Clone)]
#[derive(Debug, Clone)]
pub struct ArrowArrayStream {
get_schema: ::std::option::Option<
pub(super) get_schema: ::std::option::Option<
unsafe extern "C" fn(
arg1: *mut ArrowArrayStream,
out: *mut ArrowSchema,
) -> ::std::os::raw::c_int,
>,
get_next: ::std::option::Option<
pub(super) get_next: ::std::option::Option<
unsafe extern "C" fn(
arg1: *mut ArrowArrayStream,
out: *mut ArrowArray,
) -> ::std::os::raw::c_int,
>,
get_last_error: ::std::option::Option<
pub(super) get_last_error: ::std::option::Option<
unsafe extern "C" fn(arg1: *mut ArrowArrayStream) -> *const ::std::os::raw::c_char,
>,
release: ::std::option::Option<unsafe extern "C" fn(arg1: *mut ArrowArrayStream)>,
private_data: *mut ::std::os::raw::c_void,
pub(super) release: ::std::option::Option<unsafe extern "C" fn(arg1: *mut ArrowArrayStream)>,
pub(super) private_data: *mut ::std::os::raw::c_void,
}
4 changes: 3 additions & 1 deletion src/ffi/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ mod array;
mod bridge;
mod generated;
mod schema;
mod stream;

pub(crate) use array::try_from;
pub(crate) use array::{ArrowArrayRef, InternalArrowArray};
Expand All @@ -16,7 +17,8 @@ use crate::error::Result;

use self::schema::to_field;

pub use generated::{ArrowArray, ArrowSchema};
pub use generated::{ArrowArray, ArrowArrayStream, ArrowSchema};
pub use stream::ArrowArrayStreamReader;

/// Exports an [`Arc<dyn Array>`] to the C data interface.
/// # Safety
Expand Down
129 changes: 129 additions & 0 deletions src/ffi/stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
use std::ffi::CStr;

use crate::{array::Array, datatypes::Field, error::ArrowError};

use super::{import_array_from_c, import_field_from_c};
use super::{ArrowArray, ArrowArrayStream, ArrowSchema};

impl Drop for ArrowArrayStream {
fn drop(&mut self) {
match self.release {
None => (),
Some(release) => unsafe { release(self) },
};
}
}

impl ArrowArrayStream {
/// Creates an empty [`ArrowArrayStream`] used to import from a producer.
pub fn empty() -> Self {
Self {
get_schema: None,
get_next: None,
get_last_error: None,
release: None,
private_data: std::ptr::null_mut(),
}
}
}

unsafe fn handle_error(iter: &mut ArrowArrayStream) -> ArrowError {
let error = unsafe { (iter.get_last_error.unwrap())(&mut *iter) };

if error.is_null() {
return ArrowError::External(
"C stream".to_string(),
Box::new(ArrowError::ExternalFormat(
"an unspecified error".to_string(),
)),
);
}

let error = unsafe { CStr::from_ptr(error) };
ArrowError::External(
"C stream".to_string(),
Box::new(ArrowError::ExternalFormat(
error.to_str().unwrap().to_string(),
)),
)
}

/// Interface for the Arrow C stream interface. Implements an iterator of [`Array`].
///
pub struct ArrowArrayStreamReader {
iter: Box<ArrowArrayStream>,
field: Field,
}

impl ArrowArrayStreamReader {
/// Returns a new [`ArrowArrayStreamReader`]
/// # Error
/// Errors iff the [`ArrowArrayStream`] is out of specification
/// # Safety
/// This method is intrinsically `unsafe` since it assumes that the `ArrowArrayStream`
/// contains a valid Arrow C stream interface.
/// In particular:
/// * The `ArrowArrayStream` fulfills the invariants of the C stream interface
/// * The schema `get_schema` produces fulfills the C data interface
pub unsafe fn try_new(mut iter: Box<ArrowArrayStream>) -> Result<Self, ArrowError> {
let mut field = Box::new(ArrowSchema::empty());

if iter.get_next.is_none() {
return Err(ArrowError::OutOfSpec(
"The C stream MUST contain a non-null get_next".to_string(),
));
};

if iter.get_last_error.is_none() {
return Err(ArrowError::OutOfSpec(
"The C stream MUST contain a non-null get_last_error".to_string(),
));
};

let status = if let Some(f) = iter.get_schema {
unsafe { (f)(&mut *iter, &mut *field) }
} else {
return Err(ArrowError::OutOfSpec(
"The C stream MUST contain a non-null get_schema".to_string(),
));
};

if status != 0 {
return Err(unsafe { handle_error(&mut iter) });
}

let field = unsafe { import_field_from_c(&field)? };

Ok(Self { iter, field })
}

/// Returns the field provided by the stream
pub fn field(&self) -> &Field {
&self.field
}

/// Advances this iterator by one array
/// # Error
/// Errors iff:
/// * The C stream interface returns an error
/// * The C stream interface returns an invalid array (that we can identify, see Safety below)
/// # Safety
/// Calling this iterator's `next` assumes that the [`ArrowArrayStream`] produces arrow arrays
/// that fulfill the C data interface
pub unsafe fn next(&mut self) -> Option<Result<Box<dyn Array>, ArrowError>> {
let mut array = Box::new(ArrowArray::empty());
let status = unsafe { (self.iter.get_next.unwrap())(&mut *self.iter, &mut *array) };

if status != 0 {
return Some(Err(unsafe { handle_error(&mut self.iter) }));
}

// last paragraph of https://arrow.apache.org/docs/format/CStreamInterface.html#c.ArrowArrayStream.get_next
array.release?;

// Safety: assumed from the C stream interface
unsafe { import_array_from_c(array, self.field.data_type.clone()) }
.map(Some)
.transpose()
}
}

0 comments on commit 22f0c3e

Please sign in to comment.