Skip to content

Commit

Permalink
sla BatchReaderRaii
Browse files Browse the repository at this point in the history
  • Loading branch information
pacman82 committed Jan 31, 2024
1 parent 939bad4 commit 14c3376
Showing 1 changed file with 48 additions and 34 deletions.
82 changes: 48 additions & 34 deletions python/arrow_odbc/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from cffi.api import FFI # type: ignore

from pyarrow.cffi import ffi as arrow_ffi # type: ignore
from pyarrow import RecordBatch, Schema, Array # type: ignore
from pyarrow import RecordBatch, Schema, Array # type: ignore

from arrow_odbc.connect import to_bytes_and_len, connect_to_database # type: ignore

Expand All @@ -11,6 +11,7 @@

DEFAULT_FETCH_BUFFER_LIMIT_IN_BYTES = 2**29


def _schema_from_handle(handle) -> Schema:
"""
Take a handle to an ArrowOdbcReader and return the associated pyarrow schema
Expand All @@ -22,13 +23,14 @@ def _schema_from_handle(handle) -> Schema:
raise_on_error(error)
ptr_schema = int(ffi.cast("uintptr_t", schema_out))
return Schema._import_from_c(ptr_schema)


class _BatchReaderRaii:
"""
Takes ownership of the reader in its various states and makes sure its
resources are freed if the object is deleted.
"""

def __init__(self, handle):
# We take ownership of the corresponding reader written in Rust and keep it alive until
# `self` is deleted.
Expand All @@ -39,8 +41,7 @@ def __del__(self):
lib.arrow_odbc_reader_free(self.handle)

def schema(self):
lib._schema_from_handle(self.handle)

return _schema_from_handle(self.handle)

def next_batch(self):
array = arrow_ffi.new("struct ArrowArray *")
Expand All @@ -58,11 +59,40 @@ def next_batch(self):
schema_ptr = int(ffi.cast("uintptr_t", schema))
struct_array = Array._import_from_c(array_ptr, schema_ptr)
return RecordBatch.from_struct_array(struct_array)

def into_concurrent(self):
error = lib.arrow_odbc_reader_into_concurrent(self.handle)
raise_on_error(error)

def more_results(
self,
batch_size: int,
max_bytes_per_batch: int,
max_text_size: int,
max_binary_size: int,
falliable_allocations: bool = False,
schema: Optional[Schema] = None,
) -> bool:
ptr_schema = _export_schema_to_c(schema)

with ffi.new("bool *") as has_more_results_c:
error = lib.arrow_odbc_reader_more_results(
self.handle,
has_more_results_c,
batch_size,
max_bytes_per_batch,
max_text_size,
max_binary_size,
falliable_allocations,
ptr_schema,
)
# See if we managed to execute the query successfully and return an
# error if not
raise_on_error(error)

has_more_results = has_more_results_c[0] != 0

return has_more_results


class BatchReader:
Expand All @@ -76,7 +106,6 @@ def __init__(self, reader: _BatchReaderRaii):
`read_arrow_batches_from_odbc` in order to create instances of
`BatchReader`.
"""

# We take ownership of the corresponding reader written in Rust and keep it alive until
# `self` is deleted.
self.reader = reader
Expand All @@ -100,11 +129,11 @@ def __next__(self) -> RecordBatch:
def more_results(
self,
batch_size: int = 65535,
max_bytes_per_batch: Optional[int] = DEFAULT_FETCH_BUFFER_LIMIT_IN_BYTES,
max_bytes_per_batch: int = DEFAULT_FETCH_BUFFER_LIMIT_IN_BYTES,
max_text_size: Optional[int] = None,
max_binary_size: Optional[int] = None,
falliable_allocations: bool = False,
schema: Optional[Schema] = None
schema: Optional[Schema] = None,
) -> bool:
"""
Move the reader to the next result set returned by the data source.
Expand Down Expand Up @@ -183,27 +212,17 @@ def more_results(
if max_binary_size is None:
max_binary_size = 0

ptr_schema = _export_schema_to_c(schema)

with ffi.new("bool *") as has_more_results_c:
error = lib.arrow_odbc_reader_more_results(
self.reader.handle,
has_more_results_c,
batch_size,
max_bytes_per_batch,
max_text_size,
max_binary_size,
falliable_allocations,
ptr_schema,
)
# See if we managed to execute the query successfully and return an
# error if not
raise_on_error(error)

has_more_results = has_more_results_c[0] != 0
has_more_results = self.reader.more_results(
batch_size=batch_size,
max_bytes_per_batch=max_bytes_per_batch,
max_text_size=max_text_size,
max_binary_size=max_binary_size,
falliable_allocations=falliable_allocations,
schema=schema,
)

# Every result set can have its own schema, so we must update our member
self.schema = _schema_from_handle(self.reader.handle)
self.schema = self.reader.schema()

return has_more_results

Expand Down Expand Up @@ -245,19 +264,14 @@ def fetch_concurrently(self):
# ...
"""
try:
error = lib.arrow_odbc_reader_into_concurrent(self.handle)
self.reader.into_concurrent()
except:
# Making a reader concurrent will not change its schema, yet if there is an error the
# reader is destroyed and its schema is empty.
# self.schema == self.reader.schema()
# should always be true and so asigning it never would make the code incorrect. Yet we
# only need to do so if it actually changes.
self.schema = self.reader.schema()

if error != ffi.NULL:
self.schema = _schema_from_handle(self.handle)




def read_arrow_batches_from_odbc(
Expand Down Expand Up @@ -366,7 +380,7 @@ def read_arrow_batches_from_odbc(
:param driver_returns_memory_garbage_for_indicators: The IBM DB2 Linux ODBC drivers have been
reported to return memory garbage instead of indicators for the string length. Setting this
flag will cause ``arrow-odbc`` to rely on terminating zeroes, instead of indicators. This
prevents ``arrow-odbc`` from disambiguating between empty strings and `NULL``. As a side
prevents ``arrow-odbc`` from disambiguating between empty strings and `NULL``. As a side
effect of this workaround empty might be mapped to NULL. Currently this flag has only
meaning if the ``arrow-odbc`` is executed on an non-windows platform.
:return: A ``BatchReader`` is returned, which implements the iterator protocol and iterates over
Expand Down

0 comments on commit 14c3376

Please sign in to comment.