Skip to content

Commit

Permalink
ARROW-9761: [C/C++] Add experimental C ArrowArrayStream ABI
Browse files Browse the repository at this point in the history
The goal is to have a standardized ABI to communicate streams of homogeneous
arrays or record batches (for example for database result sets).

The trickiest part is error reporting.  This proposal tries to strike
a compromise between simplicity (an integer error code mapping to errno values)
and expressivity (an optional description string for application-specific
and context-specific details).
  • Loading branch information
pitrou committed Aug 31, 2020
1 parent df3bee2 commit fe7bec2
Show file tree
Hide file tree
Showing 4 changed files with 159 additions and 0 deletions.
25 changes: 25 additions & 0 deletions cpp/src/arrow/c/abi.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,31 @@ struct ArrowArray {
void* private_data;
};

// EXPERIMENTAL
struct ArrowArrayStream {
// Callback to get the stream type
// (will be the same for all arrays in the stream).
// Return value: 0 if successful, an `errno`-compatible error code otherwise.
int (*get_schema)(struct ArrowArrayStream*, struct ArrowSchema* out);
// Callback to get the next array
// (if no error and the array is released, the stream has ended)
// Return value: 0 if successful, an `errno`-compatible error code otherwise.
int (*get_next)(struct ArrowArrayStream*, struct ArrowArray* out);

// Callback to get optional detailed error information.
// This must only be called if the last stream operation failed
// with a non-0 return code. The returned pointer is only valid until
// the next operation on this stream (including release).
// If unavailable, NULL is returned.
const char* (*get_last_error)(struct ArrowArrayStream*);

// Release callback: release the stream's own resources.
// Note that arrays returned by `get_next` must be individually released.
void (*release)(struct ArrowArrayStream*);
// Opaque producer-specific data
void* private_data;
};

#ifdef __cplusplus
}
#endif
110 changes: 110 additions & 0 deletions cpp/src/arrow/c/bridge.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "arrow/c/bridge.h"

#include <algorithm>
#include <cerrno>
#include <cstring>
#include <string>
#include <utility>
Expand Down Expand Up @@ -1501,4 +1502,113 @@ Result<std::shared_ptr<RecordBatch>> ImportRecordBatch(struct ArrowArray* array,
return ImportRecordBatch(array, *maybe_schema);
}

//////////////////////////////////////////////////////////////////////////
// C stream export

namespace {

class ExportedArrayStream {
public:
struct PrivateData {
explicit PrivateData(std::shared_ptr<RecordBatchReader> reader)
: reader_(std::move(reader)) {}

std::shared_ptr<RecordBatchReader> reader_;
std::string last_error_;

PrivateData() = default;
ARROW_DISALLOW_COPY_AND_ASSIGN(PrivateData);
};

explicit ExportedArrayStream(struct ArrowArrayStream* stream) : stream_(stream) {}

Status GetSchema(struct ArrowSchema* out_schema) {
return ExportSchema(*reader()->schema(), out_schema);
}

Status GetNext(struct ArrowArray* out_array) {
std::shared_ptr<RecordBatch> batch;
RETURN_NOT_OK(reader()->ReadNext(&batch));
if (batch == nullptr) {
// End of stream
ArrowArrayMarkReleased(out_array);
return Status::OK();
} else {
return ExportRecordBatch(*batch, out_array);
}
}

const char* GetLastError() {
const auto& last_error = private_data()->last_error_;
return last_error.empty() ? nullptr : last_error.c_str();
}

void Release() {
if (ArrowArrayStreamIsReleased(stream_)) {
return;
}
DCHECK_NE(private_data(), nullptr);
delete private_data();

ArrowArrayStreamMarkReleased(stream_);
}

// C-compatible callbacks

static int StaticGetSchema(struct ArrowArrayStream* stream,
struct ArrowSchema* out_schema) {
ExportedArrayStream self{stream};
return self.ToCError(self.GetSchema(out_schema));
}

static int StaticGetNext(struct ArrowArrayStream* stream,
struct ArrowArray* out_array) {
ExportedArrayStream self{stream};
return self.ToCError(self.GetNext(out_array));
}

static void StaticRelease(struct ArrowArrayStream* stream) {
ExportedArrayStream{stream}.Release();
}

static const char* StaticGetLastError(struct ArrowArrayStream* stream) {
return ExportedArrayStream{stream}.GetLastError();
}

private:
int ToCError(const Status& status) {
if (ARROW_PREDICT_TRUE(status.ok())) {
private_data()->last_error_.clear();
return 0;
}
private_data()->last_error_ = status.ToString();
switch (status.code()) {
case StatusCode::IOError:
return EIO;
default:
return EINVAL; // Most likely?
}
}

PrivateData* private_data() {
return reinterpret_cast<PrivateData*>(stream_->private_data);
}

const std::shared_ptr<RecordBatchReader>& reader() { return private_data()->reader_; }

struct ArrowArrayStream* stream_;
};

} // namespace

Status ExportRecordBatchReader(std::shared_ptr<RecordBatchReader> reader,
struct ArrowArrayStream* out) {
out->get_schema = ExportedArrayStream::StaticGetSchema;
out->get_next = ExportedArrayStream::StaticGetNext;
out->get_last_error = ExportedArrayStream::StaticGetLastError;
out->release = ExportedArrayStream::StaticRelease;
out->private_data = new ExportedArrayStream::PrivateData{std::move(reader)};
return Status::OK();
}

} // namespace arrow
6 changes: 6 additions & 0 deletions cpp/src/arrow/c/bridge.h
Original file line number Diff line number Diff line change
Expand Up @@ -160,4 +160,10 @@ ARROW_EXPORT
Result<std::shared_ptr<RecordBatch>> ImportRecordBatch(struct ArrowArray* array,
struct ArrowSchema* schema);

// EXPERIMENTAL: array stream APIs

ARROW_EXPORT
Status ExportRecordBatchReader(std::shared_ptr<RecordBatchReader> reader,
struct ArrowArrayStream* out);

} // namespace arrow
18 changes: 18 additions & 0 deletions cpp/src/arrow/c/helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,24 @@ inline void ArrowArrayRelease(struct ArrowArray* array) {
}
}

/// Query whether the C array stream is released
inline int ArrowArrayStreamIsReleased(const struct ArrowArrayStream* stream) {
return stream->release == NULL;
}

/// Mark the C array stream released (for use in release callbacks)
inline void ArrowArrayStreamMarkReleased(struct ArrowArrayStream* stream) {
stream->release = NULL;
}

/// Release the C array stream, if necessary, by calling its release callback
inline void ArrowArrayStreamRelease(struct ArrowArrayStream* stream) {
if (!ArrowArrayStreamIsReleased(stream)) {
stream->release(stream);
assert(ArrowArrayStreamIsReleased(stream));
}
}

#ifdef __cplusplus
}
#endif

0 comments on commit fe7bec2

Please sign in to comment.