diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/config.py b/metadata-ingestion/src/datahub/ingestion/source/unity/config.py index 137af44df48dce..c54cf22be1d133 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/config.py @@ -260,7 +260,7 @@ class UnityCatalogSourceConfig( emit_siblings: bool = pydantic.Field( default=True, - description="Flag to emit sibling.", + description="Whether to emit siblings relation with corresponding delta-lake platform's table. If enabled, this will also ingest the corresponding delta-lake table.", ) delta_lake_options: DeltaLakeDetails = Field( diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/source.py b/metadata-ingestion/src/datahub/ingestion/source/unity/source.py index 6dcbd343df6630..143d8dd0e29490 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/source.py @@ -204,16 +204,6 @@ def __init__(self, ctx: PipelineContext, config: UnityCatalogSourceConfig): # Global map of tables, for profiling self.tables: FileBackedDict[Table] = FileBackedDict() - self.emit_siblings = ( - self.config.emit_siblings if self.config.emit_siblings else False - ) - - if self.config.delta_lake_options: - self.delta_lake_platform_instance_name = ( - self.config.delta_lake_options.platform_instance_name - ) - self.delta_lake_env = self.config.delta_lake_options.env - def init_hive_metastore_proxy(self): self.hive_metastore_proxy: Optional[HiveMetastoreProxy] = None if self.config.include_hive_metastore: @@ -1000,7 +990,7 @@ def gen_siblings_workunit( yield MetadataChangeProposalWrapper( entityUrn=source_dataset_urn, aspect=Siblings(primary=True, siblings=[dataset_urn]), - ).as_workunit() + ).as_workunit(is_primary_source=False) def gen_lineage_workunit( self, @@ -1017,4 +1007,4 @@ def gen_lineage_workunit( Upstream(dataset=source_dataset_urn, type=DatasetLineageType.VIEW) ] ), - ).as_workunit(is_primary_source=False) + ).as_workunit() diff --git a/metadata-ingestion/tests/integration/unity/unity_catalog_mces_golden.json b/metadata-ingestion/tests/integration/unity/unity_catalog_mces_golden.json index e39f25e54bb809..3b8b876c0aca0e 100644 --- a/metadata-ingestion/tests/integration/unity/unity_catalog_mces_golden.json +++ b/metadata-ingestion/tests/integration/unity/unity_catalog_mces_golden.json @@ -5440,6 +5440,22 @@ "lastRunId": "no-run-id-provided" } }, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:databricks,main.default.quickstart_table_external,PROD)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1638860400000, + "runId": "unity-catalog-test", + "lastRunId": "no-run-id-provided" + } +}, { "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:databricks,system.quickstart_schema.quickstart_table_external,PROD)", @@ -5522,6 +5538,22 @@ "lastRunId": "no-run-id-provided" } }, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:databricks,main.quickstart_schema.quickstart_table_external,PROD)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1638860400000, + "runId": "unity-catalog-test", + "lastRunId": "no-run-id-provided" + } +}, { "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:databricks,main.default.quickstart_table_external,PROD)", @@ -5570,6 +5602,22 @@ "lastRunId": "no-run-id-provided" } }, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:databricks,quickstart_catalog.default.quickstart_table_external,PROD)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1638860400000, + "runId": "unity-catalog-test", + "lastRunId": "no-run-id-provided" + } +}, { "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:databricks,system.default.quickstart_table_external,PROD)", @@ -5596,7 +5644,39 @@ }, { "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:delta-lake,db-02eec1f70bfe4115445be9fdb1aac6ac-s3-root-bucket/metastore/2c983545-d403-4f87-9063-5b7e3b6d3736/tables/cff27aa1-1c6a-4d78-b713-562c660c2896,PROD)", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:databricks,system.quickstart_schema.quickstart_table_external,PROD)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1638860400000, + "runId": "unity-catalog-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:databricks,quickstart_catalog.quickstart_schema.quickstart_table_external,PROD)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1638860400000, + "runId": "unity-catalog-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:databricks,system.default.quickstart_table_external,PROD)", "changeType": "UPSERT", "aspectName": "status", "aspect": {