-
Notifications
You must be signed in to change notification settings - Fork 54
Update reingestion workflows to load and report data #618
Conversation
89f02de
to
abe67d5
Compare
This is in draft pending the completion of the v1.3.0 milestone - once that's complete this can be rebased and undrafted! |
bbecded
to
1281943
Compare
b760655
to
5607f58
Compare
88a621b
to
6a72b97
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is looking very interesting. I need to grab the reingestion process in my mind so I'll come back to review this tomorrow with more time.
On another note, I think you can fill all those - [ ] from the checklist 😄 Those are great instructions and a PR description, very appreciated!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is so impressive! It's a really significant change too, but will give us a lot more control and clarity around these reingestion workflows. I have a number of comments on a few different aspects, notably:
- The S3 prefix we use once uploaded
- The nomenclature used for certain variables/concepts
I'm so excited to be able to turn this on soon, great work!!
def _add_counts(self, a, b): | ||
return (a or 0) + (b or 0) | ||
|
||
def __add__(self, other): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So smart!! This is awesome 🤩
@@ -104,10 +149,19 @@ def report_completion( | |||
|
|||
# Collect data into a single message | |||
message = f""" | |||
*Provider*: `{provider_name}` | |||
*DAG*: `{dag_id}` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I kind of preferred using provider name so we didn't have to see _workflow
over and over 😅 is there potentially a different way of showing that it's a reingestion workflow DAG without having to use dag_id
directly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Previously we were already using dag_id, and just chopping _workflow
off (ie dag_id.replace("_workflow", "")
). We could continue doing so, but then for reingestion flows we'd get wikimedia_reingestion
as the "provider", which isn't quite correct.
I can think of other things to do but they all seem like overkill to me. We could look for reingestion
in the dag_id, but we'd still need to get the name of the actual provider. That would mean making a mapping somewhere, or else passing the information all the way through from the ingester class. I guess we could chop off workflow
if 'reingestion' isn't in the dagid, and then report "provider" for regular flows and "dag_id" for reingestion 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps on the ProviderWorkflow
dataclass we could specify provider
here instead of dag_id
?
dag_id: str = "" |
Then we could also pass in the workflow type (regular/reingestion) to the reporting function which could be used to craft the message. On the other hand though, that all sounds like a lot of work so maybe DAG ID is just fine 😅
"\n_Duration is the sum of the duration for each data pull task." | ||
" It does not include loading time and does not account for data" | ||
" pulls that may happen concurrently." |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One thought on a potentially alternate way to do this down the line, we could average the durations across each aggregate. E.g. have average durations for the daily spread, weekly spread, and monthly spread, that way we could get a sense of how long each type of aggregate is taking on average!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like this idea! Probably overkill but it would be interesting to have some kind of summary of outliers. For Wikimedia for instance the current large-batch issue could make for some wild outliers 😮
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh definitely 😰 Yea not necessary now but a potential option down the line if we find we want more distinction!
@@ -87,6 +95,34 @@ def _make_report_completion_contents_data(media_type: str): | |||
] | |||
|
|||
|
|||
def _make_report_completion_contents_list_data(media_type: str): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These test cases are great!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is incredible, and such a significant change! I was able to run this locally and it worked great. I have one more note on the reporting, but otherwise this is good to go 🚀
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks awesome 👏
Avoids collisions on tsv filenames when reingestion runs for two different dates at the same time. - Removes the old impelementation, which appended ingestion date to the actual filename - Instead partitions by the reingestion date
Okay, last bit of feedback addressed. The biggest thing was related to this comment thread, but I'm just going to flag this at the top level for visibility, cc @AetherUnbound: The tsv filenames no longer append the
Note that for reingestion, it's partitioned by year and not by month. That is because I temporarily have set the reingestion flows to run weekly instead of daily. When they get turned back on to the daily schedule, the further partitioning will be added automatically. Sorry that took awhile to get to; the templating gave me a ton of grief! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I love all these changes! As difficult as that templating was to figure out, I really like the new prefix 💯
Agreed, it was definitely worth it! Much cleaner and more organized. Thanks everyone for the reviews :) |
Fixes
Fixes WordPress/openverse#1502 by @stacimc
Description
This PR:
@daily
to@weekly
, so that we can get a sense of how long they take when we roll them out and adjust more easily. I expect we may need to tweak the configurations.A reingestion workflow will look like this:
Don't panic 😄 To make it easier to see what's going on, I've altered the workflow configuration for Wikimedia here to generate a smaller number of reingestion days. You can see that each node in the graph represents an
ingestion workflow
for a particular day in the past. The suffix for the taskgroup is the day_shift (soingest_data_day_shift_32
is ingesting data for 32 days in the past).Here's how I altered the configuration to get that graph:
The
daily_list_length
,weekly_list_length
and so on tell the DAG factory how many taskgroups to generate at each level. So if you havedaily_list_length = 3
, taksgroups for the day_shifts1
,2
, and3
will be generated at that level. If you haveweekly_list_length=3
, you'll generate 3 new taskgroups for day_shifts counting by 7, starting at the last day_shift in the previous level (3) -- so10
,17
,24
.Between each level is a 'gather' task which acts as a gate. All the ingestion tasks in partition 0 must complete before
gather_partition_0
can run; only then are partition 1 ingestion tasks processed. There is one partition for each of the day shift multipliers (ie partition 0 is today, partition 1 is daily shifts (if any are configured), then weekly shifts and so on).When all of the tasks are completed, a single
report_load_completion
reports a summary of the sum of all the pull_data durations, as well as the sum of all the record_counts (total number upserted, total duplicates, etc).REVIEW GUIDE
This is a big PR, but many of the changes are refactoring of types or the addition of tests/docstrings. I am happy to find a way to split this if necessary for review, though. Here's my review guide by file:
DAGs.md
: auto-generated docs for the reingestion workflowscommon/loader/reporting.py
: updates to the reporting tool to aggregate data for multiple ingestion taskscommon/loader/storage/*
andprovider_data_ingester.py
: all changes to these files are done to append the ingestion date as a suffix to the tsv filenames for dated DAGs. This is necessary because reingestion tasks can run concurrently, so we can get collisions on tsv filenames when just using the execution date.provider_dag_factory.py
: the bulk of the work. This refactors the existing factories to split out ingestion and reporting tasks, and uses this to create new reingestion workflows.provider_*workflows.py
andprovider_*_workflow_factory.py
: minor cleanup, renames reingestion dags to use the word "reingestion" for claritytests/*
: new and updated testsNOTES
The regular provider flows for Europeana and Flickr are not yet enabled. Their reingestion workflows should NOT be turned on until they have been refactored, and the 'regular' provider workflow is confirmed to be working.
The TSV filenames for all regular provider workflows remain the same. For reingestion workflows, they are further partitioned by the reingestion date (the day for which data is being processed), in order to prevent collisions when reingestion for multiple days run at once. An example tsv filename for
wikimedia_reingestion_workflow
is:wikimedia_reingestion/year=2020/reingestion=2022-07-22/wikimedia_image_v001_20220922230222.tsv
.Testing Instructions
wikimedia_reingestion_workflow
. To make this quicker, you can try:ingestion_limit
variable to something like250
so that ingestion for each day completes earlyreport_load_completion
content for the wikimedia reingestion flow, as well as a 'normal' DAG to make sure they look correct with & without aggregation.Checklist
Update index.md
).main
) or a parent feature branch.Developer Certificate of Origin
Developer Certificate of Origin