diff --git a/Cargo.toml b/Cargo.toml index 90a15f8c2ad..0d42f61cbab 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -105,25 +105,7 @@ rustdoc-args = ["--cfg", "docsrs"] [features] default = [] full = [ - "io_csv", - "io_csv_async", - "io_json", - "io_ipc", - "io_flight", - "io_ipc_write_async", - "io_ipc_read_async", - "io_ipc_compression", - "io_json_integration", - "io_print", - "io_parquet", - "io_parquet_compression", - "io_avro", - "io_avro_compression", - "io_avro_async", - "regex", - "compute", - # parses timezones used in timestamp conversions - "chrono-tz", + ] io_csv = ["io_csv_read", "io_csv_write"] io_csv_async = ["io_csv_read_async"] diff --git a/arrow-pyarrow-integration-testing/src/c_stream.rs b/arrow-pyarrow-integration-testing/src/c_stream.rs new file mode 100644 index 00000000000..9a4e4fabe8a --- /dev/null +++ b/arrow-pyarrow-integration-testing/src/c_stream.rs @@ -0,0 +1,28 @@ +//! This library demonstrates a minimal usage of Rust's C data interface to pass +//! arrays from and to Python. +use pyo3::ffi::Py_uintptr_t; +use pyo3::prelude::*; + +use arrow2::ffi; + +use super::*; + +pub fn to_rust_iterator(ob: PyObject, py: Python) -> PyResult> { + let stream = Box::new(ffi::ArrowArrayStream::empty()); + + let stream_ptr = &*stream as *const ffi::ArrowArrayStream; + + // make the conversion through PyArrow's private API + // this changes the pointer's memory and is thus unsafe. In particular, `_export_to_c` can go out of bounds + ob.call_method1(py, "_export_to_c", (stream_ptr as Py_uintptr_t,))?; + + let mut iter = + unsafe { ffi::ArrowArrayStreamReader::try_new(stream).map_err(PyO3ArrowError::from) }?; + + let mut arrays = vec![]; + while let Some(array) = unsafe { iter.next() } { + let py_array = to_py_array(array.unwrap().into(), py)?; + arrays.push(py_array) + } + Ok(arrays) +} diff --git a/arrow-pyarrow-integration-testing/src/lib.rs b/arrow-pyarrow-integration-testing/src/lib.rs index eb5af0e2df3..9c4e852d30a 100644 --- a/arrow-pyarrow-integration-testing/src/lib.rs +++ b/arrow-pyarrow-integration-testing/src/lib.rs @@ -1,5 +1,6 @@ //! This library demonstrates a minimal usage of Rust's C data interface to pass //! arrays from and to Python. +mod c_stream; use std::error; use std::fmt; @@ -153,9 +154,15 @@ fn round_trip_field(array: PyObject, py: Python) -> PyResult { to_py_field(&field, py) } +#[pyfunction] +pub fn to_rust_iterator(ob: PyObject, py: Python) -> PyResult> { + c_stream::to_rust_iterator(ob, py) +} + #[pymodule] fn arrow_pyarrow_integration_testing(_py: Python, m: &PyModule) -> PyResult<()> { m.add_function(wrap_pyfunction!(round_trip_array, m)?)?; m.add_function(wrap_pyfunction!(round_trip_field, m)?)?; + m.add_function(wrap_pyfunction!(to_rust_iterator, m)?)?; Ok(()) } diff --git a/arrow-pyarrow-integration-testing/tests/test_c_stream.py b/arrow-pyarrow-integration-testing/tests/test_c_stream.py new file mode 100644 index 00000000000..168f72eadc1 --- /dev/null +++ b/arrow-pyarrow-integration-testing/tests/test_c_stream.py @@ -0,0 +1,22 @@ +import unittest + +import pyarrow.ipc + +import arrow_pyarrow_integration_testing + + +class TestCase(unittest.TestCase): + def test_rust_reads(self): + """ + Python -> Rust -> Python + """ + schema = pyarrow.schema([pyarrow.field("aa", pyarrow.int32())]) + a = pyarrow.array([1, None, 2], type=pyarrow.int32()) + + batch = pyarrow.record_batch([a], schema) + reader = pyarrow.ipc.RecordBatchStreamReader.from_batches(schema, [batch]) + + arrays = arrow_pyarrow_integration_testing.to_rust_iterator(reader) + + array = arrays[0].field(0) + assert array == a diff --git a/src/ffi/generated.rs b/src/ffi/generated.rs index a72f6eea566..0fc00e5300e 100644 --- a/src/ffi/generated.rs +++ b/src/ffi/generated.rs @@ -1,7 +1,6 @@ /* automatically generated by rust-bindgen 0.59.2 */ -/// ABI-compatible struct for `ArrowSchema` from C Data Interface -/// See +/// ABI-compatible struct for [`ArrowSchema`](https://arrow.apache.org/docs/format/CDataInterface.html#structure-definitions) #[repr(C)] #[derive(Debug, Clone)] pub struct ArrowSchema { @@ -16,8 +15,7 @@ pub struct ArrowSchema { pub(super) private_data: *mut ::std::os::raw::c_void, } -/// ABI-compatible struct for `ArrowArray` from C Data Interface -/// See +/// ABI-compatible struct for [`ArrowArray`](https://arrow.apache.org/docs/format/CDataInterface.html#structure-definitions) #[repr(C)] #[derive(Debug, Clone)] pub struct ArrowArray { @@ -33,24 +31,25 @@ pub struct ArrowArray { pub(super) private_data: *mut ::std::os::raw::c_void, } +/// ABI-compatible struct for [`ArrowArrayStream`](https://arrow.apache.org/docs/format/CStreamInterface.html). #[repr(C)] -#[derive(Debug, Copy, Clone)] +#[derive(Debug, Clone)] pub struct ArrowArrayStream { - get_schema: ::std::option::Option< + pub(super) get_schema: ::std::option::Option< unsafe extern "C" fn( arg1: *mut ArrowArrayStream, out: *mut ArrowSchema, ) -> ::std::os::raw::c_int, >, - get_next: ::std::option::Option< + pub(super) get_next: ::std::option::Option< unsafe extern "C" fn( arg1: *mut ArrowArrayStream, out: *mut ArrowArray, ) -> ::std::os::raw::c_int, >, - get_last_error: ::std::option::Option< + pub(super) get_last_error: ::std::option::Option< unsafe extern "C" fn(arg1: *mut ArrowArrayStream) -> *const ::std::os::raw::c_char, >, - release: ::std::option::Option, - private_data: *mut ::std::os::raw::c_void, + pub(super) release: ::std::option::Option, + pub(super) private_data: *mut ::std::os::raw::c_void, } diff --git a/src/ffi/mod.rs b/src/ffi/mod.rs index 379b7d52835..ca561e2ee86 100644 --- a/src/ffi/mod.rs +++ b/src/ffi/mod.rs @@ -4,6 +4,7 @@ mod array; mod bridge; mod generated; mod schema; +mod stream; pub(crate) use array::try_from; pub(crate) use array::{ArrowArrayRef, InternalArrowArray}; @@ -16,7 +17,8 @@ use crate::error::Result; use self::schema::to_field; -pub use generated::{ArrowArray, ArrowSchema}; +pub use generated::{ArrowArray, ArrowArrayStream, ArrowSchema}; +pub use stream::ArrowArrayStreamReader; /// Exports an [`Arc`] to the C data interface. /// # Safety diff --git a/src/ffi/stream.rs b/src/ffi/stream.rs new file mode 100644 index 00000000000..d020db0b5ff --- /dev/null +++ b/src/ffi/stream.rs @@ -0,0 +1,129 @@ +use std::ffi::CStr; + +use crate::{array::Array, datatypes::Field, error::ArrowError}; + +use super::{import_array_from_c, import_field_from_c}; +use super::{ArrowArray, ArrowArrayStream, ArrowSchema}; + +impl Drop for ArrowArrayStream { + fn drop(&mut self) { + match self.release { + None => (), + Some(release) => unsafe { release(self) }, + }; + } +} + +impl ArrowArrayStream { + /// Creates an empty [`ArrowArrayStream`] used to import from a producer. + pub fn empty() -> Self { + Self { + get_schema: None, + get_next: None, + get_last_error: None, + release: None, + private_data: std::ptr::null_mut(), + } + } +} + +unsafe fn handle_error(iter: &mut ArrowArrayStream) -> ArrowError { + let error = unsafe { (iter.get_last_error.unwrap())(&mut *iter) }; + + if error.is_null() { + return ArrowError::External( + "C stream".to_string(), + Box::new(ArrowError::ExternalFormat( + "an unspecified error".to_string(), + )), + ); + } + + let error = unsafe { CStr::from_ptr(error) }; + ArrowError::External( + "C stream".to_string(), + Box::new(ArrowError::ExternalFormat( + error.to_str().unwrap().to_string(), + )), + ) +} + +/// Interface for the Arrow C stream interface. Implements an iterator of [`Array`]. +/// +pub struct ArrowArrayStreamReader { + iter: Box, + field: Field, +} + +impl ArrowArrayStreamReader { + /// Returns a new [`ArrowArrayStreamReader`] + /// # Error + /// Errors iff the [`ArrowArrayStream`] is out of specification + /// # Safety + /// This method is intrinsically `unsafe` since it assumes that the `ArrowArrayStream` + /// contains a valid Arrow C stream interface. + /// In particular: + /// * The `ArrowArrayStream` fulfills the invariants of the C stream interface + /// * The schema `get_schema` produces fulfills the C data interface + pub unsafe fn try_new(mut iter: Box) -> Result { + let mut field = Box::new(ArrowSchema::empty()); + + if iter.get_next.is_none() { + return Err(ArrowError::OutOfSpec( + "The C stream MUST contain a non-null get_next".to_string(), + )); + }; + + if iter.get_last_error.is_none() { + return Err(ArrowError::OutOfSpec( + "The C stream MUST contain a non-null get_last_error".to_string(), + )); + }; + + let status = if let Some(f) = iter.get_schema { + unsafe { (f)(&mut *iter, &mut *field) } + } else { + return Err(ArrowError::OutOfSpec( + "The C stream MUST contain a non-null get_schema".to_string(), + )); + }; + + if status != 0 { + return Err(unsafe { handle_error(&mut iter) }); + } + + let field = unsafe { import_field_from_c(&field)? }; + + Ok(Self { iter, field }) + } + + /// Returns the field provided by the stream + pub fn field(&self) -> &Field { + &self.field + } + + /// Advances this iterator by one array + /// # Error + /// Errors iff: + /// * The C stream interface returns an error + /// * The C stream interface returns an invalid array (that we can identify, see Safety below) + /// # Safety + /// Calling this iterator's `next` assumes that the [`ArrowArrayStream`] produces arrow arrays + /// that fulfill the C data interface + pub unsafe fn next(&mut self) -> Option, ArrowError>> { + let mut array = Box::new(ArrowArray::empty()); + let status = unsafe { (self.iter.get_next.unwrap())(&mut *self.iter, &mut *array) }; + + if status != 0 { + return Some(Err(unsafe { handle_error(&mut self.iter) })); + } + + // last paragraph of https://arrow.apache.org/docs/format/CStreamInterface.html#c.ArrowArrayStream.get_next + array.release?; + + // Safety: assumed from the C stream interface + unsafe { import_array_from_c(array, self.field.data_type.clone()) } + .map(Some) + .transpose() + } +}