Skip to content

Commit

Permalink
Clean up logic for dataset.py yaml loader (#10089)
Browse files Browse the repository at this point in the history
  • Loading branch information
eboneil authored Mar 20, 2024
1 parent 5a3df32 commit 87169ba
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 168 deletions.
241 changes: 103 additions & 138 deletions metadata-ingestion/src/datahub/api/entities/dataset/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,7 @@
from pydantic import BaseModel, Field, validator
from ruamel.yaml import YAML

from datahub.api.entities.structuredproperties.structuredproperties import (
AllowedTypes,
StructuredProperties,
)
from datahub.api.entities.structuredproperties.structuredproperties import AllowedTypes
from datahub.configuration.common import ConfigModel
from datahub.emitter.mce_builder import (
make_data_platform_urn,
Expand All @@ -23,7 +20,7 @@
)
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.extractor.schema_util import avro_schema_to_mce_fields
from datahub.ingestion.graph.client import DataHubGraph, get_default_graph
from datahub.ingestion.graph.client import DataHubGraph
from datahub.metadata.schema_classes import (
AuditStampClass,
DatasetPropertiesClass,
Expand All @@ -45,7 +42,6 @@
)
from datahub.specific.dataset import DatasetPatchBuilder
from datahub.utilities.urns.dataset_urn import DatasetUrn
from datahub.utilities.urns.urn import Urn

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
Expand All @@ -66,8 +62,8 @@ class SchemaFieldSpecification(BaseModel):
created: Optional[dict] = None
lastModified: Optional[dict] = None
recursive: Optional[bool] = None
globalTags: Optional[dict] = None
glossaryTerms: Optional[dict] = None
globalTags: Optional[List[str]] = None
glossaryTerms: Optional[List[str]] = None
isPartOfKey: Optional[bool] = None
isPartitioningKey: Optional[bool] = None
jsonProps: Optional[dict] = None
Expand Down Expand Up @@ -161,7 +157,7 @@ class Dataset(BaseModel):
subtype: Optional[str]
subtypes: Optional[List[str]]
tags: Optional[List[str]] = None
glossaryTerms: Optional[List[str]] = None
glossary_terms: Optional[List[str]] = None
owners: Optional[List[Union[str, Ownership]]] = None
structured_properties: Optional[
Dict[str, Union[str, float, List[Union[str, float]]]]
Expand Down Expand Up @@ -267,6 +263,34 @@ def generate_mcp(
self.urn, field.id # type: ignore[arg-type]
)
assert field_urn.startswith("urn:li:schemaField:")

if field.globalTags:
mcp = MetadataChangeProposalWrapper(
entityUrn=field_urn,
aspect=GlobalTagsClass(
tags=[
TagAssociationClass(tag=make_tag_urn(tag))
for tag in field.globalTags
]
),
)
yield mcp

if field.glossaryTerms:
mcp = MetadataChangeProposalWrapper(
entityUrn=field_urn,
aspect=GlossaryTermsClass(
terms=[
GlossaryTermAssociationClass(
urn=make_term_urn(term)
)
for term in field.glossaryTerms
],
auditStamp=self._mint_auditstamp("yaml"),
),
)
yield mcp

if field.structured_properties:
mcp = MetadataChangeProposalWrapper(
entityUrn=field_urn,
Expand All @@ -284,138 +308,79 @@ def generate_mcp(
)
yield mcp

if self.subtype or self.subtypes:
mcp = MetadataChangeProposalWrapper(
entityUrn=self.urn,
aspect=SubTypesClass(
typeNames=[
s
for s in [self.subtype] + (self.subtypes or [])
if s
]
),
)
yield mcp

if self.tags:
mcp = MetadataChangeProposalWrapper(
entityUrn=self.urn,
aspect=GlobalTagsClass(
tags=[
TagAssociationClass(tag=make_tag_urn(tag))
for tag in self.tags
]
),
)
yield mcp

if self.glossaryTerms:
mcp = MetadataChangeProposalWrapper(
entityUrn=self.urn,
aspect=GlossaryTermsClass(
terms=[
GlossaryTermAssociationClass(
urn=make_term_urn(term)
)
for term in self.glossaryTerms
],
auditStamp=self._mint_auditstamp("yaml"),
),
)
yield mcp

if self.owners:
mcp = MetadataChangeProposalWrapper(
entityUrn=self.urn,
aspect=OwnershipClass(
owners=[self._mint_owner(o) for o in self.owners]
),
)
yield mcp

if self.structured_properties:
mcp = MetadataChangeProposalWrapper(
entityUrn=self.urn,
aspect=StructuredPropertiesClass(
properties=[
StructuredPropertyValueAssignmentClass(
propertyUrn=f"urn:li:structuredProperty:{prop_key}",
values=prop_value
if isinstance(prop_value, list)
else [prop_value],
)
for prop_key, prop_value in self.structured_properties.items()
]
),
if self.subtype or self.subtypes:
mcp = MetadataChangeProposalWrapper(
entityUrn=self.urn,
aspect=SubTypesClass(
typeNames=[s for s in [self.subtype] + (self.subtypes or []) if s]
),
)
yield mcp

if self.tags:
mcp = MetadataChangeProposalWrapper(
entityUrn=self.urn,
aspect=GlobalTagsClass(
tags=[
TagAssociationClass(tag=make_tag_urn(tag)) for tag in self.tags
]
),
)
yield mcp

if self.glossary_terms:
mcp = MetadataChangeProposalWrapper(
entityUrn=self.urn,
aspect=GlossaryTermsClass(
terms=[
GlossaryTermAssociationClass(urn=make_term_urn(term))
for term in self.glossary_terms
],
auditStamp=self._mint_auditstamp("yaml"),
),
)
yield mcp

if self.owners:
mcp = MetadataChangeProposalWrapper(
entityUrn=self.urn,
aspect=OwnershipClass(
owners=[self._mint_owner(o) for o in self.owners]
),
)
yield mcp

if self.structured_properties:
mcp = MetadataChangeProposalWrapper(
entityUrn=self.urn,
aspect=StructuredPropertiesClass(
properties=[
StructuredPropertyValueAssignmentClass(
propertyUrn=f"urn:li:structuredProperty:{prop_key}",
values=prop_value
if isinstance(prop_value, list)
else [prop_value],
)
yield mcp

if self.downstreams:
for downstream in self.downstreams:
patch_builder = DatasetPatchBuilder(downstream)
assert (
self.urn is not None
) # validator should have filled this in
patch_builder.add_upstream_lineage(
UpstreamClass(
dataset=self.urn,
type="COPY",
)
)
for patch_event in patch_builder.build():
yield patch_event

logger.info(f"Created dataset {self.urn}")

@staticmethod
def extract_structured_properties(
structured_properties: Dict[str, Union[str, float, List[str], List[float]]]
) -> List[Tuple[str, Union[str, float]]]:
structured_properties_flattened: List[Tuple[str, Union[str, float]]] = []
for key, value in structured_properties.items():
validated_structured_property = Dataset.validate_structured_property(
key, value
for prop_key, prop_value in self.structured_properties.items()
]
),
)
if validated_structured_property:
structured_properties_flattened.append(validated_structured_property)
structured_properties_flattened = sorted(
structured_properties_flattened, key=lambda x: x[0]
)
return structured_properties_flattened

@staticmethod
def validate_structured_property(
sp_name: str, sp_value: Union[str, float, List[str], List[float]]
) -> Union[Tuple[str, Union[str, float]], None]:
"""
Validate based on:
1. Structured property exists/has been created
2. Structured property value is of the expected type
"""
urn = Urn.make_structured_property_urn(sp_name)
with get_default_graph() as graph:
if graph.exists(urn):
validated_structured_property = StructuredProperties.from_datahub(
graph, urn
)
allowed_type = Urn.get_data_type_from_urn(
validated_structured_property.type
)
try:
if not isinstance(sp_value, list):
return Dataset.validate_type(sp_name, sp_value, allowed_type)
else:
for v in sp_value:
return Dataset.validate_type(sp_name, v, allowed_type)
except ValueError:
logger.warning(
f"Property: {sp_name}, value: {sp_value} should be a {allowed_type}."
yield mcp

if self.downstreams:
for downstream in self.downstreams:
patch_builder = DatasetPatchBuilder(downstream)
assert self.urn is not None # validator should have filled this in
patch_builder.add_upstream_lineage(
UpstreamClass(
dataset=self.urn,
type="COPY",
)
else:
logger.error(
f"Property {sp_name} does not exist and therefore will not be added to dataset. Please create property before trying again."
)
return None
for patch_event in patch_builder.build():
yield patch_event

logger.info(f"Created dataset {self.urn}")

@staticmethod
def validate_type(
Expand Down Expand Up @@ -543,7 +508,7 @@ def from_datahub(cls, graph: DataHubGraph, urn: str) -> "Dataset":
else None,
schema=Dataset._schema_from_schema_metadata(graph, urn),
tags=[tag.tag for tag in tags.tags] if tags else None,
glossaryTerms=[term.urn for term in glossary_terms.terms]
glossary_terms=[term.urn for term in glossary_terms.terms]
if glossary_terms
else None,
owners=yaml_owners,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,36 +405,6 @@ def test_dataset_yaml_loader(ingest_cleanup_data, graph):
] == ["2023-01-01"]


def test_dataset_structured_property_validation(ingest_cleanup_data, graph, caplog):
from datahub.api.entities.dataset.dataset import Dataset

property_name = f"replicationSLA{randint(10, 10000)}"
property_value = 30
value_type = "number"

create_property_definition(
property_name=property_name, graph=graph, value_type=value_type
)

attach_property_to_entity(
dataset_urns[0], property_name, [property_value], graph=graph
)

assert Dataset.validate_structured_property(
f"{default_namespace}.{property_name}", property_value
) == (
f"{default_namespace}.{property_name}",
float(property_value),
)

assert Dataset.validate_structured_property("testName", "testValue") is None

bad_property_value = "2023-09-20"
assert (
Dataset.validate_structured_property(property_name, bad_property_value) is None
)


def test_structured_property_search(ingest_cleanup_data, graph: DataHubGraph, caplog):
def to_es_name(property_name, namespace=default_namespace):
namespace_field = namespace.replace(".", "_")
Expand Down

0 comments on commit 87169ba

Please sign in to comment.