-
Notifications
You must be signed in to change notification settings - Fork 3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(ingestion/transformer): create tag if not exist #9076
Changes from all commits
46f40d1
50dbbfd
2d27c4e
605fb51
b00f652
dc5a4f9
b242961
d128f89
069bfb0
fa8ab34
0015219
0aae160
94b135f
b1c4345
29afacb
465da36
7f4a70e
f09195d
31c14f2
d672758
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,13 +17,30 @@ | |
log = logging.getLogger(__name__) | ||
|
||
|
||
class LegacyMCETransformer(Transformer, metaclass=ABCMeta): | ||
def _update_work_unit_id( | ||
envelope: RecordEnvelope, urn: str, aspect_name: str | ||
) -> Dict[Any, Any]: | ||
structured_urn = Urn.create_from_string(urn) | ||
simple_name = "-".join(structured_urn.get_entity_id()) | ||
record_metadata = envelope.metadata.copy() | ||
record_metadata.update({"workunit_id": f"txform-{simple_name}-{aspect_name}"}) | ||
return record_metadata | ||
|
||
|
||
class HandleEndOfStreamTransformer: | ||
def handle_end_of_stream(self) -> List[MetadataChangeProposalWrapper]: | ||
return [] | ||
|
||
|
||
class LegacyMCETransformer( | ||
Transformer, HandleEndOfStreamTransformer, metaclass=ABCMeta | ||
): | ||
@abstractmethod | ||
def transform_one(self, mce: MetadataChangeEventClass) -> MetadataChangeEventClass: | ||
pass | ||
|
||
|
||
class SingleAspectTransformer(metaclass=ABCMeta): | ||
class SingleAspectTransformer(HandleEndOfStreamTransformer, metaclass=ABCMeta): | ||
@abstractmethod | ||
def aspect_name(self) -> str: | ||
"""Implement this method to specify a single aspect that the transformer is interested in subscribing to. No default provided.""" | ||
|
@@ -180,6 +197,32 @@ def _transform_or_record_mcpw( | |
self._record_mcp(envelope.record) | ||
return envelope if envelope.record.aspect is not None else None | ||
|
||
def _handle_end_of_stream( | ||
self, envelope: RecordEnvelope | ||
) -> Iterable[RecordEnvelope]: | ||
|
||
if not isinstance(self, SingleAspectTransformer) and not isinstance( | ||
self, LegacyMCETransformer | ||
): | ||
return | ||
|
||
mcps: List[MetadataChangeProposalWrapper] = self.handle_end_of_stream() | ||
|
||
for mcp in mcps: | ||
if mcp.aspect is None or mcp.entityUrn is None: # to silent the lint error | ||
continue | ||
|
||
record_metadata = _update_work_unit_id( | ||
envelope=envelope, | ||
aspect_name=mcp.aspect.get_aspect_name(), # type: ignore | ||
urn=mcp.entityUrn, | ||
) | ||
|
||
yield RecordEnvelope( | ||
record=mcp, | ||
metadata=record_metadata, | ||
) | ||
|
||
def transform( | ||
self, record_envelopes: Iterable[RecordEnvelope] | ||
) -> Iterable[RecordEnvelope]: | ||
|
@@ -216,26 +259,32 @@ def transform( | |
else None, | ||
) | ||
if transformed_aspect: | ||
# for end of stream records, we modify the workunit-id | ||
structured_urn = Urn.create_from_string(urn) | ||
simple_name = "-".join(structured_urn.get_entity_id()) | ||
record_metadata = envelope.metadata.copy() | ||
record_metadata.update( | ||
{ | ||
"workunit_id": f"txform-{simple_name}-{self.aspect_name()}" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. actually I was wrong in my earlier comment - I think this code is still required There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. :) updated the code |
||
} | ||
) | ||
yield RecordEnvelope( | ||
record=MetadataChangeProposalWrapper( | ||
|
||
mcp: MetadataChangeProposalWrapper = ( | ||
MetadataChangeProposalWrapper( | ||
entityUrn=urn, | ||
entityType=structured_urn.get_type(), | ||
systemMetadata=last_seen_mcp.systemMetadata | ||
if last_seen_mcp | ||
else last_seen_mce_system_metadata, | ||
aspectName=self.aspect_name(), | ||
aspect=transformed_aspect, | ||
), | ||
) | ||
) | ||
|
||
record_metadata = _update_work_unit_id( | ||
envelope=envelope, | ||
aspect_name=mcp.aspect.get_aspect_name(), # type: ignore | ||
urn=mcp.entityUrn, | ||
) | ||
|
||
yield RecordEnvelope( | ||
record=mcp, | ||
metadata=record_metadata, | ||
) | ||
|
||
self._mark_processed(urn) | ||
yield from self._handle_end_of_stream(envelope=envelope) | ||
|
||
yield envelope |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -813,13 +813,25 @@ def test_simple_dataset_tags_transformation(mock_time): | |
] | ||
) | ||
) | ||
assert len(outputs) == 3 | ||
|
||
assert len(outputs) == 5 | ||
siddiquebagwan-gslab marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
# Check that tags were added. | ||
tags_aspect = outputs[1].record.aspect | ||
assert tags_aspect.tags[0].tag == builder.make_tag_urn("NeedsDocumentation") | ||
assert tags_aspect | ||
assert len(tags_aspect.tags) == 2 | ||
assert tags_aspect.tags[0].tag == builder.make_tag_urn("NeedsDocumentation") | ||
|
||
# Check new tag entity should be there | ||
assert outputs[2].record.aspectName == "tagKey" | ||
assert outputs[2].record.aspect.name == "NeedsDocumentation" | ||
assert outputs[2].record.entityUrn == builder.make_tag_urn("NeedsDocumentation") | ||
|
||
assert outputs[3].record.aspectName == "tagKey" | ||
assert outputs[3].record.aspect.name == "Legacy" | ||
assert outputs[3].record.entityUrn == builder.make_tag_urn("Legacy") | ||
hsheth2 marked this conversation as resolved.
Show resolved
Hide resolved
siddiquebagwan-gslab marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
assert isinstance(outputs[4].record, EndOfStream) | ||
|
||
|
||
def dummy_tag_resolver_method(dataset_snapshot): | ||
|
@@ -853,7 +865,7 @@ def test_pattern_dataset_tags_transformation(mock_time): | |
) | ||
) | ||
|
||
assert len(outputs) == 3 | ||
assert len(outputs) == 5 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we should also be checking the actual contents of the tag creations here There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
tags_aspect = outputs[1].record.aspect | ||
assert tags_aspect | ||
assert len(tags_aspect.tags) == 2 | ||
|
@@ -1363,7 +1375,7 @@ def test_mcp_add_tags_missing(mock_time): | |
] | ||
input_stream.append(RecordEnvelope(record=EndOfStream(), metadata={})) | ||
outputs = list(transformer.transform(input_stream)) | ||
assert len(outputs) == 3 | ||
assert len(outputs) == 5 | ||
assert outputs[0].record == dataset_mcp | ||
# Check that tags were added, this will be the second result | ||
tags_aspect = outputs[1].record.aspect | ||
|
@@ -1395,13 +1407,23 @@ def test_mcp_add_tags_existing(mock_time): | |
] | ||
input_stream.append(RecordEnvelope(record=EndOfStream(), metadata={})) | ||
outputs = list(transformer.transform(input_stream)) | ||
assert len(outputs) == 2 | ||
|
||
assert len(outputs) == 4 | ||
|
||
# Check that tags were added, this will be the second result | ||
tags_aspect = outputs[0].record.aspect | ||
assert tags_aspect | ||
assert len(tags_aspect.tags) == 3 | ||
assert tags_aspect.tags[0].tag == builder.make_tag_urn("Test") | ||
assert tags_aspect.tags[1].tag == builder.make_tag_urn("NeedsDocumentation") | ||
assert tags_aspect.tags[2].tag == builder.make_tag_urn("Legacy") | ||
|
||
# Check tag entities got added | ||
assert outputs[1].record.entityType == "tag" | ||
assert outputs[1].record.entityUrn == builder.make_tag_urn("NeedsDocumentation") | ||
assert outputs[2].record.entityType == "tag" | ||
assert outputs[2].record.entityUrn == builder.make_tag_urn("Legacy") | ||
|
||
assert isinstance(outputs[-1].record, EndOfStream) | ||
|
||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this _update_work_unit_id thing is kinda strange - don't think we need it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done