Skip to content

Commit

Permalink
fix(ingest): add status aspect to dataProcessInstance (datahub-projec…
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored and aviv-julienjehannet committed Jul 17, 2024
1 parent 718c3dd commit af89594
Show file tree
Hide file tree
Showing 12 changed files with 228 additions and 47 deletions.
37 changes: 0 additions & 37 deletions metadata-ingestion/src/datahub/ingestion/api/source_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from datetime import datetime, timezone
from typing import (
TYPE_CHECKING,
Callable,
Dict,
Iterable,
List,
Expand Down Expand Up @@ -43,9 +42,6 @@

if TYPE_CHECKING:
from datahub.ingestion.api.source import SourceReport
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StaleEntityRemovalHandler,
)

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -139,39 +135,6 @@ def auto_status_aspect(
).as_workunit()


def _default_entity_type_fn(wu: MetadataWorkUnit) -> Optional[str]:
urn = wu.get_urn()
entity_type = guess_entity_type(urn)
return entity_type


def auto_stale_entity_removal(
stale_entity_removal_handler: "StaleEntityRemovalHandler",
stream: Iterable[MetadataWorkUnit],
entity_type_fn: Callable[
[MetadataWorkUnit], Optional[str]
] = _default_entity_type_fn,
) -> Iterable[MetadataWorkUnit]:
"""
Record all entities that are found, and emit removals for any that disappeared in this run.
"""

for wu in stream:
urn = wu.get_urn()

if wu.is_primary_source:
entity_type = entity_type_fn(wu)
if entity_type is not None:
stale_entity_removal_handler.add_entity_to_state(entity_type, urn)
else:
stale_entity_removal_handler.add_urn_to_skip(urn)

yield wu

# Clean up stale entities.
yield from stale_entity_removal_handler.gen_removed_entity_workunits()


T = TypeVar("T", bound=MetadataWorkUnit)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from datahub.emitter.mcp_builder import entity_supports_aspect
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.ingestion_job_checkpointing_provider_base import JobId
from datahub.ingestion.api.source_helpers import auto_stale_entity_removal
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.state.checkpoint import Checkpoint
from datahub.ingestion.source.state.entity_removal_state import GenericCheckpointState
Expand All @@ -28,6 +27,10 @@

logger: logging.Logger = logging.getLogger(__name__)

STATEFUL_INGESTION_IGNORED_ENTITY_TYPES = {
"dataProcessInstance",
}


class StatefulStaleMetadataRemovalConfig(StatefulIngestionConfig):
"""
Expand Down Expand Up @@ -59,6 +62,30 @@ def report_last_state_non_deletable_entities(self, urn: str) -> None:
self.last_state_non_deletable_entities.append(urn)


def auto_stale_entity_removal(
stale_entity_removal_handler: "StaleEntityRemovalHandler",
stream: Iterable[MetadataWorkUnit],
) -> Iterable[MetadataWorkUnit]:
"""
Record all entities that are found, and emit removals for any that disappeared in this run.
"""

for wu in stream:
urn = wu.get_urn()

if wu.is_primary_source:
entity_type = guess_entity_type(urn)
if entity_type is not None:
stale_entity_removal_handler.add_entity_to_state(entity_type, urn)
else:
stale_entity_removal_handler.add_urn_to_skip(urn)

yield wu

# Clean up stale entities.
yield from stale_entity_removal_handler.gen_removed_entity_workunits()


class StaleEntityRemovalHandler(
StatefulIngestionUsecaseHandlerBase["GenericCheckpointState"]
):
Expand Down Expand Up @@ -285,7 +312,11 @@ def gen_removed_entity_workunits(self) -> Iterable[MetadataWorkUnit]:
for urn in last_checkpoint_state.get_urns_not_in(
type="*", other_checkpoint_state=cur_checkpoint_state
):
if not entity_supports_aspect(guess_entity_type(urn), StatusClass):
entity_type = guess_entity_type(urn)
if (
entity_type in STATEFUL_INGESTION_IGNORED_ENTITY_TYPES
or not entity_supports_aspect(entity_type, StatusClass)
):
# If any entity does not support aspect 'status' then skip that entity urn
report.report_last_state_non_deletable_entities(urn)
continue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6010,6 +6010,70 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:36225e795a4597b2376996774a803b0d",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "dbt-model-performance",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:57aa623f096cf3a28af70fe94b713907",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "dbt-model-performance",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:a42a5b1bee156e45972e12d4156fb7a2",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "dbt-model-performance",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:fc6268f0be68fd04c310705b65efd6fe",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "dbt-model-performance",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "tag",
"entityUrn": "urn:li:tag:dbt:tag_from_dbt",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
"aspect": {
"json": {
"owners": [],
"ownerTypes": {},
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:fivetran"
Expand Down Expand Up @@ -186,6 +187,7 @@
"aspect": {
"json": {
"owners": [],
"ownerTypes": {},
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:fivetran"
Expand Down Expand Up @@ -614,5 +616,53 @@
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:be36f55c13ec4e313c7510770e50784a",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:d8f100271d2dc3fa905717f82d083c8d",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:ee88d32dbe3133a23a9023c097050190",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
}
]
Original file line number Diff line number Diff line change
Expand Up @@ -624,5 +624,53 @@
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:be36f55c13ec4e313c7510770e50784a",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:d8f100271d2dc3fa905717f82d083c8d",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:ee88d32dbe3133a23a9023c097050190",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
}
]
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
# Adapted from https://github.com/big-data-europe/docker-hive.

version: "3"

services:

presto:
image: starburstdata/presto:350-e.15
container_name: "presto"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
---
version: '3.8'
services:
connect:
image: confluentinc/cp-kafka-connect:7.4.0
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
---
version: "3.8"
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.2.2
Expand Down
3 changes: 0 additions & 3 deletions metadata-ingestion/tests/integration/trino/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
# Adapted from https://github.com/big-data-europe/docker-hive.

version: "3"

services:

testtrino:
image: trinodb/trino:369
container_name: "testtrino"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,5 +68,21 @@
"runId": "dummy-test-stateful-ingestion",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:478810e859f870a54f72c681f41af619",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "dummy-test-stateful-ingestion",
"lastRunId": "no-run-id-provided"
}
}
]
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,22 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:7f26c3b4d2d82ace47f4b9dd0c9dea26",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "dummy-test-stateful-ingestion",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset3,PROD)",
Expand Down
Loading

0 comments on commit af89594

Please sign in to comment.