-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Create centralised way to publish production versions of data to Azure #123
Comments
Installing Edit: |
Next problem: From the CLI the way to run all partitions of a job is to do
Not sure how to get the sensors to run. I got them to run by doing Oh, if that wasn't complicated enough: I don't know if it is possible and/or makes more sense to trigger the asset materialisations using Python code. If this is the case, we could in theory parse the DAG, inspect the asset to check whether it's partitioned or not, and then run it accordingly. This seems like something that would be really useful upstream, but I think it is also a lot of work. |
Code to run a job without the issue above!!!!!! # import the top level defs thing from popgetter
from . import defs
import time
from dagster import materialize, DagsterInstance, DynamicPartitionsDefinition
job_name = "job" # Replace with whatever
job = defs.get_job_def(job_name)
# Required for persisting outputs in $DAGSTER_HOME/storage
instance = DagsterInstance.get()
dependency_list = job._graph_def._dependencies
all_assets = {node_handle.name: definition
for node_handle, definition in
job._asset_layer.assets_defs_by_node_handle.items()}
def find_materialisable_asset_names(dep_list, done_asset_names) -> set[str]:
materialisable_asset_names = set()
for asset, dep_dict in dep_list.items():
if asset.name in done_asset_names:
continue
if all(dep.node in done_asset_names for dep in dep_dict.values()):
materialisable_asset_names.add(asset.name)
return materialisable_asset_names
print("--------------------")
materialised_asset_names = set()
while len(materialised_asset_names) < len(all_assets):
time.sleep(0.5)
asset_names_to_materialise = find_materialisable_asset_names(dependency_list, materialised_asset_names)
if len(asset_names_to_materialise) == 0:
print("No more assets to materialise")
break
asset_name_to_materialise = asset_names_to_materialise.pop()
asset_to_materialise = all_assets.get(asset_name_to_materialise)
print(f"Materialising: {asset_name_to_materialise}")
partitions_def = asset_to_materialise.partitions_def
# https://docs.dagster.io/_apidocs/execution#dagster.materialize -- note
# that the `assets` keyword argument needs to include upstream assets as
# well. We use `selection` to specify the asset that is actually being
# materialised.
if partitions_def is None:
# Unpartitioned
materialize(assets=[asset_to_materialise,
*(all_assets.get(k) for k in materialised_asset_names)],
selection=[asset_to_materialise],
instance=instance)
materialised_asset_names.add(asset_name_to_materialise)
else:
# Partitioned
if type(partitions_def) != DynamicPartitionsDefinition:
raise NotImplementedError("Non-dynamic partitions not implemented yet")
partition_names = instance.get_dynamic_partitions(partitions_def.name)
for partition in partition_names:
materialize(assets=[asset_to_materialise,
*(all_assets.get(k) for k in materialised_asset_names)],
selection=[asset_to_materialise],
partition_key=partition,
instance=instance)
materialised_asset_names.add(asset_name_to_materialise) |
Progress!!!!!!! Run locally with:
Or use the Dockerfile:
Yet to figure out:
|
Right now, our production data is in the Azure blob storage container
https://popgetter.blob.core.windows.net/popgetter-dagster-test/test_2
and one of us will populate this by settingENV=prod
and running all the Dagster pipelines locally :)I think it's useful to have a single, centralised, way to generate all production data and upload it to another Azure blob storage container (that has a less testy name :-)). There are several benefits of this:
countries.txt
file cleanly — The CLI uses this file to determine which countries are present as it cannot traverse the Azure directory structure. Right now the file is being manually generated, which can easily lead to inconsistencies between what it says and the actual data that is tehreI can throw together a quick Dockerfile for this and maybe investigate running this on GitHub Actions / Azure!
GHA has usage limits (https://docs.github.com/en/actions/learn-github-actions/usage-limits-billing-and-administration); in particular, "each job in a workflow can run for up to 6 hours of execution time" so it is not a deployment method that will scale well if we have many countries to run. I think for what we have now (BE + NI) it is still workable.
The text was updated successfully, but these errors were encountered: