diff --git a/README.md b/README.md index 8da9509ce39..e54883f6eed 100644 --- a/README.md +++ b/README.md @@ -20,7 +20,8 @@ documentation of each of its APIs. * Most feature-complete implementation of Apache Arrow after the reference implementation (C++) * Float 16 unsupported (not a Rust native type) * Decimal 256 unsupported (not a Rust native type) -* FFI supported for all Arrow types +* C data interface supported for all Arrow types (read and write) +* C stream interface supported for all Arrow types (read and write) * Full interoperability with Rust's `Vec` * MutableArray API to work with bitmaps and arrays in-place * Full support for timestamps with timezones, including arithmetics that take diff --git a/arrow-pyarrow-integration-testing/README.md b/arrow-pyarrow-integration-testing/README.md index 2d1bdca4b94..beb18618e5d 100644 --- a/arrow-pyarrow-integration-testing/README.md +++ b/arrow-pyarrow-integration-testing/README.md @@ -31,7 +31,7 @@ Note that this crate uses two languages and an external ABI: Pyarrow exposes a C ABI to convert arrow arrays from and to its C implementation, see [here](https://arrow.apache.org/docs/format/CDataInterface.html). -This package uses the equivalent struct in Rust (`arrow::array::ArrowArray`), and verifies that +This package uses the equivalent struct in Rust (`arrow::array::InternalArrowArray`), and verifies that we can use pyarrow's interface to move pointers from and to Rust. ## Relevant literature diff --git a/arrow-pyarrow-integration-testing/pyproject.toml b/arrow-pyarrow-integration-testing/pyproject.toml index 27480690e06..5c143694e18 100644 --- a/arrow-pyarrow-integration-testing/pyproject.toml +++ b/arrow-pyarrow-integration-testing/pyproject.toml @@ -16,5 +16,5 @@ # under the License. [build-system] -requires = ["maturin"] +requires = ["maturin>=0.12,<0.13"] build-backend = "maturin" diff --git a/arrow-pyarrow-integration-testing/src/c_stream.rs b/arrow-pyarrow-integration-testing/src/c_stream.rs new file mode 100644 index 00000000000..efdf224c1c6 --- /dev/null +++ b/arrow-pyarrow-integration-testing/src/c_stream.rs @@ -0,0 +1,63 @@ +//! This library demonstrates a minimal usage of Rust's C data interface to pass +//! arrays from and to Python. +use pyo3::ffi::Py_uintptr_t; +use pyo3::prelude::*; + +use arrow2::array::{Int32Array, StructArray}; +use arrow2::datatypes::DataType; +use arrow2::ffi; + +use super::*; + +pub fn to_rust_iterator(ob: PyObject, py: Python) -> PyResult> { + let stream = Box::new(ffi::ArrowArrayStream::empty()); + + let stream_ptr = &*stream as *const ffi::ArrowArrayStream; + + // make the conversion through PyArrow's private API + // this changes the pointer's memory and is thus unsafe. In particular, `_export_to_c` can go out of bounds + ob.call_method1(py, "_export_to_c", (stream_ptr as Py_uintptr_t,))?; + + let mut iter = + unsafe { ffi::ArrowArrayStreamReader::try_new(stream).map_err(PyO3ArrowError::from) }?; + + let mut arrays = vec![]; + while let Some(array) = unsafe { iter.next() } { + let py_array = to_py_array(array.unwrap().into(), py)?; + arrays.push(py_array) + } + Ok(arrays) +} + +pub fn from_rust_iterator(py: Python) -> PyResult { + // initialize an array + let array = Int32Array::from(&[Some(2), None, Some(1), None]); + let array = StructArray::from_data( + DataType::Struct(vec![Field::new("a", array.data_type().clone(), true)]), + vec![Arc::new(array)], + None, + ); + // and a field with its datatype + let field = Field::new("a", array.data_type().clone(), true); + + // Arc it, since it will be shared with an external program + let array: Arc = Arc::new(array.clone()); + + // create an iterator of arrays + let arrays = vec![array.clone(), array.clone(), array]; + let iter = Box::new(arrays.clone().into_iter().map(Ok)) as _; + + // create an [`ArrowArrayStream`] based on this iterator and field + let mut stream = Box::new(ffi::ArrowArrayStream::empty()); + unsafe { ffi::export_iterator(iter, field, &mut *stream) }; + + // call pyarrow's interface to read this stream + let pa = py.import("pyarrow.ipc")?; + let py_stream = pa.getattr("RecordBatchReader")?.call_method1( + "_import_from_c", + ((&*stream as *const ffi::ArrowArrayStream) as Py_uintptr_t,), + )?; + Box::leak(stream); // this is not ideal => the struct should be allocated by pyarrow + + Ok(py_stream.to_object(py)) +} diff --git a/arrow-pyarrow-integration-testing/src/lib.rs b/arrow-pyarrow-integration-testing/src/lib.rs index 3463cab1bee..9b18dab716c 100644 --- a/arrow-pyarrow-integration-testing/src/lib.rs +++ b/arrow-pyarrow-integration-testing/src/lib.rs @@ -1,5 +1,6 @@ //! This library demonstrates a minimal usage of Rust's C data interface to pass //! arrays from and to Python. +mod c_stream; use std::error; use std::fmt; @@ -51,11 +52,11 @@ impl From for PyErr { fn to_rust_array(ob: PyObject, py: Python) -> PyResult> { // prepare a pointer to receive the Array struct - let array = Box::new(ffi::Ffi_ArrowArray::empty()); - let schema = Box::new(ffi::Ffi_ArrowSchema::empty()); + let array = Box::new(ffi::ArrowArray::empty()); + let schema = Box::new(ffi::ArrowSchema::empty()); - let array_ptr = &*array as *const ffi::Ffi_ArrowArray; - let schema_ptr = &*schema as *const ffi::Ffi_ArrowSchema; + let array_ptr = &*array as *const ffi::ArrowArray; + let schema_ptr = &*schema as *const ffi::ArrowSchema; // make the conversion through PyArrow's private API // this changes the pointer's memory and is thus unsafe. In particular, `_export_to_c` can go out of bounds @@ -73,8 +74,8 @@ fn to_rust_array(ob: PyObject, py: Python) -> PyResult> { } fn to_py_array(array: Arc, py: Python) -> PyResult { - let array_ptr = Box::new(ffi::Ffi_ArrowArray::empty()); - let schema_ptr = Box::new(ffi::Ffi_ArrowSchema::empty()); + let array_ptr = Box::new(ffi::ArrowArray::empty()); + let schema_ptr = Box::new(ffi::ArrowSchema::empty()); let array_ptr = Box::into_raw(array_ptr); let schema_ptr = Box::into_raw(schema_ptr); @@ -101,9 +102,9 @@ fn to_py_array(array: Arc, py: Python) -> PyResult { fn to_rust_field(ob: PyObject, py: Python) -> PyResult { // prepare a pointer to receive the Array struct - let schema = Box::new(ffi::Ffi_ArrowSchema::empty()); + let schema = Box::new(ffi::ArrowSchema::empty()); - let schema_ptr = &*schema as *const ffi::Ffi_ArrowSchema; + let schema_ptr = &*schema as *const ffi::ArrowSchema; // make the conversion through PyArrow's private API // this changes the pointer's memory and is thus unsafe. In particular, `_export_to_c` can go out of bounds @@ -115,7 +116,7 @@ fn to_rust_field(ob: PyObject, py: Python) -> PyResult { } fn to_py_field(field: &Field, py: Python) -> PyResult { - let schema_ptr = Box::new(ffi::Ffi_ArrowSchema::empty()); + let schema_ptr = Box::new(ffi::ArrowSchema::empty()); let schema_ptr = Box::into_raw(schema_ptr); unsafe { @@ -153,9 +154,21 @@ fn round_trip_field(array: PyObject, py: Python) -> PyResult { to_py_field(&field, py) } +#[pyfunction] +pub fn to_rust_iterator(ob: PyObject, py: Python) -> PyResult> { + c_stream::to_rust_iterator(ob, py) +} + +#[pyfunction] +pub fn from_rust_iterator(py: Python) -> PyResult { + c_stream::from_rust_iterator(py) +} + #[pymodule] fn arrow_pyarrow_integration_testing(_py: Python, m: &PyModule) -> PyResult<()> { m.add_function(wrap_pyfunction!(round_trip_array, m)?)?; m.add_function(wrap_pyfunction!(round_trip_field, m)?)?; + m.add_function(wrap_pyfunction!(to_rust_iterator, m)?)?; + m.add_function(wrap_pyfunction!(from_rust_iterator, m)?)?; Ok(()) } diff --git a/arrow-pyarrow-integration-testing/tests/test_c_stream.py b/arrow-pyarrow-integration-testing/tests/test_c_stream.py new file mode 100644 index 00000000000..74f3c6f7209 --- /dev/null +++ b/arrow-pyarrow-integration-testing/tests/test_c_stream.py @@ -0,0 +1,29 @@ +import unittest + +import pyarrow.ipc + +import arrow_pyarrow_integration_testing + + +class TestCase(unittest.TestCase): + def test_rust_reads(self): + schema = pyarrow.schema([pyarrow.field("aa", pyarrow.int32())]) + a = pyarrow.array([1, None, 2], type=pyarrow.int32()) + + batch = pyarrow.record_batch([a], schema) + reader = pyarrow.ipc.RecordBatchStreamReader.from_batches(schema, [batch]) + + arrays = arrow_pyarrow_integration_testing.to_rust_iterator(reader) + + array = arrays[0].field(0) + assert array == a + + def test_pyarrow_reads(self): + stream = arrow_pyarrow_integration_testing.from_rust_iterator() + + arrays = [a for a in stream] + + expected = pyarrow.RecordBatch.from_arrays([pyarrow.array([2, None, 1, None], pyarrow.int32())], names=["a"]) + expected = [expected, expected, expected] + + self.assertEqual(arrays, expected) diff --git a/examples/ffi.rs b/examples/ffi.rs index b69689fb867..842161eb96e 100644 --- a/examples/ffi.rs +++ b/examples/ffi.rs @@ -6,18 +6,15 @@ use std::sync::Arc; unsafe fn export( array: Arc, - array_ptr: *mut ffi::Ffi_ArrowArray, - schema_ptr: *mut ffi::Ffi_ArrowSchema, + array_ptr: *mut ffi::ArrowArray, + schema_ptr: *mut ffi::ArrowSchema, ) { let field = Field::new("a", array.data_type().clone(), true); ffi::export_array_to_c(array, array_ptr); ffi::export_field_to_c(&field, schema_ptr); } -unsafe fn import( - array: Box, - schema: &ffi::Ffi_ArrowSchema, -) -> Result> { +unsafe fn import(array: Box, schema: &ffi::ArrowSchema) -> Result> { let field = ffi::import_field_from_c(schema)?; ffi::import_array_from_c(array, field.data_type) } @@ -28,8 +25,8 @@ fn main() -> Result<()> { // the goal is to export this array and import it back via FFI. // to import, we initialize the structs that will receive the data - let array_ptr = Box::new(ffi::Ffi_ArrowArray::empty()); - let schema_ptr = Box::new(ffi::Ffi_ArrowSchema::empty()); + let array_ptr = Box::new(ffi::ArrowArray::empty()); + let schema_ptr = Box::new(ffi::ArrowSchema::empty()); // since FFIs work in raw pointers, let's temporarily relinquish ownership so that producers // can write into it in a thread-safe manner diff --git a/src/buffer/bytes.rs b/src/buffer/bytes.rs index 109302a4816..3e33fc7ce45 100644 --- a/src/buffer/bytes.rs +++ b/src/buffer/bytes.rs @@ -13,7 +13,7 @@ pub enum Deallocation { /// Native deallocation, using Rust deallocator with Arrow-specific memory aligment Native, // Foreign interface, via a callback - Foreign(Arc), + Foreign(Arc), } impl Debug for Deallocation { diff --git a/src/ffi/array.rs b/src/ffi/array.rs index 38894cd5970..abaa0363597 100644 --- a/src/ffi/array.rs +++ b/src/ffi/array.rs @@ -1,9 +1,20 @@ //! Contains functionality to load an ArrayData from the C Data Interface +use std::{ptr::NonNull, sync::Arc}; -use super::ffi::ArrowArrayRef; -use crate::array::{BooleanArray, FromFfi}; -use crate::error::Result; -use crate::{array::*, datatypes::PhysicalType}; +use crate::{ + array::*, + bitmap::{utils::bytes_for, Bitmap}, + buffer::{ + bytes::{Bytes, Deallocation}, + Buffer, + }, + datatypes::{DataType, PhysicalType}, + error::{ArrowError, Result}, + ffi::schema::get_child, + types::NativeType, +}; + +use super::ArrowArray; /// Reads a valid `ffi` interface into a `Box` /// # Errors @@ -35,3 +46,439 @@ pub unsafe fn try_from(array: A) -> Result> { Map => Box::new(MapArray::try_from_ffi(array)?), }) } + +// Sound because the arrow specification does not allow multiple implementations +// to change this struct +// This is intrinsically impossible to prove because the implementations agree +// on this as part of the Arrow specification +unsafe impl Send for ArrowArray {} +unsafe impl Sync for ArrowArray {} + +impl Drop for ArrowArray { + fn drop(&mut self) { + match self.release { + None => (), + Some(release) => unsafe { release(self) }, + }; + } +} + +// callback used to drop [ArrowArray] when it is exported +unsafe extern "C" fn c_release_array(array: *mut ArrowArray) { + if array.is_null() { + return; + } + let array = &mut *array; + + // take ownership of `private_data`, therefore dropping it + let private = Box::from_raw(array.private_data as *mut PrivateData); + for child in private.children_ptr.iter() { + let _ = Box::from_raw(*child); + } + + if let Some(ptr) = private.dictionary_ptr { + let _ = Box::from_raw(ptr); + } + + array.release = None; +} + +#[allow(dead_code)] +struct PrivateData { + array: Arc, + buffers_ptr: Box<[*const std::os::raw::c_void]>, + children_ptr: Box<[*mut ArrowArray]>, + dictionary_ptr: Option<*mut ArrowArray>, +} + +impl ArrowArray { + /// creates a new `ArrowArray` from existing data. + /// # Safety + /// This method releases `buffers`. Consumers of this struct *must* call `release` before + /// releasing this struct, or contents in `buffers` leak. + pub(crate) fn new(array: Arc) -> Self { + let (offset, buffers, children, dictionary) = + offset_buffers_children_dictionary(array.as_ref()); + + let buffers_ptr = buffers + .iter() + .map(|maybe_buffer| match maybe_buffer { + Some(b) => b.as_ptr() as *const std::os::raw::c_void, + None => std::ptr::null(), + }) + .collect::>(); + let n_buffers = buffers.len() as i64; + + let children_ptr = children + .into_iter() + .map(|child| Box::into_raw(Box::new(ArrowArray::new(child)))) + .collect::>(); + let n_children = children_ptr.len() as i64; + + let dictionary_ptr = + dictionary.map(|array| Box::into_raw(Box::new(ArrowArray::new(array)))); + + let length = array.len() as i64; + let null_count = array.null_count() as i64; + + let mut private_data = Box::new(PrivateData { + array, + buffers_ptr, + children_ptr, + dictionary_ptr, + }); + + Self { + length, + null_count, + offset: offset as i64, + n_buffers, + n_children, + buffers: private_data.buffers_ptr.as_mut_ptr(), + children: private_data.children_ptr.as_mut_ptr(), + dictionary: private_data.dictionary_ptr.unwrap_or(std::ptr::null_mut()), + release: Some(c_release_array), + private_data: Box::into_raw(private_data) as *mut ::std::os::raw::c_void, + } + } + + /// creates an empty [`ArrowArray`], which can be used to import data into + pub fn empty() -> Self { + Self { + length: 0, + null_count: 0, + offset: 0, + n_buffers: 0, + n_children: 0, + buffers: std::ptr::null_mut(), + children: std::ptr::null_mut(), + dictionary: std::ptr::null_mut(), + release: None, + private_data: std::ptr::null_mut(), + } + } + + /// the length of the array + pub(crate) fn len(&self) -> usize { + self.length as usize + } + + /// the offset of the array + pub(crate) fn offset(&self) -> usize { + self.offset as usize + } + + /// the null count of the array + pub(crate) fn null_count(&self) -> usize { + self.null_count as usize + } +} + +/// interprets the buffer `index` as a [`Buffer`]. +/// # Safety +/// The caller must guarantee that the buffer `index` corresponds to a buffer of type `T`. +/// This function assumes that the buffer created from FFI is valid; this is impossible to prove. +unsafe fn create_buffer( + array: &ArrowArray, + data_type: &DataType, + deallocation: Deallocation, + index: usize, +) -> Result> { + if array.buffers.is_null() { + return Err(ArrowError::OutOfSpec( + "The array buffers are null".to_string(), + )); + } + + let buffers = array.buffers as *mut *const u8; + + assert!(index < array.n_buffers as usize); + let ptr = *buffers.add(index); + let ptr = NonNull::new(ptr as *mut T); + + let len = buffer_len(array, data_type, index)?; + let offset = buffer_offset(array, data_type, index); + let bytes = ptr + .map(|ptr| Bytes::from_ffi(ptr, len, deallocation)) + .ok_or_else(|| { + ArrowError::OutOfSpec(format!("The buffer at position {} is null", index)) + })?; + + Ok(Buffer::from_bytes(bytes).slice(offset, len - offset)) +} + +/// returns a new buffer corresponding to the index `i` of the FFI array. It may not exist (null pointer). +/// `bits` is the number of bits that the native type of this buffer has. +/// The size of the buffer will be `ceil(self.length * bits, 8)`. +/// # Panic +/// This function panics if `i` is larger or equal to `n_buffers`. +/// # Safety +/// This function assumes that `ceil(self.length * bits, 8)` is the size of the buffer +unsafe fn create_bitmap( + array: &ArrowArray, + deallocation: Deallocation, + index: usize, +) -> Result { + if array.buffers.is_null() { + return Err(ArrowError::OutOfSpec( + "The array buffers are null".to_string(), + )); + } + let len = array.length as usize; + let offset = array.offset as usize; + let buffers = array.buffers as *mut *const u8; + + assert!(index < array.n_buffers as usize); + let ptr = *buffers.add(index); + + let bytes_len = bytes_for(offset + len); + let ptr = NonNull::new(ptr as *mut u8); + let bytes = ptr + .map(|ptr| Bytes::from_ffi(ptr, bytes_len, deallocation)) + .ok_or_else(|| { + ArrowError::OutOfSpec(format!( + "The buffer {} is a null pointer and cannot be interpreted as a bitmap", + index + )) + })?; + + Ok(Bitmap::from_bytes(bytes, offset + len).slice(offset, len)) +} + +fn buffer_offset(array: &ArrowArray, data_type: &DataType, i: usize) -> usize { + use PhysicalType::*; + match (data_type.to_physical_type(), i) { + (LargeUtf8, 2) | (LargeBinary, 2) | (Utf8, 2) | (Binary, 2) => 0, + _ => array.offset as usize, + } +} + +/// Returns the length, in slots, of the buffer `i` (indexed according to the C data interface) +// Rust implementation uses fixed-sized buffers, which require knowledge of their `len`. +// for variable-sized buffers, such as the second buffer of a stringArray, we need +// to fetch offset buffer's len to build the second buffer. +fn buffer_len(array: &ArrowArray, data_type: &DataType, i: usize) -> Result { + Ok(match (data_type.to_physical_type(), i) { + (PhysicalType::FixedSizeBinary, 1) => { + if let DataType::FixedSizeBinary(size) = data_type.to_logical_type() { + *size * (array.offset as usize + array.length as usize) + } else { + unreachable!() + } + } + (PhysicalType::FixedSizeList, 1) => { + if let DataType::FixedSizeList(_, size) = data_type.to_logical_type() { + *size * (array.offset as usize + array.length as usize) + } else { + unreachable!() + } + } + (PhysicalType::Utf8, 1) + | (PhysicalType::LargeUtf8, 1) + | (PhysicalType::Binary, 1) + | (PhysicalType::LargeBinary, 1) + | (PhysicalType::List, 1) + | (PhysicalType::LargeList, 1) + | (PhysicalType::Map, 1) => { + // the len of the offset buffer (buffer 1) equals length + 1 + array.offset as usize + array.length as usize + 1 + } + (PhysicalType::Utf8, 2) | (PhysicalType::Binary, 2) => { + // the len of the data buffer (buffer 2) equals the last value of the offset buffer (buffer 1) + let len = buffer_len(array, data_type, 1)?; + // first buffer is the null buffer => add(1) + let offset_buffer = unsafe { *(array.buffers as *mut *const u8).add(1) }; + // interpret as i32 + let offset_buffer = offset_buffer as *const i32; + // get last offset + + (unsafe { *offset_buffer.add(len - 1) }) as usize + } + (PhysicalType::LargeUtf8, 2) | (PhysicalType::LargeBinary, 2) => { + // the len of the data buffer (buffer 2) equals the last value of the offset buffer (buffer 1) + let len = buffer_len(array, data_type, 1)?; + // first buffer is the null buffer => add(1) + let offset_buffer = unsafe { *(array.buffers as *mut *const u8).add(1) }; + // interpret as i64 + let offset_buffer = offset_buffer as *const i64; + // get last offset + (unsafe { *offset_buffer.add(len - 1) }) as usize + } + // buffer len of primitive types + _ => array.offset as usize + array.length as usize, + }) +} + +fn create_child( + array: &ArrowArray, + field: &DataType, + parent: Arc, + index: usize, +) -> Result> { + let data_type = get_child(field, index)?; + assert!(index < array.n_children as usize); + assert!(!array.children.is_null()); + unsafe { + let arr_ptr = *array.children.add(index); + assert!(!arr_ptr.is_null()); + let arr_ptr = &*arr_ptr; + + Ok(ArrowArrayChild::from_raw(arr_ptr, data_type, parent)) + } +} + +fn create_dictionary( + array: &ArrowArray, + data_type: &DataType, + parent: Arc, +) -> Result>> { + if let DataType::Dictionary(_, values, _) = data_type { + let data_type = values.as_ref().clone(); + assert!(!array.dictionary.is_null()); + let array = unsafe { &*array.dictionary }; + Ok(Some(ArrowArrayChild::from_raw(array, data_type, parent))) + } else { + Ok(None) + } +} + +pub trait ArrowArrayRef: std::fmt::Debug { + fn deallocation(&self) -> Deallocation { + Deallocation::Foreign(self.parent().clone()) + } + + /// returns the null bit buffer. + /// Rust implementation uses a buffer that is not part of the array of buffers. + /// The C Data interface's null buffer is part of the array of buffers. + /// # Safety + /// The caller must guarantee that the buffer `index` corresponds to a bitmap. + /// This function assumes that the bitmap created from FFI is valid; this is impossible to prove. + unsafe fn validity(&self) -> Result> { + if self.array().null_count() == 0 { + Ok(None) + } else { + create_bitmap(self.array(), self.deallocation(), 0).map(Some) + } + } + + /// # Safety + /// The caller must guarantee that the buffer `index` corresponds to a bitmap. + /// This function assumes that the bitmap created from FFI is valid; this is impossible to prove. + unsafe fn buffer(&self, index: usize) -> Result> { + create_buffer::(self.array(), self.data_type(), self.deallocation(), index) + } + + /// # Safety + /// The caller must guarantee that the buffer `index` corresponds to a bitmap. + /// This function assumes that the bitmap created from FFI is valid; this is impossible to prove. + unsafe fn bitmap(&self, index: usize) -> Result { + // +1 to ignore null bitmap + create_bitmap(self.array(), self.deallocation(), index) + } + + /// # Safety + /// The caller must guarantee that the child `index` is valid per c data interface. + unsafe fn child(&self, index: usize) -> Result { + create_child(self.array(), self.data_type(), self.parent().clone(), index) + } + + fn dictionary(&self) -> Result> { + create_dictionary(self.array(), self.data_type(), self.parent().clone()) + } + + fn n_buffers(&self) -> usize; + + fn parent(&self) -> &Arc; + fn array(&self) -> &ArrowArray; + fn data_type(&self) -> &DataType; +} + +/// Struct used to move an Array from and to the C Data Interface. +/// Its main responsibility is to expose functionality that requires +/// both [ArrowArray] and [ArrowSchema]. +/// +/// This struct has two main paths: +/// +/// ## Import from the C Data Interface +/// * [InternalArrowArray::empty] to allocate memory to be filled by an external call +/// * [InternalArrowArray::try_from_raw] to consume two non-null allocated pointers +/// ## Export to the C Data Interface +/// * [InternalArrowArray::try_new] to create a new [InternalArrowArray] from Rust-specific information +/// * [InternalArrowArray::into_raw] to expose two pointers for [ArrowArray] and [ArrowSchema]. +/// +/// # Safety +/// Whoever creates this struct is responsible for releasing their resources. Specifically, +/// consumers *must* call [InternalArrowArray::into_raw] and take ownership of the individual pointers, +/// calling [ArrowArray::release] and [ArrowSchema::release] accordingly. +/// +/// Furthermore, this struct assumes that the incoming data agrees with the C data interface. +#[derive(Debug)] +pub struct InternalArrowArray { + array: Box, + data_type: DataType, +} + +impl InternalArrowArray { + pub fn new(array: Box, data_type: DataType) -> Self { + Self { array, data_type } + } +} + +impl ArrowArrayRef for Arc { + /// the data_type as declared in the schema + fn data_type(&self) -> &DataType { + &self.data_type + } + + fn parent(&self) -> &Arc { + self + } + + fn array(&self) -> &ArrowArray { + self.array.as_ref() + } + + fn n_buffers(&self) -> usize { + self.array.n_buffers as usize + } +} + +#[derive(Debug)] +pub struct ArrowArrayChild<'a> { + array: &'a ArrowArray, + data_type: DataType, + parent: Arc, +} + +impl<'a> ArrowArrayRef for ArrowArrayChild<'a> { + /// the data_type as declared in the schema + fn data_type(&self) -> &DataType { + &self.data_type + } + + fn parent(&self) -> &Arc { + &self.parent + } + + fn array(&self) -> &ArrowArray { + self.array + } + + fn n_buffers(&self) -> usize { + self.array.n_buffers as usize + } +} + +impl<'a> ArrowArrayChild<'a> { + fn from_raw( + array: &'a ArrowArray, + data_type: DataType, + parent: Arc, + ) -> Self { + Self { + array, + data_type, + parent, + } + } +} diff --git a/src/ffi/ffi.rs b/src/ffi/ffi.rs deleted file mode 100644 index e6de2d594a1..00000000000 --- a/src/ffi/ffi.rs +++ /dev/null @@ -1,486 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use std::{ptr::NonNull, sync::Arc}; - -use crate::{ - array::{offset_buffers_children_dictionary, Array}, - bitmap::{utils::bytes_for, Bitmap}, - buffer::{ - bytes::{Bytes, Deallocation}, - Buffer, - }, - datatypes::{DataType, PhysicalType}, - error::{ArrowError, Result}, - ffi::schema::get_child, - types::NativeType, -}; - -/// ABI-compatible struct for ArrowArray from C Data Interface -/// See -/// This was created by bindgen -#[repr(C)] -#[derive(Debug, Clone)] -pub struct Ffi_ArrowArray { - pub(crate) length: i64, - pub(crate) null_count: i64, - pub(crate) offset: i64, - pub(crate) n_buffers: i64, - pub(crate) n_children: i64, - pub(crate) buffers: *mut *const ::std::os::raw::c_void, - children: *mut *mut Ffi_ArrowArray, - dictionary: *mut Ffi_ArrowArray, - release: ::std::option::Option, - // When exported, this MUST contain everything that is owned by this array. - // for example, any buffer pointed to in `buffers` must be here, as well as the `buffers` pointer - // itself. - // In other words, everything in [Ffi_ArrowArray] must be owned by `private_data` and can assume - // that they do not outlive `private_data`. - private_data: *mut ::std::os::raw::c_void, -} - -// Sound because the arrow specification does not allow multiple implementations -// to change this struct -// This is intrinsically impossible to prove because the implementations agree -// on this as part of the Arrow specification -unsafe impl Send for Ffi_ArrowArray {} -unsafe impl Sync for Ffi_ArrowArray {} - -impl Drop for Ffi_ArrowArray { - fn drop(&mut self) { - match self.release { - None => (), - Some(release) => unsafe { release(self) }, - }; - } -} - -// callback used to drop [Ffi_ArrowArray] when it is exported -unsafe extern "C" fn c_release_array(array: *mut Ffi_ArrowArray) { - if array.is_null() { - return; - } - let array = &mut *array; - - // take ownership of `private_data`, therefore dropping it - let private = Box::from_raw(array.private_data as *mut PrivateData); - for child in private.children_ptr.iter() { - let _ = Box::from_raw(*child); - } - - if let Some(ptr) = private.dictionary_ptr { - let _ = Box::from_raw(ptr); - } - - array.release = None; -} - -#[allow(dead_code)] -struct PrivateData { - array: Arc, - buffers_ptr: Box<[*const std::os::raw::c_void]>, - children_ptr: Box<[*mut Ffi_ArrowArray]>, - dictionary_ptr: Option<*mut Ffi_ArrowArray>, -} - -impl Ffi_ArrowArray { - /// creates a new `Ffi_ArrowArray` from existing data. - /// # Safety - /// This method releases `buffers`. Consumers of this struct *must* call `release` before - /// releasing this struct, or contents in `buffers` leak. - pub(crate) fn new(array: Arc) -> Self { - let (offset, buffers, children, dictionary) = - offset_buffers_children_dictionary(array.as_ref()); - - let buffers_ptr = buffers - .iter() - .map(|maybe_buffer| match maybe_buffer { - Some(b) => b.as_ptr() as *const std::os::raw::c_void, - None => std::ptr::null(), - }) - .collect::>(); - let n_buffers = buffers.len() as i64; - - let children_ptr = children - .into_iter() - .map(|child| Box::into_raw(Box::new(Ffi_ArrowArray::new(child)))) - .collect::>(); - let n_children = children_ptr.len() as i64; - - let dictionary_ptr = - dictionary.map(|array| Box::into_raw(Box::new(Ffi_ArrowArray::new(array)))); - - let length = array.len() as i64; - let null_count = array.null_count() as i64; - - let mut private_data = Box::new(PrivateData { - array, - buffers_ptr, - children_ptr, - dictionary_ptr, - }); - - Self { - length, - null_count, - offset: offset as i64, - n_buffers, - n_children, - buffers: private_data.buffers_ptr.as_mut_ptr(), - children: private_data.children_ptr.as_mut_ptr(), - dictionary: private_data.dictionary_ptr.unwrap_or(std::ptr::null_mut()), - release: Some(c_release_array), - private_data: Box::into_raw(private_data) as *mut ::std::os::raw::c_void, - } - } - - /// creates an empty [`Ffi_ArrowArray`], which can be used to import data into - pub fn empty() -> Self { - Self { - length: 0, - null_count: 0, - offset: 0, - n_buffers: 0, - n_children: 0, - buffers: std::ptr::null_mut(), - children: std::ptr::null_mut(), - dictionary: std::ptr::null_mut(), - release: None, - private_data: std::ptr::null_mut(), - } - } - - /// the length of the array - pub(crate) fn len(&self) -> usize { - self.length as usize - } - - /// the offset of the array - pub(crate) fn offset(&self) -> usize { - self.offset as usize - } - - /// the null count of the array - pub(crate) fn null_count(&self) -> usize { - self.null_count as usize - } -} - -/// interprets the buffer `index` as a [`Buffer`]. -/// # Safety -/// The caller must guarantee that the buffer `index` corresponds to a buffer of type `T`. -/// This function assumes that the buffer created from FFI is valid; this is impossible to prove. -unsafe fn create_buffer( - array: &Ffi_ArrowArray, - data_type: &DataType, - deallocation: Deallocation, - index: usize, -) -> Result> { - if array.buffers.is_null() { - return Err(ArrowError::OutOfSpec( - "The array buffers are null".to_string(), - )); - } - - let buffers = array.buffers as *mut *const u8; - - assert!(index < array.n_buffers as usize); - let ptr = *buffers.add(index); - let ptr = NonNull::new(ptr as *mut T); - - let len = buffer_len(array, data_type, index)?; - let offset = buffer_offset(array, data_type, index); - let bytes = ptr - .map(|ptr| Bytes::from_ffi(ptr, len, deallocation)) - .ok_or_else(|| { - ArrowError::OutOfSpec(format!("The buffer at position {} is null", index)) - })?; - - Ok(Buffer::from_bytes(bytes).slice(offset, len - offset)) -} - -/// returns a new buffer corresponding to the index `i` of the FFI array. It may not exist (null pointer). -/// `bits` is the number of bits that the native type of this buffer has. -/// The size of the buffer will be `ceil(self.length * bits, 8)`. -/// # Panic -/// This function panics if `i` is larger or equal to `n_buffers`. -/// # Safety -/// This function assumes that `ceil(self.length * bits, 8)` is the size of the buffer -unsafe fn create_bitmap( - array: &Ffi_ArrowArray, - deallocation: Deallocation, - index: usize, -) -> Result { - if array.buffers.is_null() { - return Err(ArrowError::OutOfSpec( - "The array buffers are null".to_string(), - )); - } - let len = array.length as usize; - let offset = array.offset as usize; - let buffers = array.buffers as *mut *const u8; - - assert!(index < array.n_buffers as usize); - let ptr = *buffers.add(index); - - let bytes_len = bytes_for(offset + len); - let ptr = NonNull::new(ptr as *mut u8); - let bytes = ptr - .map(|ptr| Bytes::from_ffi(ptr, bytes_len, deallocation)) - .ok_or_else(|| { - ArrowError::OutOfSpec(format!( - "The buffer {} is a null pointer and cannot be interpreted as a bitmap", - index - )) - })?; - - Ok(Bitmap::from_bytes(bytes, offset + len).slice(offset, len)) -} - -fn buffer_offset(array: &Ffi_ArrowArray, data_type: &DataType, i: usize) -> usize { - use PhysicalType::*; - match (data_type.to_physical_type(), i) { - (LargeUtf8, 2) | (LargeBinary, 2) | (Utf8, 2) | (Binary, 2) => 0, - _ => array.offset as usize, - } -} - -/// Returns the length, in slots, of the buffer `i` (indexed according to the C data interface) -// Rust implementation uses fixed-sized buffers, which require knowledge of their `len`. -// for variable-sized buffers, such as the second buffer of a stringArray, we need -// to fetch offset buffer's len to build the second buffer. -fn buffer_len(array: &Ffi_ArrowArray, data_type: &DataType, i: usize) -> Result { - Ok(match (data_type.to_physical_type(), i) { - (PhysicalType::FixedSizeBinary, 1) => { - if let DataType::FixedSizeBinary(size) = data_type.to_logical_type() { - *size * (array.offset as usize + array.length as usize) - } else { - unreachable!() - } - } - (PhysicalType::FixedSizeList, 1) => { - if let DataType::FixedSizeList(_, size) = data_type.to_logical_type() { - *size * (array.offset as usize + array.length as usize) - } else { - unreachable!() - } - } - (PhysicalType::Utf8, 1) - | (PhysicalType::LargeUtf8, 1) - | (PhysicalType::Binary, 1) - | (PhysicalType::LargeBinary, 1) - | (PhysicalType::List, 1) - | (PhysicalType::LargeList, 1) - | (PhysicalType::Map, 1) => { - // the len of the offset buffer (buffer 1) equals length + 1 - array.offset as usize + array.length as usize + 1 - } - (PhysicalType::Utf8, 2) | (PhysicalType::Binary, 2) => { - // the len of the data buffer (buffer 2) equals the last value of the offset buffer (buffer 1) - let len = buffer_len(array, data_type, 1)?; - // first buffer is the null buffer => add(1) - let offset_buffer = unsafe { *(array.buffers as *mut *const u8).add(1) }; - // interpret as i32 - let offset_buffer = offset_buffer as *const i32; - // get last offset - - (unsafe { *offset_buffer.add(len - 1) }) as usize - } - (PhysicalType::LargeUtf8, 2) | (PhysicalType::LargeBinary, 2) => { - // the len of the data buffer (buffer 2) equals the last value of the offset buffer (buffer 1) - let len = buffer_len(array, data_type, 1)?; - // first buffer is the null buffer => add(1) - let offset_buffer = unsafe { *(array.buffers as *mut *const u8).add(1) }; - // interpret as i64 - let offset_buffer = offset_buffer as *const i64; - // get last offset - (unsafe { *offset_buffer.add(len - 1) }) as usize - } - // buffer len of primitive types - _ => array.offset as usize + array.length as usize, - }) -} - -fn create_child( - array: &Ffi_ArrowArray, - field: &DataType, - parent: Arc, - index: usize, -) -> Result> { - let data_type = get_child(field, index)?; - assert!(index < array.n_children as usize); - assert!(!array.children.is_null()); - unsafe { - let arr_ptr = *array.children.add(index); - assert!(!arr_ptr.is_null()); - let arr_ptr = &*arr_ptr; - - Ok(ArrowArrayChild::from_raw(arr_ptr, data_type, parent)) - } -} - -fn create_dictionary( - array: &Ffi_ArrowArray, - data_type: &DataType, - parent: Arc, -) -> Result>> { - if let DataType::Dictionary(_, values, _) = data_type { - let data_type = values.as_ref().clone(); - assert!(!array.dictionary.is_null()); - let array = unsafe { &*array.dictionary }; - Ok(Some(ArrowArrayChild::from_raw(array, data_type, parent))) - } else { - Ok(None) - } -} - -pub trait ArrowArrayRef: std::fmt::Debug { - fn deallocation(&self) -> Deallocation { - Deallocation::Foreign(self.parent().clone()) - } - - /// returns the null bit buffer. - /// Rust implementation uses a buffer that is not part of the array of buffers. - /// The C Data interface's null buffer is part of the array of buffers. - /// # Safety - /// The caller must guarantee that the buffer `index` corresponds to a bitmap. - /// This function assumes that the bitmap created from FFI is valid; this is impossible to prove. - unsafe fn validity(&self) -> Result> { - if self.array().null_count() == 0 { - Ok(None) - } else { - create_bitmap(self.array(), self.deallocation(), 0).map(Some) - } - } - - /// # Safety - /// The caller must guarantee that the buffer `index` corresponds to a bitmap. - /// This function assumes that the bitmap created from FFI is valid; this is impossible to prove. - unsafe fn buffer(&self, index: usize) -> Result> { - create_buffer::(self.array(), self.data_type(), self.deallocation(), index) - } - - /// # Safety - /// The caller must guarantee that the buffer `index` corresponds to a bitmap. - /// This function assumes that the bitmap created from FFI is valid; this is impossible to prove. - unsafe fn bitmap(&self, index: usize) -> Result { - // +1 to ignore null bitmap - create_bitmap(self.array(), self.deallocation(), index) - } - - /// # Safety - /// The caller must guarantee that the child `index` is valid per c data interface. - unsafe fn child(&self, index: usize) -> Result { - create_child(self.array(), self.data_type(), self.parent().clone(), index) - } - - fn dictionary(&self) -> Result> { - create_dictionary(self.array(), self.data_type(), self.parent().clone()) - } - - fn n_buffers(&self) -> usize; - - fn parent(&self) -> &Arc; - fn array(&self) -> &Ffi_ArrowArray; - fn data_type(&self) -> &DataType; -} - -/// Struct used to move an Array from and to the C Data Interface. -/// Its main responsibility is to expose functionality that requires -/// both [Ffi_ArrowArray] and [Ffi_ArrowSchema]. -/// -/// This struct has two main paths: -/// -/// ## Import from the C Data Interface -/// * [ArrowArray::empty] to allocate memory to be filled by an external call -/// * [ArrowArray::try_from_raw] to consume two non-null allocated pointers -/// ## Export to the C Data Interface -/// * [ArrowArray::try_new] to create a new [ArrowArray] from Rust-specific information -/// * [ArrowArray::into_raw] to expose two pointers for [Ffi_ArrowArray] and [Ffi_ArrowSchema]. -/// -/// # Safety -/// Whoever creates this struct is responsible for releasing their resources. Specifically, -/// consumers *must* call [ArrowArray::into_raw] and take ownership of the individual pointers, -/// calling [Ffi_ArrowArray::release] and [Ffi_ArrowSchema::release] accordingly. -/// -/// Furthermore, this struct assumes that the incoming data agrees with the C data interface. -#[derive(Debug)] -pub struct ArrowArray { - array: Box, - data_type: DataType, -} - -impl ArrowArray { - pub fn new(array: Box, data_type: DataType) -> Self { - Self { array, data_type } - } -} - -impl ArrowArrayRef for Arc { - /// the data_type as declared in the schema - fn data_type(&self) -> &DataType { - &self.data_type - } - - fn parent(&self) -> &Arc { - self - } - - fn array(&self) -> &Ffi_ArrowArray { - self.array.as_ref() - } - - fn n_buffers(&self) -> usize { - self.array.n_buffers as usize - } -} - -#[derive(Debug)] -pub struct ArrowArrayChild<'a> { - array: &'a Ffi_ArrowArray, - data_type: DataType, - parent: Arc, -} - -impl<'a> ArrowArrayRef for ArrowArrayChild<'a> { - /// the data_type as declared in the schema - fn data_type(&self) -> &DataType { - &self.data_type - } - - fn parent(&self) -> &Arc { - &self.parent - } - - fn array(&self) -> &Ffi_ArrowArray { - self.array - } - - fn n_buffers(&self) -> usize { - self.array.n_buffers as usize - } -} - -impl<'a> ArrowArrayChild<'a> { - fn from_raw(array: &'a Ffi_ArrowArray, data_type: DataType, parent: Arc) -> Self { - Self { - array, - data_type, - parent, - } - } -} diff --git a/src/ffi/generated.rs b/src/ffi/generated.rs new file mode 100644 index 00000000000..0fc00e5300e --- /dev/null +++ b/src/ffi/generated.rs @@ -0,0 +1,55 @@ +/* automatically generated by rust-bindgen 0.59.2 */ + +/// ABI-compatible struct for [`ArrowSchema`](https://arrow.apache.org/docs/format/CDataInterface.html#structure-definitions) +#[repr(C)] +#[derive(Debug, Clone)] +pub struct ArrowSchema { + pub(super) format: *const ::std::os::raw::c_char, + pub(super) name: *const ::std::os::raw::c_char, + pub(super) metadata: *const ::std::os::raw::c_char, + pub(super) flags: i64, + pub(super) n_children: i64, + pub(super) children: *mut *mut ArrowSchema, + pub(super) dictionary: *mut ArrowSchema, + pub(super) release: ::std::option::Option, + pub(super) private_data: *mut ::std::os::raw::c_void, +} + +/// ABI-compatible struct for [`ArrowArray`](https://arrow.apache.org/docs/format/CDataInterface.html#structure-definitions) +#[repr(C)] +#[derive(Debug, Clone)] +pub struct ArrowArray { + pub(super) length: i64, + pub(super) null_count: i64, + pub(super) offset: i64, + pub(super) n_buffers: i64, + pub(super) n_children: i64, + pub(super) buffers: *mut *const ::std::os::raw::c_void, + pub(super) children: *mut *mut ArrowArray, + pub(super) dictionary: *mut ArrowArray, + pub(super) release: ::std::option::Option, + pub(super) private_data: *mut ::std::os::raw::c_void, +} + +/// ABI-compatible struct for [`ArrowArrayStream`](https://arrow.apache.org/docs/format/CStreamInterface.html). +#[repr(C)] +#[derive(Debug, Clone)] +pub struct ArrowArrayStream { + pub(super) get_schema: ::std::option::Option< + unsafe extern "C" fn( + arg1: *mut ArrowArrayStream, + out: *mut ArrowSchema, + ) -> ::std::os::raw::c_int, + >, + pub(super) get_next: ::std::option::Option< + unsafe extern "C" fn( + arg1: *mut ArrowArrayStream, + out: *mut ArrowArray, + ) -> ::std::os::raw::c_int, + >, + pub(super) get_last_error: ::std::option::Option< + unsafe extern "C" fn(arg1: *mut ArrowArrayStream) -> *const ::std::os::raw::c_char, + >, + pub(super) release: ::std::option::Option, + pub(super) private_data: *mut ::std::os::raw::c_void, +} diff --git a/src/ffi/mod.rs b/src/ffi/mod.rs index 1b32c0cfa52..f3d689b7f8c 100644 --- a/src/ffi/mod.rs +++ b/src/ffi/mod.rs @@ -2,12 +2,12 @@ //! Arrow's [C Data Interface](https://arrow.apache.org/docs/format/CDataInterface.html) mod array; mod bridge; -#[allow(clippy::module_inception)] -mod ffi; +mod generated; mod schema; +mod stream; pub(crate) use array::try_from; -pub(crate) use ffi::{ArrowArray, ArrowArrayRef}; +pub(crate) use array::{ArrowArrayRef, InternalArrowArray}; use std::sync::Arc; @@ -15,42 +15,42 @@ use crate::array::Array; use crate::datatypes::{DataType, Field}; use crate::error::Result; -pub use ffi::Ffi_ArrowArray; -pub use schema::Ffi_ArrowSchema; - use self::schema::to_field; +pub use generated::{ArrowArray, ArrowArrayStream, ArrowSchema}; +pub use stream::{export_iterator, ArrowArrayStreamReader}; + /// Exports an [`Arc`] to the C data interface. /// # Safety /// The pointer `ptr` must be allocated and valid -pub unsafe fn export_array_to_c(array: Arc, ptr: *mut Ffi_ArrowArray) { +pub unsafe fn export_array_to_c(array: Arc, ptr: *mut ArrowArray) { let array = bridge::align_to_c_data_interface(array); - *ptr = Ffi_ArrowArray::new(array); + std::ptr::write_unaligned(ptr, ArrowArray::new(array)); } /// Exports a [`Field`] to the C data interface. /// # Safety /// The pointer `ptr` must be allocated and valid -pub unsafe fn export_field_to_c(field: &Field, ptr: *mut Ffi_ArrowSchema) { - *ptr = Ffi_ArrowSchema::new(field) +pub unsafe fn export_field_to_c(field: &Field, ptr: *mut ArrowSchema) { + std::ptr::write_unaligned(ptr, ArrowSchema::new(field)); } /// Imports a [`Field`] from the C data interface. /// # Safety -/// This function is intrinsically `unsafe` and relies on a [`Ffi_ArrowSchema`] +/// This function is intrinsically `unsafe` and relies on a [`ArrowSchema`] /// valid according to the [C data interface](https://arrow.apache.org/docs/format/CDataInterface.html) (FFI). -pub unsafe fn import_field_from_c(field: &Ffi_ArrowSchema) -> Result { +pub unsafe fn import_field_from_c(field: &ArrowSchema) -> Result { to_field(field) } /// Imports an [`Array`] from the C data interface. /// # Safety -/// This function is intrinsically `unsafe` and relies on a [`Ffi_ArrowArray`] +/// This function is intrinsically `unsafe` and relies on a [`ArrowArray`] /// valid according to the [C data interface](https://arrow.apache.org/docs/format/CDataInterface.html) (FFI). pub unsafe fn import_array_from_c( - array: Box, + array: Box, data_type: DataType, ) -> Result> { - try_from(Arc::new(ArrowArray::new(array, data_type))) + try_from(Arc::new(InternalArrowArray::new(array, data_type))) } diff --git a/src/ffi/schema.rs b/src/ffi/schema.rs index c00a765672a..a65d5568b7a 100644 --- a/src/ffi/schema.rs +++ b/src/ffi/schema.rs @@ -7,34 +7,19 @@ use crate::{ error::{ArrowError, Result}, }; +use super::ArrowSchema; + #[allow(dead_code)] struct SchemaPrivateData { name: CString, format: CString, metadata: Option>, - children_ptr: Box<[*mut Ffi_ArrowSchema]>, - dictionary: Option<*mut Ffi_ArrowSchema>, -} - -/// ABI-compatible struct for `ArrowSchema` from C Data Interface -/// See -// This was created by bindgen -#[repr(C)] -#[derive(Debug)] -pub struct Ffi_ArrowSchema { - format: *const ::std::os::raw::c_char, - name: *const ::std::os::raw::c_char, - metadata: *const ::std::os::raw::c_char, - flags: i64, - n_children: i64, - children: *mut *mut Ffi_ArrowSchema, - dictionary: *mut Ffi_ArrowSchema, - release: ::std::option::Option, - private_data: *mut ::std::os::raw::c_void, + children_ptr: Box<[*mut ArrowSchema]>, + dictionary: Option<*mut ArrowSchema>, } -// callback used to drop [Ffi_ArrowSchema] when it is exported. -unsafe extern "C" fn c_release_schema(schema: *mut Ffi_ArrowSchema) { +// callback used to drop [ArrowSchema] when it is exported. +unsafe extern "C" fn c_release_schema(schema: *mut ArrowSchema) { if schema.is_null() { return; } @@ -52,8 +37,8 @@ unsafe extern "C" fn c_release_schema(schema: *mut Ffi_ArrowSchema) { schema.release = None; } -impl Ffi_ArrowSchema { - /// creates a new [Ffi_ArrowSchema] +impl ArrowSchema { + /// creates a new [ArrowSchema] pub(crate) fn new(field: &Field) -> Self { let format = to_format(field.data_type()); let name = field.name.clone(); @@ -63,25 +48,25 @@ impl Ffi_ArrowSchema { // allocate (and hold) the children let children_vec = match field.data_type() { DataType::List(field) => { - vec![Box::new(Ffi_ArrowSchema::new(field.as_ref()))] + vec![Box::new(ArrowSchema::new(field.as_ref()))] } DataType::FixedSizeList(field, _) => { - vec![Box::new(Ffi_ArrowSchema::new(field.as_ref()))] + vec![Box::new(ArrowSchema::new(field.as_ref()))] } DataType::LargeList(field) => { - vec![Box::new(Ffi_ArrowSchema::new(field.as_ref()))] + vec![Box::new(ArrowSchema::new(field.as_ref()))] } DataType::Map(field, is_sorted) => { flags += (*is_sorted as i64) * 4; - vec![Box::new(Ffi_ArrowSchema::new(field.as_ref()))] + vec![Box::new(ArrowSchema::new(field.as_ref()))] } DataType::Struct(fields) => fields .iter() - .map(|field| Box::new(Ffi_ArrowSchema::new(field))) + .map(|field| Box::new(ArrowSchema::new(field))) .collect::>(), DataType::Union(fields, _, _) => fields .iter() - .map(|field| Box::new(Ffi_ArrowSchema::new(field))) + .map(|field| Box::new(ArrowSchema::new(field))) .collect::>(), _ => vec![], }; @@ -96,7 +81,7 @@ impl Ffi_ArrowSchema { flags += *is_ordered as i64; // we do not store field info in the dict values, so can't recover it all :( let field = Field::new("", values.as_ref().clone(), true); - Some(Box::new(Ffi_ArrowSchema::new(&field))) + Some(Box::new(ArrowSchema::new(&field))) } else { None }; @@ -153,7 +138,7 @@ impl Ffi_ArrowSchema { } } - /// create an empty [Ffi_ArrowSchema] + /// create an empty [ArrowSchema] pub fn empty() -> Self { Self { format: std::ptr::null_mut(), @@ -202,7 +187,7 @@ impl Ffi_ArrowSchema { } } -impl Drop for Ffi_ArrowSchema { +impl Drop for ArrowSchema { fn drop(&mut self) { match self.release { None => (), @@ -211,7 +196,7 @@ impl Drop for Ffi_ArrowSchema { } } -pub(crate) unsafe fn to_field(schema: &Ffi_ArrowSchema) -> Result { +pub(crate) unsafe fn to_field(schema: &ArrowSchema) -> Result { let dictionary = schema.dictionary(); let data_type = if let Some(dictionary) = dictionary { let indices = to_integer_type(schema.format())?; @@ -251,7 +236,7 @@ fn to_integer_type(format: &str) -> Result { }) } -unsafe fn to_data_type(schema: &Ffi_ArrowSchema) -> Result { +unsafe fn to_data_type(schema: &ArrowSchema) -> Result { Ok(match schema.format() { "n" => DataType::Null, "b" => DataType::Boolean, diff --git a/src/ffi/stream.rs b/src/ffi/stream.rs new file mode 100644 index 00000000000..6a316a6847c --- /dev/null +++ b/src/ffi/stream.rs @@ -0,0 +1,223 @@ +use std::ffi::{CStr, CString}; +use std::sync::Arc; + +use crate::{array::Array, datatypes::Field, error::ArrowError}; + +use super::{export_array_to_c, export_field_to_c, import_array_from_c, import_field_from_c}; +use super::{ArrowArray, ArrowArrayStream, ArrowSchema}; + +impl Drop for ArrowArrayStream { + fn drop(&mut self) { + match self.release { + None => (), + Some(release) => unsafe { release(self) }, + }; + } +} + +impl ArrowArrayStream { + /// Creates an empty [`ArrowArrayStream`] used to import from a producer. + pub fn empty() -> Self { + Self { + get_schema: None, + get_next: None, + get_last_error: None, + release: None, + private_data: std::ptr::null_mut(), + } + } +} + +unsafe fn handle_error(iter: &mut ArrowArrayStream) -> ArrowError { + let error = unsafe { (iter.get_last_error.unwrap())(&mut *iter) }; + + if error.is_null() { + return ArrowError::External( + "C stream".to_string(), + Box::new(ArrowError::ExternalFormat( + "an unspecified error".to_string(), + )), + ); + } + + let error = unsafe { CStr::from_ptr(error) }; + ArrowError::External( + "C stream".to_string(), + Box::new(ArrowError::ExternalFormat( + error.to_str().unwrap().to_string(), + )), + ) +} + +/// Implements an iterator of [`Array`] consumed from the [C stream interface](https://arrow.apache.org/docs/format/CStreamInterface.html). +pub struct ArrowArrayStreamReader { + iter: Box, + field: Field, +} + +impl ArrowArrayStreamReader { + /// Returns a new [`ArrowArrayStreamReader`] + /// # Error + /// Errors iff the [`ArrowArrayStream`] is out of specification + /// # Safety + /// This method is intrinsically `unsafe` since it assumes that the `ArrowArrayStream` + /// contains a valid Arrow C stream interface. + /// In particular: + /// * The `ArrowArrayStream` fulfills the invariants of the C stream interface + /// * The schema `get_schema` produces fulfills the C data interface + pub unsafe fn try_new(mut iter: Box) -> Result { + let mut field = Box::new(ArrowSchema::empty()); + + if iter.get_next.is_none() { + return Err(ArrowError::OutOfSpec( + "The C stream MUST contain a non-null get_next".to_string(), + )); + }; + + if iter.get_last_error.is_none() { + return Err(ArrowError::OutOfSpec( + "The C stream MUST contain a non-null get_last_error".to_string(), + )); + }; + + let status = if let Some(f) = iter.get_schema { + unsafe { (f)(&mut *iter, &mut *field) } + } else { + return Err(ArrowError::OutOfSpec( + "The C stream MUST contain a non-null get_schema".to_string(), + )); + }; + + if status != 0 { + return Err(unsafe { handle_error(&mut iter) }); + } + + let field = unsafe { import_field_from_c(&field)? }; + + Ok(Self { iter, field }) + } + + /// Returns the field provided by the stream + pub fn field(&self) -> &Field { + &self.field + } + + /// Advances this iterator by one array + /// # Error + /// Errors iff: + /// * The C stream interface returns an error + /// * The C stream interface returns an invalid array (that we can identify, see Safety below) + /// # Safety + /// Calling this iterator's `next` assumes that the [`ArrowArrayStream`] produces arrow arrays + /// that fulfill the C data interface + pub unsafe fn next(&mut self) -> Option, ArrowError>> { + let mut array = Box::new(ArrowArray::empty()); + let status = unsafe { (self.iter.get_next.unwrap())(&mut *self.iter, &mut *array) }; + + if status != 0 { + return Some(Err(unsafe { handle_error(&mut self.iter) })); + } + + // last paragraph of https://arrow.apache.org/docs/format/CStreamInterface.html#c.ArrowArrayStream.get_next + array.release?; + + // Safety: assumed from the C stream interface + unsafe { import_array_from_c(array, self.field.data_type.clone()) } + .map(Some) + .transpose() + } +} + +struct PrivateData { + iter: Box, ArrowError>>>, + field: Field, + error: Option, +} + +unsafe extern "C" fn get_next(iter: *mut ArrowArrayStream, array: *mut ArrowArray) -> i32 { + if iter.is_null() { + return 2001; + } + let mut private = &mut *((*iter).private_data as *mut PrivateData); + + match private.iter.next() { + Some(Ok(item)) => { + // check that the array has the same data_type as field + let item_dt = item.data_type(); + let expected_dt = private.field.data_type(); + if item_dt != expected_dt { + private.error = Some(CString::new(format!("The iterator produced an item of data type {item_dt:?} but the producer expects data type {expected_dt:?}").as_bytes().to_vec()).unwrap()); + return 2001; // custom application specific error (since this is never a result of this interface) + } + + export_array_to_c(item, array); + private.error = None; + 0 + } + Some(Err(err)) => { + private.error = Some(CString::new(err.to_string().as_bytes().to_vec()).unwrap()); + 2001 // custom application specific error (since this is never a result of this interface) + } + None => { + let a = ArrowArray::empty(); + std::ptr::write_unaligned(array, a); + private.error = None; + 0 + } + } +} + +unsafe extern "C" fn get_schema(iter: *mut ArrowArrayStream, schema: *mut ArrowSchema) -> i32 { + if iter.is_null() { + return 2001; + } + let private = &mut *((*iter).private_data as *mut PrivateData); + + export_field_to_c(&private.field, schema); + 0 +} + +unsafe extern "C" fn get_last_error(iter: *mut ArrowArrayStream) -> *const ::std::os::raw::c_char { + if iter.is_null() { + return std::ptr::null(); + } + let private = &mut *((*iter).private_data as *mut PrivateData); + + private + .error + .as_ref() + .map(|x| x.as_ptr()) + .unwrap_or(std::ptr::null()) +} + +unsafe extern "C" fn release(iter: *mut ArrowArrayStream) { + if iter.is_null() { + return; + } + let _ = Box::from_raw((*iter).private_data as *mut PrivateData); + (*iter).release = None; + // private drops automatically +} + +/// Exports an iterator to the [C stream interface](https://arrow.apache.org/docs/format/CStreamInterface.html) +/// # Safety +/// The pointer `consumer` must be allocated +pub unsafe fn export_iterator( + iter: Box, ArrowError>>>, + field: Field, + consumer: *mut ArrowArrayStream, +) { + let private_data = Box::new(PrivateData { + iter, + field, + error: None, + }); + + *consumer = ArrowArrayStream { + get_schema: Some(get_schema), + get_next: Some(get_next), + get_last_error: Some(get_last_error), + release: Some(release), + private_data: Box::into_raw(private_data) as *mut ::std::os::raw::c_void, + } +} diff --git a/tests/it/ffi.rs b/tests/it/ffi/data.rs similarity index 96% rename from tests/it/ffi.rs rename to tests/it/ffi/data.rs index dd4f53b912e..45c2f66cc1a 100644 --- a/tests/it/ffi.rs +++ b/tests/it/ffi/data.rs @@ -8,8 +8,8 @@ use std::sync::Arc; fn _test_round_trip(array: Arc, expected: Box) -> Result<()> { let field = Field::new("a", array.data_type().clone(), true); - let array_ptr = Box::new(ffi::Ffi_ArrowArray::empty()); - let schema_ptr = Box::new(ffi::Ffi_ArrowSchema::empty()); + let array_ptr = Box::new(ffi::ArrowArray::empty()); + let schema_ptr = Box::new(ffi::ArrowSchema::empty()); let array_ptr = Box::into_raw(array_ptr); let schema_ptr = Box::into_raw(schema_ptr); @@ -42,8 +42,8 @@ fn test_round_trip(expected: impl Array + Clone + 'static) -> Result<()> { } fn test_round_trip_schema(field: Field) -> Result<()> { - // create a `ArrowArray` from the data. - let schema_ptr = Box::new(ffi::Ffi_ArrowSchema::empty()); + // create a `InternalArrowArray` from the data. + let schema_ptr = Box::new(ffi::ArrowSchema::empty()); let schema_ptr = Box::into_raw(schema_ptr); diff --git a/tests/it/ffi/mod.rs b/tests/it/ffi/mod.rs new file mode 100644 index 00000000000..af49381f138 --- /dev/null +++ b/tests/it/ffi/mod.rs @@ -0,0 +1,2 @@ +mod data; +mod stream; diff --git a/tests/it/ffi/stream.rs b/tests/it/ffi/stream.rs new file mode 100644 index 00000000000..9d99c77e08b --- /dev/null +++ b/tests/it/ffi/stream.rs @@ -0,0 +1,33 @@ +use std::sync::Arc; + +use arrow2::array::*; +use arrow2::datatypes::Field; +use arrow2::{error::Result, ffi}; + +fn _test_round_trip(arrays: Vec>) -> Result<()> { + let field = Field::new("a", arrays[0].data_type().clone(), true); + let iter = Box::new(arrays.clone().into_iter().map(Ok)) as _; + + let mut stream = Box::new(ffi::ArrowArrayStream::empty()); + + unsafe { ffi::export_iterator(iter, field.clone(), &mut *stream) } + + let mut stream = unsafe { ffi::ArrowArrayStreamReader::try_new(stream)? }; + + let mut produced_arrays: Vec> = vec![]; + while let Some(array) = unsafe { stream.next() } { + produced_arrays.push(array?.into()); + } + + assert_eq!(produced_arrays, arrays); + assert_eq!(stream.field(), &field); + Ok(()) +} + +#[test] +fn round_trip() -> Result<()> { + let array = Int32Array::from(&[Some(2), None, Some(1), None]); + let array: Arc = Arc::new(array); + + _test_round_trip(vec![array.clone(), array.clone(), array]) +}