Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest): set pipeline name in system metadata #10190

Merged
merged 6 commits into from
Jun 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions metadata-ingestion/docs/sources/datahub/datahub_recipe.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@ source:
enabled: true
ignore_old_state: false
urn_pattern:
deny:
# Ignores all datahub metadata where the urn matches the regex
- ^denied.urn.*
allow:
# Ingests all datahub metadata where the urn matches the regex.
- ^allowed.urn.*
extractor_config:
set_system_metadata: false # Replicate system metadata
deny:
# Ignores all datahub metadata where the urn matches the regex
- ^denied.urn.*
allow:
# Ingests all datahub metadata where the urn matches the regex.
- ^allowed.urn.*

flags:
set_system_metadata: false # Replicate system metadata

# Here, we write to a DataHub instance
# You can also use a different sink, e.g. to write the data to a file instead
Expand Down
2 changes: 1 addition & 1 deletion metadata-ingestion/src/datahub/cli/check_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,10 @@ def metadata_file(json_file: str, rewrite: bool, unpack_mces: bool) -> None:
"config": {"filename": json_file},
"extractor": "generic",
"extractor_config": {
"set_system_metadata": False,
"unpack_mces_into_mcps": unpack_mces,
},
},
"flags": {"set_system_metadata": False},
"sink": {
"type": "file",
"config": {"filename": out_file.name},
Expand Down
30 changes: 19 additions & 11 deletions metadata-ingestion/src/datahub/ingestion/api/workunit.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,14 @@
from dataclasses import dataclass
from typing import Iterable, Optional, Type, TypeVar, Union, overload

from deprecated import deprecated

from datahub.emitter.aspect import TIMESERIES_ASPECT_MAP
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.common import WorkUnit
from datahub.metadata.com.linkedin.pegasus2avro.mxe import (
MetadataChangeEvent,
MetadataChangeProposal,
)
from datahub.metadata.schema_classes import UsageAggregationClass, _Aspect
from datahub.metadata.schema_classes import _Aspect

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -158,11 +156,21 @@ def decompose_mce_into_mcps(self) -> Iterable["MetadataWorkUnit"]:
for mcpw in mcps_from_mce(self.metadata)
]


@deprecated
@dataclass
class UsageStatsWorkUnit(WorkUnit):
usageStats: UsageAggregationClass

def get_metadata(self) -> dict:
return {"usage": self.usageStats}
@classmethod
def from_metadata(
cls,
metadata: Union[
MetadataChangeEvent, MetadataChangeProposal, MetadataChangeProposalWrapper
],
id: Optional[str] = None,
) -> "MetadataWorkUnit":
workunit_id = id or cls.generate_workunit_id(metadata)

if isinstance(metadata, MetadataChangeEvent):
return MetadataWorkUnit(id=workunit_id, mce=metadata)
elif isinstance(metadata, (MetadataChangeProposal)):
return MetadataWorkUnit(id=workunit_id, mcp_raw=metadata)
elif isinstance(metadata, MetadataChangeProposalWrapper):
return MetadataWorkUnit(id=workunit_id, mcp=metadata)
else:
raise ValueError(f"Unexpected metadata type {type(metadata)}")
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,13 @@
from typing import Iterable, Union

from datahub.configuration.common import ConfigModel
from datahub.emitter.mce_builder import get_sys_time
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.common import RecordEnvelope
from datahub.ingestion.api.source import Extractor, WorkUnit
from datahub.ingestion.api.workunit import MetadataWorkUnit, UsageStatsWorkUnit
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.metadata.com.linkedin.pegasus2avro.mxe import (
MetadataChangeEvent,
MetadataChangeProposal,
SystemMetadata,
)

logger = logging.getLogger(__name__)
Expand All @@ -26,10 +24,6 @@ def _try_reformat_with_black(code: str) -> str:


class WorkUnitRecordExtractorConfig(ConfigModel):
set_system_metadata: bool = True
set_system_metadata_pipeline_name: bool = (
False # false for now until the models are available in OSS
)
unpack_mces_into_mcps: bool = False


Expand Down Expand Up @@ -65,14 +59,6 @@ def get_records(
MetadataChangeProposalWrapper,
),
):
if self.config.set_system_metadata:
workunit.metadata.systemMetadata = SystemMetadata(
lastObserved=get_sys_time(), runId=self.ctx.run_id
)
if self.config.set_system_metadata_pipeline_name:
workunit.metadata.systemMetadata.pipelineName = (
self.ctx.pipeline_name
)
if (
isinstance(workunit.metadata, MetadataChangeEvent)
and len(workunit.metadata.proposedSnapshot.aspects) == 0
Expand All @@ -92,11 +78,6 @@ def get_records(
"workunit_id": workunit.id,
},
)
elif isinstance(workunit, UsageStatsWorkUnit):
logger.error(
"Dropping deprecated `UsageStatsWorkUnit`. "
"Emit a `MetadataWorkUnit` with the `datasetUsageStatistics` aspect instead."
)
else:
raise ValueError(f"unknown WorkUnit type {type(workunit)}")

Expand Down
6 changes: 6 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/run/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@
from datahub.ingestion.sink.file import FileSink, FileSinkConfig
from datahub.ingestion.sink.sink_registry import sink_registry
from datahub.ingestion.source.source_registry import source_registry
from datahub.ingestion.transformer.system_metadata_transformer import (
SystemMetadataTransformer,
)
from datahub.ingestion.transformer.transform_registry import transform_registry
from datahub.metadata.schema_classes import MetadataChangeProposalClass
from datahub.telemetry import stats, telemetry
Expand Down Expand Up @@ -317,6 +320,9 @@ def _configure_transforms(self) -> None:
f"Transformer type:{transformer_type},{transformer_class} configured"
)

# Add the system metadata transformer at the end of the list.
self.transformers.append(SystemMetadataTransformer(self.ctx))

def _configure_reporting(
self, report_to: Optional[str], no_default_report: bool
) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,14 @@ class FlagsConfig(ConfigModel):
),
)

set_system_metadata: bool = Field(
True, description="Set system metadata on entities."
)
set_system_metadata_pipeline_name: bool = Field(
True,
description="Set system metadata pipeline name. Requires `set_system_metadata` to be enabled.",
)


class PipelineConfig(ConfigModel):
source: SourceConfig
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
from typing import Callable, Iterable, Optional, Union

from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.common import ControlRecord, PipelineContext, RecordEnvelope
from datahub.ingestion.api.transform import Transformer
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.metadata.com.linkedin.pegasus2avro.mxe import (
MetadataChangeEvent,
MetadataChangeProposal,
)


class AutoHelperTransformer(Transformer):
"""Converts an auto_* source helper into a transformer.

Important usage note: this assumes that the auto helper is stateless. The converter
will be called multiple times, once for each batch of records. If the helper
attempts to maintain state or perform some cleanup at the end of the stream, it
will not behave correctly.
"""

def __init__(
self,
converter: Callable[[Iterable[MetadataWorkUnit]], Iterable[MetadataWorkUnit]],
):
self.converter = converter

def transform(
self, record_envelopes: Iterable[RecordEnvelope]
) -> Iterable[RecordEnvelope]:
records = list(record_envelopes)

normal_records = [r for r in records if not isinstance(r.record, ControlRecord)]
control_records = [r for r in records if isinstance(r.record, ControlRecord)]

yield from self._from_workunits(
self.converter(
self._into_workunits(normal_records),
)
)

# Pass through control records as-is. Note that this isn't fully correct, since it technically
# reorders the control records relative to the normal records. This is ok since the only control
# record we have marks the end of the stream.
yield from control_records

@classmethod
def _into_workunits(
cls,
stream: Iterable[
RecordEnvelope[
Union[
MetadataChangeEvent,
MetadataChangeProposal,
MetadataChangeProposalWrapper,
]
]
],
) -> Iterable[MetadataWorkUnit]:
for record in stream:
workunit_id: Optional[str] = record.metadata.get("workunit_id")
metadata = record.record
yield MetadataWorkUnit.from_metadata(metadata, id=workunit_id)

@classmethod
def _from_workunits(
cls, stream: Iterable[MetadataWorkUnit]
) -> Iterable[RecordEnvelope]:
for workunit in stream:
yield RecordEnvelope(
workunit.metadata,
{
"workunit_id": workunit.id,
},
)

@classmethod
def create(cls, config_dict: dict, ctx: PipelineContext) -> Transformer:
raise NotImplementedError(f"{cls.__name__} cannot be created from config")
Comment on lines +13 to +79
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review of new AutoHelperTransformer class.

The AutoHelperTransformer class is well-implemented with clear separation of responsibilities and good use of list comprehensions for filtering records. However, consider improving the handling of control records to avoid potential issues with record ordering.

- # Pass through control records as-is. Note that this isn't fully correct, since it technically
- # reorders the control records relative to the normal records. This is ok since the only control
- # record we have marks the end of the stream.
+ # TODO: Revisit the handling of control records to ensure correct ordering relative to normal records.
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
class AutoHelperTransformer(Transformer):
"""Converts an auto_* source helper into a transformer.
Important usage note: this assumes that the auto helper is stateless. The converter
will be called multiple times, once for each batch of records. If the helper
attempts to maintain state or perform some cleanup at the end of the stream, it
will not behave correctly.
"""
def __init__(
self,
converter: Callable[[Iterable[MetadataWorkUnit]], Iterable[MetadataWorkUnit]],
):
self.converter = converter
def transform(
self, record_envelopes: Iterable[RecordEnvelope]
) -> Iterable[RecordEnvelope]:
records = list(record_envelopes)
normal_records = [r for r in records if not isinstance(r.record, ControlRecord)]
control_records = [r for r in records if isinstance(r.record, ControlRecord)]
yield from self._from_workunits(
self.converter(
self._into_workunits(normal_records),
)
)
# Pass through control records as-is. Note that this isn't fully correct, since it technically
# reorders the control records relative to the normal records. This is ok since the only control
# record we have marks the end of the stream.
yield from control_records
@classmethod
def _into_workunits(
cls,
stream: Iterable[
RecordEnvelope[
Union[
MetadataChangeEvent,
MetadataChangeProposal,
MetadataChangeProposalWrapper,
]
]
],
) -> Iterable[MetadataWorkUnit]:
for record in stream:
workunit_id: Optional[str] = record.metadata.get("workunit_id")
metadata = record.record
yield MetadataWorkUnit.from_metadata(metadata, id=workunit_id)
@classmethod
def _from_workunits(
cls, stream: Iterable[MetadataWorkUnit]
) -> Iterable[RecordEnvelope]:
for workunit in stream:
yield RecordEnvelope(
workunit.metadata,
{
"workunit_id": workunit.id,
},
)
@classmethod
def create(cls, config_dict: dict, ctx: PipelineContext) -> Transformer:
raise NotImplementedError(f"{cls.__name__} cannot be created from config")
# TODO: Revisit the handling of control records to ensure correct ordering relative to normal records.

Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import functools
from typing import Iterable

from datahub.emitter.mce_builder import get_sys_time
from datahub.ingestion.api.common import PipelineContext, RecordEnvelope
from datahub.ingestion.api.transform import Transformer
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.transformer.auto_helper_transformer import AutoHelperTransformer
from datahub.metadata.schema_classes import SystemMetadataClass


def auto_system_metadata(
ctx: PipelineContext,
stream: Iterable[MetadataWorkUnit],
) -> Iterable[MetadataWorkUnit]:
if not ctx.pipeline_config:
raise ValueError("Pipeline config is required for system metadata")
set_system_metadata = ctx.pipeline_config.flags.set_system_metadata
set_pipeline_name = ctx.pipeline_config.flags.set_system_metadata_pipeline_name

for workunit in stream:
if set_system_metadata:
workunit.metadata.systemMetadata = SystemMetadataClass(
lastObserved=get_sys_time(), runId=ctx.run_id
)
if set_pipeline_name:
workunit.metadata.systemMetadata.pipelineName = ctx.pipeline_name

yield workunit


class SystemMetadataTransformer(Transformer):
def __init__(self, ctx: PipelineContext):
self._inner_transfomer = AutoHelperTransformer(
functools.partial(auto_system_metadata, ctx)
)

def transform(
self, record_envelopes: Iterable[RecordEnvelope]
) -> Iterable[RecordEnvelope]:
yield from self._inner_transfomer.transform(record_envelopes)

@classmethod
def create(cls, config_dict: dict, ctx: PipelineContext) -> Transformer:
raise NotImplementedError(f"{cls.__name__} cannot be created from config")
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ services:
- AWS_REGION=us-east-1
ports:
- 8888:8888
- 8080:8080
- 28080:8080
- 10000:10000
- 10001:10001
rest:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
"customProperties": {
"owner": "root",
"created-at": "2024-05-22T14:09:15.234903700Z",
"created-at": "2024-06-27T17:29:32.492204247Z",
"write.format.default": "parquet",
"location": "s3a://warehouse/wh/nyc/another_taxis",
"format-version": "1",
"partition-spec": "[{\"name\": \"trip_date\", \"transform\": \"identity\", \"source\": \"trip_date\", \"source-id\": 2, \"source-type\": \"timestamptz\", \"field-id\": 1000}]",
"snapshot-id": "1706020810864905360",
"manifest-list": "s3a://warehouse/wh/nyc/another_taxis/metadata/snap-1706020810864905360-1-90ad8346-ac1b-4e73-bb30-dfd9b0b0e0dc.avro"
"snapshot-id": "1131595459662979239",
"manifest-list": "s3a://warehouse/wh/nyc/another_taxis/metadata/snap-1131595459662979239-1-0e80739b-774c-4eda-9d96-3a4c70873c32.avro"
},
"tags": []
}
Expand Down Expand Up @@ -150,7 +150,8 @@
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "iceberg-2020_04_14-07_00_00",
"lastRunId": "no-run-id-provided"
"lastRunId": "no-run-id-provided",
"pipelineName": "test_pipeline"
}
},
{
Expand All @@ -167,7 +168,8 @@
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "iceberg-2020_04_14-07_00_00",
"lastRunId": "no-run-id-provided"
"lastRunId": "no-run-id-provided",
"pipelineName": "test_pipeline"
}
},
{
Expand All @@ -183,7 +185,8 @@
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "iceberg-2020_04_14-07_00_00",
"lastRunId": "no-run-id-provided"
"lastRunId": "no-run-id-provided",
"pipelineName": "test_pipeline"
}
}
]
Loading
Loading