Skip to content

Commit

Permalink
Add C++ stream import API
Browse files Browse the repository at this point in the history
  • Loading branch information
pitrou committed Aug 31, 2020
1 parent fe7bec2 commit 593d1aa
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 0 deletions.
60 changes: 60 additions & 0 deletions cpp/src/arrow/c/bridge.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1611,4 +1611,64 @@ Status ExportRecordBatchReader(std::shared_ptr<RecordBatchReader> reader,
return Status::OK();
}

//////////////////////////////////////////////////////////////////////////
// C stream import

namespace {

class ArrayStreamBatchReader : public RecordBatchReader {
public:
explicit ArrayStreamBatchReader(struct ArrowArrayStream* stream) : stream_(stream) {
DCHECK(!ArrowArrayStreamIsReleased(stream_));
}

~ArrayStreamBatchReader() { ArrowArrayStreamRelease(stream_); }

std::shared_ptr<Schema> schema() const override { return CacheSchema(); }

Status ReadNext(std::shared_ptr<RecordBatch>* batch) override {
struct ArrowArray c_array;
RETURN_NOT_OK(StatusFromCError(stream_->get_next(stream_, &c_array)));
return ImportRecordBatch(&c_array, CacheSchema()).Value(batch);
}

private:
std::shared_ptr<Schema> CacheSchema() const {
if (!schema_) {
struct ArrowSchema c_schema;
ARROW_CHECK_OK(StatusFromCError(stream_->get_schema(stream_, &c_schema)));
schema_ = ImportSchema(&c_schema).ValueOrDie();
}
return schema_;
}

Status StatusFromCError(int errno_like) const {
if (ARROW_PREDICT_TRUE(errno_like == 0)) {
return Status::OK();
}
StatusCode code;
switch (errno_like) {
case EINVAL:
code = StatusCode::Invalid;
break;
default:
code = StatusCode::IOError;
break;
}
const char* last_error = stream_->get_last_error(stream_);
return Status(code, last_error ? std::string(last_error) : "");
}

struct ArrowArrayStream* stream_;
mutable std::shared_ptr<Schema> schema_;
};

} // namespace

Result<std::shared_ptr<RecordBatchReader>> ImportRecordBatchReader(
struct ArrowArrayStream* stream) {
// XXX should we call get_schema() here to avoid crashing on error?
return std::make_shared<ArrayStreamBatchReader>(stream);
}

} // namespace arrow
3 changes: 3 additions & 0 deletions cpp/src/arrow/c/bridge.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,5 +165,8 @@ Result<std::shared_ptr<RecordBatch>> ImportRecordBatch(struct ArrowArray* array,
ARROW_EXPORT
Status ExportRecordBatchReader(std::shared_ptr<RecordBatchReader> reader,
struct ArrowArrayStream* out);
ARROW_EXPORT
Result<std::shared_ptr<RecordBatchReader>> ImportRecordBatchReader(
struct ArrowArrayStream* stream);

} // namespace arrow

0 comments on commit 593d1aa

Please sign in to comment.