diff --git a/cpp/src/arrow/c/abi.h b/cpp/src/arrow/c/abi.h index 114fbf86537c4..a78170dbdbcb0 100644 --- a/cpp/src/arrow/c/abi.h +++ b/cpp/src/arrow/c/abi.h @@ -60,27 +60,40 @@ struct ArrowArray { void* private_data; }; -// EXPERIMENTAL +// EXPERIMENTAL: C stream interface + struct ArrowArrayStream { // Callback to get the stream type // (will be the same for all arrays in the stream). + // // Return value: 0 if successful, an `errno`-compatible error code otherwise. + // + // If successful, the ArrowSchema must be released independently from the stream. int (*get_schema)(struct ArrowArrayStream*, struct ArrowSchema* out); + // Callback to get the next array // (if no error and the array is released, the stream has ended) + // // Return value: 0 if successful, an `errno`-compatible error code otherwise. + // + // If successful, the ArrowArray must be released independently from the stream. int (*get_next)(struct ArrowArrayStream*, struct ArrowArray* out); // Callback to get optional detailed error information. // This must only be called if the last stream operation failed - // with a non-0 return code. The returned pointer is only valid until - // the next operation on this stream (including release). - // If unavailable, NULL is returned. + // with a non-0 return code. + // + // Return value: pointer to a null-terminated character array describing + // the last error, or NULL if no description is available. + // + // The returned pointer is only valid until the next operation on this stream + // (including release). const char* (*get_last_error)(struct ArrowArrayStream*); // Release callback: release the stream's own resources. // Note that arrays returned by `get_next` must be individually released. void (*release)(struct ArrowArrayStream*); + // Opaque producer-specific data void* private_data; }; diff --git a/cpp/src/arrow/c/bridge.cc b/cpp/src/arrow/c/bridge.cc index 2a22d2d6fac35..5b360abc48c07 100644 --- a/cpp/src/arrow/c/bridge.cc +++ b/cpp/src/arrow/c/bridge.cc @@ -1585,8 +1585,12 @@ class ExportedArrayStream { switch (status.code()) { case StatusCode::IOError: return EIO; + case StatusCode::NotImplemented: + return ENOSYS; + case StatusCode::OutOfMemory: + return ENOMEM; default: - return EINVAL; // Most likely? + return EINVAL; // Fallback for Invalid, TypeError, etc. } } @@ -1618,15 +1622,21 @@ namespace { class ArrayStreamBatchReader : public RecordBatchReader { public: - explicit ArrayStreamBatchReader(struct ArrowArrayStream* stream) : stream_(stream) {} + explicit ArrayStreamBatchReader(struct ArrowArrayStream* stream) { + ArrowArrayStreamMove(stream, &stream_); + DCHECK(!ArrowArrayStreamIsReleased(&stream_)); + } - ~ArrayStreamBatchReader() { ArrowArrayStreamRelease(stream_); } + ~ArrayStreamBatchReader() { + ArrowArrayStreamRelease(&stream_); + DCHECK(ArrowArrayStreamIsReleased(&stream_)); + } std::shared_ptr schema() const override { return CacheSchema(); } Status ReadNext(std::shared_ptr* batch) override { struct ArrowArray c_array; - RETURN_NOT_OK(StatusFromCError(stream_->get_next(stream_, &c_array))); + RETURN_NOT_OK(StatusFromCError(stream_.get_next(&stream_, &c_array))); if (ArrowArrayIsReleased(&c_array)) { // End of stream batch->reset(); @@ -1640,7 +1650,7 @@ class ArrayStreamBatchReader : public RecordBatchReader { std::shared_ptr CacheSchema() const { if (!schema_) { struct ArrowSchema c_schema; - ARROW_CHECK_OK(StatusFromCError(stream_->get_schema(stream_, &c_schema))); + ARROW_CHECK_OK(StatusFromCError(stream_.get_schema(&stream_, &c_schema))); schema_ = ImportSchema(&c_schema).ValueOrDie(); } return schema_; @@ -1652,18 +1662,25 @@ class ArrayStreamBatchReader : public RecordBatchReader { } StatusCode code; switch (errno_like) { + case EDOM: case EINVAL: + case ERANGE: code = StatusCode::Invalid; break; + case ENOMEM: + code = StatusCode::OutOfMemory; + break; + case ENOSYS: + code = StatusCode::NotImplemented; default: code = StatusCode::IOError; break; } - const char* last_error = stream_->get_last_error(stream_); + const char* last_error = stream_.get_last_error(&stream_); return Status(code, last_error ? std::string(last_error) : ""); } - struct ArrowArrayStream* stream_; + mutable struct ArrowArrayStream stream_; mutable std::shared_ptr schema_; }; diff --git a/cpp/src/arrow/c/bridge.h b/cpp/src/arrow/c/bridge.h index 751730af9bb05..f74995501166b 100644 --- a/cpp/src/arrow/c/bridge.h +++ b/cpp/src/arrow/c/bridge.h @@ -160,11 +160,18 @@ ARROW_EXPORT Result> ImportRecordBatch(struct ArrowArray* array, struct ArrowSchema* schema); -// EXPERIMENTAL: array stream APIs - +/// \brief EXPERIMENTAL: Export C++ RecordBatchReader using the C stream interface. +/// +/// \param[in] reader RecordBatchReader object to export +/// \param[out] out C struct where to export the stream ARROW_EXPORT Status ExportRecordBatchReader(std::shared_ptr reader, struct ArrowArrayStream* out); + +/// \brief EXPERIMENTAL: Import C++ RecordBatchReader from the C stream interface. +/// +/// \param[in,out] stream C stream interface struct +/// \return Imported RecordBatchReader object ARROW_EXPORT Result> ImportRecordBatchReader( struct ArrowArrayStream* stream); diff --git a/cpp/src/arrow/c/bridge_test.cc b/cpp/src/arrow/c/bridge_test.cc index 6695d6ed5db2b..3f84edfc2a606 100644 --- a/cpp/src/arrow/c/bridge_test.cc +++ b/cpp/src/arrow/c/bridge_test.cc @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +#include #include #include #include @@ -22,6 +23,7 @@ #include #include +#include #include #include "arrow/c/bridge.h" @@ -40,6 +42,8 @@ namespace arrow { using internal::ArrayExportGuard; using internal::ArrayExportTraits; +using internal::ArrayStreamExportGuard; +using internal::ArrayStreamExportTraits; using internal::SchemaExportGuard; using internal::SchemaExportTraits; @@ -78,11 +82,11 @@ class ReleaseCallback { explicit ReleaseCallback(CType* c_struct) : called_(false) { orig_release_ = c_struct->release; orig_private_data_ = c_struct->private_data; - c_struct->release = ReleaseUnbound; + c_struct->release = StaticRelease; c_struct->private_data = this; } - static void ReleaseUnbound(CType* c_struct) { + static void StaticRelease(CType* c_struct) { reinterpret_cast(c_struct->private_data)->Release(c_struct); } @@ -2678,4 +2682,248 @@ TEST_F(TestArrayRoundtrip, RecordBatch) { // TODO C -> C++ -> C roundtripping tests? +//////////////////////////////////////////////////////////////////////////// +// Array stream export tests + +class FailingRecordBatchReader : public RecordBatchReader { + public: + explicit FailingRecordBatchReader(Status error) : error_(std::move(error)) {} + + static std::shared_ptr expected_schema() { return arrow::schema({}); } + + std::shared_ptr schema() const override { return expected_schema(); } + + Status ReadNext(std::shared_ptr* batch) override { return error_; } + + protected: + Status error_; +}; + +class BaseArrayStreamTest : public ::testing::Test { + public: + void SetUp() override { + pool_ = default_memory_pool(); + orig_allocated_ = pool_->bytes_allocated(); + } + + void TearDown() override { ASSERT_EQ(pool_->bytes_allocated(), orig_allocated_); } + + RecordBatchVector MakeBatches(std::shared_ptr schema, ArrayVector arrays) { + DCHECK_EQ(schema->num_fields(), 1); + RecordBatchVector batches; + for (const auto& array : arrays) { + batches.push_back(RecordBatch::Make(schema, array->length(), {array})); + } + return batches; + } + + protected: + MemoryPool* pool_; + int64_t orig_allocated_; +}; + +class TestArrayStreamExport : public BaseArrayStreamTest { + public: + void AssertStreamSchema(struct ArrowArrayStream* c_stream, const Schema& expected) { + struct ArrowSchema c_schema; + ASSERT_EQ(0, c_stream->get_schema(c_stream, &c_schema)); + + SchemaExportGuard schema_guard(&c_schema); + ASSERT_FALSE(ArrowSchemaIsReleased(&c_schema)); + ASSERT_OK_AND_ASSIGN(auto schema, ImportSchema(&c_schema)); + AssertSchemaEqual(expected, *schema); + } + + void AssertStreamEnd(struct ArrowArrayStream* c_stream) { + struct ArrowArray c_array; + ASSERT_EQ(0, c_stream->get_next(c_stream, &c_array)); + + ArrayExportGuard guard(&c_array); + ASSERT_TRUE(ArrowArrayIsReleased(&c_array)); + } + + void AssertStreamNext(struct ArrowArrayStream* c_stream, const RecordBatch& expected) { + struct ArrowArray c_array; + ASSERT_EQ(0, c_stream->get_next(c_stream, &c_array)); + + ArrayExportGuard guard(&c_array); + ASSERT_FALSE(ArrowArrayIsReleased(&c_array)); + + ASSERT_OK_AND_ASSIGN(auto batch, ImportRecordBatch(&c_array, expected.schema())); + AssertBatchesEqual(expected, *batch); + } +}; + +TEST_F(TestArrayStreamExport, Empty) { + auto schema = arrow::schema({field("ints", int32())}); + auto batches = MakeBatches(schema, {}); + ASSERT_OK_AND_ASSIGN(auto reader, RecordBatchReader::Make(batches, schema)); + + struct ArrowArrayStream c_stream; + + ASSERT_OK(ExportRecordBatchReader(reader, &c_stream)); + ArrayStreamExportGuard guard(&c_stream); + + ASSERT_FALSE(ArrowArrayStreamIsReleased(&c_stream)); + AssertStreamSchema(&c_stream, *schema); + AssertStreamEnd(&c_stream); + AssertStreamEnd(&c_stream); +} + +TEST_F(TestArrayStreamExport, Simple) { + auto schema = arrow::schema({field("ints", int32())}); + auto batches = MakeBatches( + schema, {ArrayFromJSON(int32(), "[1, 2]"), ArrayFromJSON(int32(), "[4, 5, null]")}); + ASSERT_OK_AND_ASSIGN(auto reader, RecordBatchReader::Make(batches, schema)); + + struct ArrowArrayStream c_stream; + + ASSERT_OK(ExportRecordBatchReader(reader, &c_stream)); + ArrayStreamExportGuard guard(&c_stream); + + ASSERT_FALSE(ArrowArrayStreamIsReleased(&c_stream)); + AssertStreamSchema(&c_stream, *schema); + AssertStreamNext(&c_stream, *batches[0]); + AssertStreamNext(&c_stream, *batches[1]); + AssertStreamEnd(&c_stream); + AssertStreamEnd(&c_stream); +} + +TEST_F(TestArrayStreamExport, ArrayLifetime) { + auto schema = arrow::schema({field("ints", int32())}); + auto batches = MakeBatches( + schema, {ArrayFromJSON(int32(), "[1, 2]"), ArrayFromJSON(int32(), "[4, 5, null]")}); + ASSERT_OK_AND_ASSIGN(auto reader, RecordBatchReader::Make(batches, schema)); + + struct ArrowArrayStream c_stream; + struct ArrowSchema c_schema; + struct ArrowArray c_array0, c_array1; + + ASSERT_OK(ExportRecordBatchReader(reader, &c_stream)); + { + ArrayStreamExportGuard guard(&c_stream); + ASSERT_FALSE(ArrowArrayStreamIsReleased(&c_stream)); + + ASSERT_EQ(0, c_stream.get_schema(&c_stream, &c_schema)); + ASSERT_EQ(0, c_stream.get_next(&c_stream, &c_array0)); + ASSERT_EQ(0, c_stream.get_next(&c_stream, &c_array1)); + AssertStreamEnd(&c_stream); + } + + ArrayExportGuard guard0(&c_array0), guard1(&c_array1); + + { + SchemaExportGuard schema_guard(&c_schema); + ASSERT_OK_AND_ASSIGN(auto got_schema, ImportSchema(&c_schema)); + AssertSchemaEqual(*schema, *got_schema); + } + + ASSERT_GT(pool_->bytes_allocated(), orig_allocated_); + ASSERT_OK_AND_ASSIGN(auto batch, ImportRecordBatch(&c_array1, schema)); + AssertBatchesEqual(*batches[1], *batch); + ASSERT_OK_AND_ASSIGN(batch, ImportRecordBatch(&c_array0, schema)); + AssertBatchesEqual(*batches[0], *batch); +} + +TEST_F(TestArrayStreamExport, Errors) { + auto reader = + std::make_shared(Status::Invalid("some example error")); + + struct ArrowArrayStream c_stream; + + ASSERT_OK(ExportRecordBatchReader(reader, &c_stream)); + ArrayStreamExportGuard guard(&c_stream); + + struct ArrowSchema c_schema; + ASSERT_EQ(0, c_stream.get_schema(&c_stream, &c_schema)); + ASSERT_FALSE(ArrowSchemaIsReleased(&c_schema)); + { + SchemaExportGuard schema_guard(&c_schema); + ASSERT_OK_AND_ASSIGN(auto schema, ImportSchema(&c_schema)); + AssertSchemaEqual(schema, arrow::schema({})); + } + + struct ArrowArray c_array; + ASSERT_EQ(EINVAL, c_stream.get_next(&c_stream, &c_array)); +} + +//////////////////////////////////////////////////////////////////////////// +// Array stream roundtrip tests + +class TestArrayStreamRoundtrip : public BaseArrayStreamTest { + public: + void Roundtrip(std::shared_ptr* reader, + struct ArrowArrayStream* c_stream) { + ASSERT_OK(ExportRecordBatchReader(*reader, c_stream)); + ASSERT_FALSE(ArrowArrayStreamIsReleased(c_stream)); + + ASSERT_OK_AND_ASSIGN(auto got_reader, ImportRecordBatchReader(c_stream)); + *reader = std::move(got_reader); + } + + void Roundtrip( + std::shared_ptr reader, + std::function&)> check_func) { + ArrowArrayStream c_stream; + + // NOTE: ReleaseCallback<> is not immediately usable with ArrowArrayStream, + // because get_next and get_schema need the original private_data. + std::weak_ptr weak_reader(reader); + ASSERT_EQ(weak_reader.use_count(), 1); // Expiration check will fail otherwise + + ASSERT_OK(ExportRecordBatchReader(std::move(reader), &c_stream)); + ASSERT_FALSE(ArrowArrayStreamIsReleased(&c_stream)); + + { + ASSERT_OK_AND_ASSIGN(auto new_reader, ImportRecordBatchReader(&c_stream)); + // Stream was moved + ASSERT_TRUE(ArrowArrayStreamIsReleased(&c_stream)); + ASSERT_FALSE(weak_reader.expired()); + + check_func(new_reader); + } + // Stream was released when `new_reader` was destroyed + ASSERT_TRUE(weak_reader.expired()); + } + + void AssertReaderNext(const std::shared_ptr& reader, + const RecordBatch& expected) { + ASSERT_OK_AND_ASSIGN(auto batch, reader->Next()); + ASSERT_NE(batch, nullptr); + AssertBatchesEqual(expected, *batch); + } + + void AssertReaderEnd(const std::shared_ptr& reader) { + ASSERT_OK_AND_ASSIGN(auto batch, reader->Next()); + ASSERT_EQ(batch, nullptr); + } +}; + +TEST_F(TestArrayStreamRoundtrip, Simple) { + auto orig_schema = arrow::schema({field("ints", int32())}); + auto batches = MakeBatches(orig_schema, {ArrayFromJSON(int32(), "[1, 2]"), + ArrayFromJSON(int32(), "[4, 5, null]")}); + + ASSERT_OK_AND_ASSIGN(auto reader, RecordBatchReader::Make(batches, orig_schema)); + + Roundtrip(std::move(reader), [&](const std::shared_ptr& reader) { + AssertSchemaEqual(*orig_schema, *reader->schema()); + AssertReaderNext(reader, *batches[0]); + AssertReaderNext(reader, *batches[1]); + AssertReaderEnd(reader); + AssertReaderEnd(reader); + }); +} + +TEST_F(TestArrayStreamRoundtrip, Errors) { + auto reader = std::make_shared( + Status::Invalid("roundtrip error example")); + + Roundtrip(std::move(reader), [&](const std::shared_ptr& reader) { + auto status = reader->Next().status(); + ASSERT_RAISES(Invalid, status); + ASSERT_THAT(status.message(), ::testing::HasSubstr("roundtrip error example")); + }); +} + } // namespace arrow diff --git a/cpp/src/arrow/c/helpers.h b/cpp/src/arrow/c/helpers.h index 7103d2afcf91a..a5c1f6fe4bab5 100644 --- a/cpp/src/arrow/c/helpers.h +++ b/cpp/src/arrow/c/helpers.h @@ -92,6 +92,18 @@ inline void ArrowArrayStreamMarkReleased(struct ArrowArrayStream* stream) { stream->release = NULL; } +/// Move the C array stream from `src` to `dest` +/// +/// Note `dest` must *not* point to a valid stream already, otherwise there +/// will be a memory leak. +inline void ArrowArrayStreamMove(struct ArrowArrayStream* src, + struct ArrowArrayStream* dest) { + assert(dest != src); + assert(!ArrowArrayStreamIsReleased(src)); + memcpy(dest, src, sizeof(struct ArrowArrayStream)); + ArrowArrayStreamMarkReleased(src); +} + /// Release the C array stream, if necessary, by calling its release callback inline void ArrowArrayStreamRelease(struct ArrowArrayStream* stream) { if (!ArrowArrayStreamIsReleased(stream)) { diff --git a/cpp/src/arrow/c/util_internal.h b/cpp/src/arrow/c/util_internal.h index 3ece5245205aa..6a33be9b0da8e 100644 --- a/cpp/src/arrow/c/util_internal.h +++ b/cpp/src/arrow/c/util_internal.h @@ -34,6 +34,12 @@ struct ArrayExportTraits { static constexpr auto ReleaseFunc = &ArrowArrayRelease; }; +struct ArrayStreamExportTraits { + typedef struct ArrowArrayStream CType; + static constexpr auto IsReleasedFunc = &ArrowArrayStreamIsReleased; + static constexpr auto ReleaseFunc = &ArrowArrayStreamRelease; +}; + // A RAII-style object to release a C Array / Schema struct at block scope exit. template class ExportGuard { @@ -73,6 +79,7 @@ class ExportGuard { using SchemaExportGuard = ExportGuard; using ArrayExportGuard = ExportGuard; +using ArrayStreamExportGuard = ExportGuard; } // namespace internal } // namespace arrow