From 46f40d1667dca920937505b43fdbf0d625c1cee4 Mon Sep 17 00:00:00 2001 From: siddiquebagwan-gslab Date: Mon, 23 Oct 2023 22:56:13 +0530 Subject: [PATCH 1/9] add tag if not exist --- .../src/datahub/ingestion/graph/client.py | 18 +++++++++++++ .../ingestion/transformer/add_dataset_tags.py | 27 +++++++++++++++++++ 2 files changed, 45 insertions(+) diff --git a/metadata-ingestion/src/datahub/ingestion/graph/client.py b/metadata-ingestion/src/datahub/ingestion/graph/client.py index ccff677c3a471..93b9494af8a71 100644 --- a/metadata-ingestion/src/datahub/ingestion/graph/client.py +++ b/metadata-ingestion/src/datahub/ingestion/graph/client.py @@ -787,9 +787,11 @@ def get_aspect_counts(self, aspect: str, urn_like: Optional[str] = None) -> int: def execute_graphql(self, query: str, variables: Optional[Dict] = None) -> Dict: url = f"{self.config.server}/api/graphql" + body: Dict = { "query": query, } + if variables: body["variables"] = variables @@ -1065,6 +1067,22 @@ def parse_sql_lineage( default_schema=default_schema, ) + def create_tag(self, tag_name: str) -> Dict[Any, Any]: + graph_query: str = """ + mutation($tag_detail: CreateTagInput!) { + createTag(input: $tag_detail) + } + """ + + variables = { + "tag_detail": {"name": tag_name}, + } + + return self.execute_graphql( + query=graph_query, + variables=variables, + ) + def close(self) -> None: self._make_schema_resolver.cache_clear() super().close() diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_tags.py b/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_tags.py index 5a276ad899c48..958faedc82831 100644 --- a/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_tags.py +++ b/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_tags.py @@ -1,3 +1,4 @@ +import logging from typing import Callable, List, Optional, cast from datahub.configuration.common import ( @@ -9,6 +10,9 @@ from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.transformer.dataset_transformer import DatasetTagsTransformer from datahub.metadata.schema_classes import GlobalTagsClass, TagAssociationClass +from datahub.utilities.urns.tag_urn import TagUrn + +logger = logging.getLogger(__name__) class AddDatasetTagsConfig(TransformerSemanticsConfigModel): @@ -33,6 +37,26 @@ def create(cls, config_dict: dict, ctx: PipelineContext) -> "AddDatasetTags": config = AddDatasetTagsConfig.parse_obj(config_dict) return cls(config, ctx) + def create_tag_if_not_exist( + self, tag_associations: List[TagAssociationClass] + ) -> None: + if self.ctx.graph is None: + logger.debug("graph instance is None. Skip tag creation") + return # graph instance in not available + + for tag_association in tag_associations: + if self.ctx.graph.exists(tag_association.tag): + continue + + ids: List[str] = TagUrn.create_from_string( + tag_association.tag + ).get_entity_id() + + assert len(ids) == 1, "Invalid Tag Urn" + + response: dict = self.ctx.graph.create_tag(tag_name=ids[0]) + logger.debug(f"Tag creation response: {response}") + def transform_aspect( self, entity_urn: str, aspect_name: str, aspect: Optional[Aspect] ) -> Optional[Aspect]: @@ -45,6 +69,9 @@ def transform_aspect( tags_to_add = self.config.get_tags_to_add(entity_urn) if tags_to_add is not None: out_global_tags_aspect.tags.extend(tags_to_add) + self.create_tag_if_not_exist( + tag_associations=out_global_tags_aspect.tags, + ) return self.get_result_semantics( self.config, self.ctx.graph, entity_urn, out_global_tags_aspect From dc5a4f9c25f41f34ca98718ecb8023948648f615 Mon Sep 17 00:00:00 2001 From: siddiquebagwan-gslab Date: Wed, 1 Nov 2023 10:07:43 +0530 Subject: [PATCH 2/9] address review comments --- .../ingestion/transformer/add_dataset_tags.py | 61 +++++++++++-------- .../ingestion/transformer/base_transformer.py | 59 +++++++++++++++--- .../tests/unit/test_transform_dataset.py | 6 +- 3 files changed, 92 insertions(+), 34 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_tags.py b/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_tags.py index 958faedc82831..dd1c6d5d2c086 100644 --- a/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_tags.py +++ b/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_tags.py @@ -1,15 +1,21 @@ import logging from typing import Callable, List, Optional, cast +import datahub.emitter.mce_builder as builder from datahub.configuration.common import ( KeyValuePattern, TransformerSemanticsConfigModel, ) from datahub.configuration.import_resolver import pydantic_resolve_key from datahub.emitter.mce_builder import Aspect +from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.transformer.dataset_transformer import DatasetTagsTransformer -from datahub.metadata.schema_classes import GlobalTagsClass, TagAssociationClass +from datahub.metadata.schema_classes import ( + GlobalTagsClass, + TagAssociationClass, + TagKeyClass, +) from datahub.utilities.urns.tag_urn import TagUrn logger = logging.getLogger(__name__) @@ -37,26 +43,6 @@ def create(cls, config_dict: dict, ctx: PipelineContext) -> "AddDatasetTags": config = AddDatasetTagsConfig.parse_obj(config_dict) return cls(config, ctx) - def create_tag_if_not_exist( - self, tag_associations: List[TagAssociationClass] - ) -> None: - if self.ctx.graph is None: - logger.debug("graph instance is None. Skip tag creation") - return # graph instance in not available - - for tag_association in tag_associations: - if self.ctx.graph.exists(tag_association.tag): - continue - - ids: List[str] = TagUrn.create_from_string( - tag_association.tag - ).get_entity_id() - - assert len(ids) == 1, "Invalid Tag Urn" - - response: dict = self.ctx.graph.create_tag(tag_name=ids[0]) - logger.debug(f"Tag creation response: {response}") - def transform_aspect( self, entity_urn: str, aspect_name: str, aspect: Optional[Aspect] ) -> Optional[Aspect]: @@ -69,14 +55,41 @@ def transform_aspect( tags_to_add = self.config.get_tags_to_add(entity_urn) if tags_to_add is not None: out_global_tags_aspect.tags.extend(tags_to_add) - self.create_tag_if_not_exist( - tag_associations=out_global_tags_aspect.tags, - ) return self.get_result_semantics( self.config, self.ctx.graph, entity_urn, out_global_tags_aspect ) + def handle_end_of_stream( + self, entity_urn: str + ) -> List[MetadataChangeProposalWrapper]: + tags_to_add: List[TagAssociationClass] = self.config.get_tags_to_add(entity_urn) + + mcps: List[MetadataChangeProposalWrapper] = [] + + if tags_to_add is None: + return mcps + + logger.debug("Generating tags") + + for tag_association in tags_to_add: + ids: List[str] = TagUrn.create_from_string( + tag_association.tag + ).get_entity_id() + + assert len(ids) == 1, "Invalid Tag Urn" + + tag_name: str = ids[0] + + mcps.append( + MetadataChangeProposalWrapper( + entityUrn=builder.make_tag_urn(tag=tag_name), + aspect=TagKeyClass(name=tag_name), + ) + ) + + return mcps + class SimpleDatasetTagConfig(TransformerSemanticsConfigModel): tag_urns: List[str] diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/base_transformer.py b/metadata-ingestion/src/datahub/ingestion/transformer/base_transformer.py index e0d6ae720c9a1..f9a75c088ad76 100644 --- a/metadata-ingestion/src/datahub/ingestion/transformer/base_transformer.py +++ b/metadata-ingestion/src/datahub/ingestion/transformer/base_transformer.py @@ -17,6 +17,16 @@ log = logging.getLogger(__name__) +def _update_work_unit_id( + envelope: RecordEnvelope, urn: str, aspect_name: str +) -> dict[Any, Any]: + structured_urn = Urn.create_from_string(urn) + simple_name = "-".join(structured_urn.get_entity_id()) + record_metadata = envelope.metadata.copy() + record_metadata.update({"workunit_id": f"txform-{simple_name}-{aspect_name}"}) + return record_metadata + + class LegacyMCETransformer(Transformer, metaclass=ABCMeta): @abstractmethod def transform_one(self, mce: MetadataChangeEventClass) -> MetadataChangeEventClass: @@ -40,6 +50,11 @@ def transform_aspect( """ pass + def handle_end_of_stream( + self, entity_urn: str + ) -> List[MetadataChangeProposalWrapper]: + return [] + class BaseTransformer(Transformer, metaclass=ABCMeta): """Transformer that offers common functionality that most transformers need""" @@ -180,6 +195,33 @@ def _transform_or_record_mcpw( self._record_mcp(envelope.record) return envelope if envelope.record.aspect is not None else None + def _handle_end_of_stream( + self, envelope: RecordEnvelope, entity_urn: str + ) -> Iterable[RecordEnvelope]: + + if not isinstance(self, SingleAspectTransformer): + return + + mcps: List[MetadataChangeProposalWrapper] = self.handle_end_of_stream( + entity_urn + ) + + for mcp in mcps: + # I am not sure if we need to update the work_unit_id. @harshal please confirm it + if mcp.aspect is None or mcp.entityUrn is None: # to silent the lint error + continue + + record_metadata = _update_work_unit_id( + envelope=envelope, + aspect_name=mcp.aspect.get_aspect_name(), # type: ignore + urn=mcp.entityUrn, + ) + + yield RecordEnvelope( + record=mcp, + metadata=record_metadata, + ) + def transform( self, record_envelopes: Iterable[RecordEnvelope] ) -> Iterable[RecordEnvelope]: @@ -217,14 +259,12 @@ def transform( ) if transformed_aspect: # for end of stream records, we modify the workunit-id - structured_urn = Urn.create_from_string(urn) - simple_name = "-".join(structured_urn.get_entity_id()) - record_metadata = envelope.metadata.copy() - record_metadata.update( - { - "workunit_id": f"txform-{simple_name}-{self.aspect_name()}" - } + record_metadata = _update_work_unit_id( + envelope=envelope, + aspect_name=self.aspect_name(), + urn=urn, ) + structured_urn = Urn.create_from_string(urn) yield RecordEnvelope( record=MetadataChangeProposalWrapper( entityUrn=urn, @@ -237,5 +277,10 @@ def transform( ), metadata=record_metadata, ) + yield from self._handle_end_of_stream( + envelope=envelope, entity_urn=urn + ) + self._mark_processed(urn) + yield envelope diff --git a/metadata-ingestion/tests/unit/test_transform_dataset.py b/metadata-ingestion/tests/unit/test_transform_dataset.py index bc95451620d22..71c9ed4a9aaea 100644 --- a/metadata-ingestion/tests/unit/test_transform_dataset.py +++ b/metadata-ingestion/tests/unit/test_transform_dataset.py @@ -775,7 +775,7 @@ def test_simple_dataset_tags_transformation(mock_time): ] ) ) - assert len(outputs) == 3 + assert len(outputs) == 5 # Check that tags were added. tags_aspect = outputs[1].record.aspect @@ -815,7 +815,7 @@ def test_pattern_dataset_tags_transformation(mock_time): ) ) - assert len(outputs) == 3 + assert len(outputs) == 5 tags_aspect = outputs[1].record.aspect assert tags_aspect assert len(tags_aspect.tags) == 2 @@ -1324,7 +1324,7 @@ def test_mcp_add_tags_missing(mock_time): ] input_stream.append(RecordEnvelope(record=EndOfStream(), metadata={})) outputs = list(transformer.transform(input_stream)) - assert len(outputs) == 3 + assert len(outputs) == 5 assert outputs[0].record == dataset_mcp # Check that tags were added, this will be the second result tags_aspect = outputs[1].record.aspect From d128f89f0cbb44a682d19982d0fdf9abdcba09dc Mon Sep 17 00:00:00 2001 From: siddiquebagwan-gslab Date: Wed, 15 Nov 2023 19:15:34 +0530 Subject: [PATCH 3/9] address review comments --- .../ingestion/transformer/base_transformer.py | 37 +++++++++---------- .../tests/unit/test_transform_dataset.py | 5 +++ 2 files changed, 23 insertions(+), 19 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/base_transformer.py b/metadata-ingestion/src/datahub/ingestion/transformer/base_transformer.py index f9a75c088ad76..e819ac7ec23ba 100644 --- a/metadata-ingestion/src/datahub/ingestion/transformer/base_transformer.py +++ b/metadata-ingestion/src/datahub/ingestion/transformer/base_transformer.py @@ -27,13 +27,22 @@ def _update_work_unit_id( return record_metadata -class LegacyMCETransformer(Transformer, metaclass=ABCMeta): +class HandleEndOfStreamTransformer: + def handle_end_of_stream( + self, entity_urn: str + ) -> List[MetadataChangeProposalWrapper]: + return [] + + +class LegacyMCETransformer( + Transformer, HandleEndOfStreamTransformer, metaclass=ABCMeta +): @abstractmethod def transform_one(self, mce: MetadataChangeEventClass) -> MetadataChangeEventClass: pass -class SingleAspectTransformer(metaclass=ABCMeta): +class SingleAspectTransformer(HandleEndOfStreamTransformer, metaclass=ABCMeta): @abstractmethod def aspect_name(self) -> str: """Implement this method to specify a single aspect that the transformer is interested in subscribing to. No default provided.""" @@ -50,11 +59,6 @@ def transform_aspect( """ pass - def handle_end_of_stream( - self, entity_urn: str - ) -> List[MetadataChangeProposalWrapper]: - return [] - class BaseTransformer(Transformer, metaclass=ABCMeta): """Transformer that offers common functionality that most transformers need""" @@ -199,7 +203,9 @@ def _handle_end_of_stream( self, envelope: RecordEnvelope, entity_urn: str ) -> Iterable[RecordEnvelope]: - if not isinstance(self, SingleAspectTransformer): + if not isinstance(self, SingleAspectTransformer) and not isinstance( + self, LegacyMCETransformer + ): return mcps: List[MetadataChangeProposalWrapper] = self.handle_end_of_stream( @@ -207,19 +213,12 @@ def _handle_end_of_stream( ) for mcp in mcps: - # I am not sure if we need to update the work_unit_id. @harshal please confirm it if mcp.aspect is None or mcp.entityUrn is None: # to silent the lint error continue - record_metadata = _update_work_unit_id( - envelope=envelope, - aspect_name=mcp.aspect.get_aspect_name(), # type: ignore - urn=mcp.entityUrn, - ) - yield RecordEnvelope( record=mcp, - metadata=record_metadata, + metadata=envelope.metadata, ) def transform( @@ -277,9 +276,9 @@ def transform( ), metadata=record_metadata, ) - yield from self._handle_end_of_stream( - envelope=envelope, entity_urn=urn - ) + yield from self._handle_end_of_stream( + envelope=envelope, entity_urn=urn + ) self._mark_processed(urn) diff --git a/metadata-ingestion/tests/unit/test_transform_dataset.py b/metadata-ingestion/tests/unit/test_transform_dataset.py index 71c9ed4a9aaea..ece1b3422c54d 100644 --- a/metadata-ingestion/tests/unit/test_transform_dataset.py +++ b/metadata-ingestion/tests/unit/test_transform_dataset.py @@ -775,6 +775,7 @@ def test_simple_dataset_tags_transformation(mock_time): ] ) ) + assert len(outputs) == 5 # Check that tags were added. @@ -783,6 +784,10 @@ def test_simple_dataset_tags_transformation(mock_time): assert len(tags_aspect.tags) == 2 assert tags_aspect.tags[0].tag == builder.make_tag_urn("NeedsDocumentation") + # Check new tag entity should be there + assert outputs[2].record.entityUrn == builder.make_tag_urn("NeedsDocumentation") + assert outputs[3].record.entityUrn == builder.make_tag_urn("Legacy") + def dummy_tag_resolver_method(dataset_snapshot): return [] From 069bfb014d2b0b77c80f97070adb5ab84ace4edd Mon Sep 17 00:00:00 2001 From: siddiquebagwan-gslab Date: Wed, 15 Nov 2023 19:26:21 +0530 Subject: [PATCH 4/9] remove unused function --- .../ingestion/transformer/base_transformer.py | 18 +----------------- 1 file changed, 1 insertion(+), 17 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/base_transformer.py b/metadata-ingestion/src/datahub/ingestion/transformer/base_transformer.py index e819ac7ec23ba..396087c6fe008 100644 --- a/metadata-ingestion/src/datahub/ingestion/transformer/base_transformer.py +++ b/metadata-ingestion/src/datahub/ingestion/transformer/base_transformer.py @@ -17,16 +17,6 @@ log = logging.getLogger(__name__) -def _update_work_unit_id( - envelope: RecordEnvelope, urn: str, aspect_name: str -) -> dict[Any, Any]: - structured_urn = Urn.create_from_string(urn) - simple_name = "-".join(structured_urn.get_entity_id()) - record_metadata = envelope.metadata.copy() - record_metadata.update({"workunit_id": f"txform-{simple_name}-{aspect_name}"}) - return record_metadata - - class HandleEndOfStreamTransformer: def handle_end_of_stream( self, entity_urn: str @@ -257,12 +247,6 @@ def transform( else None, ) if transformed_aspect: - # for end of stream records, we modify the workunit-id - record_metadata = _update_work_unit_id( - envelope=envelope, - aspect_name=self.aspect_name(), - urn=urn, - ) structured_urn = Urn.create_from_string(urn) yield RecordEnvelope( record=MetadataChangeProposalWrapper( @@ -274,7 +258,7 @@ def transform( aspectName=self.aspect_name(), aspect=transformed_aspect, ), - metadata=record_metadata, + metadata=envelope.metadata, ) yield from self._handle_end_of_stream( envelope=envelope, entity_urn=urn From fa8ab34b28a5212c95dfb0450dec8038a2d17456 Mon Sep 17 00:00:00 2001 From: siddiquebagwan-gslab Date: Wed, 15 Nov 2023 23:00:08 +0530 Subject: [PATCH 5/9] return urn --- .../src/datahub/ingestion/graph/client.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/graph/client.py b/metadata-ingestion/src/datahub/ingestion/graph/client.py index ca819bc81bc4c..5c24b06dde999 100644 --- a/metadata-ingestion/src/datahub/ingestion/graph/client.py +++ b/metadata-ingestion/src/datahub/ingestion/graph/client.py @@ -1067,7 +1067,7 @@ def parse_sql_lineage( default_schema=default_schema, ) - def create_tag(self, tag_name: str) -> Dict[Any, Any]: + def create_tag(self, tag_name: str) -> str: graph_query: str = """ mutation($tag_detail: CreateTagInput!) { createTag(input: $tag_detail) @@ -1075,14 +1075,20 @@ def create_tag(self, tag_name: str) -> Dict[Any, Any]: """ variables = { - "tag_detail": {"name": tag_name}, + "tag_detail": { + "name": tag_name, + "id": tag_name, + }, } - return self.execute_graphql( + res = self.execute_graphql( query=graph_query, variables=variables, ) + # return urn + return res["createTag"] + def close(self) -> None: self._make_schema_resolver.cache_clear() super().close() From 0aae16075149705466c50811cbd34b7e87a50f6f Mon Sep 17 00:00:00 2001 From: siddiquebagwan-gslab Date: Tue, 12 Dec 2023 23:04:22 +0530 Subject: [PATCH 6/9] review comments --- .../ingestion/transformer/add_dataset_tags.py | 16 +++--- .../ingestion/transformer/base_transformer.py | 51 +++++++++++++------ .../tests/unit/test_transform_dataset.py | 13 ++++- 3 files changed, 56 insertions(+), 24 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_tags.py b/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_tags.py index dd1c6d5d2c086..72a8c226e491e 100644 --- a/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_tags.py +++ b/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_tags.py @@ -32,11 +32,13 @@ class AddDatasetTags(DatasetTagsTransformer): ctx: PipelineContext config: AddDatasetTagsConfig + processed_tags: List[TagAssociationClass] def __init__(self, config: AddDatasetTagsConfig, ctx: PipelineContext): super().__init__() self.ctx = ctx self.config = config + self.processed_tags = [] @classmethod def create(cls, config_dict: dict, ctx: PipelineContext) -> "AddDatasetTags": @@ -55,24 +57,21 @@ def transform_aspect( tags_to_add = self.config.get_tags_to_add(entity_urn) if tags_to_add is not None: out_global_tags_aspect.tags.extend(tags_to_add) + self.processed_tags.extend( + tags_to_add + ) # Keep track of tags added so that we can create them in handle_end_of_stream return self.get_result_semantics( self.config, self.ctx.graph, entity_urn, out_global_tags_aspect ) - def handle_end_of_stream( - self, entity_urn: str - ) -> List[MetadataChangeProposalWrapper]: - tags_to_add: List[TagAssociationClass] = self.config.get_tags_to_add(entity_urn) + def handle_end_of_stream(self) -> List[MetadataChangeProposalWrapper]: mcps: List[MetadataChangeProposalWrapper] = [] - if tags_to_add is None: - return mcps - logger.debug("Generating tags") - for tag_association in tags_to_add: + for tag_association in self.processed_tags: ids: List[str] = TagUrn.create_from_string( tag_association.tag ).get_entity_id() @@ -122,6 +121,7 @@ class PatternAddDatasetTags(AddDatasetTags): """Transformer that adds a specified set of tags to each dataset.""" def __init__(self, config: PatternDatasetTagsConfig, ctx: PipelineContext): + config.tag_pattern.all tag_pattern = config.tag_pattern generic_config = AddDatasetTagsConfig( get_tags_to_add=lambda _: [ diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/base_transformer.py b/metadata-ingestion/src/datahub/ingestion/transformer/base_transformer.py index 396087c6fe008..4b9fd3c867bf0 100644 --- a/metadata-ingestion/src/datahub/ingestion/transformer/base_transformer.py +++ b/metadata-ingestion/src/datahub/ingestion/transformer/base_transformer.py @@ -17,10 +17,18 @@ log = logging.getLogger(__name__) +def _update_work_unit_id( + envelope: RecordEnvelope, urn: str, aspect_name: str +) -> dict[Any, Any]: + structured_urn = Urn.create_from_string(urn) + simple_name = "-".join(structured_urn.get_entity_id()) + record_metadata = envelope.metadata.copy() + record_metadata.update({"workunit_id": f"txform-{simple_name}-{aspect_name}"}) + return record_metadata + + class HandleEndOfStreamTransformer: - def handle_end_of_stream( - self, entity_urn: str - ) -> List[MetadataChangeProposalWrapper]: + def handle_end_of_stream(self) -> List[MetadataChangeProposalWrapper]: return [] @@ -190,7 +198,7 @@ def _transform_or_record_mcpw( return envelope if envelope.record.aspect is not None else None def _handle_end_of_stream( - self, envelope: RecordEnvelope, entity_urn: str + self, envelope: RecordEnvelope ) -> Iterable[RecordEnvelope]: if not isinstance(self, SingleAspectTransformer) and not isinstance( @@ -198,17 +206,21 @@ def _handle_end_of_stream( ): return - mcps: List[MetadataChangeProposalWrapper] = self.handle_end_of_stream( - entity_urn - ) + mcps: List[MetadataChangeProposalWrapper] = self.handle_end_of_stream() for mcp in mcps: if mcp.aspect is None or mcp.entityUrn is None: # to silent the lint error continue + record_metadata = _update_work_unit_id( + envelope=envelope, + aspect_name=mcp.aspect.get_aspect_name(), # type: ignore + urn=mcp.entityUrn, + ) + yield RecordEnvelope( record=mcp, - metadata=envelope.metadata, + metadata=record_metadata, ) def transform( @@ -248,8 +260,9 @@ def transform( ) if transformed_aspect: structured_urn = Urn.create_from_string(urn) - yield RecordEnvelope( - record=MetadataChangeProposalWrapper( + + mcp: MetadataChangeProposalWrapper = ( + MetadataChangeProposalWrapper( entityUrn=urn, entityType=structured_urn.get_type(), systemMetadata=last_seen_mcp.systemMetadata @@ -257,13 +270,21 @@ def transform( else last_seen_mce_system_metadata, aspectName=self.aspect_name(), aspect=transformed_aspect, - ), - metadata=envelope.metadata, + ) + ) + + record_metadata = _update_work_unit_id( + envelope=envelope, + aspect_name=mcp.aspect.get_aspect_name(), # type: ignore + urn=mcp.entityUrn, + ) + + yield RecordEnvelope( + record=mcp, + metadata=record_metadata, ) - yield from self._handle_end_of_stream( - envelope=envelope, entity_urn=urn - ) self._mark_processed(urn) + yield from self._handle_end_of_stream(envelope=envelope) yield envelope diff --git a/metadata-ingestion/tests/unit/test_transform_dataset.py b/metadata-ingestion/tests/unit/test_transform_dataset.py index ece1b3422c54d..1d627e3871540 100644 --- a/metadata-ingestion/tests/unit/test_transform_dataset.py +++ b/metadata-ingestion/tests/unit/test_transform_dataset.py @@ -1361,13 +1361,24 @@ def test_mcp_add_tags_existing(mock_time): ] input_stream.append(RecordEnvelope(record=EndOfStream(), metadata={})) outputs = list(transformer.transform(input_stream)) - assert len(outputs) == 2 + + assert len(outputs) == 4 + # Check that tags were added, this will be the second result tags_aspect = outputs[0].record.aspect assert tags_aspect assert len(tags_aspect.tags) == 3 assert tags_aspect.tags[0].tag == builder.make_tag_urn("Test") assert tags_aspect.tags[1].tag == builder.make_tag_urn("NeedsDocumentation") + assert tags_aspect.tags[2].tag == builder.make_tag_urn("Legacy") + + # Check tag entities got added + outputs[1].record.entityType == "tag" + outputs[1].record.entityUrn == builder.make_tag_urn("NeedsDocumentation") + outputs[2].record.entityType == "tag" + outputs[2].record.entityUrn == builder.make_tag_urn("Legacy") + + assert isinstance(outputs[-1].record, EndOfStream) From b1c434590cbebed2b6e44dfdfee24f29419e094b Mon Sep 17 00:00:00 2001 From: siddiquebagwan-gslab Date: Wed, 13 Dec 2023 12:46:18 +0530 Subject: [PATCH 7/9] lint fix --- .../tests/unit/test_transform_dataset.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/metadata-ingestion/tests/unit/test_transform_dataset.py b/metadata-ingestion/tests/unit/test_transform_dataset.py index 1d627e3871540..94da1844e5f86 100644 --- a/metadata-ingestion/tests/unit/test_transform_dataset.py +++ b/metadata-ingestion/tests/unit/test_transform_dataset.py @@ -1363,7 +1363,7 @@ def test_mcp_add_tags_existing(mock_time): outputs = list(transformer.transform(input_stream)) assert len(outputs) == 4 - + # Check that tags were added, this will be the second result tags_aspect = outputs[0].record.aspect assert tags_aspect @@ -1372,12 +1372,11 @@ def test_mcp_add_tags_existing(mock_time): assert tags_aspect.tags[1].tag == builder.make_tag_urn("NeedsDocumentation") assert tags_aspect.tags[2].tag == builder.make_tag_urn("Legacy") - # Check tag entities got added - outputs[1].record.entityType == "tag" - outputs[1].record.entityUrn == builder.make_tag_urn("NeedsDocumentation") - outputs[2].record.entityType == "tag" - outputs[2].record.entityUrn == builder.make_tag_urn("Legacy") - + # Check tag entities got added + assert outputs[1].record.entityType == "tag" + assert outputs[1].record.entityUrn == builder.make_tag_urn("NeedsDocumentation") + assert outputs[2].record.entityType == "tag" + assert outputs[2].record.entityUrn == builder.make_tag_urn("Legacy") assert isinstance(outputs[-1].record, EndOfStream) From 7f4a70e5c34ddb18fa7339cc1c6b9af45922ddb4 Mon Sep 17 00:00:00 2001 From: siddiquebagwan-gslab Date: Thu, 14 Dec 2023 15:03:02 +0530 Subject: [PATCH 8/9] review comments --- metadata-ingestion/tests/unit/test_transform_dataset.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/metadata-ingestion/tests/unit/test_transform_dataset.py b/metadata-ingestion/tests/unit/test_transform_dataset.py index b6ec9e4cca1b9..546549dcf37a4 100644 --- a/metadata-ingestion/tests/unit/test_transform_dataset.py +++ b/metadata-ingestion/tests/unit/test_transform_dataset.py @@ -818,14 +818,21 @@ def test_simple_dataset_tags_transformation(mock_time): # Check that tags were added. tags_aspect = outputs[1].record.aspect + assert tags_aspect.tags[0].tag == builder.make_tag_urn("NeedsDocumentation") assert tags_aspect assert len(tags_aspect.tags) == 2 - assert tags_aspect.tags[0].tag == builder.make_tag_urn("NeedsDocumentation") # Check new tag entity should be there + assert outputs[2].record.aspectName == "tagKey" + assert outputs[2].record.aspect.name == "NeedsDocumentation" assert outputs[2].record.entityUrn == builder.make_tag_urn("NeedsDocumentation") + + assert outputs[3].record.aspectName == "tagKey" + assert outputs[3].record.aspect.name == "Legacy" assert outputs[3].record.entityUrn == builder.make_tag_urn("Legacy") + assert isinstance(outputs[4].record, EndOfStream) + def dummy_tag_resolver_method(dataset_snapshot): return [] From 31c14f2b39fc2c8d867075a7a04150f75468624b Mon Sep 17 00:00:00 2001 From: siddiquebagwan-gslab Date: Thu, 14 Dec 2023 16:56:17 +0530 Subject: [PATCH 9/9] python3.7 lint fix --- .../src/datahub/ingestion/transformer/base_transformer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/base_transformer.py b/metadata-ingestion/src/datahub/ingestion/transformer/base_transformer.py index 4b9fd3c867bf0..8b6f42dcfba4b 100644 --- a/metadata-ingestion/src/datahub/ingestion/transformer/base_transformer.py +++ b/metadata-ingestion/src/datahub/ingestion/transformer/base_transformer.py @@ -19,7 +19,7 @@ def _update_work_unit_id( envelope: RecordEnvelope, urn: str, aspect_name: str -) -> dict[Any, Any]: +) -> Dict[Any, Any]: structured_urn = Urn.create_from_string(urn) simple_name = "-".join(structured_urn.get_entity_id()) record_metadata = envelope.metadata.copy()