-
Notifications
You must be signed in to change notification settings - Fork 3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(ingestion/transformer): create tag if not exist #9076
feat(ingestion/transformer): create tag if not exist #9076
Conversation
metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_tags.py
Outdated
Show resolved
Hide resolved
…lab/datahub-fork into master+tag-transformer
) | ||
|
||
for mcp in mcps: | ||
# I am not sure if we need to update the work_unit_id. @harshal please confirm it |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no, we don't need to update the work_unit_id
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -815,7 +815,7 @@ def test_pattern_dataset_tags_transformation(mock_time): | |||
) | |||
) | |||
|
|||
assert len(outputs) == 3 | |||
assert len(outputs) == 5 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should also be checking the actual contents of the tag creations here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -237,5 +277,10 @@ def transform( | |||
), | |||
metadata=record_metadata, | |||
) | |||
yield from self._handle_end_of_stream( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the indentation looks incorrect here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
if mcp.aspect is None or mcp.entityUrn is None: # to silent the lint error | ||
continue | ||
|
||
record_metadata = _update_work_unit_id( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this _update_work_unit_id thing is kinda strange - don't think we need it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
self, envelope: RecordEnvelope, entity_urn: str | ||
) -> Iterable[RecordEnvelope]: | ||
|
||
if not isinstance(self, SingleAspectTransformer): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's make this work for both types of transformers
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -1065,6 +1067,22 @@ def parse_sql_lineage( | |||
default_schema=default_schema, | |||
) | |||
|
|||
def create_tag(self, tag_name: str) -> Dict[Any, Any]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should return just the urn, and not a dict
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
class LegacyMCETransformer(Transformer, metaclass=ABCMeta): | ||
class HandleEndOfStreamTransformer: | ||
def handle_end_of_stream( | ||
self, entity_urn: str |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this method should not take in an entity_urn
. It should only be called once after all records have been processed, not once per urn
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
record_metadata = envelope.metadata.copy() | ||
record_metadata.update( | ||
{ | ||
"workunit_id": f"txform-{simple_name}-{self.aspect_name()}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually I was wrong in my earlier comment - I think this code is still required
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
:) updated the code
) | ||
yield from self._handle_end_of_stream( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be after the self._mark_processed(urn)
call and two level of indentation less
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
…lab/datahub-fork into master+tag-transformer
…lab/datahub-fork into master+tag-transformer
For the dataset create the tag if not present