diff --git a/cpp/src/arrow/c/abi.h b/cpp/src/arrow/c/abi.h index 821bc961281c2..a78170dbdbcb0 100644 --- a/cpp/src/arrow/c/abi.h +++ b/cpp/src/arrow/c/abi.h @@ -60,6 +60,44 @@ struct ArrowArray { void* private_data; }; +// EXPERIMENTAL: C stream interface + +struct ArrowArrayStream { + // Callback to get the stream type + // (will be the same for all arrays in the stream). + // + // Return value: 0 if successful, an `errno`-compatible error code otherwise. + // + // If successful, the ArrowSchema must be released independently from the stream. + int (*get_schema)(struct ArrowArrayStream*, struct ArrowSchema* out); + + // Callback to get the next array + // (if no error and the array is released, the stream has ended) + // + // Return value: 0 if successful, an `errno`-compatible error code otherwise. + // + // If successful, the ArrowArray must be released independently from the stream. + int (*get_next)(struct ArrowArrayStream*, struct ArrowArray* out); + + // Callback to get optional detailed error information. + // This must only be called if the last stream operation failed + // with a non-0 return code. + // + // Return value: pointer to a null-terminated character array describing + // the last error, or NULL if no description is available. + // + // The returned pointer is only valid until the next operation on this stream + // (including release). + const char* (*get_last_error)(struct ArrowArrayStream*); + + // Release callback: release the stream's own resources. + // Note that arrays returned by `get_next` must be individually released. + void (*release)(struct ArrowArrayStream*); + + // Opaque producer-specific data + void* private_data; +}; + #ifdef __cplusplus } #endif diff --git a/cpp/src/arrow/c/bridge.cc b/cpp/src/arrow/c/bridge.cc index 1e602a6a310f0..5b360abc48c07 100644 --- a/cpp/src/arrow/c/bridge.cc +++ b/cpp/src/arrow/c/bridge.cc @@ -18,6 +18,7 @@ #include "arrow/c/bridge.h" #include +#include #include #include #include @@ -1501,4 +1502,197 @@ Result> ImportRecordBatch(struct ArrowArray* array, return ImportRecordBatch(array, *maybe_schema); } +////////////////////////////////////////////////////////////////////////// +// C stream export + +namespace { + +class ExportedArrayStream { + public: + struct PrivateData { + explicit PrivateData(std::shared_ptr reader) + : reader_(std::move(reader)) {} + + std::shared_ptr reader_; + std::string last_error_; + + PrivateData() = default; + ARROW_DISALLOW_COPY_AND_ASSIGN(PrivateData); + }; + + explicit ExportedArrayStream(struct ArrowArrayStream* stream) : stream_(stream) {} + + Status GetSchema(struct ArrowSchema* out_schema) { + return ExportSchema(*reader()->schema(), out_schema); + } + + Status GetNext(struct ArrowArray* out_array) { + std::shared_ptr batch; + RETURN_NOT_OK(reader()->ReadNext(&batch)); + if (batch == nullptr) { + // End of stream + ArrowArrayMarkReleased(out_array); + return Status::OK(); + } else { + return ExportRecordBatch(*batch, out_array); + } + } + + const char* GetLastError() { + const auto& last_error = private_data()->last_error_; + return last_error.empty() ? nullptr : last_error.c_str(); + } + + void Release() { + if (ArrowArrayStreamIsReleased(stream_)) { + return; + } + DCHECK_NE(private_data(), nullptr); + delete private_data(); + + ArrowArrayStreamMarkReleased(stream_); + } + + // C-compatible callbacks + + static int StaticGetSchema(struct ArrowArrayStream* stream, + struct ArrowSchema* out_schema) { + ExportedArrayStream self{stream}; + return self.ToCError(self.GetSchema(out_schema)); + } + + static int StaticGetNext(struct ArrowArrayStream* stream, + struct ArrowArray* out_array) { + ExportedArrayStream self{stream}; + return self.ToCError(self.GetNext(out_array)); + } + + static void StaticRelease(struct ArrowArrayStream* stream) { + ExportedArrayStream{stream}.Release(); + } + + static const char* StaticGetLastError(struct ArrowArrayStream* stream) { + return ExportedArrayStream{stream}.GetLastError(); + } + + private: + int ToCError(const Status& status) { + if (ARROW_PREDICT_TRUE(status.ok())) { + private_data()->last_error_.clear(); + return 0; + } + private_data()->last_error_ = status.ToString(); + switch (status.code()) { + case StatusCode::IOError: + return EIO; + case StatusCode::NotImplemented: + return ENOSYS; + case StatusCode::OutOfMemory: + return ENOMEM; + default: + return EINVAL; // Fallback for Invalid, TypeError, etc. + } + } + + PrivateData* private_data() { + return reinterpret_cast(stream_->private_data); + } + + const std::shared_ptr& reader() { return private_data()->reader_; } + + struct ArrowArrayStream* stream_; +}; + +} // namespace + +Status ExportRecordBatchReader(std::shared_ptr reader, + struct ArrowArrayStream* out) { + out->get_schema = ExportedArrayStream::StaticGetSchema; + out->get_next = ExportedArrayStream::StaticGetNext; + out->get_last_error = ExportedArrayStream::StaticGetLastError; + out->release = ExportedArrayStream::StaticRelease; + out->private_data = new ExportedArrayStream::PrivateData{std::move(reader)}; + return Status::OK(); +} + +////////////////////////////////////////////////////////////////////////// +// C stream import + +namespace { + +class ArrayStreamBatchReader : public RecordBatchReader { + public: + explicit ArrayStreamBatchReader(struct ArrowArrayStream* stream) { + ArrowArrayStreamMove(stream, &stream_); + DCHECK(!ArrowArrayStreamIsReleased(&stream_)); + } + + ~ArrayStreamBatchReader() { + ArrowArrayStreamRelease(&stream_); + DCHECK(ArrowArrayStreamIsReleased(&stream_)); + } + + std::shared_ptr schema() const override { return CacheSchema(); } + + Status ReadNext(std::shared_ptr* batch) override { + struct ArrowArray c_array; + RETURN_NOT_OK(StatusFromCError(stream_.get_next(&stream_, &c_array))); + if (ArrowArrayIsReleased(&c_array)) { + // End of stream + batch->reset(); + return Status::OK(); + } else { + return ImportRecordBatch(&c_array, CacheSchema()).Value(batch); + } + } + + private: + std::shared_ptr CacheSchema() const { + if (!schema_) { + struct ArrowSchema c_schema; + ARROW_CHECK_OK(StatusFromCError(stream_.get_schema(&stream_, &c_schema))); + schema_ = ImportSchema(&c_schema).ValueOrDie(); + } + return schema_; + } + + Status StatusFromCError(int errno_like) const { + if (ARROW_PREDICT_TRUE(errno_like == 0)) { + return Status::OK(); + } + StatusCode code; + switch (errno_like) { + case EDOM: + case EINVAL: + case ERANGE: + code = StatusCode::Invalid; + break; + case ENOMEM: + code = StatusCode::OutOfMemory; + break; + case ENOSYS: + code = StatusCode::NotImplemented; + default: + code = StatusCode::IOError; + break; + } + const char* last_error = stream_.get_last_error(&stream_); + return Status(code, last_error ? std::string(last_error) : ""); + } + + mutable struct ArrowArrayStream stream_; + mutable std::shared_ptr schema_; +}; + +} // namespace + +Result> ImportRecordBatchReader( + struct ArrowArrayStream* stream) { + if (ArrowArrayStreamIsReleased(stream)) { + return Status::Invalid("Cannot import released ArrowArrayStream"); + } + // XXX should we call get_schema() here to avoid crashing on error? + return std::make_shared(stream); +} + } // namespace arrow diff --git a/cpp/src/arrow/c/bridge.h b/cpp/src/arrow/c/bridge.h index 8efb5d98bed05..294f53e49fb55 100644 --- a/cpp/src/arrow/c/bridge.h +++ b/cpp/src/arrow/c/bridge.h @@ -29,6 +29,10 @@ namespace arrow { +/// \defgroup c-data-interface Functions for working with the C data interface. +/// +/// @{ + /// \brief Export C++ DataType using the C data interface format. /// /// The root type is considered to have empty name and metadata. @@ -160,4 +164,34 @@ ARROW_EXPORT Result> ImportRecordBatch(struct ArrowArray* array, struct ArrowSchema* schema); +/// @} + +/// \defgroup c-stream-interface Functions for working with the C data interface. +/// +/// @{ + +/// \brief EXPERIMENTAL: Export C++ RecordBatchReader using the C stream interface. +/// +/// The resulting ArrowArrayStream struct keeps the record batch reader alive +/// until its release callback is called by the consumer. +/// +/// \param[in] reader RecordBatchReader object to export +/// \param[out] out C struct where to export the stream +ARROW_EXPORT +Status ExportRecordBatchReader(std::shared_ptr reader, + struct ArrowArrayStream* out); + +/// \brief EXPERIMENTAL: Import C++ RecordBatchReader from the C stream interface. +/// +/// The ArrowArrayStream struct has its contents moved to a private object +/// held alive by the resulting record batch reader. +/// +/// \param[in,out] stream C stream interface struct +/// \return Imported RecordBatchReader object +ARROW_EXPORT +Result> ImportRecordBatchReader( + struct ArrowArrayStream* stream); + +/// @} + } // namespace arrow diff --git a/cpp/src/arrow/c/bridge_test.cc b/cpp/src/arrow/c/bridge_test.cc index 6695d6ed5db2b..3f84edfc2a606 100644 --- a/cpp/src/arrow/c/bridge_test.cc +++ b/cpp/src/arrow/c/bridge_test.cc @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +#include #include #include #include @@ -22,6 +23,7 @@ #include #include +#include #include #include "arrow/c/bridge.h" @@ -40,6 +42,8 @@ namespace arrow { using internal::ArrayExportGuard; using internal::ArrayExportTraits; +using internal::ArrayStreamExportGuard; +using internal::ArrayStreamExportTraits; using internal::SchemaExportGuard; using internal::SchemaExportTraits; @@ -78,11 +82,11 @@ class ReleaseCallback { explicit ReleaseCallback(CType* c_struct) : called_(false) { orig_release_ = c_struct->release; orig_private_data_ = c_struct->private_data; - c_struct->release = ReleaseUnbound; + c_struct->release = StaticRelease; c_struct->private_data = this; } - static void ReleaseUnbound(CType* c_struct) { + static void StaticRelease(CType* c_struct) { reinterpret_cast(c_struct->private_data)->Release(c_struct); } @@ -2678,4 +2682,248 @@ TEST_F(TestArrayRoundtrip, RecordBatch) { // TODO C -> C++ -> C roundtripping tests? +//////////////////////////////////////////////////////////////////////////// +// Array stream export tests + +class FailingRecordBatchReader : public RecordBatchReader { + public: + explicit FailingRecordBatchReader(Status error) : error_(std::move(error)) {} + + static std::shared_ptr expected_schema() { return arrow::schema({}); } + + std::shared_ptr schema() const override { return expected_schema(); } + + Status ReadNext(std::shared_ptr* batch) override { return error_; } + + protected: + Status error_; +}; + +class BaseArrayStreamTest : public ::testing::Test { + public: + void SetUp() override { + pool_ = default_memory_pool(); + orig_allocated_ = pool_->bytes_allocated(); + } + + void TearDown() override { ASSERT_EQ(pool_->bytes_allocated(), orig_allocated_); } + + RecordBatchVector MakeBatches(std::shared_ptr schema, ArrayVector arrays) { + DCHECK_EQ(schema->num_fields(), 1); + RecordBatchVector batches; + for (const auto& array : arrays) { + batches.push_back(RecordBatch::Make(schema, array->length(), {array})); + } + return batches; + } + + protected: + MemoryPool* pool_; + int64_t orig_allocated_; +}; + +class TestArrayStreamExport : public BaseArrayStreamTest { + public: + void AssertStreamSchema(struct ArrowArrayStream* c_stream, const Schema& expected) { + struct ArrowSchema c_schema; + ASSERT_EQ(0, c_stream->get_schema(c_stream, &c_schema)); + + SchemaExportGuard schema_guard(&c_schema); + ASSERT_FALSE(ArrowSchemaIsReleased(&c_schema)); + ASSERT_OK_AND_ASSIGN(auto schema, ImportSchema(&c_schema)); + AssertSchemaEqual(expected, *schema); + } + + void AssertStreamEnd(struct ArrowArrayStream* c_stream) { + struct ArrowArray c_array; + ASSERT_EQ(0, c_stream->get_next(c_stream, &c_array)); + + ArrayExportGuard guard(&c_array); + ASSERT_TRUE(ArrowArrayIsReleased(&c_array)); + } + + void AssertStreamNext(struct ArrowArrayStream* c_stream, const RecordBatch& expected) { + struct ArrowArray c_array; + ASSERT_EQ(0, c_stream->get_next(c_stream, &c_array)); + + ArrayExportGuard guard(&c_array); + ASSERT_FALSE(ArrowArrayIsReleased(&c_array)); + + ASSERT_OK_AND_ASSIGN(auto batch, ImportRecordBatch(&c_array, expected.schema())); + AssertBatchesEqual(expected, *batch); + } +}; + +TEST_F(TestArrayStreamExport, Empty) { + auto schema = arrow::schema({field("ints", int32())}); + auto batches = MakeBatches(schema, {}); + ASSERT_OK_AND_ASSIGN(auto reader, RecordBatchReader::Make(batches, schema)); + + struct ArrowArrayStream c_stream; + + ASSERT_OK(ExportRecordBatchReader(reader, &c_stream)); + ArrayStreamExportGuard guard(&c_stream); + + ASSERT_FALSE(ArrowArrayStreamIsReleased(&c_stream)); + AssertStreamSchema(&c_stream, *schema); + AssertStreamEnd(&c_stream); + AssertStreamEnd(&c_stream); +} + +TEST_F(TestArrayStreamExport, Simple) { + auto schema = arrow::schema({field("ints", int32())}); + auto batches = MakeBatches( + schema, {ArrayFromJSON(int32(), "[1, 2]"), ArrayFromJSON(int32(), "[4, 5, null]")}); + ASSERT_OK_AND_ASSIGN(auto reader, RecordBatchReader::Make(batches, schema)); + + struct ArrowArrayStream c_stream; + + ASSERT_OK(ExportRecordBatchReader(reader, &c_stream)); + ArrayStreamExportGuard guard(&c_stream); + + ASSERT_FALSE(ArrowArrayStreamIsReleased(&c_stream)); + AssertStreamSchema(&c_stream, *schema); + AssertStreamNext(&c_stream, *batches[0]); + AssertStreamNext(&c_stream, *batches[1]); + AssertStreamEnd(&c_stream); + AssertStreamEnd(&c_stream); +} + +TEST_F(TestArrayStreamExport, ArrayLifetime) { + auto schema = arrow::schema({field("ints", int32())}); + auto batches = MakeBatches( + schema, {ArrayFromJSON(int32(), "[1, 2]"), ArrayFromJSON(int32(), "[4, 5, null]")}); + ASSERT_OK_AND_ASSIGN(auto reader, RecordBatchReader::Make(batches, schema)); + + struct ArrowArrayStream c_stream; + struct ArrowSchema c_schema; + struct ArrowArray c_array0, c_array1; + + ASSERT_OK(ExportRecordBatchReader(reader, &c_stream)); + { + ArrayStreamExportGuard guard(&c_stream); + ASSERT_FALSE(ArrowArrayStreamIsReleased(&c_stream)); + + ASSERT_EQ(0, c_stream.get_schema(&c_stream, &c_schema)); + ASSERT_EQ(0, c_stream.get_next(&c_stream, &c_array0)); + ASSERT_EQ(0, c_stream.get_next(&c_stream, &c_array1)); + AssertStreamEnd(&c_stream); + } + + ArrayExportGuard guard0(&c_array0), guard1(&c_array1); + + { + SchemaExportGuard schema_guard(&c_schema); + ASSERT_OK_AND_ASSIGN(auto got_schema, ImportSchema(&c_schema)); + AssertSchemaEqual(*schema, *got_schema); + } + + ASSERT_GT(pool_->bytes_allocated(), orig_allocated_); + ASSERT_OK_AND_ASSIGN(auto batch, ImportRecordBatch(&c_array1, schema)); + AssertBatchesEqual(*batches[1], *batch); + ASSERT_OK_AND_ASSIGN(batch, ImportRecordBatch(&c_array0, schema)); + AssertBatchesEqual(*batches[0], *batch); +} + +TEST_F(TestArrayStreamExport, Errors) { + auto reader = + std::make_shared(Status::Invalid("some example error")); + + struct ArrowArrayStream c_stream; + + ASSERT_OK(ExportRecordBatchReader(reader, &c_stream)); + ArrayStreamExportGuard guard(&c_stream); + + struct ArrowSchema c_schema; + ASSERT_EQ(0, c_stream.get_schema(&c_stream, &c_schema)); + ASSERT_FALSE(ArrowSchemaIsReleased(&c_schema)); + { + SchemaExportGuard schema_guard(&c_schema); + ASSERT_OK_AND_ASSIGN(auto schema, ImportSchema(&c_schema)); + AssertSchemaEqual(schema, arrow::schema({})); + } + + struct ArrowArray c_array; + ASSERT_EQ(EINVAL, c_stream.get_next(&c_stream, &c_array)); +} + +//////////////////////////////////////////////////////////////////////////// +// Array stream roundtrip tests + +class TestArrayStreamRoundtrip : public BaseArrayStreamTest { + public: + void Roundtrip(std::shared_ptr* reader, + struct ArrowArrayStream* c_stream) { + ASSERT_OK(ExportRecordBatchReader(*reader, c_stream)); + ASSERT_FALSE(ArrowArrayStreamIsReleased(c_stream)); + + ASSERT_OK_AND_ASSIGN(auto got_reader, ImportRecordBatchReader(c_stream)); + *reader = std::move(got_reader); + } + + void Roundtrip( + std::shared_ptr reader, + std::function&)> check_func) { + ArrowArrayStream c_stream; + + // NOTE: ReleaseCallback<> is not immediately usable with ArrowArrayStream, + // because get_next and get_schema need the original private_data. + std::weak_ptr weak_reader(reader); + ASSERT_EQ(weak_reader.use_count(), 1); // Expiration check will fail otherwise + + ASSERT_OK(ExportRecordBatchReader(std::move(reader), &c_stream)); + ASSERT_FALSE(ArrowArrayStreamIsReleased(&c_stream)); + + { + ASSERT_OK_AND_ASSIGN(auto new_reader, ImportRecordBatchReader(&c_stream)); + // Stream was moved + ASSERT_TRUE(ArrowArrayStreamIsReleased(&c_stream)); + ASSERT_FALSE(weak_reader.expired()); + + check_func(new_reader); + } + // Stream was released when `new_reader` was destroyed + ASSERT_TRUE(weak_reader.expired()); + } + + void AssertReaderNext(const std::shared_ptr& reader, + const RecordBatch& expected) { + ASSERT_OK_AND_ASSIGN(auto batch, reader->Next()); + ASSERT_NE(batch, nullptr); + AssertBatchesEqual(expected, *batch); + } + + void AssertReaderEnd(const std::shared_ptr& reader) { + ASSERT_OK_AND_ASSIGN(auto batch, reader->Next()); + ASSERT_EQ(batch, nullptr); + } +}; + +TEST_F(TestArrayStreamRoundtrip, Simple) { + auto orig_schema = arrow::schema({field("ints", int32())}); + auto batches = MakeBatches(orig_schema, {ArrayFromJSON(int32(), "[1, 2]"), + ArrayFromJSON(int32(), "[4, 5, null]")}); + + ASSERT_OK_AND_ASSIGN(auto reader, RecordBatchReader::Make(batches, orig_schema)); + + Roundtrip(std::move(reader), [&](const std::shared_ptr& reader) { + AssertSchemaEqual(*orig_schema, *reader->schema()); + AssertReaderNext(reader, *batches[0]); + AssertReaderNext(reader, *batches[1]); + AssertReaderEnd(reader); + AssertReaderEnd(reader); + }); +} + +TEST_F(TestArrayStreamRoundtrip, Errors) { + auto reader = std::make_shared( + Status::Invalid("roundtrip error example")); + + Roundtrip(std::move(reader), [&](const std::shared_ptr& reader) { + auto status = reader->Next().status(); + ASSERT_RAISES(Invalid, status); + ASSERT_THAT(status.message(), ::testing::HasSubstr("roundtrip error example")); + }); +} + } // namespace arrow diff --git a/cpp/src/arrow/c/helpers.h b/cpp/src/arrow/c/helpers.h index a1a1240dd756b..a5c1f6fe4bab5 100644 --- a/cpp/src/arrow/c/helpers.h +++ b/cpp/src/arrow/c/helpers.h @@ -82,6 +82,36 @@ inline void ArrowArrayRelease(struct ArrowArray* array) { } } +/// Query whether the C array stream is released +inline int ArrowArrayStreamIsReleased(const struct ArrowArrayStream* stream) { + return stream->release == NULL; +} + +/// Mark the C array stream released (for use in release callbacks) +inline void ArrowArrayStreamMarkReleased(struct ArrowArrayStream* stream) { + stream->release = NULL; +} + +/// Move the C array stream from `src` to `dest` +/// +/// Note `dest` must *not* point to a valid stream already, otherwise there +/// will be a memory leak. +inline void ArrowArrayStreamMove(struct ArrowArrayStream* src, + struct ArrowArrayStream* dest) { + assert(dest != src); + assert(!ArrowArrayStreamIsReleased(src)); + memcpy(dest, src, sizeof(struct ArrowArrayStream)); + ArrowArrayStreamMarkReleased(src); +} + +/// Release the C array stream, if necessary, by calling its release callback +inline void ArrowArrayStreamRelease(struct ArrowArrayStream* stream) { + if (!ArrowArrayStreamIsReleased(stream)) { + stream->release(stream); + assert(ArrowArrayStreamIsReleased(stream)); + } +} + #ifdef __cplusplus } #endif diff --git a/cpp/src/arrow/c/util_internal.h b/cpp/src/arrow/c/util_internal.h index 3ece5245205aa..6a33be9b0da8e 100644 --- a/cpp/src/arrow/c/util_internal.h +++ b/cpp/src/arrow/c/util_internal.h @@ -34,6 +34,12 @@ struct ArrayExportTraits { static constexpr auto ReleaseFunc = &ArrowArrayRelease; }; +struct ArrayStreamExportTraits { + typedef struct ArrowArrayStream CType; + static constexpr auto IsReleasedFunc = &ArrowArrayStreamIsReleased; + static constexpr auto ReleaseFunc = &ArrowArrayStreamRelease; +}; + // A RAII-style object to release a C Array / Schema struct at block scope exit. template class ExportGuard { @@ -73,6 +79,7 @@ class ExportGuard { using SchemaExportGuard = ExportGuard; using ArrayExportGuard = ExportGuard; +using ArrayStreamExportGuard = ExportGuard; } // namespace internal } // namespace arrow diff --git a/cpp/src/arrow/python/CMakeLists.txt b/cpp/src/arrow/python/CMakeLists.txt index b972b0d860618..960155703e1ed 100644 --- a/cpp/src/arrow/python/CMakeLists.txt +++ b/cpp/src/arrow/python/CMakeLists.txt @@ -38,6 +38,7 @@ set(ARROW_PYTHON_SRCS inference.cc init.cc io.cc + ipc.cc numpy_convert.cc numpy_to_arrow.cc python_to_arrow.cc diff --git a/cpp/src/arrow/python/extension_type.h b/cpp/src/arrow/python/extension_type.h index 0041c8af6a45e..f5b12e6d8430a 100644 --- a/cpp/src/arrow/python/extension_type.h +++ b/cpp/src/arrow/python/extension_type.h @@ -44,6 +44,7 @@ class ARROW_PYTHON_EXPORT PyExtensionType : public ExtensionType { std::string Serialize() const override; // For use from Cython + // Assumes that `typ` is borrowed static Status FromClass(const std::shared_ptr storage_type, const std::string extension_name, PyObject* typ, std::shared_ptr* out); diff --git a/cpp/src/arrow/python/ipc.cc b/cpp/src/arrow/python/ipc.cc new file mode 100644 index 0000000000000..2e6c9d912756a --- /dev/null +++ b/cpp/src/arrow/python/ipc.cc @@ -0,0 +1,67 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/python/ipc.h" + +#include + +#include "arrow/python/pyarrow.h" + +namespace arrow { +namespace py { + +PyRecordBatchReader::PyRecordBatchReader() {} + +Status PyRecordBatchReader::Init(std::shared_ptr schema, PyObject* iterable) { + schema_ = std::move(schema); + + iterator_.reset(PyObject_GetIter(iterable)); + return CheckPyError(); +} + +std::shared_ptr PyRecordBatchReader::schema() const { return schema_; } + +Status PyRecordBatchReader::ReadNext(std::shared_ptr* batch) { + PyAcquireGIL lock; + + if (!iterator_) { + // End of stream + batch->reset(); + return Status::OK(); + } + + OwnedRef py_batch(PyIter_Next(iterator_.obj())); + if (!py_batch) { + RETURN_IF_PYERROR(); + // End of stream + batch->reset(); + iterator_.reset(); + return Status::OK(); + } + + return unwrap_batch(py_batch.obj()).Value(batch); +} + +Result> PyRecordBatchReader::Make( + std::shared_ptr schema, PyObject* iterable) { + auto reader = std::shared_ptr(new PyRecordBatchReader()); + RETURN_NOT_OK(reader->Init(std::move(schema), iterable)); + return reader; +} + +} // namespace py +} // namespace arrow diff --git a/cpp/src/arrow/python/ipc.h b/cpp/src/arrow/python/ipc.h new file mode 100644 index 0000000000000..92232ed830093 --- /dev/null +++ b/cpp/src/arrow/python/ipc.h @@ -0,0 +1,52 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +#include "arrow/python/common.h" +#include "arrow/python/visibility.h" +#include "arrow/record_batch.h" +#include "arrow/result.h" +#include "arrow/util/macros.h" + +namespace arrow { +namespace py { + +class ARROW_PYTHON_EXPORT PyRecordBatchReader : public RecordBatchReader { + public: + std::shared_ptr schema() const override; + + Status ReadNext(std::shared_ptr* batch) override; + + // For use from Cython + // Assumes that `iterable` is borrowed + static Result> Make(std::shared_ptr, + PyObject* iterable); + + protected: + PyRecordBatchReader(); + + Status Init(std::shared_ptr, PyObject* iterable); + + std::shared_ptr schema_; + OwnedRefNoGIL iterator_; +}; + +} // namespace py +} // namespace arrow diff --git a/docs/source/cpp/api.rst b/docs/source/cpp/api.rst index 59d221012d36e..626b388b66458 100644 --- a/docs/source/cpp/api.rst +++ b/docs/source/cpp/api.rst @@ -29,6 +29,7 @@ API Reference api/scalar api/builder api/table + api/c_abi api/compute api/tensor api/utilities diff --git a/docs/source/cpp/api/c_abi.rst b/docs/source/cpp/api/c_abi.rst new file mode 100644 index 0000000000000..4e451c3eca6ad --- /dev/null +++ b/docs/source/cpp/api/c_abi.rst @@ -0,0 +1,48 @@ +.. Licensed to the Apache Software Foundation (ASF) under one +.. or more contributor license agreements. See the NOTICE file +.. distributed with this work for additional information +.. regarding copyright ownership. The ASF licenses this file +.. to you under the Apache License, Version 2.0 (the +.. "License"); you may not use this file except in compliance +.. with the License. You may obtain a copy of the License at + +.. http://www.apache.org/licenses/LICENSE-2.0 + +.. Unless required by applicable law or agreed to in writing, +.. software distributed under the License is distributed on an +.. "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +.. KIND, either express or implied. See the License for the +.. specific language governing permissions and limitations +.. under the License. + +============ +C Interfaces +============ + +.. seealso:: + The :ref:`C data interface ` and + :ref:`C stream interface ` specifications. + +ABI Structures +============== + +.. doxygenstruct:: ArrowSchema + :project: arrow_cpp + +.. doxygenstruct:: ArrowArray + :project: arrow_cpp + +.. doxygenstruct:: ArrowArrayStream + :project: arrow_cpp + +C Data Interface +================ + +.. doxygengroup:: c-data-interface + :content-only: + +C Stream Interface +================== + +.. doxygengroup:: c-stream-interface + :content-only: diff --git a/docs/source/format/CDataInterface.rst b/docs/source/format/CDataInterface.rst index 768dc47114498..dbecf307db990 100644 --- a/docs/source/format/CDataInterface.rst +++ b/docs/source/format/CDataInterface.rst @@ -535,6 +535,8 @@ Therefore, the consumer MUST not try to interfere with the producer's handling of these members' lifetime. The only way the consumer influences data lifetime is by calling the base structure's ``release`` callback. +.. _c-data-interface-released: + Released structure '''''''''''''''''' diff --git a/docs/source/format/CStreamInterface.rst b/docs/source/format/CStreamInterface.rst new file mode 100644 index 0000000000000..b8ccce3559238 --- /dev/null +++ b/docs/source/format/CStreamInterface.rst @@ -0,0 +1,218 @@ +.. Licensed to the Apache Software Foundation (ASF) under one +.. or more contributor license agreements. See the NOTICE file +.. distributed with this work for additional information +.. regarding copyright ownership. The ASF licenses this file +.. to you under the Apache License, Version 2.0 (the +.. "License"); you may not use this file except in compliance +.. with the License. You may obtain a copy of the License at + +.. http://www.apache.org/licenses/LICENSE-2.0 + +.. Unless required by applicable law or agreed to in writing, +.. software distributed under the License is distributed on an +.. "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +.. KIND, either express or implied. See the License for the +.. specific language governing permissions and limitations +.. under the License. + +.. highlight:: c + +.. _c-stream-interface: + +============================ +The Arrow C stream interface +============================ + +.. warning:: + This interface is experimental and may evolve based on feedback from + early users. ABI stability is not guaranteed yet. Feel free to + `contact us `__. + +The C stream interface builds on the structures defined in the +:ref:`C data interface ` and combines them into a higher-level +specification so as to ease the communication of streaming data within a single +process. + +Semantics +========= + +An Arrow C stream exposes a streaming source of data chunks, each with the +same schema. Chunks are obtained by calling a blocking pull-style iteration +function. + +Structure definition +==================== + +The C stream interface is defined by a single ``struct`` definition:: + + struct ArrowArrayStream { + // Callbacks providing stream functionality + int (*get_schema)(struct ArrowArrayStream*, struct ArrowSchema* out); + int (*get_next)(struct ArrowArrayStream*, struct ArrowArray* out); + const char* (*get_last_error)(struct ArrowArrayStream*); + + // Release callback + void (*release)(struct ArrowArrayStream*); + + // Opaque producer-specific data + void* private_data; + }; + +The ArrowArrayStream structure +------------------------------ + +The ``ArrowArrayStream`` provides the required callbacks to interact with a +streaming source of Arrow arrays. It has the following fields: + +.. c:member:: int (*ArrowArrayStream.get_schema)(struct ArrowArrayStream*, struct ArrowSchema* out) + + *Mandatory.* This callback allows the consumer to query the schema of + the chunks of data in the stream. The schema is the same for all + data chunks. + + This callback must NOT be called on a released ``ArrowArrayStream``. + + *Return value:* 0 on success, a non-zero + :ref:`error code ` otherwise. + +.. c:member:: int (*ArrowArrayStream.get_next)(struct ArrowArrayStream*, struct ArrowArray* out) + + *Mandatory.* This callback allows the consumer to get the next chunk + of data in the stream. + + This callback must NOT be called on a released ``ArrowArrayStream``. + + *Return value:* 0 on success, a non-zero + :ref:`error code ` otherwise. + + On success, the consumer must check whether the ``ArrowArray`` is + marked :ref:`released `. If the + ``ArrowArray`` is released, then the end of stream has been reached. + Otherwise, the ``ArrowArray`` contains a valid data chunk. + +.. c:member:: const char* (*ArrowArrayStream.get_last_error)(struct ArrowArrayStream*) + + *Mandatory.* This callback allows the consumer to get a textual description + of the last error. + + This callback must ONLY be called if the last operation on the + ``ArrowArrayStream`` returned an error. It must NOT be called on a + released ``ArrowArrayStream``. + + *Return value:* a pointer to a NULL-terminated character string (UTF8-encoded). + NULL can also be returned if no detailed description is available. + + The returned pointer is only guaranteed to be valid until the next call of + one of the stream's callbacks. The character string it points to should + be copied to consumer-managed storage if it is intended to survive longer. + +.. c:member:: void (*ArrowArrayStream.release)(struct ArrowArrayStream*) + + *Mandatory.* A pointer to a producer-provided release callback. + +.. c:member:: void* ArrowArrayStream.private_data + + *Optional.* An opaque pointer to producer-provided private data. + + Consumers MUST not process this member. Lifetime of this member + is handled by the producer, and especially by the release callback. + + +.. _c-stream-interface-error-codes: + +Error codes +----------- + +The ``get_schema`` and ``get_next`` callbacks may return an error under the form +of a non-zero integer code. Such error codes should be interpreted like +``errno`` numbers (as defined by the local platform). Note that the symbolic +forms of these constants are stable from platform to platform, but their numeric +values are platform-specific. + +In particular, it is recommended to recognize the following values: + +* ``EINVAL``: for a parameter or input validation error +* ``ENOMEM``: for a memory allocation failure (out of memory) +* ``EIO``: for a generic input/output error + +.. seealso:: + `Standard POSIX error codes `__. + + `Error codes recognized by the Windows C runtime library + `__. + +Result lifetimes +---------------- + +The data returned by the ``get_schema`` and ``get_next`` callbacks must be +released independently. Their lifetimes are not tied to that of the +``ArrowArrayStream``. + +Stream lifetime +--------------- + +Lifetime of the C stream is managed using a release callback with similar +usage as in the :ref:`C data interface `. + + +C consumer example +================== + +Let's say a particular database provides the following C API to execute +a SQL query and return the result set as a Arrow C stream:: + + void MyDB_Query(const char* query, struct ArrowArrayStream* result_set); + +Then a consumer could use the following code to iterate over the results:: + + static void handle_error(int errcode, struct ArrowArrayStream* stream) { + // Print stream error + const char* errdesc = stream->get_last_error(stream); + if (errdesc != NULL) { + fputs(errdesc, stderr); + } else { + fputs(strerror(errcode), stderr); + } + // Release stream and abort + stream->release(stream), + exit(1); + } + + void run_query() { + struct ArrowArrayStream stream; + struct ArrowSchema schema; + struct ArrowArray chunk; + int errcode; + + MyDB_Query("SELECT * FROM my_table", &stream); + + // Query result set schema + errcode = stream.get_schema(&stream, &schema); + if (errcode != 0) { + handle_error(errcode, &stream); + } + + int64_t num_rows = 0; + + // Iterate over results: loop until error or end of stream + while ((errcode = stream.get_next(&stream, &chunk) == 0) && + chunk.release != NULL) { + // Do something with chunk... + fprintf(stderr, "Result chunk: got %lld rows\n", chunk.length); + num_rows += chunk.length; + + // Release chunk + chunk.release(&chunk); + } + + // Was it an error? + if (errcode != 0) { + handle_error(errcode, &stream); + } + + fprintf(stderr, "Result stream ended: total %lld rows\n", num_rows); + + // Release schema and stream + schema.release(&schema); + stream.release(&stream); + } diff --git a/docs/source/index.rst b/docs/source/index.rst index 2d95e22f16ac5..cfcf865398227 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -43,6 +43,7 @@ such topics as: format/Flight format/Integration format/CDataInterface + format/CStreamInterface format/Other .. _toc.usage: diff --git a/python/pyarrow/_csv.pyx b/python/pyarrow/_csv.pyx index 028ddc6f43c7a..34c6693c51e82 100644 --- a/python/pyarrow/_csv.pyx +++ b/python/pyarrow/_csv.pyx @@ -28,7 +28,7 @@ from collections.abc import Mapping from pyarrow.includes.common cimport * from pyarrow.includes.libarrow cimport * from pyarrow.lib cimport (check_status, Field, MemoryPool, Schema, - _CRecordBatchReader, ensure_type, + RecordBatchReader, ensure_type, maybe_unbox_memory_pool, get_input_stream, native_transcoding_input_stream, pyarrow_wrap_schema, pyarrow_wrap_table, @@ -633,7 +633,7 @@ cdef _get_convert_options(ConvertOptions convert_options, out[0] = convert_options.options -cdef class CSVStreamingReader(_CRecordBatchReader): +cdef class CSVStreamingReader(RecordBatchReader): """An object that reads record batches incrementally from a CSV file. Should not be instantiated directly by user code. diff --git a/python/pyarrow/_flight.pyx b/python/pyarrow/_flight.pyx index 484fbb2a4e4c6..3950ceea90ee1 100644 --- a/python/pyarrow/_flight.pyx +++ b/python/pyarrow/_flight.pyx @@ -35,7 +35,7 @@ from pyarrow.lib cimport * from pyarrow.lib import ArrowException, ArrowInvalid from pyarrow.lib import as_buffer, frombytes, tobytes from pyarrow.includes.libarrow_flight cimport * -from pyarrow.ipc import _ReadPandasOption, _get_legacy_format_default +from pyarrow.ipc import _get_legacy_format_default, _ReadPandasMixin import pyarrow.lib as lib @@ -812,7 +812,7 @@ cdef class FlightStreamChunk(_Weakrefable): self.chunk.data != NULL, self.chunk.app_metadata != NULL) -cdef class _MetadataRecordBatchReader(_Weakrefable): +cdef class _MetadataRecordBatchReader(_Weakrefable, _ReadPandasMixin): """A reader for Flight streams.""" # Needs to be separate class so the "real" class can subclass the @@ -869,8 +869,7 @@ cdef class _MetadataRecordBatchReader(_Weakrefable): return chunk -cdef class MetadataRecordBatchReader(_MetadataRecordBatchReader, - _ReadPandasOption): +cdef class MetadataRecordBatchReader(_MetadataRecordBatchReader): """The virtual base class for readers for Flight streams.""" @@ -1365,7 +1364,7 @@ cdef class RecordBatchStream(FlightDataStream): data_source : RecordBatchReader or Table options : pyarrow.ipc.IpcWriteOptions, optional """ - if (not isinstance(data_source, _CRecordBatchReader) and + if (not isinstance(data_source, RecordBatchReader) and not isinstance(data_source, lib.Table)): raise TypeError("Expected RecordBatchReader or Table, " "but got: {}".format(type(data_source))) @@ -1375,8 +1374,8 @@ cdef class RecordBatchStream(FlightDataStream): cdef CFlightDataStream* to_stream(self) except *: cdef: shared_ptr[CRecordBatchReader] reader - if isinstance(self.data_source, _CRecordBatchReader): - reader = (<_CRecordBatchReader> self.data_source).reader + if isinstance(self.data_source, RecordBatchReader): + reader = ( self.data_source).reader elif isinstance(self.data_source, lib.Table): table = ( self.data_source).table reader.reset(new TableBatchReader(deref(table))) @@ -1617,7 +1616,7 @@ cdef CStatus _data_stream_next(void* self, CFlightPayload* payload) except *: else: result, metadata = result, None - if isinstance(result, (Table, _CRecordBatchReader)): + if isinstance(result, (Table, RecordBatchReader)): if metadata: raise ValueError("Can only return metadata alongside a " "RecordBatch.") diff --git a/python/pyarrow/cffi.py b/python/pyarrow/cffi.py index 8880c25a0357d..961b61dee59fd 100644 --- a/python/pyarrow/cffi.py +++ b/python/pyarrow/cffi.py @@ -52,6 +52,18 @@ // Opaque producer-specific data void* private_data; }; + + struct ArrowArrayStream { + int (*get_schema)(struct ArrowArrayStream*, struct ArrowSchema* out); + int (*get_next)(struct ArrowArrayStream*, struct ArrowArray* out); + + const char* (*get_last_error)(struct ArrowArrayStream*); + + // Release callback + void (*release)(struct ArrowArrayStream*); + // Opaque producer-specific data + void* private_data; + }; """ # TODO use out-of-line mode for faster import and avoid C parsing diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index d1c4110cb4000..5d5800eec58f7 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -1393,7 +1393,7 @@ cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" nogil: " arrow::ipc::RecordBatchStreamReader"(CRecordBatchReader): @staticmethod CResult[shared_ptr[CRecordBatchReader]] Open( - const CInputStream* stream, const CIpcReadOptions& options) + const shared_ptr[CInputStream], const CIpcReadOptions&) @staticmethod CResult[shared_ptr[CRecordBatchReader]] Open2" Open"( @@ -1965,6 +1965,14 @@ cdef extern from 'arrow/python/inference.h' namespace 'arrow::py': c_bool IsPyFloat(object o) +cdef extern from 'arrow/python/ipc.h' namespace 'arrow::py': + cdef cppclass CPyRecordBatchReader" arrow::py::PyRecordBatchReader" \ + (CRecordBatchReader): + @staticmethod + CResult[shared_ptr[CRecordBatchReader]] Make(shared_ptr[CSchema], + object) + + cdef extern from 'arrow/extension_type.h' namespace 'arrow': cdef cppclass CExtensionTypeRegistry" arrow::ExtensionTypeRegistry": @staticmethod @@ -2064,6 +2072,9 @@ cdef extern from 'arrow/c/abi.h': cdef struct ArrowArray: pass + cdef struct ArrowArrayStream: + pass + cdef extern from 'arrow/c/bridge.h' namespace 'arrow' nogil: CStatus ExportType(CDataType&, ArrowSchema* out) CResult[shared_ptr[CDataType]] ImportType(ArrowSchema*) @@ -2084,3 +2095,8 @@ cdef extern from 'arrow/c/bridge.h' namespace 'arrow' nogil: shared_ptr[CSchema]) CResult[shared_ptr[CRecordBatch]] ImportRecordBatch(ArrowArray*, ArrowSchema*) + + CStatus ExportRecordBatchReader(shared_ptr[CRecordBatchReader], + ArrowArrayStream*) + CResult[shared_ptr[CRecordBatchReader]] ImportRecordBatchReader( + ArrowArrayStream*) diff --git a/python/pyarrow/ipc.pxi b/python/pyarrow/ipc.pxi index bcb7477772a33..74a81c60ecff7 100644 --- a/python/pyarrow/ipc.pxi +++ b/python/pyarrow/ipc.pxi @@ -400,8 +400,29 @@ cdef _get_input_stream(object source, shared_ptr[CInputStream]* out): get_input_stream(source, True, out) -cdef class _CRecordBatchReader(_Weakrefable): - """The base RecordBatchReader wrapper. +class _ReadPandasMixin: + + def read_pandas(self, **options): + """ + Read contents of stream to a pandas.DataFrame. + + Read all record batches as a pyarrow.Table then convert it to a + pandas.DataFrame using Table.to_pandas. + + Parameters + ---------- + **options : arguments to forward to Table.to_pandas + + Returns + ------- + df : pandas.DataFrame + """ + table = self.read_all() + return table.to_pandas(**options) + + +cdef class RecordBatchReader(_Weakrefable): + """Base class for reading stream of record batches. Provides common implementations of convenience methods. Should not be instantiated directly by user code. @@ -413,6 +434,18 @@ cdef class _CRecordBatchReader(_Weakrefable): while True: yield self.read_next_batch() + @property + def schema(self): + """ + Shared schema of the record batches in the stream. + """ + cdef shared_ptr[CSchema] c_schema + + with nogil: + c_schema = self.reader.get().schema() + + return pyarrow_wrap_schema(c_schema) + def get_next_batch(self): import warnings warnings.warn('Please use read_next_batch instead of ' @@ -447,21 +480,91 @@ cdef class _CRecordBatchReader(_Weakrefable): check_status(self.reader.get().ReadAll(&table)) return pyarrow_wrap_table(table) + read_pandas = _ReadPandasMixin.read_pandas + def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): pass + def _export_to_c(self, uintptr_t out_ptr): + """ + Export to a C ArrowArrayStream struct, given its pointer. + + Parameters + ---------- + out_ptr: int + The raw pointer to a C ArrowArrayStream struct. -cdef class _RecordBatchStreamReader(_CRecordBatchReader): + Be careful: if you don't pass the ArrowArrayStream struct to a + consumer, array memory will leak. This is a low-level function + intended for expert users. + """ + with nogil: + check_status(ExportRecordBatchReader( + self.reader, out_ptr)) + + @staticmethod + def _import_from_c(uintptr_t in_ptr): + """ + Import RecordBatchReader from a C ArrowArrayStream struct, + given its pointer. + + Parameters + ---------- + in_ptr: int + The raw pointer to a C ArrowArrayStream struct. + + This is a low-level function intended for expert users. + """ + cdef: + shared_ptr[CRecordBatchReader] c_reader + RecordBatchReader self + + with nogil: + c_reader = GetResultValue(ImportRecordBatchReader( + in_ptr)) + + self = RecordBatchReader.__new__(RecordBatchReader) + self.reader = c_reader + return self + + @staticmethod + def from_batches(schema, batches): + """ + Create RecordBatchReader from an iterable of batches. + + Parameters + ---------- + schema : Schema + The shared schema of the record batches + batches : Iterable[RecordBatch] + The batches that this reader will return. + + Returns + ------- + reader : RecordBatchReader + """ + cdef: + shared_ptr[CSchema] c_schema + shared_ptr[CRecordBatchReader] c_reader + RecordBatchReader self + + c_schema = pyarrow_unwrap_schema(schema) + c_reader = GetResultValue(CPyRecordBatchReader.Make( + c_schema, batches)) + + self = RecordBatchReader.__new__(RecordBatchReader) + self.reader = c_reader + return self + + +cdef class _RecordBatchStreamReader(RecordBatchReader): cdef: shared_ptr[CInputStream] in_stream CIpcReadOptions options - cdef readonly: - Schema schema - def __cinit__(self): pass @@ -469,9 +572,7 @@ cdef class _RecordBatchStreamReader(_CRecordBatchReader): _get_input_stream(source, &self.in_stream) with nogil: self.reader = GetResultValue(CRecordBatchStreamReader.Open( - self.in_stream.get(), self.options)) - - self.schema = pyarrow_wrap_schema(self.reader.get().schema()) + self.in_stream, self.options)) cdef class _RecordBatchFileWriter(_RecordBatchStreamWriter): @@ -565,6 +666,8 @@ cdef class _RecordBatchFileReader(_Weakrefable): return pyarrow_wrap_table(table) + read_pandas = _ReadPandasMixin.read_pandas + def __enter__(self): return self diff --git a/python/pyarrow/ipc.py b/python/pyarrow/ipc.py index 19e80baa8dfa3..65325c483c4a8 100644 --- a/python/pyarrow/ipc.py +++ b/python/pyarrow/ipc.py @@ -22,6 +22,7 @@ import pyarrow as pa from pyarrow.lib import (IpcWriteOptions, Message, MessageReader, # noqa + RecordBatchReader, _ReadPandasMixin, MetadataVersion, read_message, read_record_batch, read_schema, read_tensor, write_tensor, @@ -29,28 +30,7 @@ import pyarrow.lib as lib -class _ReadPandasOption: - - def read_pandas(self, **options): - """ - Read contents of stream to a pandas.DataFrame. - - Read all record batches as a pyarrow.Table then convert it to a - pandas.DataFrame using Table.to_pandas. - - Parameters - ---------- - **options : arguments to forward to Table.to_pandas - - Returns - ------- - df : pandas.DataFrame - """ - table = self.read_all() - return table.to_pandas(**options) - - -class RecordBatchStreamReader(lib._RecordBatchStreamReader, _ReadPandasOption): +class RecordBatchStreamReader(lib._RecordBatchStreamReader): """ Reader for the Arrow streaming binary format. @@ -97,7 +77,7 @@ def __init__(self, sink, schema, *, use_legacy_format=None, options=None): self._open(sink, schema, options=options) -class RecordBatchFileReader(lib._RecordBatchFileReader, _ReadPandasOption): +class RecordBatchFileReader(lib._RecordBatchFileReader): """ Class for reading Arrow record batch data from the Arrow binary file format diff --git a/python/pyarrow/lib.pxd b/python/pyarrow/lib.pxd index 8e06dcd1d9bcb..5b2958a06472d 100644 --- a/python/pyarrow/lib.pxd +++ b/python/pyarrow/lib.pxd @@ -483,7 +483,7 @@ cdef class _CRecordBatchWriter(_Weakrefable): shared_ptr[CRecordBatchWriter] writer -cdef class _CRecordBatchReader(_Weakrefable): +cdef class RecordBatchReader(_Weakrefable): cdef: shared_ptr[CRecordBatchReader] reader diff --git a/python/pyarrow/tests/test_cffi.py b/python/pyarrow/tests/test_cffi.py index bcf3c723950fc..5505e57164534 100644 --- a/python/pyarrow/tests/test_cffi.py +++ b/python/pyarrow/tests/test_cffi.py @@ -26,6 +26,13 @@ import pytest +try: + import pandas as pd + import pandas.testing as tm +except ImportError: + pd = tm = None + + needs_cffi = pytest.mark.skipif(ffi is None, reason="test needs cffi package installed") @@ -36,6 +43,34 @@ assert_array_released = pytest.raises( ValueError, match="Cannot import released ArrowArray") +assert_stream_released = pytest.raises( + ValueError, match="Cannot import released ArrowArrayStream") + + +def make_schema(): + return pa.schema([('ints', pa.list_(pa.int32()))], + metadata={b'key1': b'value1'}) + + +def make_batch(): + return pa.record_batch([[[1], [2, 42]]], make_schema()) + + +def make_batches(): + schema = make_schema() + return [ + pa.record_batch([[[1], [2, 42]]], schema), + pa.record_batch([[None, [], [5, 6]]], schema), + ] + + +def make_serialized(schema, batches): + with pa.BufferOutputStream() as sink: + with pa.ipc.new_stream(sink, schema) as out: + for batch in batches: + out.write(batch) + return sink.getvalue() + @needs_cffi def test_export_import_type(): @@ -120,10 +155,6 @@ def test_export_import_schema(): c_schema = ffi.new("struct ArrowSchema*") ptr_schema = int(ffi.cast("uintptr_t", c_schema)) - def make_schema(): - return pa.schema([('ints', pa.list_(pa.int32()))], - metadata={b'key1': b'value1'}) - gc.collect() # Make sure no Arrow data dangles in a ref cycle old_allocated = pa.total_allocated_bytes() @@ -156,13 +187,6 @@ def test_export_import_batch(): c_array = ffi.new("struct ArrowArray*") ptr_array = int(ffi.cast("uintptr_t", c_array)) - def make_schema(): - return pa.schema([('ints', pa.list_(pa.int32()))], - metadata={b'key1': b'value1'}) - - def make_batch(): - return pa.record_batch([[[1], [2, 42]]], make_schema()) - gc.collect() # Make sure no Arrow data dangles in a ref cycle old_allocated = pa.total_allocated_bytes() @@ -172,7 +196,7 @@ def make_batch(): py_value = batch.to_pydict() batch._export_to_c(ptr_array) assert pa.total_allocated_bytes() > old_allocated - # Delete recreate C++ object from exported pointer + # Delete and recreate C++ object from exported pointer del batch batch_new = pa.RecordBatch._import_from_c(ptr_array, schema) assert batch_new.to_pydict() == py_value @@ -192,8 +216,6 @@ def make_batch(): del batch batch_new = pa.RecordBatch._import_from_c(ptr_array, ptr_schema) assert batch_new.to_pydict() == py_value - print(batch_new.schema) - print(make_schema()) assert batch_new.schema == make_schema() assert pa.total_allocated_bytes() > old_allocated del batch_new @@ -211,3 +233,63 @@ def make_batch(): # Now released with assert_schema_released: pa.RecordBatch._import_from_c(ptr_array, ptr_schema) + + +def _export_import_batch_reader(ptr_stream, reader_factory): + # Prepare input + batches = make_batches() + schema = batches[0].schema + + reader = reader_factory(schema, batches) + reader._export_to_c(ptr_stream) + # Delete and recreate C++ object from exported pointer + del reader, batches + + reader_new = pa.ipc.RecordBatchReader._import_from_c(ptr_stream) + assert reader_new.schema == schema + got_batches = list(reader_new) + del reader_new + assert got_batches == make_batches() + + # Test read_pandas() + if pd is not None: + batches = make_batches() + schema = batches[0].schema + expected_df = pa.Table.from_batches(batches).to_pandas() + + reader = reader_factory(schema, batches) + reader._export_to_c(ptr_stream) + del reader, batches + + reader_new = pa.ipc.RecordBatchReader._import_from_c(ptr_stream) + got_df = reader_new.read_pandas() + del reader_new + tm.assert_frame_equal(expected_df, got_df) + + +def make_ipc_stream_reader(schema, batches): + return pa.ipc.open_stream(make_serialized(schema, batches)) + + +def make_py_record_batch_reader(schema, batches): + return pa.ipc.RecordBatchReader.from_batches(schema, batches) + + +@needs_cffi +@pytest.mark.parametrize('reader_factory', + [make_ipc_stream_reader, + make_py_record_batch_reader]) +def test_export_import_batch_reader(reader_factory): + c_stream = ffi.new("struct ArrowArrayStream*") + ptr_stream = int(ffi.cast("uintptr_t", c_stream)) + + gc.collect() # Make sure no Arrow data dangles in a ref cycle + old_allocated = pa.total_allocated_bytes() + + _export_import_batch_reader(ptr_stream, reader_factory) + + assert pa.total_allocated_bytes() == old_allocated + + # Now released + with assert_stream_released: + pa.ipc.RecordBatchReader._import_from_c(ptr_stream) diff --git a/python/pyarrow/tests/test_ipc.py b/python/pyarrow/tests/test_ipc.py index 44f8499e8346e..3d3e72e616533 100644 --- a/python/pyarrow/tests/test_ipc.py +++ b/python/pyarrow/tests/test_ipc.py @@ -15,11 +15,13 @@ # specific language governing permissions and limitations # under the License. +from collections import UserList import io import pytest import socket import sys import threading +import weakref import numpy as np @@ -858,3 +860,36 @@ def test_write_empty_ipc_file(): table = reader.read_all() assert len(table) == 0 assert table.schema.equals(schema) + + +def test_py_record_batch_reader(): + def make_schema(): + return pa.schema([('field', pa.int64())]) + + def make_batches(): + schema = make_schema() + batch1 = pa.record_batch([[1, 2, 3]], schema=schema) + batch2 = pa.record_batch([[4, 5]], schema=schema) + return [batch1, batch2] + + # With iterable + batches = UserList(make_batches()) # weakrefable + wr = weakref.ref(batches) + + with pa.ipc.RecordBatchReader.from_batches(make_schema(), + batches) as reader: + batches = None + assert wr() is not None + assert list(reader) == make_batches() + assert wr() is None + + # With iterator + batches = iter(UserList(make_batches())) # weakrefable + wr = weakref.ref(batches) + + with pa.ipc.RecordBatchReader.from_batches(make_schema(), + batches) as reader: + batches = None + assert wr() is not None + assert list(reader) == make_batches() + assert wr() is None