Skip to content
This repository has been archived by the owner on Aug 4, 2023. It is now read-only.

Commit

Permalink
Respect the ingestion limit if ingest_records is called multiple times (
Browse files Browse the repository at this point in the history
#804)

* Respect the ingestion limit if ingest_records is called multiple times

* Return early instead of just halting ingestion

Slightly more readable, and prevents the record commiting and error handling
from re-running for no reason each time.
  • Loading branch information
stacimc authored Oct 19, 2022
1 parent 9ff501e commit de9fc4d
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@ def __init__(self, conf: dict = None, date: str = None):
if self.limit:
self.batch_limit = min(self.batch_limit, self.limit)

# Keep track of number of records ingested
self.record_count = 0

# Initialize the DelayedRequester and all necessary Media Stores.
self.delayed_requester = DelayedRequester(
delay=self.delay, headers=self.headers
Expand Down Expand Up @@ -147,9 +150,14 @@ def ingest_records(self, **kwargs) -> None:
**kwargs: Optional arguments to be passed to `get_next_query_params`.
"""
should_continue = True
record_count = 0
query_params = None

# If an ingestion limit has been set and we have already ingested records
# in excess of the limit, exit early. This may happen if `ingest_records`
# is called more than once.
if self.limit and self.record_count >= self.limit:
return

logger.info(f"Begin ingestion for {self.__class__.__name__}")

while should_continue:
Expand All @@ -163,8 +171,8 @@ def ingest_records(self, **kwargs) -> None:
batch, should_continue = self.get_batch(query_params)

if batch and len(batch) > 0:
record_count += self.process_batch(batch)
logger.info(f"{record_count} records ingested so far.")
self.record_count += self.process_batch(batch)
logger.info(f"{self.record_count} records ingested so far.")
else:
logger.info("Batch complete.")
should_continue = False
Expand Down Expand Up @@ -195,7 +203,7 @@ def ingest_records(self, **kwargs) -> None:
self.commit_records()
raise error from ingestion_error

if self.limit and record_count >= self.limit:
if self.limit and self.record_count >= self.limit:
logger.info(f"Ingestion limit of {self.limit} has been reached.")
should_continue = False

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,45 @@ def test_ingest_records_with_skip_ingestion_errors(
assert get_batch_mock.call_count == expected_call_count


def test_ingest_records_exits_immediately_if_limit_already_reached():
# A child class may override `ingest_records` to run multiple times.
# Once the ingestion limit has been reached, subsequent calls to
# `ingest_records` should immediately return without attempting to
# ingest new data.
with patch(
"providers.provider_api_scripts.provider_data_ingester.Variable"
) as MockVariable:
# Mock the calls to Variable.get to set an ingestion limit
MockVariable.get.side_effect = [5]

ingester = MockProviderDataIngester()

# Mock batch processing to reach the ingestion limit
with (
patch.object(ingester, "get_batch") as get_batch_mock,
patch.object(
ingester, "process_batch", return_value=5
) as process_batch_mock,
):
get_batch_mock.side_effect = [
(EXPECTED_BATCH_DATA, True), # First batch
(EXPECTED_BATCH_DATA, True), # Second batch
]

ingester.ingest_records()

# get_batch was only called once, and then the limit was reached
assert get_batch_mock.call_count == 1
assert process_batch_mock.call_count == 1

# Ingest records again
ingester.ingest_records()

# get_batch was not called any additional times
assert get_batch_mock.call_count == 1
assert process_batch_mock.call_count == 1


def test_commit_commits_all_stores():
with (
patch.object(audio_store, "commit") as audio_store_mock,
Expand Down

0 comments on commit de9fc4d

Please sign in to comment.