-
Notifications
You must be signed in to change notification settings - Fork 853
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add FFI for Arrow C Stream Interface #1384
Conversation
Hmm, not sure why
only in |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I started looking through this, but I don't have time at this moment to complete a detailed review. It looks very cool though and I will try and review it more carefully over the coming days
arrow/src/ffi_stream.rs
Outdated
use std::fs::File; | ||
|
||
use crate::datatypes::Schema; | ||
use crate::ipc::reader::FileReader; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since these tests use ipc
they probably need to be feature flagged like
#[cfg(feature = "ipc")]
|
||
assert_eq!(batch, expected_batch); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jorgecarleitao 's PRs in arrow2 had a nice roundtrip test:
It might be good to add one here as well (which would also mean the test for ffi didn't depend on the ipc
feature)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rewrote the tests to roundtrip tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And it doesn't depend on ipc
now.
arrow/src/error.rs
Outdated
@@ -40,6 +40,8 @@ pub enum ArrowError { | |||
ParquetError(String), | |||
/// Error during import or export to/from the C Data Interface | |||
CDataInterface(String), | |||
/// Error during import or export to/from the C Stream Interface | |||
CStreamInterface(String), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the Stream interface is built on the CDataInterface, what do you think about just reusing the same CDataInterface
error variant instead of adding a new one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok.
// under the License. | ||
|
||
//! Contains declarations to bind to the [C Stream Interface](https://arrow.apache.org/docs/format/CStreamInterface.html). | ||
//! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how did you create this file (was it bindgen
?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I followed the C Stream interface definition. Let me go to generate it using bindgen
again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Re-generated the binding by bindgen
and verified it looks the same. I added This was created by bindgen
in the doc.
I plan to review this early next week (BTW the integration test failure has been fixed on master, if you want to merge / rebase to get a clean CI) |
Thanks @alamb . I'm writing roundtrip test as you suggested. I will do rebase after that. |
Codecov Report
@@ Coverage Diff @@
## master #1384 +/- ##
==========================================
+ Coverage 82.70% 82.72% +0.02%
==========================================
Files 187 189 +2
Lines 54208 54561 +353
==========================================
+ Hits 44832 45138 +306
- Misses 9376 9423 +47
Continue to review full report at Codecov.
|
arrow/src/ffi_stream.rs
Outdated
Arc::into_raw(this) | ||
} | ||
|
||
/// Get `FFI_ArrowArrayStream` from raw pointer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: maybe we should mention that the input ptr
is consumed after the call and the ownership of the input FFI_ArrowArrayStream
has been transferred to the returned value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, do we change the ownership if the input FFI_ArrowArrayStream
? As it is behind a raw pointer, we cannot move it. That's why I need to do clone
here.
arrow/src/ffi_stream.rs
Outdated
} | ||
|
||
impl ExportedArrayStream { | ||
fn get_private_data(&self) -> Box<StreamPrivateData> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can we just return &mut StreamPrivateData
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private_data
is kept in the stream as a raw pointer. We cannot create and return a reference of a temporary object here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking of something like this:
fn get_private_data(&mut self) -> &mut StreamPrivateData {
unsafe { &mut *((*self.stream).private_data as *mut StreamPrivateData) }
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, this works. If we just take reference, we don't need move it. I will change it.
arrow/src/ffi_stream.rs
Outdated
} | ||
|
||
/// Get the last error from `ArrowArrayStreamReader` | ||
fn get_stream_last_error(stream_reader: &ArrowArrayStreamReader) -> Option<String> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can this be a method of ArrowArrayStreamReader
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
moved
arrow/src/ffi_stream.rs
Outdated
let empty_schema = Arc::new(FFI_ArrowSchema::empty()); | ||
let schema_ptr = Arc::into_raw(empty_schema) as *mut FFI_ArrowSchema; | ||
|
||
let ret_code = unsafe { self.stream.get_schema.unwrap()(stream_ptr, schema_ptr) }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe we can have a try_new
ctor for ArrowArrayStreamReader
and initialize the cached schema via get_schema
there.
arrow/src/ffi_stream.rs
Outdated
|
||
impl RecordBatchReader for ArrowArrayStreamReader { | ||
fn schema(&self) -> SchemaRef { | ||
if self.stream.get_schema.is_none() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can just unwrap here instead of returning an empty schema. It's an implementation error if the callback is not set.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now schema is cached when constructing ArrowArrayStreamReader
. Here we simply return it.
} | ||
|
||
#[cfg(test)] | ||
mod tests { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe we should have some integration tests between Python and Rust too in arrow-pyarrow-integration-testing
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we have some for ffi too? if so, I may follow it up. If not, maybe we can have both in later PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not quite sure. I'm assuming PyArrow supports it since Arrow C++ implemented the stream interface.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just checked PyArrow. It has the stream interface. But at Rust side, the arrow-pyarrow-integration-testing
crate doesn't have some basic code for testing it yet. I may need sometime to try it and write some integration tests between Python and Rust.
I'm fine to hold this until I add the integration test, or I can work on in in later PR.
cc @jorgecarleitao too |
@sunchao Any more comments? Thanks. |
Will take another look soon. Sorry for the delay @viirya ! |
No problem! Thank you @sunchao ! |
} | ||
|
||
#[cfg(test)] | ||
mod tests { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not quite sure. I'm assuming PyArrow supports it since Arrow C++ implemented the stream interface.
arrow/src/ffi_stream.rs
Outdated
} | ||
|
||
impl ExportedArrayStream { | ||
fn get_private_data(&self) -> Box<StreamPrivateData> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking of something like this:
fn get_private_data(&mut self) -> &mut StreamPrivateData {
unsafe { &mut *((*self.stream).private_data as *mut StreamPrivateData) }
}
arrow/src/ffi_stream.rs
Outdated
/// # Safety | ||
/// Assumes that the pointer represents valid C Stream Interfaces, both in memory | ||
/// representation and lifetime via the `release` mechanism. | ||
/// This function copies the content from the raw pointer and cleans it up to prevent |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why we need this when we already have ArrowArrayStreamReader.from_raw
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if in any case, users need FFI_ArrowArrayStream
instead of ArrowArrayStreamReader
? I can remove it now, we can add it if it is needed in the future.
arrow/src/ffi_stream.rs
Outdated
|
||
match schema { | ||
Ok(mut schema) => { | ||
unsafe { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can perhaps just do:
Ok(mut schema) => unsafe {
std::ptr::copy(&schema as *const FFI_ArrowSchema, out, 1);
schema.release = None;
0
},
arrow/src/ffi_stream.rs
Outdated
|
||
let stream_data = std::ptr::replace(raw_stream, FFI_ArrowArrayStream::empty()); | ||
|
||
let stream = Arc::new(stream_data); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we call Self::try_new(stream_data)
here?
arrow/src/ffi_stream.rs
Outdated
fn get_stream_last_error(&self) -> Option<String> { | ||
self.stream.get_last_error?; | ||
|
||
let stream_ptr = Arc::into_raw(self.stream.clone()) as *mut FFI_ArrowArrayStream; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm can we use Arc::as_ptr(&self.stream) as *mut FFI_ArrowArrayStream
here?
arrow/src/ffi_stream.rs
Outdated
type Item = Result<RecordBatch>; | ||
|
||
fn next(&mut self) -> Option<Self::Item> { | ||
self.stream.get_next?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this seems unnecessary
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the stream is released, get_next
is None.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But I think we checked in the constructors to make sure it is not released?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, okay, I thought it is safer to make sure it is not released here too. I will use unwrap
directly then.
arrow/src/ffi_stream.rs
Outdated
ffi_array.release?; | ||
|
||
let schema_ref = self.schema(); | ||
let schema = FFI_ArrowSchema::try_from(schema_ref.as_ref()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can just do:
let schema = FFI_ArrowSchema::try_from(schema_ref.as_ref()).ok()?;
arrow/src/ffi_stream.rs
Outdated
} | ||
.to_data(); | ||
|
||
if data.is_err() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
arrow/src/ffi_stream.rs
Outdated
return Some(Err(schema.err().unwrap())); | ||
} | ||
|
||
if ret_code == 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we skip the above part of creating schema
if ret_code
is not zero?
I wonder if this one is ready to go? It seems like it has stalled (no thanks to me, of course, who hasn't reviewed it 😢 ) |
I think it's almost ready.😀 will take another round of look after the comments are addressed. |
Thanks @sunchao. Addressed your latest comments. |
Is this one ready to merge? I am preparing to create an arrow release candidate later this week |
@sunchao Do you need to look this again? |
Sorry I'm on vacation. Will take another look tomorrow. |
arrow/src/ffi_stream.rs
Outdated
//! | ||
//! // export it | ||
//! let stream = Arc::new(FFI_ArrowArrayStream::new(reader)); | ||
//! let stream_ptr = FFI_ArrowArrayStream::to_raw(stream) as *mut FFI_ArrowArrayStream; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs to be updated. Also, similar to export_array_into_raw
in FFI Array, do we need to have a function to allow an importer to allocate struct memory for the exported stream?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added export_reader_into_raw
and used it in test and doc now.
arrow/src/ffi_stream.rs
Outdated
type Item = Result<RecordBatch>; | ||
|
||
fn next(&mut self) -> Option<Self::Item> { | ||
self.stream.get_next?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But I think we checked in the constructors to make sure it is not released?
/// that requires [FFI_ArrowArrayStream]. | ||
#[derive(Debug)] | ||
pub struct ArrowArrayStreamReader { | ||
stream: Arc<FFI_ArrowArrayStream>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we can use Box
here to indicate that stream
is an unique reference, not shared.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To make it Box
, it will be in trouble when we want to get raw pointer back. Because Box::into_raw(self.stream)
will be complained self.stream
cannot be moved, and we cannot clone
this Box
(FFI_ArrowArrayStream
is not clonable).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah I see.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall looks pretty good! just a few last comments.
arrow/src/ffi_stream.rs
Outdated
type Item = Result<RecordBatch>; | ||
|
||
fn next(&mut self) -> Option<Self::Item> { | ||
self.stream.get_next.unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this seems unnecessary since by contract get_next
should be defined. Plus we also call unwrap
at line 368.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea, this can be removed.
arrow/src/ffi_stream.rs
Outdated
fn next(&mut self) -> Option<Self::Item> { | ||
self.stream.get_next.unwrap(); | ||
|
||
let stream_ptr = Arc::into_raw(self.stream.clone()) as *mut FFI_ArrowArrayStream; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can just use Arc::as_ptr
instead of clone
here.
let record_batch = RecordBatch::from(&StructArray::from(data)); | ||
|
||
Some(Ok(record_batch)) | ||
} else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we need to drop array_ptr
in the else branch too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Thanks @sunchao |
Merged, thanks @viirya ! |
Which issue does this PR close?
Closes #1348.
Rationale for this change
What changes are included in this PR?
Are there any user-facing changes?