diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py index b0ee6dd06291a0..ab0efe72407c62 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py @@ -1,3 +1,4 @@ +import asyncio import copy import functools import logging @@ -579,16 +580,11 @@ def on_dag_start(self, dag_run: "DagRun") -> None: obsolete_pipelines = set(ingested_dataflow_urns) - set(airflow_flow_urns) obsolete_tasks = set(ingested_datajob_urns) - set(airflow_job_urns) - for obsolete_pipeline in obsolete_pipelines: - if self.graph.exists(str(obsolete_pipeline)): - self.graph.soft_delete_entity(str(obsolete_pipeline)) + obsolete_urns = obsolete_pipelines.union(obsolete_tasks) - logger.debug(f"total pipelines removed = {len(obsolete_pipelines)}") - - for obsolete_task in obsolete_tasks: - if self.graph.exists(str(obsolete_task)): - self.graph.soft_delete_entity(str(obsolete_task)) + asyncio.run(self._soft_delete_obsolete_urns(obsolete_urns=obsolete_urns)) + logger.debug(f"total pipelines removed = {len(obsolete_pipelines)}") logger.debug(f"total tasks removed = {len(obsolete_tasks)}") if HAS_AIRFLOW_DAG_LISTENER_API: @@ -627,3 +623,13 @@ def on_dataset_changed(self, dataset: "Dataset") -> None: logger.debug( f"DataHub listener got notification about dataset change for {dataset}" ) + + async def _soft_delete_obsolete_urns(self, obsolete_urns): + delete_tasks = [self._delete_obsolete_data(urn) for urn in obsolete_urns] + await asyncio.gather(*delete_tasks) + + async def _delete_obsolete_data(self, obsolete_urn): + assert self.graph + + if self.graph.exists(str(obsolete_urn)): + self.graph.soft_delete_entity(str(obsolete_urn))