Skip to content

Commit

Permalink
fix(ingest/unity-catalog) upstream lineage for hive_metastore externa…
Browse files Browse the repository at this point in the history
…l table with s3 location (#10546)
  • Loading branch information
dushayntAW authored May 23, 2024
1 parent 94af249 commit 92780e6
Show file tree
Hide file tree
Showing 3 changed files with 743 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
create_dataset_props_patch_builder,
)
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.aws import s3_util
from datahub.ingestion.source.aws.s3_util import (
make_s3_urn_for_lineage,
strip_s3_prefix,
Expand Down Expand Up @@ -512,14 +513,16 @@ def process_table(self, table: Table, schema: Schema) -> Iterable[MetadataWorkUn
if table.view_definition:
self.view_definitions[dataset_urn] = (table.ref, table.view_definition)

# generate sibling and lineage aspects in case of EXTERNAL DELTA TABLE
if (
table_props.customProperties.get("table_type") == "EXTERNAL"
table_props.customProperties.get("table_type")
in {"EXTERNAL", "HIVE_EXTERNAL_TABLE"}
and table_props.customProperties.get("data_source_format") == "DELTA"
and self.config.emit_siblings
):
storage_location = str(table_props.customProperties.get("storage_location"))
if storage_location.startswith("s3://"):
if any(
storage_location.startswith(prefix) for prefix in s3_util.S3_PREFIXES
):
browse_path = strip_s3_prefix(storage_location)
source_dataset_urn = make_dataset_urn_with_platform_instance(
"delta-lake",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,35 @@ def mock_hive_sql(query):
"",
),
]
elif query == "DESCRIBE EXTENDED `bronze_kambi`.`external_metastore`":
return [
("betStatusId", "bigint", None),
("channelId", "bigint", None),
(
"combination",
"struct<combinationRef:bigint,currentOdds:double,eachWay:boolean,liveBetting:boolean,odds:double,outcomes:array<struct<betOfferTypeId:bigint,criterionId:bigint,criterionName:string,currentOdds:double,eventGroupId:bigint,eventGroupPath:array<struct<id:bigint,name:string>>,eventId:bigint,eventName:string,eventStartDate:string,live:boolean,odds:double,outcomeIds:array<bigint>,outcomeLabel:string,sportId:string,status:string,voidReason:string>>,payout:double,rewardExtraPayout:double,stake:double>",
None,
),
("", "", ""),
("# Detailed Table Information", "", ""),
("Catalog", "hive_metastore", ""),
("Database", "bronze_kambi", ""),
("Table", "external_metastore", ""),
("Created Time", "Wed Jun 22 05:14:56 UTC 2022", ""),
("Last Access", "UNKNOWN", ""),
("Created By", "Spark 3.2.1", ""),
("Statistics", "1024 bytes, 3 rows", ""),
("Type", "EXTERNAL", ""),
("Location", "s3://external_metastore/", ""),
("Provider", "delta", ""),
("Owner", "root", ""),
("Is_managed_location", "true", ""),
(
"Table Properties",
"[delta.autoOptimize.autoCompact=true,delta.autoOptimize.optimizeWrite=true,delta.minReaderVersion=1,delta.minWriterVersion=2]",
"",
),
]
elif query == "DESCRIBE EXTENDED `bronze_kambi`.`view1`":
return [
("betStatusId", "bigint", None),
Expand Down Expand Up @@ -384,6 +413,7 @@ def mock_hive_sql(query):
elif query == "SHOW TABLES FROM `bronze_kambi`":
return [
TableEntry("bronze_kambi", "bet", False),
TableEntry("bronze_kambi", "external_metastore", False),
TableEntry("bronze_kambi", "delta_error_table", False),
TableEntry("bronze_kambi", "view1", False),
]
Expand Down
Loading

0 comments on commit 92780e6

Please sign in to comment.