Skip to content

Commit

Permalink
fix(ingestion/airflow-plugin): airflow remove old tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
dushayntAW committed May 23, 2024
1 parent 0d36c63 commit 49c689c
Showing 1 changed file with 14 additions and 8 deletions.
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import copy
import functools
import logging
Expand Down Expand Up @@ -579,16 +580,11 @@ def on_dag_start(self, dag_run: "DagRun") -> None:
obsolete_pipelines = set(ingested_dataflow_urns) - set(airflow_flow_urns)
obsolete_tasks = set(ingested_datajob_urns) - set(airflow_job_urns)

for obsolete_pipeline in obsolete_pipelines:
if self.graph.exists(str(obsolete_pipeline)):
self.graph.soft_delete_entity(str(obsolete_pipeline))
obsolete_urns = obsolete_pipelines.union(obsolete_tasks)

logger.debug(f"total pipelines removed = {len(obsolete_pipelines)}")

for obsolete_task in obsolete_tasks:
if self.graph.exists(str(obsolete_task)):
self.graph.soft_delete_entity(str(obsolete_task))
asyncio.run(self._soft_delete_obsolete_urns(obsolete_urns=obsolete_urns))

logger.debug(f"total pipelines removed = {len(obsolete_pipelines)}")
logger.debug(f"total tasks removed = {len(obsolete_tasks)}")

if HAS_AIRFLOW_DAG_LISTENER_API:
Expand Down Expand Up @@ -627,3 +623,13 @@ def on_dataset_changed(self, dataset: "Dataset") -> None:
logger.debug(
f"DataHub listener got notification about dataset change for {dataset}"
)

async def _soft_delete_obsolete_urns(self, obsolete_urns):
delete_tasks = [self._delete_obsolete_data(urn) for urn in obsolete_urns]
await asyncio.gather(*delete_tasks)

async def _delete_obsolete_data(self, obsolete_urn):
assert self.graph

if self.graph.exists(str(obsolete_urn)):
self.graph.soft_delete_entity(str(obsolete_urn))

0 comments on commit 49c689c

Please sign in to comment.