Skip to content

Commit

Permalink
fix(ingestion/unity): updated code to make delta-lake primary sibling
Browse files Browse the repository at this point in the history
careted the config to take the values realted the delta-lake emit
  • Loading branch information
dushayntAW committed Mar 7, 2024
1 parent 25f281e commit d9c9c29
Show file tree
Hide file tree
Showing 5 changed files with 181 additions and 120 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ source:
deny:
- ".*\\.unwanted_schema"

# emit_siblings: true
# delta_lake_options:
# platform_instance_name: 'None'
# env: 'Prod'

# profiling:
# method: "analyze"
# enabled: true
Expand Down
15 changes: 15 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/source/unity/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ class UnityCatalogProfilerConfig(ConfigModel):
)


class DeltaLakeDetails(ConfigModel):
platform_instance_name: Optional[str] = Field(default=None, description="")
env: Optional[str] = Field(default="Prod", description="")


class UnityCatalogAnalyzeProfilerConfig(UnityCatalogProfilerConfig):
method: Literal["analyze"] = "analyze"

Expand Down Expand Up @@ -253,6 +258,16 @@ class UnityCatalogSourceConfig(
discriminator="method",
)

emit_siblings: bool = pydantic.Field(
default=True,
description="Flag to emit sibling.",
)

delta_lake_options: DeltaLakeDetails = Field(
default=DeltaLakeDetails(),
description="Details about the delta lake, incase to emit siblings",
)

scheme: str = DATABRICKS

def get_sql_alchemy_url(self, database: Optional[str] = None) -> str:
Expand Down
21 changes: 18 additions & 3 deletions metadata-ingestion/src/datahub/ingestion/source/unity/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,16 @@ def __init__(self, ctx: PipelineContext, config: UnityCatalogSourceConfig):
# Global map of tables, for profiling
self.tables: FileBackedDict[Table] = FileBackedDict()

self.emit_siblings = (
self.config.emit_siblings if self.config.emit_siblings else False
)

if self.config.delta_lake_options:
self.delta_lake_platform_instance_name = (
self.config.delta_lake_options.platform_instance_name
)
self.delta_lake_env = self.config.delta_lake_options.env

def init_hive_metastore_proxy(self):
self.hive_metastore_proxy: Optional[HiveMetastoreProxy] = None
if self.config.include_hive_metastore:
Expand Down Expand Up @@ -498,20 +508,25 @@ def process_table(self, table: Table, schema: Schema) -> Iterable[MetadataWorkUn
if table.view_definition:
self.view_definitions[dataset_urn] = (table.ref, table.view_definition)

print(f"TABLE TYPE = {table_props.customProperties.get('table_type')}")
print(f"FORMAT = {table_props.customProperties.get('data_source_format')}")
print(f"EMIT SIBLINGS = {self.emit_siblings}")
# generate sibling and lineage aspects in case of EXTERNAL DELTA TABLE
if (
table_props.customProperties.get("table_type") == "EXTERNAL"
and table_props.customProperties.get("data_source_format") == "DELTA"
and self.emit_siblings
):
storage_location = str(table_props.customProperties.get("storage_location"))
if storage_location.startswith("s3://"):
browse_path = strip_s3_prefix(storage_location)
source_dataset_urn = make_dataset_urn_with_platform_instance(
self.platform,
"delta-lake",
browse_path,
self.platform_instance_name,
self.config.env,
)

yield from self.gen_siblings_workunit(dataset_urn, source_dataset_urn)
yield from self.gen_lineage_workunit(dataset_urn, source_dataset_urn)

Expand Down Expand Up @@ -982,12 +997,12 @@ def gen_siblings_workunit(
"""
yield MetadataChangeProposalWrapper(
entityUrn=dataset_urn,
aspect=Siblings(primary=False, siblings=[source_dataset_urn]),
aspect=Siblings(primary=True, siblings=[source_dataset_urn]),
).as_workunit()

yield MetadataChangeProposalWrapper(
entityUrn=source_dataset_urn,
aspect=Siblings(primary=True, siblings=[dataset_urn]),
aspect=Siblings(primary=False, siblings=[dataset_urn]),
).as_workunit()

def gen_lineage_workunit(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,11 @@ def test_ingestion(pytestconfig, tmp_path, requests_mock):
"include_ownership": True,
"include_hive_metastore": True,
"warehouse_id": "test",
"emit_siblings": True,
"delta_lake_options": {
"platform_instance_name": "None",
"env": "Prod",
},
"profiling": {
"enabled": True,
"method": "analyze",
Expand Down
Loading

0 comments on commit d9c9c29

Please sign in to comment.