Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingestion/transformer): create tag if not exist #9076

Merged
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
46f40d1
add tag if not exist
siddiquebagwan-gslab Oct 23, 2023
50dbbfd
Merge branch 'master' into master+tag-transformer
siddiquebagwan-gslab Oct 23, 2023
2d27c4e
Merge branch 'master' into master+tag-transformer
siddiquebagwan-gslab Oct 25, 2023
605fb51
Merge branch 'master' into master+tag-transformer
siddiquebagwan-gslab Oct 31, 2023
b00f652
Merge branch 'master+tag-transformer' of github.com:siddiquebagwan-gs…
siddiquebagwan-gslab Oct 31, 2023
dc5a4f9
address review comments
siddiquebagwan-gslab Nov 1, 2023
b242961
Merge branch 'master' into master+tag-transformer
siddiquebagwan-gslab Nov 15, 2023
d128f89
address review comments
siddiquebagwan-gslab Nov 15, 2023
069bfb0
remove unused function
siddiquebagwan-gslab Nov 15, 2023
fa8ab34
return urn
siddiquebagwan-gslab Nov 15, 2023
0015219
Merge branch 'master' into master+tag-transformer
siddiquebagwan-gslab Dec 12, 2023
0aae160
review comments
siddiquebagwan-gslab Dec 12, 2023
94b135f
Merge branch 'master' into master+tag-transformer
siddiquebagwan-gslab Dec 13, 2023
b1c4345
lint fix
siddiquebagwan-gslab Dec 13, 2023
29afacb
Merge branch 'master+tag-transformer' of github.com:siddiquebagwan-gs…
siddiquebagwan-gslab Dec 13, 2023
465da36
Merge branch 'master' into master+tag-transformer
siddiquebagwan-gslab Dec 14, 2023
7f4a70e
review comments
siddiquebagwan-gslab Dec 14, 2023
f09195d
Merge branch 'master' into master+tag-transformer
siddiquebagwan-gslab Dec 14, 2023
31c14f2
python3.7 lint fix
siddiquebagwan-gslab Dec 14, 2023
d672758
Merge branch 'master+tag-transformer' of github.com:siddiquebagwan-gs…
siddiquebagwan-gslab Dec 14, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/graph/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -787,9 +787,11 @@ def get_aspect_counts(self, aspect: str, urn_like: Optional[str] = None) -> int:

def execute_graphql(self, query: str, variables: Optional[Dict] = None) -> Dict:
url = f"{self.config.server}/api/graphql"

body: Dict = {
"query": query,
}

if variables:
body["variables"] = variables

Expand Down Expand Up @@ -1065,6 +1067,28 @@ def parse_sql_lineage(
default_schema=default_schema,
)

def create_tag(self, tag_name: str) -> str:
graph_query: str = """
mutation($tag_detail: CreateTagInput!) {
createTag(input: $tag_detail)
}
"""

variables = {
"tag_detail": {
"name": tag_name,
"id": tag_name,
},
}

res = self.execute_graphql(
query=graph_query,
variables=variables,
)

# return urn
return res["createTag"]

def close(self) -> None:
self._make_schema_resolver.cache_clear()
super().close()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,24 @@
import logging
from typing import Callable, List, Optional, cast

import datahub.emitter.mce_builder as builder
from datahub.configuration.common import (
KeyValuePattern,
TransformerSemanticsConfigModel,
)
from datahub.configuration.import_resolver import pydantic_resolve_key
from datahub.emitter.mce_builder import Aspect
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.transformer.dataset_transformer import DatasetTagsTransformer
from datahub.metadata.schema_classes import GlobalTagsClass, TagAssociationClass
from datahub.metadata.schema_classes import (
GlobalTagsClass,
TagAssociationClass,
TagKeyClass,
)
from datahub.utilities.urns.tag_urn import TagUrn

logger = logging.getLogger(__name__)


class AddDatasetTagsConfig(TransformerSemanticsConfigModel):
Expand All @@ -22,11 +32,13 @@ class AddDatasetTags(DatasetTagsTransformer):

ctx: PipelineContext
config: AddDatasetTagsConfig
processed_tags: List[TagAssociationClass]

def __init__(self, config: AddDatasetTagsConfig, ctx: PipelineContext):
super().__init__()
self.ctx = ctx
self.config = config
self.processed_tags = []

@classmethod
def create(cls, config_dict: dict, ctx: PipelineContext) -> "AddDatasetTags":
Expand All @@ -45,11 +57,38 @@ def transform_aspect(
tags_to_add = self.config.get_tags_to_add(entity_urn)
if tags_to_add is not None:
out_global_tags_aspect.tags.extend(tags_to_add)
self.processed_tags.extend(
tags_to_add
) # Keep track of tags added so that we can create them in handle_end_of_stream

return self.get_result_semantics(
self.config, self.ctx.graph, entity_urn, out_global_tags_aspect
)

def handle_end_of_stream(self) -> List[MetadataChangeProposalWrapper]:

mcps: List[MetadataChangeProposalWrapper] = []

logger.debug("Generating tags")

for tag_association in self.processed_tags:
ids: List[str] = TagUrn.create_from_string(
tag_association.tag
).get_entity_id()

assert len(ids) == 1, "Invalid Tag Urn"

tag_name: str = ids[0]

mcps.append(
MetadataChangeProposalWrapper(
entityUrn=builder.make_tag_urn(tag=tag_name),
aspect=TagKeyClass(name=tag_name),
)
)

return mcps


class SimpleDatasetTagConfig(TransformerSemanticsConfigModel):
tag_urns: List[str]
Expand Down Expand Up @@ -82,6 +121,7 @@ class PatternAddDatasetTags(AddDatasetTags):
"""Transformer that adds a specified set of tags to each dataset."""

def __init__(self, config: PatternDatasetTagsConfig, ctx: PipelineContext):
config.tag_pattern.all
tag_pattern = config.tag_pattern
generic_config = AddDatasetTagsConfig(
get_tags_to_add=lambda _: [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,30 @@
log = logging.getLogger(__name__)


class LegacyMCETransformer(Transformer, metaclass=ABCMeta):
def _update_work_unit_id(
envelope: RecordEnvelope, urn: str, aspect_name: str
) -> dict[Any, Any]:
structured_urn = Urn.create_from_string(urn)
simple_name = "-".join(structured_urn.get_entity_id())
record_metadata = envelope.metadata.copy()
record_metadata.update({"workunit_id": f"txform-{simple_name}-{aspect_name}"})
return record_metadata


class HandleEndOfStreamTransformer:
def handle_end_of_stream(self) -> List[MetadataChangeProposalWrapper]:
return []


class LegacyMCETransformer(
Transformer, HandleEndOfStreamTransformer, metaclass=ABCMeta
):
@abstractmethod
def transform_one(self, mce: MetadataChangeEventClass) -> MetadataChangeEventClass:
pass


class SingleAspectTransformer(metaclass=ABCMeta):
class SingleAspectTransformer(HandleEndOfStreamTransformer, metaclass=ABCMeta):
@abstractmethod
def aspect_name(self) -> str:
"""Implement this method to specify a single aspect that the transformer is interested in subscribing to. No default provided."""
Expand Down Expand Up @@ -180,6 +197,32 @@ def _transform_or_record_mcpw(
self._record_mcp(envelope.record)
return envelope if envelope.record.aspect is not None else None

def _handle_end_of_stream(
self, envelope: RecordEnvelope
) -> Iterable[RecordEnvelope]:

if not isinstance(self, SingleAspectTransformer) and not isinstance(
self, LegacyMCETransformer
):
return

mcps: List[MetadataChangeProposalWrapper] = self.handle_end_of_stream()

for mcp in mcps:
if mcp.aspect is None or mcp.entityUrn is None: # to silent the lint error
continue

record_metadata = _update_work_unit_id(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this _update_work_unit_id thing is kinda strange - don't think we need it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

envelope=envelope,
aspect_name=mcp.aspect.get_aspect_name(), # type: ignore
urn=mcp.entityUrn,
)

yield RecordEnvelope(
record=mcp,
metadata=record_metadata,
)

def transform(
self, record_envelopes: Iterable[RecordEnvelope]
) -> Iterable[RecordEnvelope]:
Expand Down Expand Up @@ -216,26 +259,32 @@ def transform(
else None,
)
if transformed_aspect:
# for end of stream records, we modify the workunit-id
structured_urn = Urn.create_from_string(urn)
simple_name = "-".join(structured_urn.get_entity_id())
record_metadata = envelope.metadata.copy()
record_metadata.update(
{
"workunit_id": f"txform-{simple_name}-{self.aspect_name()}"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually I was wrong in my earlier comment - I think this code is still required

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:) updated the code

}
)
yield RecordEnvelope(
record=MetadataChangeProposalWrapper(

mcp: MetadataChangeProposalWrapper = (
MetadataChangeProposalWrapper(
entityUrn=urn,
entityType=structured_urn.get_type(),
systemMetadata=last_seen_mcp.systemMetadata
if last_seen_mcp
else last_seen_mce_system_metadata,
aspectName=self.aspect_name(),
aspect=transformed_aspect,
),
)
)

record_metadata = _update_work_unit_id(
envelope=envelope,
aspect_name=mcp.aspect.get_aspect_name(), # type: ignore
urn=mcp.entityUrn,
)

yield RecordEnvelope(
record=mcp,
metadata=record_metadata,
)

self._mark_processed(urn)
yield from self._handle_end_of_stream(envelope=envelope)

yield envelope
23 changes: 19 additions & 4 deletions metadata-ingestion/tests/unit/test_transform_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -775,14 +775,19 @@ def test_simple_dataset_tags_transformation(mock_time):
]
)
)
assert len(outputs) == 3

assert len(outputs) == 5
siddiquebagwan-gslab marked this conversation as resolved.
Show resolved Hide resolved

# Check that tags were added.
tags_aspect = outputs[1].record.aspect
assert tags_aspect
assert len(tags_aspect.tags) == 2
assert tags_aspect.tags[0].tag == builder.make_tag_urn("NeedsDocumentation")

# Check new tag entity should be there
assert outputs[2].record.entityUrn == builder.make_tag_urn("NeedsDocumentation")
assert outputs[3].record.entityUrn == builder.make_tag_urn("Legacy")
hsheth2 marked this conversation as resolved.
Show resolved Hide resolved
siddiquebagwan-gslab marked this conversation as resolved.
Show resolved Hide resolved


def dummy_tag_resolver_method(dataset_snapshot):
return []
Expand Down Expand Up @@ -815,7 +820,7 @@ def test_pattern_dataset_tags_transformation(mock_time):
)
)

assert len(outputs) == 3
assert len(outputs) == 5
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should also be checking the actual contents of the tag creations here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

tags_aspect = outputs[1].record.aspect
assert tags_aspect
assert len(tags_aspect.tags) == 2
Expand Down Expand Up @@ -1324,7 +1329,7 @@ def test_mcp_add_tags_missing(mock_time):
]
input_stream.append(RecordEnvelope(record=EndOfStream(), metadata={}))
outputs = list(transformer.transform(input_stream))
assert len(outputs) == 3
assert len(outputs) == 5
assert outputs[0].record == dataset_mcp
# Check that tags were added, this will be the second result
tags_aspect = outputs[1].record.aspect
Expand Down Expand Up @@ -1356,13 +1361,23 @@ def test_mcp_add_tags_existing(mock_time):
]
input_stream.append(RecordEnvelope(record=EndOfStream(), metadata={}))
outputs = list(transformer.transform(input_stream))
assert len(outputs) == 2

assert len(outputs) == 4

# Check that tags were added, this will be the second result
tags_aspect = outputs[0].record.aspect
assert tags_aspect
assert len(tags_aspect.tags) == 3
assert tags_aspect.tags[0].tag == builder.make_tag_urn("Test")
assert tags_aspect.tags[1].tag == builder.make_tag_urn("NeedsDocumentation")
assert tags_aspect.tags[2].tag == builder.make_tag_urn("Legacy")

# Check tag entities got added
assert outputs[1].record.entityType == "tag"
assert outputs[1].record.entityUrn == builder.make_tag_urn("NeedsDocumentation")
assert outputs[2].record.entityType == "tag"
assert outputs[2].record.entityUrn == builder.make_tag_urn("Legacy")

assert isinstance(outputs[-1].record, EndOfStream)


Expand Down
Loading