Skip to content

Commit

Permalink
fix(ingest/gc): Adding test and more checks to gc source (#12027)
Browse files Browse the repository at this point in the history
  • Loading branch information
treff7es authored Dec 5, 2024
1 parent 8d15df0 commit 3c388a5
Show file tree
Hide file tree
Showing 2 changed files with 156 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -208,22 +208,28 @@ def fetch_dpis(self, job_urn: str, batch_size: int) -> List[dict]:
dpis = []
start = 0
while True:
job_query_result = self.ctx.graph.execute_graphql(
DATA_PROCESS_INSTANCES_QUERY,
{"dataJobUrn": job_urn, "start": start, "count": batch_size},
)
job_data = job_query_result.get("dataJob")
if not job_data:
raise ValueError(f"Error getting job {job_urn}")

runs_data = job_data.get("runs")
if not runs_data:
raise ValueError(f"Error getting runs for {job_urn}")

runs = runs_data.get("runs")
dpis.extend(runs)
start += batch_size
if len(runs) < batch_size:
try:
job_query_result = self.ctx.graph.execute_graphql(
DATA_PROCESS_INSTANCES_QUERY,
{"dataJobUrn": job_urn, "start": start, "count": batch_size},
)
job_data = job_query_result.get("dataJob")
if not job_data:
logger.error(f"Error getting job {job_urn}")
break

runs_data = job_data.get("runs")
if not runs_data:
logger.error(f"Error getting runs for {job_urn}")
break

runs = runs_data.get("runs")
dpis.extend(runs)
start += batch_size
if len(runs) < batch_size:
break
except Exception as e:
logger.error(f"Exception while fetching DPIs for job {job_urn}: {e}")
break
return dpis

Expand All @@ -243,8 +249,12 @@ def keep_last_n_dpi(
futures[future] = dpi

for future in as_completed(futures):
deleted_count_last_n += 1
futures[future]["deleted"] = True
try:
future.result()
deleted_count_last_n += 1
futures[future]["deleted"] = True
except Exception as e:
logger.error(f"Exception while deleting DPI: {e}")

if deleted_count_last_n % self.config.batch_size == 0:
logger.info(f"Deleted {deleted_count_last_n} DPIs from {job.urn}")
Expand Down Expand Up @@ -279,7 +289,7 @@ def delete_dpi_from_datajobs(self, job: DataJobEntity) -> None:
dpis = self.fetch_dpis(job.urn, self.config.batch_size)
dpis.sort(
key=lambda x: x["created"]["time"]
if x["created"] and x["created"]["time"]
if "created" in x and "time" in x["created"]
else 0,
reverse=True,
)
Expand Down Expand Up @@ -314,15 +324,23 @@ def remove_old_dpis(
if dpi.get("deleted"):
continue

if dpi["created"]["time"] < retention_time * 1000:
if (
"created" not in dpi
or "time" not in dpi["created"]
or dpi["created"]["time"] < retention_time * 1000
):
future = executor.submit(
self.delete_entity, dpi["urn"], "dataprocessInstance"
)
futures[future] = dpi

for future in as_completed(futures):
deleted_count_retention += 1
futures[future]["deleted"] = True
try:
future.result()
deleted_count_retention += 1
futures[future]["deleted"] = True
except Exception as e:
logger.error(f"Exception while deleting DPI: {e}")

if deleted_count_retention % self.config.batch_size == 0:
logger.info(
Expand Down Expand Up @@ -378,8 +396,11 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
dataFlows[flow.urn] = flow

scroll_id: Optional[str] = None
previous_scroll_id: Optional[str] = None

dataJobs: Dict[str, List[DataJobEntity]] = defaultdict(list)
deleted_jobs: int = 0

while True:
result = self.ctx.graph.execute_graphql(
DATAJOB_QUERY,
Expand Down Expand Up @@ -426,9 +447,11 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
else:
dataJobs[datajob_entity.flow_urn].append(datajob_entity)

if not scroll_id:
if not scroll_id or previous_scroll_id == scroll_id:
break

previous_scroll_id = scroll_id

logger.info(f"Deleted {deleted_jobs} DataJobs")
# Delete empty dataflows if needed
if self.config.delete_empty_data_flows:
Expand All @@ -443,4 +466,5 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
if deleted_jobs % self.config.batch_size == 0:
logger.info(f"Deleted {deleted_data_flows} DataFlows")
logger.info(f"Deleted {deleted_data_flows} DataFlows")

return []
109 changes: 109 additions & 0 deletions metadata-ingestion/tests/unit/test_gc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
import unittest
from datetime import datetime, timezone
from unittest.mock import MagicMock, patch

from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.source.gc.dataprocess_cleanup import (
DataJobEntity,
DataProcessCleanup,
DataProcessCleanupConfig,
DataProcessCleanupReport,
)


class TestDataProcessCleanup(unittest.TestCase):
def setUp(self):
self.ctx = PipelineContext(run_id="test_run")
self.ctx.graph = MagicMock()
self.config = DataProcessCleanupConfig()
self.report = DataProcessCleanupReport()
self.cleanup = DataProcessCleanup(
self.ctx, self.config, self.report, dry_run=True
)

@patch(
"datahub.ingestion.source.gc.dataprocess_cleanup.DataProcessCleanup.fetch_dpis"
)
def test_delete_dpi_from_datajobs(self, mock_fetch_dpis):
job = DataJobEntity(
urn="urn:li:dataJob:1",
flow_urn="urn:li:dataFlow:1",
lastIngested=int(datetime.now(timezone.utc).timestamp()),
jobId="job1",
dataPlatformInstance="urn:li:dataPlatformInstance:1",
total_runs=10,
)
mock_fetch_dpis.return_value = [
{
"urn": f"urn:li:dataprocessInstance:{i}",
"created": {
"time": int(datetime.now(timezone.utc).timestamp() + i) * 1000
},
}
for i in range(10)
]
self.cleanup.delete_dpi_from_datajobs(job)
self.assertEqual(5, self.report.num_aspects_removed)

@patch(
"datahub.ingestion.source.gc.dataprocess_cleanup.DataProcessCleanup.fetch_dpis"
)
def test_delete_dpi_from_datajobs_without_dpis(self, mock_fetch_dpis):
job = DataJobEntity(
urn="urn:li:dataJob:1",
flow_urn="urn:li:dataFlow:1",
lastIngested=int(datetime.now(timezone.utc).timestamp()),
jobId="job1",
dataPlatformInstance="urn:li:dataPlatformInstance:1",
total_runs=10,
)
mock_fetch_dpis.return_value = []
self.cleanup.delete_dpi_from_datajobs(job)
self.assertEqual(0, self.report.num_aspects_removed)

@patch(
"datahub.ingestion.source.gc.dataprocess_cleanup.DataProcessCleanup.fetch_dpis"
)
def test_delete_dpi_from_datajobs_without_dpi_created_time(self, mock_fetch_dpis):
job = DataJobEntity(
urn="urn:li:dataJob:1",
flow_urn="urn:li:dataFlow:1",
lastIngested=int(datetime.now(timezone.utc).timestamp()),
jobId="job1",
dataPlatformInstance="urn:li:dataPlatformInstance:1",
total_runs=10,
)
mock_fetch_dpis.return_value = [
{"urn": f"urn:li:dataprocessInstance:{i}"} for i in range(10)
] + [
{
"urn": "urn:li:dataprocessInstance:11",
"created": {"time": int(datetime.now(timezone.utc).timestamp() * 1000)},
}
]
self.cleanup.delete_dpi_from_datajobs(job)
self.assertEqual(10, self.report.num_aspects_removed)

def test_fetch_dpis(self):
assert self.cleanup.ctx.graph
self.cleanup.ctx.graph = MagicMock()
self.cleanup.ctx.graph.execute_graphql.return_value = {
"dataJob": {
"runs": {
"runs": [
{
"urn": "urn:li:dataprocessInstance:1",
"created": {
"time": int(datetime.now(timezone.utc).timestamp())
},
}
]
}
}
}
dpis = self.cleanup.fetch_dpis("urn:li:dataJob:1", 10)
self.assertEqual(len(dpis), 1)


if __name__ == "__main__":
unittest.main()

0 comments on commit 3c388a5

Please sign in to comment.