Skip to content

Commit

Permalink
Add C++ tests
Browse files Browse the repository at this point in the history
  • Loading branch information
pitrou committed Sep 1, 2020
1 parent ce70a82 commit 1b87b7a
Show file tree
Hide file tree
Showing 6 changed files with 319 additions and 15 deletions.
21 changes: 17 additions & 4 deletions cpp/src/arrow/c/abi.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,27 +60,40 @@ struct ArrowArray {
void* private_data;
};

// EXPERIMENTAL
// EXPERIMENTAL: C stream interface

struct ArrowArrayStream {
// Callback to get the stream type
// (will be the same for all arrays in the stream).
//
// Return value: 0 if successful, an `errno`-compatible error code otherwise.
//
// If successful, the ArrowSchema must be released independently from the stream.
int (*get_schema)(struct ArrowArrayStream*, struct ArrowSchema* out);

// Callback to get the next array
// (if no error and the array is released, the stream has ended)
//
// Return value: 0 if successful, an `errno`-compatible error code otherwise.
//
// If successful, the ArrowArray must be released independently from the stream.
int (*get_next)(struct ArrowArrayStream*, struct ArrowArray* out);

// Callback to get optional detailed error information.
// This must only be called if the last stream operation failed
// with a non-0 return code. The returned pointer is only valid until
// the next operation on this stream (including release).
// If unavailable, NULL is returned.
// with a non-0 return code.
//
// Return value: pointer to a null-terminated character array describing
// the last error, or NULL if no description is available.
//
// The returned pointer is only valid until the next operation on this stream
// (including release).
const char* (*get_last_error)(struct ArrowArrayStream*);

// Release callback: release the stream's own resources.
// Note that arrays returned by `get_next` must be individually released.
void (*release)(struct ArrowArrayStream*);

// Opaque producer-specific data
void* private_data;
};
Expand Down
31 changes: 24 additions & 7 deletions cpp/src/arrow/c/bridge.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1585,8 +1585,12 @@ class ExportedArrayStream {
switch (status.code()) {
case StatusCode::IOError:
return EIO;
case StatusCode::NotImplemented:
return ENOSYS;
case StatusCode::OutOfMemory:
return ENOMEM;
default:
return EINVAL; // Most likely?
return EINVAL; // Fallback for Invalid, TypeError, etc.
}
}

Expand Down Expand Up @@ -1618,15 +1622,21 @@ namespace {

class ArrayStreamBatchReader : public RecordBatchReader {
public:
explicit ArrayStreamBatchReader(struct ArrowArrayStream* stream) : stream_(stream) {}
explicit ArrayStreamBatchReader(struct ArrowArrayStream* stream) {
ArrowArrayStreamMove(stream, &stream_);
DCHECK(!ArrowArrayStreamIsReleased(&stream_));
}

~ArrayStreamBatchReader() { ArrowArrayStreamRelease(stream_); }
~ArrayStreamBatchReader() {
ArrowArrayStreamRelease(&stream_);
DCHECK(ArrowArrayStreamIsReleased(&stream_));
}

std::shared_ptr<Schema> schema() const override { return CacheSchema(); }

Status ReadNext(std::shared_ptr<RecordBatch>* batch) override {
struct ArrowArray c_array;
RETURN_NOT_OK(StatusFromCError(stream_->get_next(stream_, &c_array)));
RETURN_NOT_OK(StatusFromCError(stream_.get_next(&stream_, &c_array)));
if (ArrowArrayIsReleased(&c_array)) {
// End of stream
batch->reset();
Expand All @@ -1640,7 +1650,7 @@ class ArrayStreamBatchReader : public RecordBatchReader {
std::shared_ptr<Schema> CacheSchema() const {
if (!schema_) {
struct ArrowSchema c_schema;
ARROW_CHECK_OK(StatusFromCError(stream_->get_schema(stream_, &c_schema)));
ARROW_CHECK_OK(StatusFromCError(stream_.get_schema(&stream_, &c_schema)));
schema_ = ImportSchema(&c_schema).ValueOrDie();
}
return schema_;
Expand All @@ -1652,18 +1662,25 @@ class ArrayStreamBatchReader : public RecordBatchReader {
}
StatusCode code;
switch (errno_like) {
case EDOM:
case EINVAL:
case ERANGE:
code = StatusCode::Invalid;
break;
case ENOMEM:
code = StatusCode::OutOfMemory;
break;
case ENOSYS:
code = StatusCode::NotImplemented;
default:
code = StatusCode::IOError;
break;
}
const char* last_error = stream_->get_last_error(stream_);
const char* last_error = stream_.get_last_error(&stream_);
return Status(code, last_error ? std::string(last_error) : "");
}

struct ArrowArrayStream* stream_;
mutable struct ArrowArrayStream stream_;
mutable std::shared_ptr<Schema> schema_;
};

Expand Down
11 changes: 9 additions & 2 deletions cpp/src/arrow/c/bridge.h
Original file line number Diff line number Diff line change
Expand Up @@ -160,11 +160,18 @@ ARROW_EXPORT
Result<std::shared_ptr<RecordBatch>> ImportRecordBatch(struct ArrowArray* array,
struct ArrowSchema* schema);

// EXPERIMENTAL: array stream APIs

/// \brief EXPERIMENTAL: Export C++ RecordBatchReader using the C stream interface.
///
/// \param[in] reader RecordBatchReader object to export
/// \param[out] out C struct where to export the stream
ARROW_EXPORT
Status ExportRecordBatchReader(std::shared_ptr<RecordBatchReader> reader,
struct ArrowArrayStream* out);

/// \brief EXPERIMENTAL: Import C++ RecordBatchReader from the C stream interface.
///
/// \param[in,out] stream C stream interface struct
/// \return Imported RecordBatchReader object
ARROW_EXPORT
Result<std::shared_ptr<RecordBatchReader>> ImportRecordBatchReader(
struct ArrowArrayStream* stream);
Expand Down
Loading

0 comments on commit 1b87b7a

Please sign in to comment.