Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingest): add status aspect to dataProcessInstance #10757

Merged
merged 5 commits into from
Jun 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 0 additions & 37 deletions metadata-ingestion/src/datahub/ingestion/api/source_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from datetime import datetime, timezone
from typing import (
TYPE_CHECKING,
Callable,
Dict,
Iterable,
List,
Expand Down Expand Up @@ -43,9 +42,6 @@

if TYPE_CHECKING:
from datahub.ingestion.api.source import SourceReport
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StaleEntityRemovalHandler,
)

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -139,39 +135,6 @@ def auto_status_aspect(
).as_workunit()


def _default_entity_type_fn(wu: MetadataWorkUnit) -> Optional[str]:
urn = wu.get_urn()
entity_type = guess_entity_type(urn)
return entity_type


def auto_stale_entity_removal(
stale_entity_removal_handler: "StaleEntityRemovalHandler",
stream: Iterable[MetadataWorkUnit],
entity_type_fn: Callable[
[MetadataWorkUnit], Optional[str]
] = _default_entity_type_fn,
) -> Iterable[MetadataWorkUnit]:
"""
Record all entities that are found, and emit removals for any that disappeared in this run.
"""

for wu in stream:
urn = wu.get_urn()

if wu.is_primary_source:
entity_type = entity_type_fn(wu)
if entity_type is not None:
stale_entity_removal_handler.add_entity_to_state(entity_type, urn)
else:
stale_entity_removal_handler.add_urn_to_skip(urn)

yield wu

# Clean up stale entities.
yield from stale_entity_removal_handler.gen_removed_entity_workunits()


T = TypeVar("T", bound=MetadataWorkUnit)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from datahub.emitter.mcp_builder import entity_supports_aspect
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.ingestion_job_checkpointing_provider_base import JobId
from datahub.ingestion.api.source_helpers import auto_stale_entity_removal
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.state.checkpoint import Checkpoint
from datahub.ingestion.source.state.entity_removal_state import GenericCheckpointState
Expand All @@ -28,6 +27,10 @@

logger: logging.Logger = logging.getLogger(__name__)

STATEFUL_INGESTION_IGNORED_ENTITY_TYPES = {
"dataProcessInstance",
}


class StatefulStaleMetadataRemovalConfig(StatefulIngestionConfig):
"""
Expand Down Expand Up @@ -59,6 +62,30 @@ def report_last_state_non_deletable_entities(self, urn: str) -> None:
self.last_state_non_deletable_entities.append(urn)


def auto_stale_entity_removal(
stale_entity_removal_handler: "StaleEntityRemovalHandler",
stream: Iterable[MetadataWorkUnit],
) -> Iterable[MetadataWorkUnit]:
"""
Record all entities that are found, and emit removals for any that disappeared in this run.
"""

for wu in stream:
urn = wu.get_urn()

if wu.is_primary_source:
entity_type = guess_entity_type(urn)
if entity_type is not None:
stale_entity_removal_handler.add_entity_to_state(entity_type, urn)
else:
stale_entity_removal_handler.add_urn_to_skip(urn)

yield wu

# Clean up stale entities.
yield from stale_entity_removal_handler.gen_removed_entity_workunits()


class StaleEntityRemovalHandler(
StatefulIngestionUsecaseHandlerBase["GenericCheckpointState"]
):
Expand Down Expand Up @@ -285,7 +312,11 @@ def gen_removed_entity_workunits(self) -> Iterable[MetadataWorkUnit]:
for urn in last_checkpoint_state.get_urns_not_in(
type="*", other_checkpoint_state=cur_checkpoint_state
):
if not entity_supports_aspect(guess_entity_type(urn), StatusClass):
entity_type = guess_entity_type(urn)
if (
entity_type in STATEFUL_INGESTION_IGNORED_ENTITY_TYPES
or not entity_supports_aspect(entity_type, StatusClass)
):
# If any entity does not support aspect 'status' then skip that entity urn
report.report_last_state_non_deletable_entities(urn)
continue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6010,6 +6010,70 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:36225e795a4597b2376996774a803b0d",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "dbt-model-performance",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:57aa623f096cf3a28af70fe94b713907",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "dbt-model-performance",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:a42a5b1bee156e45972e12d4156fb7a2",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "dbt-model-performance",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:fc6268f0be68fd04c310705b65efd6fe",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1643871600000,
"runId": "dbt-model-performance",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "tag",
"entityUrn": "urn:li:tag:dbt:tag_from_dbt",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
"aspect": {
"json": {
"owners": [],
"ownerTypes": {},
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:fivetran"
Expand Down Expand Up @@ -186,6 +187,7 @@
"aspect": {
"json": {
"owners": [],
"ownerTypes": {},
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:fivetran"
Expand Down Expand Up @@ -614,5 +616,53 @@
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:be36f55c13ec4e313c7510770e50784a",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:d8f100271d2dc3fa905717f82d083c8d",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:ee88d32dbe3133a23a9023c097050190",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
}
]
Original file line number Diff line number Diff line change
Expand Up @@ -624,5 +624,53 @@
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:be36f55c13ec4e313c7510770e50784a",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:d8f100271d2dc3fa905717f82d083c8d",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:ee88d32dbe3133a23a9023c097050190",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
}
]
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
# Adapted from https://github.com/big-data-europe/docker-hive.

version: "3"

services:

presto:
image: starburstdata/presto:350-e.15
container_name: "presto"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
---
version: '3.8'
services:
connect:
image: confluentinc/cp-kafka-connect:7.4.0
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
---
version: "3.8"
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.2.2
Expand Down
3 changes: 0 additions & 3 deletions metadata-ingestion/tests/integration/trino/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
# Adapted from https://github.com/big-data-europe/docker-hive.

version: "3"

services:

testtrino:
image: trinodb/trino:369
container_name: "testtrino"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,5 +68,21 @@
"runId": "dummy-test-stateful-ingestion",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:478810e859f870a54f72c681f41af619",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "dummy-test-stateful-ingestion",
"lastRunId": "no-run-id-provided"
}
}
]
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,22 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:7f26c3b4d2d82ace47f4b9dd0c9dea26",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "dummy-test-stateful-ingestion",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset3,PROD)",
Expand Down
Loading
Loading