Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest/gc): add index truncation logic #10099

Merged
merged 3 commits into from
Apr 2, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 133 additions & 2 deletions metadata-ingestion/src/datahub/ingestion/source/gc/datahub_gc.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import datetime
import logging
import re
import time
from dataclasses import dataclass
from typing import Iterable
from typing import Dict, Iterable

from pydantic import Field

from datahub.configuration.common import ConfigModel
from datahub.configuration.common import ConfigModel, OperationalError
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.decorators import (
SupportStatus,
Expand All @@ -15,12 +18,30 @@
from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.api.workunit import MetadataWorkUnit

logger = logging.getLogger(__name__)


class DataHubGcSourceConfig(ConfigModel):
cleanup_expired_tokens: bool = Field(
default=True,
description="Whether to clean up expired tokens or not",
)
truncate_indices: bool = Field(
default=True,
description="Whether to truncate elasticsearch indices or not which can be safely truncated",
)
truncate_index_older_than_days: int = Field(
default=30,
description="Indices older than this number of days will be truncated",
)
truncation_watch_until: int = Field(
default=10000,
description="Wait for truncation of indices until this number of documents are left",
)
truncation_sleep_between_seconds: int = Field(
default=30,
description="Sleep between truncation monitoring.",
)


@dataclass
Expand Down Expand Up @@ -51,8 +72,118 @@ def get_workunits_internal(
) -> Iterable[MetadataWorkUnit]:
if self.config.cleanup_expired_tokens:
self.revoke_expired_tokens()
if self.config.truncate_indices:
self.truncate_indices()
yield from []

def truncate_indices(self) -> None:
self._truncate_timeseries_helper(aspect_name="operation", entity_type="dataset")
self._truncate_timeseries_helper(
aspect_name="datasetusagestatistics", entity_type="dataset"
)
self._truncate_timeseries_helper(
aspect_name="chartUsageStatistics", entity_type="chart"
)
self._truncate_timeseries_helper(
aspect_name="dashboardUsageStatistics", entity_type="dashboard"
)

def _truncate_timeseries_helper(self, aspect_name: str, entity_type: str) -> None:
self._truncate_timeseries_with_watch_optional(
aspect_name=aspect_name, entity_type=entity_type, watch=False
)
self._truncate_timeseries_with_watch_optional(
aspect_name=aspect_name, entity_type=entity_type, watch=True
)

def _truncate_timeseries_with_watch_optional(
self, aspect_name: str, entity_type: str, watch: bool
) -> None:
graph = self.graph
assert graph is not None
if watch:
to_delete = 1
while to_delete > 0:
response = self.truncate_timeseries_util(
aspect=aspect_name,
dry_run=watch,
days_ago=self.config.truncate_index_older_than_days,
entity_type=entity_type,
)
val = response.get("value", "")
if "This was a dry run" not in val or "out of" not in val:
return
prev_to_delete = to_delete
to_delete, total = re.findall(r"\d+", val)[:2]
to_delete = int(to_delete)
if to_delete <= 0:
logger.info("Nothing to delete.")
return
logger.info(f"to_delete {to_delete} / {total}")
if to_delete == prev_to_delete:
logger.info("Seems to be stuck. Ending the loop.")
break
elif to_delete < self.config.truncation_watch_until:
logger.info("Too small truncation. Not going to watch.")
return
else:
time.sleep(self.config.truncation_sleep_between_seconds)
else:
self.truncate_timeseries_util(
aspect=aspect_name,
dry_run=watch,
days_ago=self.config.truncate_index_older_than_days,
entity_type=entity_type,
)

def x_days_ago_millis(self, days: int) -> int:
x_days_ago_datetime = datetime.datetime.now(
datetime.timezone.utc
) - datetime.timedelta(days=days)
return int(x_days_ago_datetime.timestamp() * 1000)

def truncate_timeseries_util(
self,
aspect: str,
days_ago: int,
dry_run: bool = True,
entity_type: str = "dataset",
) -> Dict:
graph = self.graph
assert graph is not None

gms_url = graph._gms_server
if not dry_run:
logger.info(
f"Going to truncate timeseries for {aspect} for {gms_url} older than {days_ago} days"
)
days_ago_millis = self.x_days_ago_millis(days_ago)
url = f"{gms_url}/operations?action=truncateTimeseriesAspect"
try:
response = graph._post_generic(
url=url,
payload_dict={
"entityType": entity_type,
"aspect": aspect,
"endTimeMillis": days_ago_millis,
"dryRun": dry_run,
},
)
# logger.info(f"Response: {response}")
except OperationalError:
response = graph._post_generic(
url=url,
payload_dict={
"entityType": entity_type,
"aspect": aspect,
"endTimeMillis": days_ago_millis,
"dryRun": dry_run,
"forceDeleteByQuery": True,
},
)
# logger.info(f"Response: {response}")
return response

def revoke_expired_tokens(self) -> None:
total = 1
while total > 0:
Expand Down
Loading