Skip to content
This repository has been archived by the owner on Aug 4, 2023. It is now read-only.

Update Flickr to use new time delineated ingester class #995

Merged
merged 8 commits into from
Feb 23, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@
regular ingestion.
"""
import logging
from datetime import datetime, timedelta, timezone
from itertools import chain

from airflow.exceptions import AirflowException
from common import constants
from common.licenses import get_license_info
from common.loader import provider_details as prov
from providers.provider_api_scripts.provider_data_ingester import ProviderDataIngester
from providers.provider_api_scripts.time_delineated_provider_data_ingester import (
TimeDelineatedProviderDataIngester,
)


logging.basicConfig(
Expand All @@ -36,7 +37,7 @@
SUB_PROVIDERS = prov.FINNISH_SUB_PROVIDERS


class FinnishMuseumsDataIngester(ProviderDataIngester):
class FinnishMuseumsDataIngester(TimeDelineatedProviderDataIngester):
providers = {"image": PROVIDER}
endpoint = "https://api.finna.fi/api/v1/search"
batch_limit = 100
Expand All @@ -48,153 +49,15 @@ class FinnishMuseumsDataIngester(ProviderDataIngester):
"0/SATMUSEO/",
"0/SA-kuva/",
]

def __init__(self, *args, **kwargs):
"""
Note: this DAG runs ingestion separately for each configured `building`. When a
building has many records for the ingestion date, the DAG further splits up
ingestion into time slices. Each run of the `ingest_records` function for
a particular (building, time slice) pair is an "iteration" of the DAG.

For logging and alerting purposes, we maintain several instance variables
that help track each iteration.
"""
# A flag that is True only when we are processing the first batch of data in
# a new iteration.
self.new_iteration = True
# Use to keep track of the number of records we've fetched from the API so far,
# specifically in this iteration.
self.fetched_count = 0

super().__init__(*args, **kwargs)

@staticmethod
def format_ts(timestamp):
return timestamp.isoformat().replace("+00:00", "Z")

@staticmethod
def _get_timestamp_query_params_list(
start_date: datetime, end_date: datetime, number_of_divisions: int
) -> list[tuple[datetime, datetime]]:
"""
Given a start_date and end_date, returns a list of timestamp pairs that
divides the time interval between them into equal portions, with the
number of portions determined by `number_of_divisions`.

Required Arguments:
start_date: datetime to be considered start of the interval
end_date: datetime to be considered end of the interval
number_of_divisions: int number of portions to divide the interval into.
"""
seconds_in_time_slice = (end_date - start_date).total_seconds()
portion = int(seconds_in_time_slice / number_of_divisions)
# Double check that the division resulted in even portions
if seconds_in_time_slice % number_of_divisions:
raise ValueError(
f"The time slice from {start_date} to {end_date} cannot be divided "
f"evenly into {number_of_divisions} parts!"
)

# Generate the start/end timestamps for each 'slice' of the interval
return [
(
start_date + timedelta(seconds=i * portion),
start_date + timedelta(seconds=(i + 1) * portion),
)
for i in range(number_of_divisions)
]

def _get_record_count(self, start: datetime, end: datetime, building: str) -> int:
"""
Get the number of records returned by the Finnish Museums API for a
particular building, during a given time interval.
"""
query_params = self.get_next_query_params(
prev_query_params=None, building=building, start_ts=start, end_ts=end
)
response_json = self.get_response_json(query_params)

if response_json:
return response_json.get("resultCount", 0)
return 0

def _get_timestamp_pairs(self, building: str):
"""
Determine a set of timestamp pairs per building.
The Finnish Museums API can behave unexpectedly when querying large datasets,
resulting in large numbers of duplicates and eventual DAG timeouts
(see https://github.com/WordPress/openverse-catalog/pull/879 for more
details). To avoid this, when we detect that a time period contains a large
number of records we split it up into multiple smaller time periods and
run ingestion separately for each.

Most runs of the DAG will have a very small (or zero) number of records, so in
order to determine how many time intervals we need we first check the record
count, and split into the smallest number of intervals possible.

If the building has no/few records, this results in ONE extra request.
If the building has a large amount of data, this results in TWENTY FIVE extra
requests (one for the full day, and then one for each hour).
"""
pairs_list: list[tuple[datetime, datetime]] = []
# Get UTC timestamps for the start and end of the ingestion date
start_ts = datetime.strptime(self.date, "%Y-%m-%d").replace(tzinfo=timezone.utc)
end_ts = start_ts + timedelta(days=1)

record_count = self._get_record_count(start_ts, end_ts, building)
if record_count == 0:
logger.info(f"No data for {building}. Continuing.")
return pairs_list
if record_count < 10_000:
# This building only has a small amount of data. We can ingest all of the
# data for it in a single run.
return self._get_timestamp_query_params_list(start_ts, end_ts, 1)

# If we got here, this building has a large amount of data. Since data is often
# not distributed evenly across the day, we'll break down each hour of the day
# separately. We add timestamp_pairs to the list only for hours that actually
# contain data. Hours that contain more data get divided into a larger number of
# portions.
hour_slices = self._get_timestamp_query_params_list(start_ts, end_ts, 24)
for (start_hour, end_hour) in hour_slices:
# Get the number of records in this hour interval
record_count = self._get_record_count(start_hour, end_hour, building)
if record_count == 0:
# No records for this hour, don't bother ingesting for this time period.
logger.info(f"No data detected for {start_hour}. Continuing.")
continue
if record_count < 10_000:
# This hour doesn't have much data, ingest it in one chunk
pairs_list.append((start_hour, end_hour))
continue
# If we got this far, this hour has a lot of data. It is split into 12 5-min
# intervals if it has fewer than 100k, or 20 3-min intervals if more.
num_divisions = 12 if record_count < 100_000 else 20
minute_slices = self._get_timestamp_query_params_list(
start_hour, end_hour, num_divisions
)
pairs_list.extend(minute_slices)

return pairs_list
max_records = 10_000
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The simplification of this file is awesome ✨

division_threshold = 100_000
min_divisions = 12
max_divisions = 20

def ingest_records(self, **kwargs):
for building in self.buildings:
logger.info(f"Obtaining images of building {building}")

# Get the timestamp pairs. If there are no records for this building,
# it will be an empty list.
timestamp_pairs = self._get_timestamp_pairs(building)

# Run ingestion for each timestamp pair
for start_ts, end_ts in timestamp_pairs:
# Reset counts before we start
self.new_iteration = True
self.fetched_count = 0

logger.info(f"Ingesting data for start: {start_ts}, end: {end_ts}")
super().ingest_records(
building=building, start_ts=start_ts, end_ts=end_ts
)
super().ingest_records(building=building)

def get_next_query_params(self, prev_query_params, **kwargs):
if not prev_query_params:
Expand Down Expand Up @@ -222,8 +85,13 @@ def get_next_query_params(self, prev_query_params, **kwargs):
}
return {**prev_query_params, "page": prev_query_params["page"] + 1}

def get_record_count_from_response(self, response_json):
if response_json:
return response_json.get("resultCount", 0)
return 0

def get_media_type(self, record):
return "image"
return constants.IMAGE

def get_batch_data(self, response_json):
if (
Expand All @@ -234,26 +102,6 @@ def get_batch_data(self, response_json):
):
return None

total_count = response_json.get("resultCount")
# Update the number of records we have pulled for this iteration.
# Note that this tracks the number of records pulled from the API, not
# the number actually written to TSV (which may be larger or smaller
# as some records are discarded or have additional related images.)
self.fetched_count += len(response_json.get("records"))

if self.new_iteration:
# This is the first batch of a new iteration.
logger.info(f"Detected {total_count} total records.")
self.new_iteration = False

# Detect a bug when the API continues serving pages of data in excess of
# the stated resultCount.
if self.fetched_count > total_count:
raise AirflowException(
f"Expected {total_count} records, but {self.fetched_count} have"
" been fetched. Consider reducing the ingestion interval."
)

return response_json["records"]

def get_record_data(self, data):
Expand Down
82 changes: 40 additions & 42 deletions openverse_catalog/dags/providers/provider_api_scripts/flickr.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,18 @@

import argparse
import logging
from datetime import datetime, timedelta, timezone
from datetime import datetime, timedelta

import lxml.html as html
from airflow.exceptions import AirflowException
from airflow.models import Variable
from common import constants
from common.licenses import get_license_info
from common.loader import provider_details as prov
from common.loader.provider_details import ImageCategory
from providers.provider_api_scripts.provider_data_ingester import ProviderDataIngester
from providers.provider_api_scripts.time_delineated_provider_data_ingester import (
TimeDelineatedProviderDataIngester,
)


logger = logging.getLogger(__name__)
Expand All @@ -37,7 +40,7 @@
}


class FlickrDataIngester(ProviderDataIngester):
class FlickrDataIngester(TimeDelineatedProviderDataIngester):
provider_string = prov.FLICKR_DEFAULT_PROVIDER
sub_providers = prov.FLICKR_SUB_PROVIDERS
photo_url_base = prov.FLICKR_PHOTO_URL_BASE
Expand All @@ -47,6 +50,16 @@ class FlickrDataIngester(ProviderDataIngester):
batch_limit = 500
retries = 5

max_records = 4_000
division_threshold = 20_000
min_divisions = 12
max_divisions = 60
# When we pull more records than the API reports, we don't want to raise an error
# and halt ingestion. Instead, this DAG adds its own separate handling to cut off
# ingestion when max_records is reached, and continue to the next time interval. See
# https://github.com/WordPress/openverse-catalog/pull/995
should_raise_error = False

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

Expand All @@ -60,48 +73,11 @@ def __init__(self, *args, **kwargs):
# to hitting rate limits.
self.requests_count = 0

@staticmethod
def _derive_timestamp_pair_list(date):
"""
Create a list of start/end timestamps for equal portions of the day.

Ingestion will be run separately for each of these time divisions.
This is necessary because requesting data for too long a period may cause
unexpected behavior from the API:
https://github.com/WordPress/openverse-catalog/issues/26.
"""
seconds_in_a_day = 86400
number_of_divisions = 48 # Half-hour increments
portion = int(seconds_in_a_day / number_of_divisions)
utc_date = datetime.strptime(date, "%Y-%m-%d").replace(tzinfo=timezone.utc)

def _ts_string(d):
return str(int(d.timestamp()))

# Generate the start/end timestamps for each half-hour 'slice' of the day
pair_list = [
(
_ts_string(utc_date + timedelta(seconds=i * portion)),
_ts_string(utc_date + timedelta(seconds=(i + 1) * portion)),
)
for i in range(number_of_divisions)
]
return pair_list

def ingest_records(self, **kwargs):
# Build a list of start/end timestamps demarcating portions of the day
# for which to ingest data.
timestamp_pairs = self._derive_timestamp_pair_list(self.date)

for start_ts, end_ts in timestamp_pairs:
logger.info(f"Ingesting data for start: {start_ts}, end: {end_ts}")
super().ingest_records(start_timestamp=start_ts, end_timestamp=end_ts)

def get_next_query_params(self, prev_query_params, **kwargs):
if not prev_query_params:
# Initial request, return default params
start_timestamp = kwargs.get("start_timestamp")
end_timestamp = kwargs.get("end_timestamp")
start_timestamp = kwargs.get("start_ts")
end_timestamp = kwargs.get("end_ts")

return {
"min_upload_date": start_timestamp,
Expand Down Expand Up @@ -151,6 +127,12 @@ def get_batch_data(self, response_json):
return None
return response_json.get("photos", {}).get("photo")

def get_record_count_from_response(self, response_json) -> int:
if response_json:
count = response_json.get("photos", {}).get("total", 0)
return int(count)
return 0

def get_record_data(self, data):
if (license_info := self._get_license_info(data)) is None:
return None
Expand Down Expand Up @@ -272,6 +254,22 @@ def _get_category(image_data):
return ImageCategory.PHOTOGRAPH
return None

def get_should_continue(self, response_json):
# Call the parent method in order to update the fetched_count
should_continue = super().get_should_continue(response_json)

# Return early if more than the max_records have been ingested.
# This could happen if we did not break the ingestion down into
# small enough divisions.
if self.fetched_count > self.max_records:
raise AirflowException(
f"{self.fetched_count} records retrieved, but there is a"
f" limit of {self.max_records}. Consider increasing the"
" number of divisions."
)

return should_continue


def main(date):
ingester = FlickrDataIngester(date=date)
Expand Down
Loading