From 13dad80079261ac8c2c691ac73de7780f3828246 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 2 Mar 2022 01:24:31 -0800 Subject: [PATCH 01/22] Add FFI for Arrow C Stream Interface --- arrow/Cargo.toml | 1 + arrow/src/ffi.rs | 28 ++-- arrow/src/ffi_stream.rs | 311 ++++++++++++++++++++++++++++++++++++++++ arrow/src/lib.rs | 1 + 4 files changed, 327 insertions(+), 14 deletions(-) create mode 100644 arrow/src/ffi_stream.rs diff --git a/arrow/Cargo.toml b/arrow/Cargo.toml index f5368c1f13bc..a68524681fe4 100644 --- a/arrow/Cargo.toml +++ b/arrow/Cargo.toml @@ -57,6 +57,7 @@ pyo3 = { version = "0.15", optional = true } lexical-core = "^0.8" multiversion = "0.6.1" bitflags = "1.2.1" +libc = "0.2.119" [features] default = ["csv", "ipc", "test_utils"] diff --git a/arrow/src/ffi.rs b/arrow/src/ffi.rs index b7a22deab13e..e83010d2c9d0 100644 --- a/arrow/src/ffi.rs +++ b/arrow/src/ffi.rs @@ -109,15 +109,15 @@ bitflags! { #[repr(C)] #[derive(Debug, Clone)] pub struct FFI_ArrowSchema { - format: *const c_char, - name: *const c_char, - metadata: *const c_char, - flags: i64, - n_children: i64, - children: *mut *mut FFI_ArrowSchema, - dictionary: *mut FFI_ArrowSchema, - release: Option, - private_data: *mut c_void, + pub(crate) format: *const c_char, + pub(crate) name: *const c_char, + pub(crate) metadata: *const c_char, + pub(crate) flags: i64, + pub(crate) n_children: i64, + pub(crate) children: *mut *mut FFI_ArrowSchema, + pub(crate) dictionary: *mut FFI_ArrowSchema, + pub(crate) release: Option, + pub(crate) private_data: *mut c_void, } struct SchemaPrivateData { @@ -324,15 +324,15 @@ pub struct FFI_ArrowArray { pub(crate) n_buffers: i64, pub(crate) n_children: i64, pub(crate) buffers: *mut *const c_void, - children: *mut *mut FFI_ArrowArray, - dictionary: *mut FFI_ArrowArray, - release: Option, + pub(crate) children: *mut *mut FFI_ArrowArray, + pub(crate) dictionary: *mut FFI_ArrowArray, + pub(crate) release: Option, // When exported, this MUST contain everything that is owned by this array. // for example, any buffer pointed to in `buffers` must be here, as well // as the `buffers` pointer itself. // In other words, everything in [FFI_ArrowArray] must be owned by // `private_data` and can assume that they do not outlive `private_data`. - private_data: *mut c_void, + pub(crate) private_data: *mut c_void, } impl Drop for FFI_ArrowArray { @@ -372,7 +372,7 @@ impl FFI_ArrowArray { /// # Safety /// This method releases `buffers`. Consumers of this struct *must* call `release` before /// releasing this struct, or contents in `buffers` leak. - fn new(data: &ArrayData) -> Self { + pub fn new(data: &ArrayData) -> Self { // * insert the null buffer at the start // * make all others `Option`. let buffers = iter::once(data.null_buffer().cloned()) diff --git a/arrow/src/ffi_stream.rs b/arrow/src/ffi_stream.rs new file mode 100644 index 000000000000..620c03701f04 --- /dev/null +++ b/arrow/src/ffi_stream.rs @@ -0,0 +1,311 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Contains declarations to bind to the [C Stream Interface](https://arrow.apache.org/docs/format/CStreamInterface.html). +//! + +use std::{ + convert::TryFrom, + ffi::CString, + os::raw::{c_char, c_int, c_void}, + sync::Arc, +}; + +use libc::{EINVAL, ENOSYS, ENOMEM, EIO}; + +use crate::array::Array; +use crate::array::StructArray; +use crate::error::ArrowError; +use crate::ffi::*; +use crate::record_batch::RecordBatchReader; + +/// ABI-compatible struct for `ArrayStream` from C Stream Interface +/// This interface is experimental +/// See +#[repr(C)] +#[derive(Debug, Clone)] +pub struct FFI_ArrowArrayStream { + // Callbacks providing stream functionality + get_schema: Option c_int>, + get_next: Option c_int>, + get_last_error: Option *const c_char>, + + // Release callback + release: Option, + + // Opaque producer-specific data + private_data: *mut c_void, +} + +// callback used to drop [FFI_ArrowArrayStream] when it is exported. +unsafe extern "C" fn release_stream(stream: *mut FFI_ArrowArrayStream) { + if stream.is_null() { + return; + } + let stream = &mut *stream; + + stream.get_schema = None; + stream.get_next = None; + stream.get_last_error = None; + + let private_data = Box::from_raw(stream.private_data as *mut StreamPrivateData); + drop(private_data); + + stream.release = None; +} + +struct StreamPrivateData { + batch_reader: Box, + last_error: Box, +} + +unsafe extern "C" fn get_schema(stream: *mut FFI_ArrowArrayStream, schema: *mut FFI_ArrowSchema) -> c_int { + ExportedArrayStream { stream }.get_schema(schema) +} + +unsafe extern "C" fn get_next(stream: *mut FFI_ArrowArrayStream, array: *mut FFI_ArrowArray) -> c_int { + ExportedArrayStream { stream }.get_next(array) +} + +unsafe extern "C" fn get_last_error(stream: *mut FFI_ArrowArrayStream) -> *const c_char { + let last_error = ExportedArrayStream { stream }.get_last_error(); + CString::new(last_error.as_str()).unwrap().into_raw() +} + +impl Drop for FFI_ArrowArrayStream { + fn drop(&mut self) { + match self.release { + None => (), + Some(release) => unsafe { release(self) }, + }; + } +} + +impl FFI_ArrowArrayStream { + /// create a new [`FFI_ArrowArrayStream`]. + pub fn new( + batch_reader: Box + ) -> Self { + let private_data = Box::new(StreamPrivateData { + batch_reader, + last_error: Box::new(String::new()) + }); + + Self { + get_schema: Some(get_schema), + get_next: Some(get_next), + get_last_error: Some(get_last_error), + release: Some(release_stream), + private_data: Box::into_raw(private_data) as *mut c_void, + } + } + + pub fn empty() -> Self { + Self { + get_schema: None, + get_next: None, + get_last_error: None, + release: None, + private_data: std::ptr::null_mut(), + } + } + + pub fn to_raw(this: Arc) -> *const FFI_ArrowArrayStream { + Arc::into_raw(this) + } + + pub unsafe fn from_raw(ptr: *const FFI_ArrowArrayStream) -> Arc { + let ffi_stream = (*ptr).clone(); + Arc::new(ffi_stream) + } +} + +struct ExportedArrayStream { + stream: *mut FFI_ArrowArrayStream, +} + +impl ExportedArrayStream { + fn get_private_data(&self) -> Box { + unsafe { + Box::from_raw((*self.stream).private_data as *mut StreamPrivateData) + } + } + + pub fn get_schema(&self, out: *mut FFI_ArrowSchema) -> i32 { + unsafe { + match (*out).release { + None => (), + Some(release) => release(out), + }; + }; + + let mut private_data = self.get_private_data(); + let reader = &private_data.batch_reader; + + let schema = FFI_ArrowSchema::try_from(reader.schema().as_ref()); + + let ret_code = match schema { + Ok(mut schema) => { + unsafe { + (*out).format = schema.format; + (*out).name = schema.name; + (*out).metadata = schema.metadata; + (*out).flags = schema.flags; + (*out).n_children = schema.n_children; + (*out).children = schema.children; + (*out).dictionary = schema.dictionary; + (*out).release = schema.release; + (*out).private_data = schema.private_data; + } + schema.release = None; + 0 + } + Err(ref err) => { + private_data.last_error = Box::new(err.to_string()); + get_error_code(err) + } + }; + + Box::into_raw(private_data); + ret_code + } + + pub fn get_next(&self, out: *mut FFI_ArrowArray) -> i32 { + unsafe { + match (*out).release { + None => (), + Some(release) => release(out), + }; + }; + + let mut private_data = self.get_private_data(); + let reader = &mut private_data.batch_reader; + + let ret_code = match reader.next() { + None => 0, + Some(next_batch) => { + if next_batch.is_ok() { + let struct_array = StructArray::from(next_batch.unwrap()); + let mut array = FFI_ArrowArray::new(struct_array.data()); + + unsafe { + (*out).length = array.length; + (*out).null_count = array.null_count; + (*out).offset = array.offset; + (*out).n_buffers = array.n_buffers; + (*out).n_children = array.n_children; + (*out).buffers = array.buffers; + (*out).children = array.children; + (*out).dictionary = array.dictionary; + (*out).release = array.release; + (*out).private_data = array.private_data; + } + + array.release = None; + 0 + } else { + let err = &next_batch.unwrap_err(); + private_data.last_error = Box::new(err.to_string()); + get_error_code(err) + } + } + }; + + Box::into_raw(private_data); + ret_code + } + + pub fn get_last_error(&self) -> Box { + self.get_private_data().last_error + } + + +} + +fn get_error_code(err: &ArrowError) -> i32 { + match err { + ArrowError::NotYetImplemented(_) => ENOSYS, + ArrowError::MemoryError(_) => ENOMEM, + ArrowError::IoError(_) => EIO, + _ => EINVAL, + } +} + +#[cfg(test)] +mod tests { + use super::*; + + use std::fs::File; + + use crate::datatypes::Schema; + use crate::ipc::reader::FileReader; + + fn get_array_testdata() -> File { + let testdata = crate::util::test_util::arrow_test_data(); + let version = "0.14.1"; + File::open(format!( + "{}/arrow-ipc-stream/integration/{}/generated_decimal.arrow_file", + testdata, version + )).unwrap() + } + + #[test] + fn test_export_stream() { + let file = get_array_testdata(); + let reader = Box::new(FileReader::try_new(file).unwrap()); + let expected_schema = reader.schema(); + + let stream = Box::new(FFI_ArrowArrayStream::new(reader)); + let stream_ptr = Box::into_raw(stream) as *mut FFI_ArrowArrayStream; + + let empty_schema = Box::new(FFI_ArrowSchema::empty()); + let schema_ptr = Box::into_raw(empty_schema) as *mut FFI_ArrowSchema; + + let ret_code = unsafe { + get_schema(stream_ptr, schema_ptr) + }; + assert_eq!(ret_code, 0); + + let ffi_schema = unsafe { Box::from_raw(schema_ptr) }; + + let schema = Schema::try_from(ffi_schema.as_ref()).unwrap(); + assert_eq!(&schema, expected_schema.as_ref()); + + let empty_array = Box::new(FFI_ArrowArray::empty()); + let array_ptr = Box::into_raw(empty_array) as *mut FFI_ArrowArray; + + let ret_code = unsafe { + get_next(stream_ptr, array_ptr) + }; + assert_eq!(ret_code, 0); + + let array = unsafe { + ArrowArray::try_from_raw(array_ptr, Box::into_raw(ffi_schema) as *mut FFI_ArrowSchema).unwrap().to_data().unwrap() + }; + + let file = get_array_testdata(); + let mut reader = Box::new(FileReader::try_new(file).unwrap()); + let expected_batch = reader.next().unwrap().unwrap(); + assert_eq!(array.len(), expected_batch.num_rows()); + } +} diff --git a/arrow/src/lib.rs b/arrow/src/lib.rs index 925d33cf118c..0d3ea0f0a2c3 100644 --- a/arrow/src/lib.rs +++ b/arrow/src/lib.rs @@ -142,6 +142,7 @@ pub mod csv; pub mod datatypes; pub mod error; pub mod ffi; +pub mod ffi_stream; #[cfg(feature = "ipc")] pub mod ipc; pub mod json; From 4d4cb7ae6e954f13149b66031cc063b69651e6f3 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 2 Mar 2022 12:55:58 -0800 Subject: [PATCH 02/22] Add ArrowArrayStreamReader --- arrow/src/error.rs | 5 ++ arrow/src/ffi.rs | 4 +- arrow/src/ffi_stream.rs | 190 +++++++++++++++++++++++++++++++++------- 3 files changed, 164 insertions(+), 35 deletions(-) diff --git a/arrow/src/error.rs b/arrow/src/error.rs index ef7abbbddef9..d0890eedd75c 100644 --- a/arrow/src/error.rs +++ b/arrow/src/error.rs @@ -40,6 +40,8 @@ pub enum ArrowError { ParquetError(String), /// Error during import or export to/from the C Data Interface CDataInterface(String), + /// Error during import or export to/from the C Stream Interface + CStreamInterface(String), DictionaryKeyOverflowError, } @@ -122,6 +124,9 @@ impl Display for ArrowError { ArrowError::CDataInterface(desc) => { write!(f, "C Data interface error: {}", desc) } + ArrowError::CStreamInterface(desc) => { + write!(f, "C Stream interface error: {}", desc) + } ArrowError::DictionaryKeyOverflowError => { write!(f, "Dictionary key bigger than the key type") } diff --git a/arrow/src/ffi.rs b/arrow/src/ffi.rs index e83010d2c9d0..dfd395f1c1d8 100644 --- a/arrow/src/ffi.rs +++ b/arrow/src/ffi.rs @@ -646,8 +646,8 @@ pub trait ArrowArrayRef { /// Furthermore, this struct assumes that the incoming data agrees with the C data interface. #[derive(Debug)] pub struct ArrowArray { - array: Arc, - schema: Arc, + pub(crate) array: Arc, + pub(crate) schema: Arc, } #[derive(Debug)] diff --git a/arrow/src/ffi_stream.rs b/arrow/src/ffi_stream.rs index 620c03701f04..516a1e081b8f 100644 --- a/arrow/src/ffi_stream.rs +++ b/arrow/src/ffi_stream.rs @@ -25,13 +25,15 @@ use std::{ sync::Arc, }; -use libc::{EINVAL, ENOSYS, ENOMEM, EIO}; +use libc::{EINVAL, EIO, ENOMEM, ENOSYS}; -use crate::array::Array; use crate::array::StructArray; +use crate::array::{Array, ArrayRef}; +use crate::datatypes::{Schema, SchemaRef}; use crate::error::ArrowError; +use crate::error::Result; use crate::ffi::*; -use crate::record_batch::RecordBatchReader; +use crate::record_batch::{RecordBatch, RecordBatchReader}; /// ABI-compatible struct for `ArrayStream` from C Stream Interface /// This interface is experimental @@ -40,15 +42,20 @@ use crate::record_batch::RecordBatchReader; #[derive(Debug, Clone)] pub struct FFI_ArrowArrayStream { // Callbacks providing stream functionality - get_schema: Option c_int>, - get_next: Option c_int>, - get_last_error: Option *const c_char>, + get_schema: Option< + unsafe extern "C" fn( + arg1: *mut FFI_ArrowArrayStream, + arg2: *mut FFI_ArrowSchema, + ) -> c_int, + >, + get_next: Option< + unsafe extern "C" fn( + arg1: *mut FFI_ArrowArrayStream, + arg2: *mut FFI_ArrowArray, + ) -> c_int, + >, + get_last_error: + Option *const c_char>, // Release callback release: Option, @@ -79,14 +86,23 @@ struct StreamPrivateData { last_error: Box, } -unsafe extern "C" fn get_schema(stream: *mut FFI_ArrowArrayStream, schema: *mut FFI_ArrowSchema) -> c_int { +// The callback used to get array schema +unsafe extern "C" fn get_schema( + stream: *mut FFI_ArrowArrayStream, + schema: *mut FFI_ArrowSchema, +) -> c_int { ExportedArrayStream { stream }.get_schema(schema) } -unsafe extern "C" fn get_next(stream: *mut FFI_ArrowArrayStream, array: *mut FFI_ArrowArray) -> c_int { +// The callback used to get next array +unsafe extern "C" fn get_next( + stream: *mut FFI_ArrowArrayStream, + array: *mut FFI_ArrowArray, +) -> c_int { ExportedArrayStream { stream }.get_next(array) } +// The callback used to get the error from last operation on the `FFI_ArrowArrayStream` unsafe extern "C" fn get_last_error(stream: *mut FFI_ArrowArrayStream) -> *const c_char { let last_error = ExportedArrayStream { stream }.get_last_error(); CString::new(last_error.as_str()).unwrap().into_raw() @@ -103,12 +119,10 @@ impl Drop for FFI_ArrowArrayStream { impl FFI_ArrowArrayStream { /// create a new [`FFI_ArrowArrayStream`]. - pub fn new( - batch_reader: Box - ) -> Self { + pub fn new(batch_reader: Box) -> Self { let private_data = Box::new(StreamPrivateData { batch_reader, - last_error: Box::new(String::new()) + last_error: Box::new(String::new()), }); Self { @@ -117,7 +131,7 @@ impl FFI_ArrowArrayStream { get_last_error: Some(get_last_error), release: Some(release_stream), private_data: Box::into_raw(private_data) as *mut c_void, - } + } } pub fn empty() -> Self { @@ -134,7 +148,9 @@ impl FFI_ArrowArrayStream { Arc::into_raw(this) } - pub unsafe fn from_raw(ptr: *const FFI_ArrowArrayStream) -> Arc { + pub unsafe fn from_raw( + ptr: *const FFI_ArrowArrayStream, + ) -> Arc { let ffi_stream = (*ptr).clone(); Arc::new(ffi_stream) } @@ -146,9 +162,7 @@ struct ExportedArrayStream { impl ExportedArrayStream { fn get_private_data(&self) -> Box { - unsafe { - Box::from_raw((*self.stream).private_data as *mut StreamPrivateData) - } + unsafe { Box::from_raw((*self.stream).private_data as *mut StreamPrivateData) } } pub fn get_schema(&self, out: *mut FFI_ArrowSchema) -> i32 { @@ -238,8 +252,6 @@ impl ExportedArrayStream { pub fn get_last_error(&self) -> Box { self.get_private_data().last_error } - - } fn get_error_code(err: &ArrowError) -> i32 { @@ -251,6 +263,115 @@ fn get_error_code(err: &ArrowError) -> i32 { } } +/// A `RecordBatch` reader which imports from `FFI_ArrowArrayStream` +struct ArrowArrayStreamReader { + stream: Arc, +} + +impl ArrowArrayStreamReader { + pub fn new(stream: FFI_ArrowArrayStream) -> Self { + Self { + stream: Arc::new(stream), + } + } +} + +/// Get the last error from `ArrowArrayStreamReader` +fn get_stream_last_error(stream_reader: &ArrowArrayStreamReader) -> Option { + if stream_reader.stream.get_last_error.is_none() { + return None; + } + + let stream_ptr = + Arc::into_raw(stream_reader.stream.clone()) as *mut FFI_ArrowArrayStream; + + let error_str = unsafe { + let c_str = + stream_reader.stream.get_last_error.unwrap()(stream_ptr) as *mut c_char; + CString::from_raw(c_str).into_string() + }; + + if error_str.is_ok() { + Some(error_str.unwrap()) + } else { + Some(error_str.unwrap_err().to_string()) + } +} + +impl Iterator for ArrowArrayStreamReader { + type Item = Result; + + fn next(&mut self) -> Option { + if self.stream.get_next.is_none() { + return None; + } + + let stream_ptr = Arc::into_raw(self.stream.clone()) as *mut FFI_ArrowArrayStream; + + let empty_array = Arc::new(FFI_ArrowArray::empty()); + let array_ptr = Arc::into_raw(empty_array) as *mut FFI_ArrowArray; + + let ret_code = unsafe { self.stream.get_next.unwrap()(stream_ptr, array_ptr) }; + + let ffi_array = unsafe { Arc::from_raw(array_ptr) }; + + let schema_ref = self.schema(); + let schema = FFI_ArrowSchema::try_from(schema_ref.as_ref()); + + if schema.is_err() { + return Some(Err(schema.err().unwrap())); + } + + if ret_code == 0 { + let data = ArrowArray { + array: ffi_array, + schema: Arc::new(schema.unwrap()), + } + .to_data(); + + if data.is_err() { + return Some(Err(data.err().unwrap())); + } + + let record_batch = + RecordBatch::try_new(schema_ref, vec![ArrayRef::from(data.unwrap())]); + + if record_batch.is_err() { + return Some(Err(record_batch.err().unwrap())); + } + Some(Ok(record_batch.unwrap())) + } else { + let last_error = get_stream_last_error(self); + let err = ArrowError::CStreamInterface(last_error.unwrap()); + Some(Err(err)) + } + } +} + +impl RecordBatchReader for ArrowArrayStreamReader { + fn schema(&self) -> SchemaRef { + if self.stream.get_schema.is_none() { + return Arc::new(Schema::empty()); + } + + let stream_ptr = Arc::into_raw(self.stream.clone()) as *mut FFI_ArrowArrayStream; + + let empty_schema = Arc::new(FFI_ArrowSchema::empty()); + let schema_ptr = Arc::into_raw(empty_schema) as *mut FFI_ArrowSchema; + + let ret_code = unsafe { self.stream.get_schema.unwrap()(stream_ptr, schema_ptr) }; + + let ffi_schema = unsafe { Arc::from_raw(schema_ptr) }; + + if ret_code == 0 { + let schema = Schema::try_from(ffi_schema.as_ref()).unwrap(); + Arc::new(schema) + } else { + Arc::new(Schema::empty()) + } + } +} + #[cfg(test)] mod tests { use super::*; @@ -266,7 +387,8 @@ mod tests { File::open(format!( "{}/arrow-ipc-stream/integration/{}/generated_decimal.arrow_file", testdata, version - )).unwrap() + )) + .unwrap() } #[test] @@ -281,9 +403,7 @@ mod tests { let empty_schema = Box::new(FFI_ArrowSchema::empty()); let schema_ptr = Box::into_raw(empty_schema) as *mut FFI_ArrowSchema; - let ret_code = unsafe { - get_schema(stream_ptr, schema_ptr) - }; + let ret_code = unsafe { get_schema(stream_ptr, schema_ptr) }; assert_eq!(ret_code, 0); let ffi_schema = unsafe { Box::from_raw(schema_ptr) }; @@ -294,13 +414,17 @@ mod tests { let empty_array = Box::new(FFI_ArrowArray::empty()); let array_ptr = Box::into_raw(empty_array) as *mut FFI_ArrowArray; - let ret_code = unsafe { - get_next(stream_ptr, array_ptr) - }; + let ret_code = unsafe { get_next(stream_ptr, array_ptr) }; assert_eq!(ret_code, 0); let array = unsafe { - ArrowArray::try_from_raw(array_ptr, Box::into_raw(ffi_schema) as *mut FFI_ArrowSchema).unwrap().to_data().unwrap() + ArrowArray::try_from_raw( + array_ptr, + Box::into_raw(ffi_schema) as *mut FFI_ArrowSchema, + ) + .unwrap() + .to_data() + .unwrap() }; let file = get_array_testdata(); From 5570fc24498d819ed1eefabbb49df2d6d6e12854 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 2 Mar 2022 13:45:31 -0800 Subject: [PATCH 03/22] Add test --- arrow/src/ffi_stream.rs | 44 +++++++++++++++++++++++++++++++++-------- 1 file changed, 36 insertions(+), 8 deletions(-) diff --git a/arrow/src/ffi_stream.rs b/arrow/src/ffi_stream.rs index 516a1e081b8f..ebcff9d995d0 100644 --- a/arrow/src/ffi_stream.rs +++ b/arrow/src/ffi_stream.rs @@ -27,8 +27,8 @@ use std::{ use libc::{EINVAL, EIO, ENOMEM, ENOSYS}; +use crate::array::Array; use crate::array::StructArray; -use crate::array::{Array, ArrayRef}; use crate::datatypes::{Schema, SchemaRef}; use crate::error::ArrowError; use crate::error::Result; @@ -269,11 +269,18 @@ struct ArrowArrayStreamReader { } impl ArrowArrayStreamReader { + #[allow(dead_code)] pub fn new(stream: FFI_ArrowArrayStream) -> Self { Self { stream: Arc::new(stream), } } + + #[allow(dead_code)] + pub fn from_raw(raw_stream: *mut FFI_ArrowArrayStream) -> Self { + let stream = unsafe { Arc::new((*raw_stream).clone()) }; + Self { stream } + } } /// Get the last error from `ArrowArrayStreamReader` @@ -333,13 +340,9 @@ impl Iterator for ArrowArrayStreamReader { return Some(Err(data.err().unwrap())); } - let record_batch = - RecordBatch::try_new(schema_ref, vec![ArrayRef::from(data.unwrap())]); + let record_batch = RecordBatch::from(&StructArray::from(data.unwrap())); - if record_batch.is_err() { - return Some(Err(record_batch.err().unwrap())); - } - Some(Ok(record_batch.unwrap())) + Some(Ok(record_batch)) } else { let last_error = get_stream_last_error(self); let err = ArrowError::CStreamInterface(last_error.unwrap()); @@ -427,9 +430,34 @@ mod tests { .unwrap() }; + let record_batch = RecordBatch::from(&StructArray::from(array)); + let file = get_array_testdata(); let mut reader = Box::new(FileReader::try_new(file).unwrap()); let expected_batch = reader.next().unwrap().unwrap(); - assert_eq!(array.len(), expected_batch.num_rows()); + assert_eq!(record_batch, expected_batch); + } + + #[test] + fn test_import_stream() { + let file = get_array_testdata(); + let reader = Box::new(FileReader::try_new(file).unwrap()); + let expected_schema = reader.schema(); + + let stream = Box::new(FFI_ArrowArrayStream::new(reader)); + let stream_ptr = Box::into_raw(stream) as *mut FFI_ArrowArrayStream; + + let mut stream_reader = ArrowArrayStreamReader::from_raw(stream_ptr); + + let schema = stream_reader.schema(); + assert_eq!(schema, expected_schema); + + let batch = stream_reader.next().unwrap().unwrap(); + + let file = get_array_testdata(); + let mut reader = Box::new(FileReader::try_new(file).unwrap()); + let expected_batch = reader.next().unwrap().unwrap(); + + assert_eq!(batch, expected_batch); } } From b256c91e66ac8fe38d7b4636710caae5b9c2a964 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 2 Mar 2022 14:51:34 -0800 Subject: [PATCH 04/22] Fix clippy --- arrow/src/ffi_stream.rs | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/arrow/src/ffi_stream.rs b/arrow/src/ffi_stream.rs index ebcff9d995d0..44abdba15015 100644 --- a/arrow/src/ffi_stream.rs +++ b/arrow/src/ffi_stream.rs @@ -83,7 +83,7 @@ unsafe extern "C" fn release_stream(stream: *mut FFI_ArrowArrayStream) { struct StreamPrivateData { batch_reader: Box, - last_error: Box, + last_error: String, } // The callback used to get array schema @@ -122,7 +122,7 @@ impl FFI_ArrowArrayStream { pub fn new(batch_reader: Box) -> Self { let private_data = Box::new(StreamPrivateData { batch_reader, - last_error: Box::new(String::new()), + last_error: String::new(), }); Self { @@ -148,6 +148,10 @@ impl FFI_ArrowArrayStream { Arc::into_raw(this) } + /// Get `FFI_ArrowArrayStream` from raw pointer + /// # Safety + /// Assumes that the pointer represents valid C Stream Interfaces, both in memory + /// representation and lifetime via the `release` mechanism. pub unsafe fn from_raw( ptr: *const FFI_ArrowArrayStream, ) -> Arc { @@ -195,7 +199,7 @@ impl ExportedArrayStream { 0 } Err(ref err) => { - private_data.last_error = Box::new(err.to_string()); + private_data.last_error = err.to_string(); get_error_code(err) } }; @@ -218,8 +222,8 @@ impl ExportedArrayStream { let ret_code = match reader.next() { None => 0, Some(next_batch) => { - if next_batch.is_ok() { - let struct_array = StructArray::from(next_batch.unwrap()); + if let Ok(batch) = next_batch { + let struct_array = StructArray::from(batch); let mut array = FFI_ArrowArray::new(struct_array.data()); unsafe { @@ -239,7 +243,7 @@ impl ExportedArrayStream { 0 } else { let err = &next_batch.unwrap_err(); - private_data.last_error = Box::new(err.to_string()); + private_data.last_error = err.to_string(); get_error_code(err) } } @@ -249,7 +253,7 @@ impl ExportedArrayStream { ret_code } - pub fn get_last_error(&self) -> Box { + pub fn get_last_error(&self) -> String { self.get_private_data().last_error } } @@ -285,9 +289,7 @@ impl ArrowArrayStreamReader { /// Get the last error from `ArrowArrayStreamReader` fn get_stream_last_error(stream_reader: &ArrowArrayStreamReader) -> Option { - if stream_reader.stream.get_last_error.is_none() { - return None; - } + stream_reader.stream.get_last_error?; let stream_ptr = Arc::into_raw(stream_reader.stream.clone()) as *mut FFI_ArrowArrayStream; @@ -298,10 +300,10 @@ fn get_stream_last_error(stream_reader: &ArrowArrayStreamReader) -> Option; fn next(&mut self) -> Option { - if self.stream.get_next.is_none() { - return None; - } + self.stream.get_next?; let stream_ptr = Arc::into_raw(self.stream.clone()) as *mut FFI_ArrowArrayStream; From 415146f83cd483680152e9d659de8e65f986b43f Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 2 Mar 2022 15:02:45 -0800 Subject: [PATCH 05/22] fix format --- arrow/src/ffi_stream.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/arrow/src/ffi_stream.rs b/arrow/src/ffi_stream.rs index 44abdba15015..edfe80128c9f 100644 --- a/arrow/src/ffi_stream.rs +++ b/arrow/src/ffi_stream.rs @@ -301,7 +301,7 @@ fn get_stream_last_error(stream_reader: &ArrowArrayStreamReader) -> Option Date: Wed, 2 Mar 2022 15:50:22 -0800 Subject: [PATCH 06/22] define error code --- arrow/Cargo.toml | 1 - arrow/src/ffi_stream.rs | 7 +++++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/arrow/Cargo.toml b/arrow/Cargo.toml index a68524681fe4..f5368c1f13bc 100644 --- a/arrow/Cargo.toml +++ b/arrow/Cargo.toml @@ -57,7 +57,6 @@ pyo3 = { version = "0.15", optional = true } lexical-core = "^0.8" multiversion = "0.6.1" bitflags = "1.2.1" -libc = "0.2.119" [features] default = ["csv", "ipc", "test_utils"] diff --git a/arrow/src/ffi_stream.rs b/arrow/src/ffi_stream.rs index edfe80128c9f..65f607ee5de6 100644 --- a/arrow/src/ffi_stream.rs +++ b/arrow/src/ffi_stream.rs @@ -25,8 +25,6 @@ use std::{ sync::Arc, }; -use libc::{EINVAL, EIO, ENOMEM, ENOSYS}; - use crate::array::Array; use crate::array::StructArray; use crate::datatypes::{Schema, SchemaRef}; @@ -35,6 +33,11 @@ use crate::error::Result; use crate::ffi::*; use crate::record_batch::{RecordBatch, RecordBatchReader}; +const ENOMEM: i32 = 12; +const EIO: i32 = 5; +const EINVAL: i32 = 22; +const ENOSYS: i32 = 78; + /// ABI-compatible struct for `ArrayStream` from C Stream Interface /// This interface is experimental /// See From 79f5d8ad8a6882f186ea5dd013bc18fe2ecac8c1 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 3 Mar 2022 12:30:33 -0800 Subject: [PATCH 07/22] Regenerate ffi binding using bindgen --- arrow/src/ffi_stream.rs | 20 ++++++++------------ 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/arrow/src/ffi_stream.rs b/arrow/src/ffi_stream.rs index 65f607ee5de6..1156a19f0eb5 100644 --- a/arrow/src/ffi_stream.rs +++ b/arrow/src/ffi_stream.rs @@ -41,30 +41,26 @@ const ENOSYS: i32 = 78; /// ABI-compatible struct for `ArrayStream` from C Stream Interface /// This interface is experimental /// See +/// This was created by bindgen #[repr(C)] #[derive(Debug, Clone)] pub struct FFI_ArrowArrayStream { - // Callbacks providing stream functionality - get_schema: Option< + pub get_schema: Option< unsafe extern "C" fn( arg1: *mut FFI_ArrowArrayStream, - arg2: *mut FFI_ArrowSchema, + out: *mut FFI_ArrowSchema, ) -> c_int, >, - get_next: Option< + pub get_next: Option< unsafe extern "C" fn( arg1: *mut FFI_ArrowArrayStream, - arg2: *mut FFI_ArrowArray, + out: *mut FFI_ArrowArray, ) -> c_int, >, - get_last_error: + pub get_last_error: Option *const c_char>, - - // Release callback - release: Option, - - // Opaque producer-specific data - private_data: *mut c_void, + pub release: Option, + pub private_data: *mut c_void, } // callback used to drop [FFI_ArrowArrayStream] when it is exported. From 2623b33174fb1ddb4fa3049effac2cf1b48b0f56 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sun, 6 Mar 2022 15:01:44 -0800 Subject: [PATCH 08/22] Rewrite test --- arrow/src/ffi_stream.rs | 163 +++++++++++++++++++++++++++------------- 1 file changed, 109 insertions(+), 54 deletions(-) diff --git a/arrow/src/ffi_stream.rs b/arrow/src/ffi_stream.rs index 1156a19f0eb5..090487200fdd 100644 --- a/arrow/src/ffi_stream.rs +++ b/arrow/src/ffi_stream.rs @@ -321,6 +321,11 @@ impl Iterator for ArrowArrayStreamReader { let ffi_array = unsafe { Arc::from_raw(array_ptr) }; + // The end of stream has been reached + if ffi_array.release.is_none() { + return None; + } + let schema_ref = self.schema(); let schema = FFI_ArrowSchema::try_from(schema_ref.as_ref()); @@ -378,85 +383,135 @@ impl RecordBatchReader for ArrowArrayStreamReader { mod tests { use super::*; - use std::fs::File; + use crate::array::Int32Array; + use crate::datatypes::{Field, Schema}; - use crate::datatypes::Schema; - use crate::ipc::reader::FileReader; + struct TestRecordBatchReader { + schema: SchemaRef, + iter: Box>>, + } - fn get_array_testdata() -> File { - let testdata = crate::util::test_util::arrow_test_data(); - let version = "0.14.1"; - File::open(format!( - "{}/arrow-ipc-stream/integration/{}/generated_decimal.arrow_file", - testdata, version - )) - .unwrap() + impl TestRecordBatchReader { + pub fn new( + schema: SchemaRef, + iter: Box>>, + ) -> Box { + Box::new(TestRecordBatchReader { schema, iter }) + } } - #[test] - fn test_export_stream() { - let file = get_array_testdata(); - let reader = Box::new(FileReader::try_new(file).unwrap()); - let expected_schema = reader.schema(); + impl Iterator for TestRecordBatchReader { + type Item = Result; + + fn next(&mut self) -> Option { + self.iter.next() + } + } - let stream = Box::new(FFI_ArrowArrayStream::new(reader)); - let stream_ptr = Box::into_raw(stream) as *mut FFI_ArrowArrayStream; + impl RecordBatchReader for TestRecordBatchReader { + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + } + + fn _test_round_trip_export(arrays: Vec>) -> Result<()> { + let schema = Arc::new(Schema::new(vec![ + Field::new("a", arrays[0].data_type().clone(), true), + Field::new("b", arrays[1].data_type().clone(), true), + Field::new("c", arrays[2].data_type().clone(), true), + ])); + let batch = RecordBatch::try_new(schema.clone(), arrays).unwrap(); + let iter = Box::new(vec![batch.clone(), batch.clone()].into_iter().map(Ok)) as _; + + let reader = TestRecordBatchReader::new(schema.clone(), iter); - let empty_schema = Box::new(FFI_ArrowSchema::empty()); - let schema_ptr = Box::into_raw(empty_schema) as *mut FFI_ArrowSchema; + // Export a `RecordBatchReader` through `FFI_ArrowArrayStream` + let stream = Arc::new(FFI_ArrowArrayStream::new(reader)); + let stream_ptr = Arc::into_raw(stream) as *mut FFI_ArrowArrayStream; + let empty_schema = Arc::new(FFI_ArrowSchema::empty()); + let schema_ptr = Arc::into_raw(empty_schema) as *mut FFI_ArrowSchema; + + // Get schema from `FFI_ArrowArrayStream` let ret_code = unsafe { get_schema(stream_ptr, schema_ptr) }; assert_eq!(ret_code, 0); - let ffi_schema = unsafe { Box::from_raw(schema_ptr) }; + let ffi_schema = unsafe { Arc::from_raw(schema_ptr) }; + + let exported_schema = Schema::try_from(ffi_schema.as_ref()).unwrap(); + assert_eq!(&exported_schema, schema.as_ref()); - let schema = Schema::try_from(ffi_schema.as_ref()).unwrap(); - assert_eq!(&schema, expected_schema.as_ref()); + // Get array from `FFI_ArrowArrayStream` + let mut produced_batches = vec![]; + loop { + let empty_array = Arc::new(FFI_ArrowArray::empty()); + let array_ptr = Arc::into_raw(empty_array.clone()) as *mut FFI_ArrowArray; - let empty_array = Box::new(FFI_ArrowArray::empty()); - let array_ptr = Box::into_raw(empty_array) as *mut FFI_ArrowArray; + let ret_code = unsafe { get_next(stream_ptr, array_ptr) }; + assert_eq!(ret_code, 0); - let ret_code = unsafe { get_next(stream_ptr, array_ptr) }; - assert_eq!(ret_code, 0); + // The end of stream has been reached + let ffi_array = unsafe { Arc::from_raw(array_ptr) }; + if ffi_array.release.is_none() { + break; + } - let array = unsafe { - ArrowArray::try_from_raw( - array_ptr, - Box::into_raw(ffi_schema) as *mut FFI_ArrowSchema, - ) - .unwrap() + let array = ArrowArray { + array: ffi_array, + schema: ffi_schema.clone(), + } .to_data() - .unwrap() - }; + .unwrap(); - let record_batch = RecordBatch::from(&StructArray::from(array)); + let record_batch = RecordBatch::from(&StructArray::from(array)); + produced_batches.push(record_batch); + } - let file = get_array_testdata(); - let mut reader = Box::new(FileReader::try_new(file).unwrap()); - let expected_batch = reader.next().unwrap().unwrap(); - assert_eq!(record_batch, expected_batch); + assert_eq!(produced_batches, vec![batch.clone(), batch.clone()]); + Ok(()) } - #[test] - fn test_import_stream() { - let file = get_array_testdata(); - let reader = Box::new(FileReader::try_new(file).unwrap()); - let expected_schema = reader.schema(); + fn _test_round_trip_import(arrays: Vec>) -> Result<()> { + let schema = Arc::new(Schema::new(vec![ + Field::new("a", arrays[0].data_type().clone(), true), + Field::new("b", arrays[1].data_type().clone(), true), + Field::new("c", arrays[2].data_type().clone(), true), + ])); + let batch = RecordBatch::try_new(schema.clone(), arrays).unwrap(); + let iter = Box::new(vec![batch.clone(), batch.clone()].into_iter().map(Ok)) as _; - let stream = Box::new(FFI_ArrowArrayStream::new(reader)); - let stream_ptr = Box::into_raw(stream) as *mut FFI_ArrowArrayStream; + let reader = TestRecordBatchReader::new(schema.clone(), iter); + // Import a `RecordBatchReader` through `FFI_ArrowArrayStream` as `ArrowArrayStreamReader` + let stream = Arc::new(FFI_ArrowArrayStream::new(reader)); + let stream_ptr = Arc::into_raw(stream) as *mut FFI_ArrowArrayStream; let mut stream_reader = ArrowArrayStreamReader::from_raw(stream_ptr); - let schema = stream_reader.schema(); - assert_eq!(schema, expected_schema); + let imported_schema = stream_reader.schema(); + assert_eq!(imported_schema, schema); - let batch = stream_reader.next().unwrap().unwrap(); + let mut produced_batches = vec![]; + while let Some(batch) = stream_reader.next() { + produced_batches.push(batch.unwrap()); + } + + assert_eq!(produced_batches, vec![batch.clone(), batch.clone()]); + Ok(()) + } + + #[test] + fn test_stream_round_trip_export() -> Result<()> { + let array = Int32Array::from(vec![Some(2), None, Some(1), None]); + let array: Arc = Arc::new(array); + + _test_round_trip_export(vec![array.clone(), array.clone(), array]) + } - let file = get_array_testdata(); - let mut reader = Box::new(FileReader::try_new(file).unwrap()); - let expected_batch = reader.next().unwrap().unwrap(); + #[test] + fn test_stream_round_trip_import() -> Result<()> { + let array = Int32Array::from(vec![Some(2), None, Some(1), None]); + let array: Arc = Arc::new(array); - assert_eq!(batch, expected_batch); + _test_round_trip_import(vec![array.clone(), array.clone(), array]) } } From a9bc94cb18f50a0d73a5e495d2ab642213d1f678 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sun, 6 Mar 2022 15:04:44 -0800 Subject: [PATCH 09/22] Remove CStreamInterface --- arrow/src/error.rs | 5 ----- arrow/src/ffi_stream.rs | 2 +- 2 files changed, 1 insertion(+), 6 deletions(-) diff --git a/arrow/src/error.rs b/arrow/src/error.rs index d0890eedd75c..ef7abbbddef9 100644 --- a/arrow/src/error.rs +++ b/arrow/src/error.rs @@ -40,8 +40,6 @@ pub enum ArrowError { ParquetError(String), /// Error during import or export to/from the C Data Interface CDataInterface(String), - /// Error during import or export to/from the C Stream Interface - CStreamInterface(String), DictionaryKeyOverflowError, } @@ -124,9 +122,6 @@ impl Display for ArrowError { ArrowError::CDataInterface(desc) => { write!(f, "C Data interface error: {}", desc) } - ArrowError::CStreamInterface(desc) => { - write!(f, "C Stream interface error: {}", desc) - } ArrowError::DictionaryKeyOverflowError => { write!(f, "Dictionary key bigger than the key type") } diff --git a/arrow/src/ffi_stream.rs b/arrow/src/ffi_stream.rs index 090487200fdd..1f94b8dfadea 100644 --- a/arrow/src/ffi_stream.rs +++ b/arrow/src/ffi_stream.rs @@ -349,7 +349,7 @@ impl Iterator for ArrowArrayStreamReader { Some(Ok(record_batch)) } else { let last_error = get_stream_last_error(self); - let err = ArrowError::CStreamInterface(last_error.unwrap()); + let err = ArrowError::CDataInterface(last_error.unwrap()); Some(Err(err)) } } From 083e014d65d987bb48d1f081527d1ef05c9781e3 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sun, 6 Mar 2022 15:56:49 -0800 Subject: [PATCH 10/22] Fix clippy error --- arrow/src/ffi_stream.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/arrow/src/ffi_stream.rs b/arrow/src/ffi_stream.rs index 1f94b8dfadea..74c6c5a37611 100644 --- a/arrow/src/ffi_stream.rs +++ b/arrow/src/ffi_stream.rs @@ -322,9 +322,7 @@ impl Iterator for ArrowArrayStreamReader { let ffi_array = unsafe { Arc::from_raw(array_ptr) }; // The end of stream has been reached - if ffi_array.release.is_none() { - return None; - } + ffi_array.release?; let schema_ref = self.schema(); let schema = FFI_ArrowSchema::try_from(schema_ref.as_ref()); From b222a449b6c43b5770a87c4a2a67bdad932fd4d4 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sun, 6 Mar 2022 16:26:30 -0800 Subject: [PATCH 11/22] Fix more clippy errors --- arrow/src/ffi_stream.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/arrow/src/ffi_stream.rs b/arrow/src/ffi_stream.rs index 74c6c5a37611..227d8ade4741 100644 --- a/arrow/src/ffi_stream.rs +++ b/arrow/src/ffi_stream.rs @@ -465,7 +465,7 @@ mod tests { produced_batches.push(record_batch); } - assert_eq!(produced_batches, vec![batch.clone(), batch.clone()]); + assert_eq!(produced_batches, vec![batch.clone(), batch]); Ok(()) } @@ -480,20 +480,20 @@ mod tests { let reader = TestRecordBatchReader::new(schema.clone(), iter); - // Import a `RecordBatchReader` through `FFI_ArrowArrayStream` as `ArrowArrayStreamReader` + // Import through `FFI_ArrowArrayStream` as `ArrowArrayStreamReader` let stream = Arc::new(FFI_ArrowArrayStream::new(reader)); let stream_ptr = Arc::into_raw(stream) as *mut FFI_ArrowArrayStream; - let mut stream_reader = ArrowArrayStreamReader::from_raw(stream_ptr); + let stream_reader = ArrowArrayStreamReader::from_raw(stream_ptr); let imported_schema = stream_reader.schema(); assert_eq!(imported_schema, schema); let mut produced_batches = vec![]; - while let Some(batch) = stream_reader.next() { + for batch in stream_reader { produced_batches.push(batch.unwrap()); } - assert_eq!(produced_batches, vec![batch.clone(), batch.clone()]); + assert_eq!(produced_batches, vec![batch.clone(), batch]); Ok(()) } From e029f618b02ff09deabd1d33dbfc3366af3d4dd9 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 8 Mar 2022 13:19:04 -0800 Subject: [PATCH 12/22] For review comment. --- arrow/src/ffi_stream.rs | 151 ++++++++++++++++++++++++++++------------ 1 file changed, 106 insertions(+), 45 deletions(-) diff --git a/arrow/src/ffi_stream.rs b/arrow/src/ffi_stream.rs index 227d8ade4741..957a656016ed 100644 --- a/arrow/src/ffi_stream.rs +++ b/arrow/src/ffi_stream.rs @@ -17,6 +17,44 @@ //! Contains declarations to bind to the [C Stream Interface](https://arrow.apache.org/docs/format/CStreamInterface.html). //! +//! This module has two main interfaces: +//! One interface maps C ABI to native Rust types, i.e. convert c-pointers, c_char, to native rust. +//! This is handled by [FFI_ArrowArrayStream]. +//! +//! The second interface is used to import `FFI_ArrowArrayStream` as Rust implementation `RecordBatch` reader. +//! This is handled by `ArrowArrayStreamReader`. +//! +//! ```rust +//! # use std::fs::File; +//! # use std::sync::Arc; +//! # use arrow::error::Result; +//! # use arrow::ffi_stream::{ArrowArrayStreamReader, FFI_ArrowArrayStream}; +//! # use arrow::ipc::reader::FileReader; +//! # use arrow::record_batch::RecordBatchReader; +//! # fn main() -> Result<()> { +//! // create an record batch reader natively//! +//! let file = File::open("arrow_file").unwrap(); +//! let reader = Box::new(FileReader::try_new(file).unwrap()); +//! +//! // export it +//! let stream = Arc::new(FFI_ArrowArrayStream::new(reader)); +//! let stream_ptr = FFI_ArrowArrayStream::to_raw(stream)?; +//! +//! // consumed and used by something else... +//! +//! // import it +//! let stream_reader = ArrowArrayStreamReader::from_raw(stream_ptr).unwrap(); +//! let imported_schema = stream_reader.schema(); +//! +//! let mut produced_batches = vec![]; +//! for batch in stream_reader { +//! produced_batches.push(batch.unwrap()); +//! } +//! +//! // (drop/release) +//! Ok(()) +//! } +//! ``` use std::{ convert::TryFrom, @@ -117,7 +155,7 @@ impl Drop for FFI_ArrowArrayStream { } impl FFI_ArrowArrayStream { - /// create a new [`FFI_ArrowArrayStream`]. + /// Creates a new [`FFI_ArrowArrayStream`]. pub fn new(batch_reader: Box) -> Self { let private_data = Box::new(StreamPrivateData { batch_reader, @@ -133,6 +171,7 @@ impl FFI_ArrowArrayStream { } } + /// Creates a new empty [FFI_ArrowArrayStream]. Used to import from the C Stream Interface. pub fn empty() -> Self { Self { get_schema: None, @@ -143,11 +182,12 @@ impl FFI_ArrowArrayStream { } } + /// Gets a raw pointer of `FFI_ArrowArrayStream` pub fn to_raw(this: Arc) -> *const FFI_ArrowArrayStream { Arc::into_raw(this) } - /// Get `FFI_ArrowArrayStream` from raw pointer + /// Gets `FFI_ArrowArrayStream` from raw pointer /// # Safety /// Assumes that the pointer represents valid C Stream Interfaces, both in memory /// representation and lifetime via the `release` mechanism. @@ -266,43 +306,82 @@ fn get_error_code(err: &ArrowError) -> i32 { } } -/// A `RecordBatch` reader which imports from `FFI_ArrowArrayStream` -struct ArrowArrayStreamReader { +/// A `RecordBatchReader` which imports Arrays from `FFI_ArrowArrayStream`. +/// Struct used to fetch `RecordBatch` from the C Stream Interface. +/// Its main responsibility is to expose `RecordBatchReader` functionality +/// that requires [FFI_ArrowArrayStream]. +#[derive(Debug)] +pub struct ArrowArrayStreamReader { stream: Arc, + schema: SchemaRef, +} + +/// Gets schema from a raw pointer of `FFI_ArrowArrayStream`. This is used when constructing +/// `ArrowArrayStreamReader` to cache schema. +fn get_stream_schema(stream_ptr: *mut FFI_ArrowArrayStream) -> Result { + let empty_schema = Arc::new(FFI_ArrowSchema::empty()); + let schema_ptr = Arc::into_raw(empty_schema) as *mut FFI_ArrowSchema; + + let ret_code = unsafe { (*stream_ptr).get_schema.unwrap()(stream_ptr, schema_ptr) }; + + let ffi_schema = unsafe { Arc::from_raw(schema_ptr) }; + + if ret_code == 0 { + let schema = Schema::try_from(ffi_schema.as_ref()).unwrap(); + Ok(Arc::new(schema)) + } else { + Err(ArrowError::CDataInterface( + format!( + "Cannot get schema from input stream. Error code: {:?}", + ret_code + ) + .to_string(), + )) + } } impl ArrowArrayStreamReader { #[allow(dead_code)] - pub fn new(stream: FFI_ArrowArrayStream) -> Self { - Self { - stream: Arc::new(stream), + pub fn try_new(stream: FFI_ArrowArrayStream) -> Result { + if stream.release.is_none() { + return Err(ArrowError::CDataInterface( + "input stream is already released".to_string(), + )); } + + let stream_ptr = Arc::into_raw(Arc::new(stream)) as *mut FFI_ArrowArrayStream; + + let schema = get_stream_schema(stream_ptr)?; + + Ok(Self { + stream: unsafe { Arc::from_raw(stream_ptr) }, + schema, + }) } #[allow(dead_code)] - pub fn from_raw(raw_stream: *mut FFI_ArrowArrayStream) -> Self { + pub fn from_raw(raw_stream: *mut FFI_ArrowArrayStream) -> Result { + let schema = get_stream_schema(raw_stream)?; let stream = unsafe { Arc::new((*raw_stream).clone()) }; - Self { stream } + Ok(Self { stream, schema }) } -} -/// Get the last error from `ArrowArrayStreamReader` -fn get_stream_last_error(stream_reader: &ArrowArrayStreamReader) -> Option { - stream_reader.stream.get_last_error?; + /// Get the last error from `ArrowArrayStreamReader` + fn get_stream_last_error(&self) -> Option { + self.stream.get_last_error?; - let stream_ptr = - Arc::into_raw(stream_reader.stream.clone()) as *mut FFI_ArrowArrayStream; + let stream_ptr = Arc::into_raw(self.stream.clone()) as *mut FFI_ArrowArrayStream; - let error_str = unsafe { - let c_str = - stream_reader.stream.get_last_error.unwrap()(stream_ptr) as *mut c_char; - CString::from_raw(c_str).into_string() - }; + let error_str = unsafe { + let c_str = self.stream.get_last_error.unwrap()(stream_ptr) as *mut c_char; + CString::from_raw(c_str).into_string() + }; - if let Err(err) = error_str { - Some(err.to_string()) - } else { - Some(error_str.unwrap()) + if let Err(err) = error_str { + Some(err.to_string()) + } else { + Some(error_str.unwrap()) + } } } @@ -346,7 +425,7 @@ impl Iterator for ArrowArrayStreamReader { Some(Ok(record_batch)) } else { - let last_error = get_stream_last_error(self); + let last_error = self.get_stream_last_error(); let err = ArrowError::CDataInterface(last_error.unwrap()); Some(Err(err)) } @@ -355,25 +434,7 @@ impl Iterator for ArrowArrayStreamReader { impl RecordBatchReader for ArrowArrayStreamReader { fn schema(&self) -> SchemaRef { - if self.stream.get_schema.is_none() { - return Arc::new(Schema::empty()); - } - - let stream_ptr = Arc::into_raw(self.stream.clone()) as *mut FFI_ArrowArrayStream; - - let empty_schema = Arc::new(FFI_ArrowSchema::empty()); - let schema_ptr = Arc::into_raw(empty_schema) as *mut FFI_ArrowSchema; - - let ret_code = unsafe { self.stream.get_schema.unwrap()(stream_ptr, schema_ptr) }; - - let ffi_schema = unsafe { Arc::from_raw(schema_ptr) }; - - if ret_code == 0 { - let schema = Schema::try_from(ffi_schema.as_ref()).unwrap(); - Arc::new(schema) - } else { - Arc::new(Schema::empty()) - } + self.schema.clone() } } @@ -483,7 +544,7 @@ mod tests { // Import through `FFI_ArrowArrayStream` as `ArrowArrayStreamReader` let stream = Arc::new(FFI_ArrowArrayStream::new(reader)); let stream_ptr = Arc::into_raw(stream) as *mut FFI_ArrowArrayStream; - let stream_reader = ArrowArrayStreamReader::from_raw(stream_ptr); + let stream_reader = ArrowArrayStreamReader::from_raw(stream_ptr).unwrap(); let imported_schema = stream_reader.schema(); assert_eq!(imported_schema, schema); From 5a35a8ed062cab68eadc7127d0f0fb002b2b1df0 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 8 Mar 2022 13:49:54 -0800 Subject: [PATCH 13/22] Fix clippy error --- arrow/src/ffi_stream.rs | 28 ++++++++++++++++------------ 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/arrow/src/ffi_stream.rs b/arrow/src/ffi_stream.rs index 957a656016ed..07a7e02baf83 100644 --- a/arrow/src/ffi_stream.rs +++ b/arrow/src/ffi_stream.rs @@ -43,7 +43,7 @@ //! // consumed and used by something else... //! //! // import it -//! let stream_reader = ArrowArrayStreamReader::from_raw(stream_ptr).unwrap(); +//! let stream_reader = unsafe { ArrowArrayStreamReader::from_raw(stream_ptr).unwrap() }; //! let imported_schema = stream_reader.schema(); //! //! let mut produced_batches = vec![]; @@ -330,17 +330,16 @@ fn get_stream_schema(stream_ptr: *mut FFI_ArrowArrayStream) -> Result let schema = Schema::try_from(ffi_schema.as_ref()).unwrap(); Ok(Arc::new(schema)) } else { - Err(ArrowError::CDataInterface( - format!( - "Cannot get schema from input stream. Error code: {:?}", - ret_code - ) - .to_string(), - )) + Err(ArrowError::CDataInterface(format!( + "Cannot get schema from input stream. Error code: {:?}", + ret_code + ))) } } impl ArrowArrayStreamReader { + /// Creates a new `ArrowArrayStreamReader` from a `FFI_ArrowArrayStream`. + /// This is used to import from the C Stream Interface. #[allow(dead_code)] pub fn try_new(stream: FFI_ArrowArrayStream) -> Result { if stream.release.is_none() { @@ -359,10 +358,14 @@ impl ArrowArrayStreamReader { }) } - #[allow(dead_code)] - pub fn from_raw(raw_stream: *mut FFI_ArrowArrayStream) -> Result { + /// Creates a new `ArrowArrayStreamReader` from a raw pointer of `FFI_ArrowArrayStream`. + /// + /// # Safety + /// This function dereferences a raw pointer of `FFI_ArrowArrayStream`. + /// Assumes that the pointer represents valid C Stream Interfaces. + pub unsafe fn from_raw(raw_stream: *mut FFI_ArrowArrayStream) -> Result { let schema = get_stream_schema(raw_stream)?; - let stream = unsafe { Arc::new((*raw_stream).clone()) }; + let stream = Arc::new((*raw_stream).clone()); Ok(Self { stream, schema }) } @@ -544,7 +547,8 @@ mod tests { // Import through `FFI_ArrowArrayStream` as `ArrowArrayStreamReader` let stream = Arc::new(FFI_ArrowArrayStream::new(reader)); let stream_ptr = Arc::into_raw(stream) as *mut FFI_ArrowArrayStream; - let stream_reader = ArrowArrayStreamReader::from_raw(stream_ptr).unwrap(); + let stream_reader = + unsafe { ArrowArrayStreamReader::from_raw(stream_ptr).unwrap() }; let imported_schema = stream_reader.schema(); assert_eq!(imported_schema, schema); From 4b3c91b42068e8e14eae30ef7f1712a55e2e1d7f Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 8 Mar 2022 14:02:58 -0800 Subject: [PATCH 14/22] Fix clippy error --- arrow/src/ffi_stream.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/arrow/src/ffi_stream.rs b/arrow/src/ffi_stream.rs index 07a7e02baf83..12ba168bc315 100644 --- a/arrow/src/ffi_stream.rs +++ b/arrow/src/ffi_stream.rs @@ -38,7 +38,7 @@ //! //! // export it //! let stream = Arc::new(FFI_ArrowArrayStream::new(reader)); -//! let stream_ptr = FFI_ArrowArrayStream::to_raw(stream)?; +//! let stream_ptr = FFI_ArrowArrayStream::to_raw(stream) as *mut FFI_ArrowArrayStream; //! //! // consumed and used by something else... //! From 99033c133514c55bff2fa16dec6df69560956823 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 8 Mar 2022 14:19:21 -0800 Subject: [PATCH 15/22] not run example code in comment --- arrow/src/ffi_stream.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/arrow/src/ffi_stream.rs b/arrow/src/ffi_stream.rs index 12ba168bc315..361cac8e6d38 100644 --- a/arrow/src/ffi_stream.rs +++ b/arrow/src/ffi_stream.rs @@ -24,7 +24,7 @@ //! The second interface is used to import `FFI_ArrowArrayStream` as Rust implementation `RecordBatch` reader. //! This is handled by `ArrowArrayStreamReader`. //! -//! ```rust +//! ```no_run //! # use std::fs::File; //! # use std::sync::Arc; //! # use arrow::error::Result; @@ -32,7 +32,7 @@ //! # use arrow::ipc::reader::FileReader; //! # use arrow::record_batch::RecordBatchReader; //! # fn main() -> Result<()> { -//! // create an record batch reader natively//! +//! // create an record batch reader natively //! let file = File::open("arrow_file").unwrap(); //! let reader = Box::new(FileReader::try_new(file).unwrap()); //! From 78444b1dc466f92abd4f1298c014c79d77dec721 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 8 Mar 2022 14:53:05 -0800 Subject: [PATCH 16/22] ignore doctest --- arrow/src/ffi_stream.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/arrow/src/ffi_stream.rs b/arrow/src/ffi_stream.rs index 361cac8e6d38..8371ee570bf9 100644 --- a/arrow/src/ffi_stream.rs +++ b/arrow/src/ffi_stream.rs @@ -24,7 +24,7 @@ //! The second interface is used to import `FFI_ArrowArrayStream` as Rust implementation `RecordBatch` reader. //! This is handled by `ArrowArrayStreamReader`. //! -//! ```no_run +//! ```ignore //! # use std::fs::File; //! # use std::sync::Arc; //! # use arrow::error::Result; From b0a6ac9789ca76f2dd6fc4e88c9a2dbf312bac77 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 17 Mar 2022 12:06:10 -0700 Subject: [PATCH 17/22] For review --- arrow/src/ffi_stream.rs | 35 +++++++++++++++++++---------------- 1 file changed, 19 insertions(+), 16 deletions(-) diff --git a/arrow/src/ffi_stream.rs b/arrow/src/ffi_stream.rs index 8371ee570bf9..7845cf937064 100644 --- a/arrow/src/ffi_stream.rs +++ b/arrow/src/ffi_stream.rs @@ -81,7 +81,7 @@ const ENOSYS: i32 = 78; /// See /// This was created by bindgen #[repr(C)] -#[derive(Debug, Clone)] +#[derive(Debug)] pub struct FFI_ArrowArrayStream { pub get_schema: Option< unsafe extern "C" fn( @@ -141,7 +141,8 @@ unsafe extern "C" fn get_next( // The callback used to get the error from last operation on the `FFI_ArrowArrayStream` unsafe extern "C" fn get_last_error(stream: *mut FFI_ArrowArrayStream) -> *const c_char { - let last_error = ExportedArrayStream { stream }.get_last_error(); + let ffi_stream = ExportedArrayStream { stream }; + let last_error = ffi_stream.get_last_error(); CString::new(last_error.as_str()).unwrap().into_raw() } @@ -182,20 +183,19 @@ impl FFI_ArrowArrayStream { } } - /// Gets a raw pointer of `FFI_ArrowArrayStream` - pub fn to_raw(this: Arc) -> *const FFI_ArrowArrayStream { - Arc::into_raw(this) - } - /// Gets `FFI_ArrowArrayStream` from raw pointer /// # Safety /// Assumes that the pointer represents valid C Stream Interfaces, both in memory /// representation and lifetime via the `release` mechanism. + /// This function copies the content from the raw pointer and cleans it up to prevent + /// double-dropping. pub unsafe fn from_raw( ptr: *const FFI_ArrowArrayStream, ) -> Arc { - let ffi_stream = (*ptr).clone(); - Arc::new(ffi_stream) + let stream_mut = ptr as *mut FFI_ArrowArrayStream; + let stream_data = std::ptr::replace(stream_mut, FFI_ArrowArrayStream::empty()); + + Arc::new(stream_data) } } @@ -204,8 +204,8 @@ struct ExportedArrayStream { } impl ExportedArrayStream { - fn get_private_data(&self) -> Box { - unsafe { Box::from_raw((*self.stream).private_data as *mut StreamPrivateData) } + fn get_private_data(&self) -> &mut StreamPrivateData { + unsafe { &mut *((*self.stream).private_data as *mut StreamPrivateData) } } pub fn get_schema(&self, out: *mut FFI_ArrowSchema) -> i32 { @@ -243,7 +243,6 @@ impl ExportedArrayStream { } }; - Box::into_raw(private_data); ret_code } @@ -288,12 +287,11 @@ impl ExportedArrayStream { } }; - Box::into_raw(private_data); ret_code } - pub fn get_last_error(&self) -> String { - self.get_private_data().last_error + pub fn get_last_error(&self) -> &String { + &self.get_private_data().last_error } } @@ -363,9 +361,14 @@ impl ArrowArrayStreamReader { /// # Safety /// This function dereferences a raw pointer of `FFI_ArrowArrayStream`. /// Assumes that the pointer represents valid C Stream Interfaces. + /// This function copies the content from the raw pointer and cleans up it to prevent + /// double-dropping pub unsafe fn from_raw(raw_stream: *mut FFI_ArrowArrayStream) -> Result { let schema = get_stream_schema(raw_stream)?; - let stream = Arc::new((*raw_stream).clone()); + + let stream_data = std::ptr::replace(raw_stream, FFI_ArrowArrayStream::empty()); + + let stream = Arc::new(stream_data); Ok(Self { stream, schema }) } From b6efc8dfff7c8ec85541d6c65a886f6df6a76673 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 17 Mar 2022 12:55:11 -0700 Subject: [PATCH 18/22] Fix clippy --- arrow/src/ffi_stream.rs | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/arrow/src/ffi_stream.rs b/arrow/src/ffi_stream.rs index 7845cf937064..f1b34b66d664 100644 --- a/arrow/src/ffi_stream.rs +++ b/arrow/src/ffi_stream.rs @@ -141,7 +141,7 @@ unsafe extern "C" fn get_next( // The callback used to get the error from last operation on the `FFI_ArrowArrayStream` unsafe extern "C" fn get_last_error(stream: *mut FFI_ArrowArrayStream) -> *const c_char { - let ffi_stream = ExportedArrayStream { stream }; + let mut ffi_stream = ExportedArrayStream { stream }; let last_error = ffi_stream.get_last_error(); CString::new(last_error.as_str()).unwrap().into_raw() } @@ -204,11 +204,11 @@ struct ExportedArrayStream { } impl ExportedArrayStream { - fn get_private_data(&self) -> &mut StreamPrivateData { + fn get_private_data(&mut self) -> &mut StreamPrivateData { unsafe { &mut *((*self.stream).private_data as *mut StreamPrivateData) } } - pub fn get_schema(&self, out: *mut FFI_ArrowSchema) -> i32 { + pub fn get_schema(&mut self, out: *mut FFI_ArrowSchema) -> i32 { unsafe { match (*out).release { None => (), @@ -221,7 +221,7 @@ impl ExportedArrayStream { let schema = FFI_ArrowSchema::try_from(reader.schema().as_ref()); - let ret_code = match schema { + match schema { Ok(mut schema) => { unsafe { (*out).format = schema.format; @@ -241,12 +241,10 @@ impl ExportedArrayStream { private_data.last_error = err.to_string(); get_error_code(err) } - }; - - ret_code + } } - pub fn get_next(&self, out: *mut FFI_ArrowArray) -> i32 { + pub fn get_next(&mut self, out: *mut FFI_ArrowArray) -> i32 { unsafe { match (*out).release { None => (), @@ -290,7 +288,7 @@ impl ExportedArrayStream { ret_code } - pub fn get_last_error(&self) -> &String { + pub fn get_last_error(&mut self) -> &String { &self.get_private_data().last_error } } From a162c84de8ad9588591a1273253dfcbaf6d2bbea Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 23 Mar 2022 16:48:09 -0700 Subject: [PATCH 19/22] For review comment --- arrow/src/ffi_stream.rs | 89 +++++++++++------------------------------ 1 file changed, 23 insertions(+), 66 deletions(-) diff --git a/arrow/src/ffi_stream.rs b/arrow/src/ffi_stream.rs index f1b34b66d664..d2d201c27748 100644 --- a/arrow/src/ffi_stream.rs +++ b/arrow/src/ffi_stream.rs @@ -182,21 +182,6 @@ impl FFI_ArrowArrayStream { private_data: std::ptr::null_mut(), } } - - /// Gets `FFI_ArrowArrayStream` from raw pointer - /// # Safety - /// Assumes that the pointer represents valid C Stream Interfaces, both in memory - /// representation and lifetime via the `release` mechanism. - /// This function copies the content from the raw pointer and cleans it up to prevent - /// double-dropping. - pub unsafe fn from_raw( - ptr: *const FFI_ArrowArrayStream, - ) -> Arc { - let stream_mut = ptr as *mut FFI_ArrowArrayStream; - let stream_data = std::ptr::replace(stream_mut, FFI_ArrowArrayStream::empty()); - - Arc::new(stream_data) - } } struct ExportedArrayStream { @@ -222,21 +207,11 @@ impl ExportedArrayStream { let schema = FFI_ArrowSchema::try_from(reader.schema().as_ref()); match schema { - Ok(mut schema) => { - unsafe { - (*out).format = schema.format; - (*out).name = schema.name; - (*out).metadata = schema.metadata; - (*out).flags = schema.flags; - (*out).n_children = schema.n_children; - (*out).children = schema.children; - (*out).dictionary = schema.dictionary; - (*out).release = schema.release; - (*out).private_data = schema.private_data; - } + Ok(mut schema) => unsafe { + std::ptr::copy(&schema as *const FFI_ArrowSchema, out, 1); schema.release = None; 0 - } + }, Err(ref err) => { private_data.last_error = err.to_string(); get_error_code(err) @@ -263,20 +238,10 @@ impl ExportedArrayStream { let mut array = FFI_ArrowArray::new(struct_array.data()); unsafe { - (*out).length = array.length; - (*out).null_count = array.null_count; - (*out).offset = array.offset; - (*out).n_buffers = array.n_buffers; - (*out).n_children = array.n_children; - (*out).buffers = array.buffers; - (*out).children = array.children; - (*out).dictionary = array.dictionary; - (*out).release = array.release; - (*out).private_data = array.private_data; + std::ptr::copy(&array as *const FFI_ArrowArray, out, 1); + array.release = None; + 0 } - - array.release = None; - 0 } else { let err = &next_batch.unwrap_err(); private_data.last_error = err.to_string(); @@ -356,25 +321,24 @@ impl ArrowArrayStreamReader { /// Creates a new `ArrowArrayStreamReader` from a raw pointer of `FFI_ArrowArrayStream`. /// - /// # Safety - /// This function dereferences a raw pointer of `FFI_ArrowArrayStream`. /// Assumes that the pointer represents valid C Stream Interfaces. /// This function copies the content from the raw pointer and cleans up it to prevent - /// double-dropping + /// double-dropping. The caller is responsible for freeing up the memory allocated for + /// the pointer. + /// + /// # Safety + /// This function dereferences a raw pointer of `FFI_ArrowArrayStream`. pub unsafe fn from_raw(raw_stream: *mut FFI_ArrowArrayStream) -> Result { - let schema = get_stream_schema(raw_stream)?; - let stream_data = std::ptr::replace(raw_stream, FFI_ArrowArrayStream::empty()); - let stream = Arc::new(stream_data); - Ok(Self { stream, schema }) + Self::try_new(stream_data) } /// Get the last error from `ArrowArrayStreamReader` fn get_stream_last_error(&self) -> Option { self.stream.get_last_error?; - let stream_ptr = Arc::into_raw(self.stream.clone()) as *mut FFI_ArrowArrayStream; + let stream_ptr = Arc::as_ptr(&self.stream) as *mut FFI_ArrowArrayStream; let error_str = unsafe { let c_str = self.stream.get_last_error.unwrap()(stream_ptr) as *mut c_char; @@ -402,30 +366,23 @@ impl Iterator for ArrowArrayStreamReader { let ret_code = unsafe { self.stream.get_next.unwrap()(stream_ptr, array_ptr) }; - let ffi_array = unsafe { Arc::from_raw(array_ptr) }; + if ret_code == 0 { + let ffi_array = unsafe { Arc::from_raw(array_ptr) }; - // The end of stream has been reached - ffi_array.release?; + // The end of stream has been reached + ffi_array.release?; - let schema_ref = self.schema(); - let schema = FFI_ArrowSchema::try_from(schema_ref.as_ref()); + let schema_ref = self.schema(); + let schema = FFI_ArrowSchema::try_from(schema_ref.as_ref()).ok()?; - if schema.is_err() { - return Some(Err(schema.err().unwrap())); - } - - if ret_code == 0 { let data = ArrowArray { array: ffi_array, - schema: Arc::new(schema.unwrap()), - } - .to_data(); - - if data.is_err() { - return Some(Err(data.err().unwrap())); + schema: Arc::new(schema), } + .to_data() + .ok()?; - let record_batch = RecordBatch::from(&StructArray::from(data.unwrap())); + let record_batch = RecordBatch::from(&StructArray::from(data)); Some(Ok(record_batch)) } else { From 35da4f19d975125d68118705c010ed47488c3d4e Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 30 Mar 2022 01:01:51 -0700 Subject: [PATCH 20/22] For review --- arrow/src/ffi_stream.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/arrow/src/ffi_stream.rs b/arrow/src/ffi_stream.rs index d2d201c27748..b60da4ba4be7 100644 --- a/arrow/src/ffi_stream.rs +++ b/arrow/src/ffi_stream.rs @@ -38,7 +38,7 @@ //! //! // export it //! let stream = Arc::new(FFI_ArrowArrayStream::new(reader)); -//! let stream_ptr = FFI_ArrowArrayStream::to_raw(stream) as *mut FFI_ArrowArrayStream; +//! let stream_ptr = Arc::into_raw(stream) as *mut FFI_ArrowArrayStream; //! //! // consumed and used by something else... //! @@ -357,7 +357,7 @@ impl Iterator for ArrowArrayStreamReader { type Item = Result; fn next(&mut self) -> Option { - self.stream.get_next?; + self.stream.get_next.unwrap(); let stream_ptr = Arc::into_raw(self.stream.clone()) as *mut FFI_ArrowArrayStream; From 9fad867d2a7bc42b703bd7bda8a740cb14e52f47 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 30 Mar 2022 11:07:16 -0700 Subject: [PATCH 21/22] Add export_reader_into_raw --- arrow/src/ffi_stream.rs | 23 ++++++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/arrow/src/ffi_stream.rs b/arrow/src/ffi_stream.rs index b60da4ba4be7..8d0e8a4c12b3 100644 --- a/arrow/src/ffi_stream.rs +++ b/arrow/src/ffi_stream.rs @@ -28,7 +28,7 @@ //! # use std::fs::File; //! # use std::sync::Arc; //! # use arrow::error::Result; -//! # use arrow::ffi_stream::{ArrowArrayStreamReader, FFI_ArrowArrayStream}; +//! # use arrow::ffi_stream::{export_reader_into_raw, ArrowArrayStreamReader, FFI_ArrowArrayStream}; //! # use arrow::ipc::reader::FileReader; //! # use arrow::record_batch::RecordBatchReader; //! # fn main() -> Result<()> { @@ -37,8 +37,9 @@ //! let reader = Box::new(FileReader::try_new(file).unwrap()); //! //! // export it -//! let stream = Arc::new(FFI_ArrowArrayStream::new(reader)); +//! let stream = Arc::new(FFI_ArrowArrayStream::empty()); //! let stream_ptr = Arc::into_raw(stream) as *mut FFI_ArrowArrayStream; +//! unsafe { export_reader_into_raw(reader, stream_ptr) }; //! //! // consumed and used by something else... //! @@ -399,6 +400,20 @@ impl RecordBatchReader for ArrowArrayStreamReader { } } +/// Exports a record batch reader to raw pointer of the C Stream Interface provided by the consumer. +/// +/// # Safety +/// Assumes that the pointer represents valid C Stream Interfaces, both in memory +/// representation and lifetime via the `release` mechanism. +pub unsafe fn export_reader_into_raw( + reader: Box, + out_stream: *mut FFI_ArrowArrayStream, +) { + let stream = FFI_ArrowArrayStream::new(reader); + + std::ptr::write_unaligned(out_stream, stream); +} + #[cfg(test)] mod tests { use super::*; @@ -446,9 +461,11 @@ mod tests { let reader = TestRecordBatchReader::new(schema.clone(), iter); // Export a `RecordBatchReader` through `FFI_ArrowArrayStream` - let stream = Arc::new(FFI_ArrowArrayStream::new(reader)); + let stream = Arc::new(FFI_ArrowArrayStream::empty()); let stream_ptr = Arc::into_raw(stream) as *mut FFI_ArrowArrayStream; + unsafe { export_reader_into_raw(reader, stream_ptr) }; + let empty_schema = Arc::new(FFI_ArrowSchema::empty()); let schema_ptr = Arc::into_raw(empty_schema) as *mut FFI_ArrowSchema; From 3716c1627f5abc1d30b789157e099d6381a20ce1 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 30 Mar 2022 16:12:47 -0700 Subject: [PATCH 22/22] For review --- arrow/src/ffi_stream.rs | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/arrow/src/ffi_stream.rs b/arrow/src/ffi_stream.rs index 8d0e8a4c12b3..ab4caea36f8e 100644 --- a/arrow/src/ffi_stream.rs +++ b/arrow/src/ffi_stream.rs @@ -37,8 +37,8 @@ //! let reader = Box::new(FileReader::try_new(file).unwrap()); //! //! // export it -//! let stream = Arc::new(FFI_ArrowArrayStream::empty()); -//! let stream_ptr = Arc::into_raw(stream) as *mut FFI_ArrowArrayStream; +//! let stream = Box::new(FFI_ArrowArrayStream::empty()); +//! let stream_ptr = Box::into_raw(stream) as *mut FFI_ArrowArrayStream; //! unsafe { export_reader_into_raw(reader, stream_ptr) }; //! //! // consumed and used by something else... @@ -53,6 +53,9 @@ //! } //! //! // (drop/release) +//! unsafe { +//! Box::from_raw(stream_ptr); +//! } //! Ok(()) //! } //! ``` @@ -358,9 +361,7 @@ impl Iterator for ArrowArrayStreamReader { type Item = Result; fn next(&mut self) -> Option { - self.stream.get_next.unwrap(); - - let stream_ptr = Arc::into_raw(self.stream.clone()) as *mut FFI_ArrowArrayStream; + let stream_ptr = Arc::as_ptr(&self.stream) as *mut FFI_ArrowArrayStream; let empty_array = Arc::new(FFI_ArrowArray::empty()); let array_ptr = Arc::into_raw(empty_array) as *mut FFI_ArrowArray; @@ -387,6 +388,8 @@ impl Iterator for ArrowArrayStreamReader { Some(Ok(record_batch)) } else { + unsafe { Arc::from_raw(array_ptr) }; + let last_error = self.get_stream_last_error(); let err = ArrowError::CDataInterface(last_error.unwrap()); Some(Err(err))