Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-9761: [C/C++] Add experimental C stream inferface #8052

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions cpp/src/arrow/c/abi.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,44 @@ struct ArrowArray {
void* private_data;
};

// EXPERIMENTAL: C stream interface

struct ArrowArrayStream {
// Callback to get the stream type
// (will be the same for all arrays in the stream).
//
// Return value: 0 if successful, an `errno`-compatible error code otherwise.
//
// If successful, the ArrowSchema must be released independently from the stream.
int (*get_schema)(struct ArrowArrayStream*, struct ArrowSchema* out);

// Callback to get the next array
// (if no error and the array is released, the stream has ended)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

released is something defined in the ArrowArray-spec. Is it stronger or weaker guarantee then returning a nullptr here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A nullptr cannot be returned. The callback returns an int. However, we could say that returning -1 means end of stream.

//
// Return value: 0 if successful, an `errno`-compatible error code otherwise.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

silly question is errno-compatible a well defined unix/windows term? someplace where I can read more about it?

Copy link
Member Author

@pitrou pitrou Aug 26, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know how to phrase it: it returns value that are errno error codes (in case of error). A number of values are standard in C++: https://en.cppreference.com/w/cpp/error/errno_macros

//
// If successful, the ArrowArray must be released independently from the stream.
int (*get_next)(struct ArrowArrayStream*, struct ArrowArray* out);

// Callback to get optional detailed error information.
// This must only be called if the last stream operation failed
// with a non-0 return code.
//
// Return value: pointer to a null-terminated character array describing
// the last error, or NULL if no description is available.
//
// The returned pointer is only valid until the next operation on this stream
// (including release).
const char* (*get_last_error)(struct ArrowArrayStream*);

// Release callback: release the stream's own resources.
// Note that arrays returned by `get_next` must be individually released.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are arrays produced by this stream tied to the lifecycle of this stream (must they be released first?)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... I'd say no.

void (*release)(struct ArrowArrayStream*);

// Opaque producer-specific data
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit ntew line here or remove the new line above?

void* private_data;
};

#ifdef __cplusplus
}
#endif
194 changes: 194 additions & 0 deletions cpp/src/arrow/c/bridge.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "arrow/c/bridge.h"

#include <algorithm>
#include <cerrno>
#include <cstring>
#include <string>
#include <utility>
Expand Down Expand Up @@ -1501,4 +1502,197 @@ Result<std::shared_ptr<RecordBatch>> ImportRecordBatch(struct ArrowArray* array,
return ImportRecordBatch(array, *maybe_schema);
}

//////////////////////////////////////////////////////////////////////////
// C stream export

namespace {

class ExportedArrayStream {
public:
struct PrivateData {
explicit PrivateData(std::shared_ptr<RecordBatchReader> reader)
: reader_(std::move(reader)) {}

std::shared_ptr<RecordBatchReader> reader_;
std::string last_error_;

PrivateData() = default;
ARROW_DISALLOW_COPY_AND_ASSIGN(PrivateData);
};

explicit ExportedArrayStream(struct ArrowArrayStream* stream) : stream_(stream) {}

Status GetSchema(struct ArrowSchema* out_schema) {
return ExportSchema(*reader()->schema(), out_schema);
}

Status GetNext(struct ArrowArray* out_array) {
std::shared_ptr<RecordBatch> batch;
RETURN_NOT_OK(reader()->ReadNext(&batch));
if (batch == nullptr) {
// End of stream
ArrowArrayMarkReleased(out_array);
return Status::OK();
} else {
return ExportRecordBatch(*batch, out_array);
}
}

const char* GetLastError() {
const auto& last_error = private_data()->last_error_;
return last_error.empty() ? nullptr : last_error.c_str();
}

void Release() {
if (ArrowArrayStreamIsReleased(stream_)) {
return;
}
DCHECK_NE(private_data(), nullptr);
delete private_data();

ArrowArrayStreamMarkReleased(stream_);
}

// C-compatible callbacks

static int StaticGetSchema(struct ArrowArrayStream* stream,
struct ArrowSchema* out_schema) {
ExportedArrayStream self{stream};
return self.ToCError(self.GetSchema(out_schema));
}

static int StaticGetNext(struct ArrowArrayStream* stream,
struct ArrowArray* out_array) {
ExportedArrayStream self{stream};
return self.ToCError(self.GetNext(out_array));
}

static void StaticRelease(struct ArrowArrayStream* stream) {
ExportedArrayStream{stream}.Release();
}

static const char* StaticGetLastError(struct ArrowArrayStream* stream) {
return ExportedArrayStream{stream}.GetLastError();
}

private:
int ToCError(const Status& status) {
if (ARROW_PREDICT_TRUE(status.ok())) {
private_data()->last_error_.clear();
return 0;
}
private_data()->last_error_ = status.ToString();
switch (status.code()) {
case StatusCode::IOError:
return EIO;
case StatusCode::NotImplemented:
return ENOSYS;
case StatusCode::OutOfMemory:
return ENOMEM;
default:
return EINVAL; // Fallback for Invalid, TypeError, etc.
}
}

PrivateData* private_data() {
return reinterpret_cast<PrivateData*>(stream_->private_data);
}

const std::shared_ptr<RecordBatchReader>& reader() { return private_data()->reader_; }

struct ArrowArrayStream* stream_;
};

} // namespace

Status ExportRecordBatchReader(std::shared_ptr<RecordBatchReader> reader,
struct ArrowArrayStream* out) {
out->get_schema = ExportedArrayStream::StaticGetSchema;
out->get_next = ExportedArrayStream::StaticGetNext;
out->get_last_error = ExportedArrayStream::StaticGetLastError;
out->release = ExportedArrayStream::StaticRelease;
out->private_data = new ExportedArrayStream::PrivateData{std::move(reader)};
return Status::OK();
}

//////////////////////////////////////////////////////////////////////////
// C stream import

namespace {

class ArrayStreamBatchReader : public RecordBatchReader {
public:
explicit ArrayStreamBatchReader(struct ArrowArrayStream* stream) {
ArrowArrayStreamMove(stream, &stream_);
DCHECK(!ArrowArrayStreamIsReleased(&stream_));
}

~ArrayStreamBatchReader() {
ArrowArrayStreamRelease(&stream_);
DCHECK(ArrowArrayStreamIsReleased(&stream_));
}

std::shared_ptr<Schema> schema() const override { return CacheSchema(); }

Status ReadNext(std::shared_ptr<RecordBatch>* batch) override {
struct ArrowArray c_array;
RETURN_NOT_OK(StatusFromCError(stream_.get_next(&stream_, &c_array)));
if (ArrowArrayIsReleased(&c_array)) {
// End of stream
batch->reset();
return Status::OK();
} else {
return ImportRecordBatch(&c_array, CacheSchema()).Value(batch);
}
}

private:
std::shared_ptr<Schema> CacheSchema() const {
if (!schema_) {
struct ArrowSchema c_schema;
ARROW_CHECK_OK(StatusFromCError(stream_.get_schema(&stream_, &c_schema)));
schema_ = ImportSchema(&c_schema).ValueOrDie();
}
return schema_;
}

Status StatusFromCError(int errno_like) const {
if (ARROW_PREDICT_TRUE(errno_like == 0)) {
return Status::OK();
}
StatusCode code;
switch (errno_like) {
case EDOM:
case EINVAL:
case ERANGE:
code = StatusCode::Invalid;
break;
case ENOMEM:
code = StatusCode::OutOfMemory;
break;
case ENOSYS:
code = StatusCode::NotImplemented;
default:
code = StatusCode::IOError;
break;
}
const char* last_error = stream_.get_last_error(&stream_);
return Status(code, last_error ? std::string(last_error) : "");
}

mutable struct ArrowArrayStream stream_;
mutable std::shared_ptr<Schema> schema_;
};

} // namespace

Result<std::shared_ptr<RecordBatchReader>> ImportRecordBatchReader(
struct ArrowArrayStream* stream) {
if (ArrowArrayStreamIsReleased(stream)) {
return Status::Invalid("Cannot import released ArrowArrayStream");
}
// XXX should we call get_schema() here to avoid crashing on error?
return std::make_shared<ArrayStreamBatchReader>(stream);
}

} // namespace arrow
34 changes: 34 additions & 0 deletions cpp/src/arrow/c/bridge.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@

namespace arrow {

/// \defgroup c-data-interface Functions for working with the C data interface.
///
/// @{

/// \brief Export C++ DataType using the C data interface format.
///
/// The root type is considered to have empty name and metadata.
Expand Down Expand Up @@ -160,4 +164,34 @@ ARROW_EXPORT
Result<std::shared_ptr<RecordBatch>> ImportRecordBatch(struct ArrowArray* array,
struct ArrowSchema* schema);

/// @}

/// \defgroup c-stream-interface Functions for working with the C data interface.
///
/// @{

/// \brief EXPERIMENTAL: Export C++ RecordBatchReader using the C stream interface.
///
/// The resulting ArrowArrayStream struct keeps the record batch reader alive
/// until its release callback is called by the consumer.
///
/// \param[in] reader RecordBatchReader object to export
/// \param[out] out C struct where to export the stream
ARROW_EXPORT
Status ExportRecordBatchReader(std::shared_ptr<RecordBatchReader> reader,
struct ArrowArrayStream* out);

/// \brief EXPERIMENTAL: Import C++ RecordBatchReader from the C stream interface.
///
/// The ArrowArrayStream struct has its contents moved to a private object
/// held alive by the resulting record batch reader.
///
/// \param[in,out] stream C stream interface struct
/// \return Imported RecordBatchReader object
ARROW_EXPORT
Result<std::shared_ptr<RecordBatchReader>> ImportRecordBatchReader(
struct ArrowArrayStream* stream);

/// @}

} // namespace arrow
Loading