Skip to content

Commit

Permalink
fix(ingest): adding platform instance urn to data platform instance a…
Browse files Browse the repository at this point in the history
…spects (#4015)
  • Loading branch information
swaroopjagadish authored Jan 31, 2022
1 parent c27f1f9 commit e29d2cb
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from datahub.configuration.source_common import DatasetSourceConfigBase
from datahub.emitter.mce_builder import (
make_data_platform_urn,
make_dataplatform_instance_urn,
make_dataset_urn_with_platform_instance,
)
from datahub.emitter.mcp import MetadataChangeProposalWrapper
Expand Down Expand Up @@ -339,7 +340,9 @@ def _extract_mcps(self, index: str) -> Iterable[MetadataChangeProposalWrapper]:
aspectName="dataPlatformInstance",
aspect=DataPlatformInstanceClass(
platform=make_data_platform_urn(self.platform),
instance=self.source_config.platform_instance,
instance=make_dataplatform_instance_urn(
self.platform, self.source_config.platform_instance
),
),
changeType=ChangeTypeClass.UPSERT,
)
Expand Down
14 changes: 12 additions & 2 deletions metadata-ingestion/src/datahub/ingestion/source/kafka_connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -976,7 +976,12 @@ def construct_lineage_workunits(
changeType=models.ChangeTypeClass.UPSERT,
aspectName="dataPlatformInstance",
aspect=models.DataPlatformInstanceClass(
platform=builder.make_data_platform_urn(target_platform)
platform=builder.make_data_platform_urn(target_platform),
instance=builder.make_dataplatform_instance_urn(
target_platform, target_platform_instance
)
if target_platform_instance
else None,
),
)

Expand All @@ -995,7 +1000,12 @@ def construct_lineage_workunits(
changeType=models.ChangeTypeClass.UPSERT,
aspectName="dataPlatformInstance",
aspect=models.DataPlatformInstanceClass(
platform=builder.make_data_platform_urn(source_platform)
platform=builder.make_data_platform_urn(source_platform),
instance=builder.make_dataplatform_instance_urn(
source_platform, source_platform_instance
)
if source_platform_instance
else None,
),
)

Expand Down
58 changes: 29 additions & 29 deletions metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from datahub.configuration.common import AllowDenyPattern
from datahub.emitter.mce_builder import (
make_data_platform_urn,
make_dataplatform_instance_urn,
make_dataset_urn_with_platform_instance,
)
from datahub.emitter.mcp import MetadataChangeProposalWrapper
Expand Down Expand Up @@ -722,21 +723,32 @@ def loop_tables( # noqa: C901
self.report.report_workunit(wu)
yield wu

# If we are a platform instance based source, emit the instance aspect
if self.config.platform_instance:
mcp = MetadataChangeProposalWrapper(
entityType="dataset",
changeType=ChangeTypeClass.UPSERT,
entityUrn=dataset_urn,
aspectName="dataPlatformInstance",
aspect=DataPlatformInstanceClass(
platform=make_data_platform_urn(self.platform),
instance=self.config.platform_instance,
dpi_aspect = self.get_dataplatform_instance_aspect(dataset_urn=dataset_urn)
if dpi_aspect:
yield dpi_aspect

def get_dataplatform_instance_aspect(
self, dataset_urn: str
) -> Optional[SqlWorkUnit]:
# If we are a platform instance based source, emit the instance aspect
if self.config.platform_instance:
mcp = MetadataChangeProposalWrapper(
entityType="dataset",
changeType=ChangeTypeClass.UPSERT,
entityUrn=dataset_urn,
aspectName="dataPlatformInstance",
aspect=DataPlatformInstanceClass(
platform=make_data_platform_urn(self.platform),
instance=make_dataplatform_instance_urn(
self.platform, self.config.platform_instance
),
)
wu = SqlWorkUnit(id=f"{dataset_name}-dataPlatformInstance", mcp=mcp)
self.report.report_workunit(wu)
yield wu
),
)
wu = SqlWorkUnit(id=f"{dataset_urn}-dataPlatformInstance", mcp=mcp)
self.report.report_workunit(wu)
return wu
else:
return None

def get_schema_fields(
self, dataset_name: str, columns: List[dict], pk_constraints: dict = None
Expand Down Expand Up @@ -868,21 +880,9 @@ def loop_views(
self.report.report_workunit(wu)
yield wu

# If we are a platform instance based source, emit the instance aspect
if self.config.platform_instance:
mcp = MetadataChangeProposalWrapper(
entityType="dataset",
changeType=ChangeTypeClass.UPSERT,
entityUrn=dataset_urn,
aspectName="dataPlatformInstance",
aspect=DataPlatformInstanceClass(
platform=make_data_platform_urn(self.platform),
instance=self.config.platform_instance,
),
)
wu = SqlWorkUnit(id=f"{dataset_name}-dataPlatformInstance", mcp=mcp)
self.report.report_workunit(wu)
yield wu
dpi_aspect = self.get_dataplatform_instance_aspect(dataset_urn=dataset_urn)
if dpi_aspect:
yield dpi_aspect

def _get_profiler_instance(self, inspector: Inspector) -> "DatahubGEProfiler":
from datahub.ingestion.source.ge_data_profiler import DatahubGEProfiler
Expand Down

0 comments on commit e29d2cb

Please sign in to comment.