From 593d1aa0de5ae55cc08a5fbdf2e946da14d5f341 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Tue, 25 Aug 2020 19:57:25 +0200 Subject: [PATCH] Add C++ stream import API --- cpp/src/arrow/c/bridge.cc | 60 +++++++++++++++++++++++++++++++++++++++ cpp/src/arrow/c/bridge.h | 3 ++ 2 files changed, 63 insertions(+) diff --git a/cpp/src/arrow/c/bridge.cc b/cpp/src/arrow/c/bridge.cc index 504d59f1916a4..7dec039545253 100644 --- a/cpp/src/arrow/c/bridge.cc +++ b/cpp/src/arrow/c/bridge.cc @@ -1611,4 +1611,64 @@ Status ExportRecordBatchReader(std::shared_ptr reader, return Status::OK(); } +////////////////////////////////////////////////////////////////////////// +// C stream import + +namespace { + +class ArrayStreamBatchReader : public RecordBatchReader { + public: + explicit ArrayStreamBatchReader(struct ArrowArrayStream* stream) : stream_(stream) { + DCHECK(!ArrowArrayStreamIsReleased(stream_)); + } + + ~ArrayStreamBatchReader() { ArrowArrayStreamRelease(stream_); } + + std::shared_ptr schema() const override { return CacheSchema(); } + + Status ReadNext(std::shared_ptr* batch) override { + struct ArrowArray c_array; + RETURN_NOT_OK(StatusFromCError(stream_->get_next(stream_, &c_array))); + return ImportRecordBatch(&c_array, CacheSchema()).Value(batch); + } + + private: + std::shared_ptr CacheSchema() const { + if (!schema_) { + struct ArrowSchema c_schema; + ARROW_CHECK_OK(StatusFromCError(stream_->get_schema(stream_, &c_schema))); + schema_ = ImportSchema(&c_schema).ValueOrDie(); + } + return schema_; + } + + Status StatusFromCError(int errno_like) const { + if (ARROW_PREDICT_TRUE(errno_like == 0)) { + return Status::OK(); + } + StatusCode code; + switch (errno_like) { + case EINVAL: + code = StatusCode::Invalid; + break; + default: + code = StatusCode::IOError; + break; + } + const char* last_error = stream_->get_last_error(stream_); + return Status(code, last_error ? std::string(last_error) : ""); + } + + struct ArrowArrayStream* stream_; + mutable std::shared_ptr schema_; +}; + +} // namespace + +Result> ImportRecordBatchReader( + struct ArrowArrayStream* stream) { + // XXX should we call get_schema() here to avoid crashing on error? + return std::make_shared(stream); +} + } // namespace arrow diff --git a/cpp/src/arrow/c/bridge.h b/cpp/src/arrow/c/bridge.h index e54df1aaeadfa..751730af9bb05 100644 --- a/cpp/src/arrow/c/bridge.h +++ b/cpp/src/arrow/c/bridge.h @@ -165,5 +165,8 @@ Result> ImportRecordBatch(struct ArrowArray* array, ARROW_EXPORT Status ExportRecordBatchReader(std::shared_ptr reader, struct ArrowArrayStream* out); +ARROW_EXPORT +Result> ImportRecordBatchReader( + struct ArrowArrayStream* stream); } // namespace arrow