Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingestion/glue): delta schemas #10299

Merged
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
cd7f751
feat(ingestion/glue): delta schemas
sgomezvillamor Apr 16, 2024
ac351e9
lint fix
sgomezvillamor Apr 16, 2024
21b3c47
update delta golden files
sgomezvillamor Apr 17, 2024
7fe6f0f
Merge branch 'master' into feat-ingestion-glue-delta-schema
sgomezvillamor Apr 17, 2024
1562daa
Merge branch 'master' into feat-ingestion-glue-delta-schema
sgomezvillamor Apr 19, 2024
cecb5aa
Merge branch 'master' into feat-ingestion-glue-delta-schema
sgomezvillamor Apr 19, 2024
5f53955
Merge branch 'master' into feat-ingestion-glue-delta-schema
sgomezvillamor Apr 22, 2024
bd8a97b
Merge branch 'master' into feat-ingestion-glue-delta-schema
sgomezvillamor Apr 23, 2024
7774c34
Merge branch 'master' into feat-ingestion-glue-delta-schema
sgomezvillamor Apr 24, 2024
1a66f4d
Merge branch 'master' into feat-ingestion-glue-delta-schema
sgomezvillamor Apr 25, 2024
dbceda1
Merge branch 'master' into feat-ingestion-glue-delta-schema
sgomezvillamor Apr 26, 2024
d634b10
refactor: address PR review comments
sgomezvillamor May 10, 2024
8fc4f15
Merge branch 'master' into feat-ingestion-glue-delta-schema
sgomezvillamor May 10, 2024
ee9f466
Merge branch 'master' into feat-ingestion-glue-delta-schema
sgomezvillamor May 13, 2024
f602238
Merge branch 'master' into feat-ingestion-glue-delta-schema
sgomezvillamor May 15, 2024
310102e
Update metadata-ingestion/src/datahub/ingestion/source/aws/glue.py
sgomezvillamor May 16, 2024
da5c3b7
Update metadata-ingestion/src/datahub/ingestion/source/aws/glue.py
sgomezvillamor May 16, 2024
6bec426
Merge branch 'master' into feat-ingestion-glue-delta-schema
sgomezvillamor May 16, 2024
4bbadbd
fix lint
sgomezvillamor May 16, 2024
b8dfc25
Merge branch 'master' into feat-ingestion-glue-delta-schema
sgomezvillamor May 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 76 additions & 1 deletion metadata-ingestion/src/datahub/ingestion/source/aws/glue.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
import logging
from collections import defaultdict
from dataclasses import dataclass, field as dataclass_field
Expand Down Expand Up @@ -98,6 +99,7 @@
UpstreamClass,
UpstreamLineageClass,
)
from datahub.utilities.delta import delta_type_to_hive_type
from datahub.utilities.hive_schema_to_avro import get_schema_fields_for_hive_column

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -204,6 +206,8 @@ class GlueSourceReport(StaleEntityRemovalSourceReport):
num_job_script_failed_parsing: int = 0
num_job_without_nodes: int = 0
num_dataset_to_dataset_edges_in_job: int = 0
num_dataset_schema_invalid: int = 0
num_dataset_buggy_delta_schema: int = 0
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note for reviewers:
I added these two mainly for the testing. I'm ok to rename or remove even

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is nice. Let's please rename as

num_dataset_schema_invalid -> num_dataset_invalid_delta_schema
num_dataset_buggy_delta_schema -> num_dataset_valid_delta_schema

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed in d634b10


def report_table_scanned(self) -> None:
self.tables_scanned += 1
Expand Down Expand Up @@ -1148,9 +1152,35 @@ def get_s3_tags() -> Optional[GlobalTagsClass]:
return new_tags

def get_schema_metadata() -> Optional[SchemaMetadata]:
if not table.get("StorageDescriptor"):
def is_delta_schema(columns: Optional[List[Mapping[str, Any]]]) -> bool:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Defining functions within other functions is discouraged as per coding style. Can you please move this function outside ?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be refractored to accept both tableParameters and tableStorageDescriptor and then return boolean ? this function can subsume this check as well -> (provider == "delta") and (num_parts > 0)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Defining functions within other functions is discouraged as per coding style. Can you please move this function outside ?

While I do agree, I just followed the existing pattern in the code. Note get_owner, get_dataset_properties, get_s3_tags are all defined within _extract_record method.

So, are you suggesting to move is_delta_schema at _extract_record level too or at GlueSource level?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As just moved the is_delta_schema at _extra_record level, to keep it aligned with all others existing methods in the codebase.

return (
columns is not None
and (len(columns) == 1)
and (columns[0].get("Name", "") == "col")
and (columns[0].get("Type", "") == "array<string>")
)

# https://github.com/delta-io/delta/pull/2310
provider = table.get("Parameters", {}).get("spark.sql.sources.provider", "")
num_parts = int(
table.get("Parameters", {}).get(
"spark.sql.sources.schema.numParts", "0"
)
)
columns = table.get("StorageDescriptor", {}).get("Columns", [{}])

if (provider == "delta") and (num_parts > 0) and is_delta_schema(columns):
return _get_delta_schema_metadata()

elif table.get("StorageDescriptor"):
return _get_glue_schema_metadata()

else:
return None

def _get_glue_schema_metadata() -> Optional[SchemaMetadata]:
assert table.get("StorageDescriptor")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can remove this assert as this is already checked earlier.


schema = table["StorageDescriptor"]["Columns"]
fields: List[SchemaField] = []
for field in schema:
Expand Down Expand Up @@ -1183,6 +1213,51 @@ def get_schema_metadata() -> Optional[SchemaMetadata]:
platformSchema=MySqlDDL(tableSchema=""),
)

def _get_delta_schema_metadata() -> Optional[SchemaMetadata]:
assert (
table["Parameters"]["spark.sql.sources.provider"] == "delta"
and int(table["Parameters"]["spark.sql.sources.schema.numParts"]) > 0
)

try:
numParts = int(table["Parameters"]["spark.sql.sources.schema.numParts"])
schema_str = "".join(
[
table["Parameters"][f"spark.sql.sources.schema.part.{i}"]
for i in range(numParts)
]
)
schema_json = json.loads(schema_str)
fields: List[SchemaField] = []
for field in schema_json["fields"]:
field_type = delta_type_to_hive_type(field.get("type", "unknown"))
schema_fields = get_schema_fields_for_hive_column(
hive_column_name=field["name"],
hive_column_type=field_type,
description=field.get("description"),
default_nullable=bool(field.get("nullable", True)),
)
assert schema_fields
fields.extend(schema_fields)

self.report.num_dataset_buggy_delta_schema += 1
return SchemaMetadata(
schemaName=table_name,
version=0,
fields=fields,
platform=f"urn:li:dataPlatform:{self.platform}",
hash="",
platformSchema=MySqlDDL(tableSchema=""),
)

except Exception as e:
self.report_warning(
dataset_urn,
f"Could not parse schema for {table_name} because of {type(e).__name__}: {e}",
)
self.report.num_dataset_schema_invalid += 1
return None

def get_data_platform_instance() -> DataPlatformInstanceClass:
return DataPlatformInstanceClass(
platform=make_data_platform_urn(self.platform),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import logging
import os
import time
from typing import Any, Dict, Iterable, List
from typing import Dict, Iterable, List
from urllib.parse import urlparse

from deltalake import DeltaTable
Expand Down Expand Up @@ -51,6 +51,7 @@
SchemaFieldClass,
)
from datahub.telemetry import telemetry
from datahub.utilities.delta import delta_type_to_hive_type
from datahub.utilities.hive_schema_to_avro import get_schema_fields_for_hive_column

logging.getLogger("py4j").setLevel(logging.ERROR)
Expand Down Expand Up @@ -126,46 +127,12 @@ def create(cls, config_dict: dict, ctx: PipelineContext) -> "Source":
config = DeltaLakeSourceConfig.parse_obj(config_dict)
return cls(config, ctx)

def delta_type_to_hive_type(self, field_type: Any) -> str:
if isinstance(field_type, str):
"""
return the field type
"""
return field_type
else:
if field_type.get("type") == "array":
"""
if array is of complex type, recursively parse the
fields and create the native datatype
"""
return (
"array<"
+ self.delta_type_to_hive_type(field_type.get("elementType"))
+ ">"
)
elif field_type.get("type") == "struct":
parsed_struct = ""
for field in field_type.get("fields"):
"""
if field is of complex type, recursively parse
and create the native datatype
"""
parsed_struct += (
"{0}:{1}".format(
field.get("name"),
self.delta_type_to_hive_type(field.get("type")),
)
+ ","
)
return "struct<" + parsed_struct.rstrip(",") + ">"
return ""

def _parse_datatype(self, raw_field_json_str: str) -> List[SchemaFieldClass]:
raw_field_json = json.loads(raw_field_json_str)

# get the parent field name and type
field_name = raw_field_json.get("name")
field_type = self.delta_type_to_hive_type(raw_field_json.get("type"))
field_type = delta_type_to_hive_type(raw_field_json.get("type"))

return get_schema_fields_for_hive_column(field_name, field_type)

Expand Down
34 changes: 34 additions & 0 deletions metadata-ingestion/src/datahub/utilities/delta.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from typing import Any


def delta_type_to_hive_type(field_type: Any) -> str:
if isinstance(field_type, str):
"""
return the field type
"""
return field_type
else:
if field_type.get("type") == "array":
"""
if array is of complex type, recursively parse the
fields and create the native datatype
"""
return (
"array<" + delta_type_to_hive_type(field_type.get("elementType")) + ">"
)
elif field_type.get("type") == "struct":
parsed_struct = ""
for field in field_type.get("fields"):
"""
if field is of complex type, recursively parse
and create the native datatype
"""
parsed_struct += (
"{0}:{1}".format(
field.get("name"),
delta_type_to_hive_type(field.get("type")),
)
+ ","
)
return "struct<" + parsed_struct.rstrip(",") + ">"
return ""
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class HiveColumnToAvroConverter:
"bigint": "long",
"varchar": "string",
"char": "string",
"long": "long",
}

_COMPLEX_TYPE = re.compile("^(struct|map|array|uniontype)")
Expand Down
Loading
Loading