-
Notifications
You must be signed in to change notification settings - Fork 54
Add dayshift to tsv filenames for reingestion workflows #969
Conversation
Drafting to look at extended test failures. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh this is rad, I totally forgot that generate_tsv_filenames
was something we could get rid of now that we're fully converted to the ProviderDataIngester
derivatives!
This looks good, but when I run this locally both the Wikimedia commons DAG and the Cleveland one gave me XComs that had a suffix value:
[2023-01-31, 01:30:16 UTC] {factory_utils.py:44} INFO - Image store location: /var/workflow_output/wikimedia_image_v001_20230131013016_0.tsv
[2023-01-31, 01:30:16 UTC] {factory_utils.py:44} INFO - Audio store location: /var/workflow_output/wikimedia_audio_audio_v001_20230131013016_0.tsv
and
[2023-01-31, 01:34:33 UTC] {media.py:183} INFO - Output path: /var/workflow_output/clevelandmuseum_image_v001_20230131013433_None.tsv
[2023-01-31, 01:34:33 UTC] {factory_utils.py:44} INFO - Image store location: /var/workflow_output/clevelandmuseum_image_v001_20230131013433_None.tsv
Do you know why that's happening? 😮
datetime.now().strftime("%Y%m%d%H%M%S"), | ||
tsv_suffix, | ||
] | ||
output_file = ("_").join(filter(None, path_components)) + ".tsv" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hehe it's a little face 😁 ("_")
(as a real note though I think the parens can be removed):
output_file = ("_").join(filter(None, path_components)) + ".tsv" | |
output_file = "_".join(filter(None, path_components)) + ".tsv" |
Awesome use of filter
!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In Python, I am much more comfortable with list comprehensions than the filter
. This would be clearer for me, but this is not a blocking comment:
output_file = "_".join([c for c in path_components if c is not None]) + ".tsv"
or, if 0
is not a valid value:
output_file = "_".join([c for c in path_components if c]) + ".tsv"
@@ -160,7 +166,7 @@ def _init_media_stores(self) -> dict[str, MediaStore]: | |||
|
|||
for media_type, provider in self.providers.items(): | |||
StoreClass = get_media_store_class(media_type) | |||
media_stores[media_type] = StoreClass(provider) | |||
media_stores[media_type] = StoreClass(provider, tsv_suffix=str(day_shift)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, it's this line 😅 This is converting None
and 0
to 'None'
and '0'
which don't pass the filter! So they're getting appended to the filename anyway.
Based on the high urgency of this PR, the following reviewers are being gently reminded to review this PR: @obulat Excluding weekend1 days, this PR was updated 2 day(s) ago. PRs labelled with high urgency are expected to be reviewed within 2 weekday(s)2. @stacimc, if this PR is not ready for a review, please draft it to prevent reviewers from getting further unnecessary pings. Footnotes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am getting the same issue as @AetherUnbound, in load_from_s3
:
Loading image/cleveland_museum/year=2023/clevelandmuseum_image_v001_20230207060449_None.tsv from S3 Bucket openverse-storage into provider_data_image_cleveland_museum_20230101T000000_0
Can we also add a test for these cases?
7c2dd2d
to
04cfd65
Compare
Sorry for the delay getting back to this one. Rebased and fixed the identified issue 👍 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was able to verify, both with standard ingestion & reingestion, that this appropriately appends the day shift when necessary! I have some other comments/suggestions above, but nothing to block a merge 😄
Fixes
Fixes WordPress/openverse#1417 by @stacimc
Description
This adds the
day_shift
as a suffix to the tsv filenames for reingestion workflows, in order to prevent a race condition where multiple reingestion days running at the same time will attempt to write to a file with the same name. @AetherUnbound did an excellent investigation and write-up of the bug in this comment.This PR:
generate_tsv_filenames
task while we're at it. This was a separate task that runs before thepull_data
step and outputs the tsv filenames to XComs, so they can be grabbed later by thecopy_to_s3
task. For scripts using theProviderDataIngester
base class, it does this by initializing the ingester class to grab the file names. Thepull_data
task would then re-initialize the ingester class and override its generated filenames with the previously generated ones. Since all the provider scripts are using the ingester class now this should not be necessary.Testing Instructions
General things to look for when testing a DAG:
INGESTION_LIMIT
Airflow variable to a very small number (<= 100) to speed up ingestion.pull_data
task and verify that you see logs reporting the tsv file paths for each media type the DAG supports.copy_to_s3
task and verify that it is accessing the same file path reported in thepull_data
taskSome DAGs to test:
cleveland_museum_workflow
. You should see tsv filenames that do NOT have aday_shift
appended to the end. Examplepull_data
log:wikimedia_reingestion_workflow
. Check the logs for a few different reingestion days and make sure that the appropriateday_shift
is appended to the filenames. Examplepull_data
log for theingest_data_day_shift_4
TaskGroup:Checklist
Update index.md
).main
) ora parent feature branch.
errors.
Developer Certificate of Origin
Developer Certificate of Origin