Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingest/gc): Additional dataprocess cleanup fixes #12049

Merged
merged 4 commits into from
Dec 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,9 @@ def fetch_dpis(self, job_urn: str, batch_size: int) -> List[dict]:
assert self.ctx.graph
dpis = []
start = 0
# This graphql endpoint doesn't support scrolling and therefore after 10k DPIs it causes performance issues on ES
# Therefore, we are limiting the max DPIs to 9000
max_item = 9000
while True:
try:
job_query_result = self.ctx.graph.execute_graphql(
Expand All @@ -226,10 +229,12 @@ def fetch_dpis(self, job_urn: str, batch_size: int) -> List[dict]:
runs = runs_data.get("runs")
dpis.extend(runs)
start += batch_size
if len(runs) < batch_size:
if len(runs) < batch_size or start >= max_item:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we probably should move to using scroll instead of search

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately there is no scroll endpoint for this :(

break
except Exception as e:
logger.error(f"Exception while fetching DPIs for job {job_urn}: {e}")
self.report.failure(
f"Exception while fetching DPIs for job {job_urn}:", exc=e
)
break
return dpis

Expand All @@ -254,8 +259,9 @@ def keep_last_n_dpi(
deleted_count_last_n += 1
futures[future]["deleted"] = True
except Exception as e:
logger.error(f"Exception while deleting DPI: {e}")

self.report.report_failure(
f"Exception while deleting DPI: {e}", exc=e
)
if deleted_count_last_n % self.config.batch_size == 0:
logger.info(f"Deleted {deleted_count_last_n} DPIs from {job.urn}")
if self.config.delay:
Expand Down Expand Up @@ -289,7 +295,7 @@ def delete_dpi_from_datajobs(self, job: DataJobEntity) -> None:
dpis = self.fetch_dpis(job.urn, self.config.batch_size)
dpis.sort(
key=lambda x: x["created"]["time"]
if "created" in x and "time" in x["created"]
if x.get("created") and x["created"].get("time")
else 0,
reverse=True,
)
Expand Down Expand Up @@ -325,8 +331,8 @@ def remove_old_dpis(
continue

if (
"created" not in dpi
or "time" not in dpi["created"]
not dpi.get("created")
or not dpi["created"].get("time")
or dpi["created"]["time"] < retention_time * 1000
):
future = executor.submit(
Expand All @@ -340,7 +346,7 @@ def remove_old_dpis(
deleted_count_retention += 1
futures[future]["deleted"] = True
except Exception as e:
logger.error(f"Exception while deleting DPI: {e}")
self.report.report_failure(f"Exception while deleting DPI: {e}", exc=e)

if deleted_count_retention % self.config.batch_size == 0:
logger.info(
Expand All @@ -351,9 +357,12 @@ def remove_old_dpis(
logger.info(f"Sleeping for {self.config.delay} seconds")
time.sleep(self.config.delay)

logger.info(
f"Deleted {deleted_count_retention} DPIs from {job.urn} due to retention"
)
if deleted_count_retention > 0:
logger.info(
f"Deleted {deleted_count_retention} DPIs from {job.urn} due to retention"
)
else:
logger.debug(f"No DPIs to delete from {job.urn} due to retention")

def get_data_flows(self) -> Iterable[DataFlowEntity]:
assert self.ctx.graph
Expand Down
48 changes: 48 additions & 0 deletions metadata-ingestion/tests/unit/test_gc.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,54 @@ def test_delete_dpi_from_datajobs_without_dpi_created_time(self, mock_fetch_dpis
self.cleanup.delete_dpi_from_datajobs(job)
self.assertEqual(10, self.report.num_aspects_removed)

@patch(
"datahub.ingestion.source.gc.dataprocess_cleanup.DataProcessCleanup.fetch_dpis"
)
def test_delete_dpi_from_datajobs_without_dpi_null_created_time(
self, mock_fetch_dpis
):
job = DataJobEntity(
urn="urn:li:dataJob:1",
flow_urn="urn:li:dataFlow:1",
lastIngested=int(datetime.now(timezone.utc).timestamp()),
jobId="job1",
dataPlatformInstance="urn:li:dataPlatformInstance:1",
total_runs=10,
)
mock_fetch_dpis.return_value = [
{"urn": f"urn:li:dataprocessInstance:{i}"} for i in range(10)
] + [
{
"urn": "urn:li:dataprocessInstance:11",
"created": {"time": None},
}
]
self.cleanup.delete_dpi_from_datajobs(job)
self.assertEqual(11, self.report.num_aspects_removed)

@patch(
"datahub.ingestion.source.gc.dataprocess_cleanup.DataProcessCleanup.fetch_dpis"
)
def test_delete_dpi_from_datajobs_without_dpi_without_time(self, mock_fetch_dpis):
job = DataJobEntity(
urn="urn:li:dataJob:1",
flow_urn="urn:li:dataFlow:1",
lastIngested=int(datetime.now(timezone.utc).timestamp()),
jobId="job1",
dataPlatformInstance="urn:li:dataPlatformInstance:1",
total_runs=10,
)
mock_fetch_dpis.return_value = [
{"urn": f"urn:li:dataprocessInstance:{i}"} for i in range(10)
] + [
{
"urn": "urn:li:dataprocessInstance:11",
"created": None,
}
]
self.cleanup.delete_dpi_from_datajobs(job)
self.assertEqual(11, self.report.num_aspects_removed)

def test_fetch_dpis(self):
assert self.cleanup.ctx.graph
self.cleanup.ctx.graph = MagicMock()
Expand Down
Loading