Skip to content

Commit

Permalink
fix: implement tags-to-terms transformation based on tags
Browse files Browse the repository at this point in the history
  • Loading branch information
sagar-salvi-apptware committed Jun 21, 2024
1 parent b3dccb0 commit c417c01
Show file tree
Hide file tree
Showing 5 changed files with 428 additions and 3 deletions.
52 changes: 51 additions & 1 deletion metadata-ingestion/docs/transformer/dataset_transformer.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ The below table shows transformer which can transform aspects of entity [Dataset
| `ownership` | - [Simple Add Dataset ownership](#simple-add-dataset-ownership)<br/> - [Pattern Add Dataset ownership](#pattern-add-dataset-ownership)<br/> - [Simple Remove Dataset Ownership](#simple-remove-dataset-ownership)<br/> - [Extract Ownership from Tags](#extract-ownership-from-tags)<br/> - [Clean suffix prefix from Ownership](#clean-suffix-prefix-from-ownership) |
| `globalTags` | - [Simple Add Dataset globalTags ](#simple-add-dataset-globaltags)<br/> - [Pattern Add Dataset globalTags](#pattern-add-dataset-globaltags)<br/> - [Add Dataset globalTags](#add-dataset-globaltags) |
| `browsePaths` | - [Set Dataset browsePath](#set-dataset-browsepath) |
| `glossaryTerms` | - [Simple Add Dataset glossaryTerms ](#simple-add-dataset-glossaryterms)<br/> - [Pattern Add Dataset glossaryTerms](#pattern-add-dataset-glossaryterms) |
| `glossaryTerms` | - [Simple Add Dataset glossaryTerms ](#simple-add-dataset-glossaryterms)<br/> - [Pattern Add Dataset glossaryTerms](#pattern-add-dataset-glossaryterms)<br/> - [Tags to Term Mapping](#tags-to-term-mapping) |
| `schemaMetadata` | - [Pattern Add Dataset Schema Field glossaryTerms](#pattern-add-dataset-schema-field-glossaryterms)<br/> - [Pattern Add Dataset Schema Field globalTags](#pattern-add-dataset-schema-field-globaltags) |
| `datasetProperties` | - [Simple Add Dataset datasetProperties](#simple-add-dataset-datasetproperties)<br/> - [Add Dataset datasetProperties](#add-dataset-datasetproperties) |
| `domains` | - [Simple Add Dataset domains](#simple-add-dataset-domains)<br/> - [Pattern Add Dataset domains](#pattern-add-dataset-domains)<br/> - [Domain Mapping Based on Tags](#domain-mapping-based-on-tags) |
Expand Down Expand Up @@ -668,6 +668,56 @@ We can add glossary terms to datasets based on a regex filter.
".*example1.*": ["urn:li:glossaryTerm:Email", "urn:li:glossaryTerm:Address"]
".*example2.*": ["urn:li:glossaryTerm:PostalCode"]
```

## Tags to Term Mapping
### Config Details

| Field | Required | Type | Default | Description |
|---------------|----------|--------------------|-------------|-------------------------------------------------------------------------------------------------------|
| `tags` | ✅ | List[str] | | List of tag names based on which terms will be created and associated with the dataset. |
| `semantics` | | enum | "OVERWRITE" | Determines whether to OVERWRITE or PATCH the terms associated with the dataset on DataHub GMS. |

<br/>

The `tags_to_term` transformer is designed to map specific tags to glossary terms within DataHub. It takes a configuration of tags should be translated into corresponding glossaryTerm. This transformer can apply these mappings to any tags found either at column level of dataset or dataset top level.

When specifying tags in the configuration, use the tag's simple name rather than the full tag URN.

For example, instead of using the tag URN `urn:li:tag:snowflakedb.snowflakeschema.tag_name:tag_value`, you should specify just the tag name `tag_name` in the mapping configuration

```yaml
transformers:
- type: "tags_to_term"
config:
semantics: OVERWRITE # OVERWRITE is the default behavior
tags:
- "tag_name"
```

`tags_to_term` can be configured in below different way

- Add domains based on tags, however overwrite the domains available for the dataset on DataHub GMS
```yaml
transformers:
- type: "domain_mapping_based_on_tags"
config:
semantics: OVERWRITE # OVERWRITE is default behaviour
tags:
- "example1"
- "example2"
- "example3"
```
- Add domains based on tags, however keep the domains available for the dataset on DataHub GMS
```yaml
transformers:
- type: "domain_mapping_based_on_tags"
config:
semantics: PATCH
domain_mapping:
'example1': "urn:li:domain:engineering"
'example2': "urn:li:domain:hr"
```

## Pattern Add Dataset Schema Field glossaryTerms
### Config Details
| Field | Required | Type | Default | Description |
Expand Down
1 change: 1 addition & 0 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -715,6 +715,7 @@
"replace_external_url = datahub.ingestion.transformer.replace_external_url:ReplaceExternalUrl",
"pattern_cleanup_dataset_usage_user = datahub.ingestion.transformer.pattern_cleanup_dataset_usage_user:PatternCleanupDatasetUsageUser",
"domain_mapping_based_on_tags = datahub.ingestion.transformer.dataset_domain_based_on_tags:DatasetTagDomainMapper",
"tags_to_term = datahub.ingestion.transformer.tags_to_terms:TagsToTermMapper",
],
"datahub.ingestion.sink.plugins": [
"file = datahub.ingestion.sink.file:FileSink",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,16 @@ def entity_types(self) -> List[str]:
return ["dataset"]


class TagTransformer(BaseTransformer, SingleAspectTransformer, metaclass=ABCMeta):
"""Transformer that does transform sequentially on each tag."""

def __init__(self):
super().__init__()

def entity_types(self) -> List[str]:
return ["dataset", "container"]


class DatasetOwnershipTransformer(DatasetTransformer, metaclass=ABCMeta):
def aspect_name(self) -> str:
return "ownership"
Expand Down Expand Up @@ -128,3 +138,8 @@ def aspect_name(self) -> str:
class DatasetUsageStatisticsTransformer(DatasetTransformer, metaclass=ABCMeta):
def aspect_name(self) -> str:
return "datasetUsageStatistics"


class TagsToTermTransformer(TagTransformer, metaclass=ABCMeta):
def aspect_name(self) -> str:
return "glossaryTerms"
145 changes: 145 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/transformer/tags_to_terms.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
from typing import List, Optional, Set, cast

import datahub.emitter.mce_builder as builder
from datahub.configuration.common import (
TransformerSemantics,
TransformerSemanticsConfigModel,
)
from datahub.emitter.mce_builder import Aspect, make_term_urn
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.graph.client import DataHubGraph
from datahub.ingestion.transformer.dataset_transformer import TagsToTermTransformer
from datahub.metadata.schema_classes import (
AuditStampClass,
GlobalTagsClass,
GlossaryTermAssociationClass,
GlossaryTermsClass,
SchemaMetadataClass,
)


class TagsToTermMapperConfig(TransformerSemanticsConfigModel):
tags: List[str]


class TagsToTermMapper(TagsToTermTransformer):
"""This transformer maps specified tags to corresponding glossary terms for a dataset."""

def __init__(self, config: TagsToTermMapperConfig, ctx: PipelineContext):
super().__init__()
self.ctx: PipelineContext = ctx
self.config: TagsToTermMapperConfig = config

@classmethod
def create(cls, config_dict: dict, ctx: PipelineContext) -> "TagsToTermMapper":
config = TagsToTermMapperConfig.parse_obj(config_dict)
return cls(config, ctx)

@staticmethod
def _merge_with_server_glossary_terms(
graph: DataHubGraph,
urn: str,
glossary_terms_aspect: Optional[GlossaryTermsClass],
) -> Optional[GlossaryTermsClass]:
if not glossary_terms_aspect or not glossary_terms_aspect.terms:
# nothing to add, no need to consult server
return None

# Merge the transformed terms with existing server terms.
# The transformed terms takes precedence, which may change the term context.
server_glossary_terms_aspect = graph.get_glossary_terms(entity_urn=urn)
if server_glossary_terms_aspect is not None:
glossary_terms_aspect.terms = list(
{
**{term.urn: term for term in server_glossary_terms_aspect.terms},
**{term.urn: term for term in glossary_terms_aspect.terms},
}.values()
)

return glossary_terms_aspect

@staticmethod
def get_tags_from_global_tags(global_tags: Optional[GlobalTagsClass]) -> Set[str]:
"""Extracts tags urn from GlobalTagsClass."""
if not global_tags or not global_tags.tags:
return set()

return {tag_assoc.tag for tag_assoc in global_tags.tags}

@staticmethod
def get_tags_from_schema_metadata(
schema_metadata: Optional[SchemaMetadataClass],
) -> Set[str]:
"""Extracts globalTags from all fields in SchemaMetadataClass."""
if not schema_metadata or not schema_metadata.fields:
return set()
tags = set()
for field in schema_metadata.fields:
if field.globalTags:
tags.update(
TagsToTermMapper.get_tags_from_global_tags(field.globalTags)
)
return tags

def transform_aspect(
self, entity_urn: str, aspect_name: str, aspect: Optional[Aspect]
) -> Optional[Aspect]:

in_glossary_terms: Optional[GlossaryTermsClass] = cast(
Optional[GlossaryTermsClass], aspect
)

assert self.ctx.graph
in_global_tags_aspect: Optional[GlobalTagsClass] = self.ctx.graph.get_tags(
entity_urn
)
in_schema_metadata_aspect: Optional[
SchemaMetadataClass
] = self.ctx.graph.get_schema_metadata(entity_urn)

if in_global_tags_aspect is None and in_schema_metadata_aspect is None:
return cast(Aspect, in_glossary_terms)

global_tags = TagsToTermMapper.get_tags_from_global_tags(in_global_tags_aspect)
schema_metadata_tags = TagsToTermMapper.get_tags_from_schema_metadata(
in_schema_metadata_aspect
)

# Combine tags from both global and schema level
combined_tags = global_tags.union(schema_metadata_tags)

tag_set = set(self.config.tags)
terms_to_add = set()
tags_to_delete = set()

# Check each global tag against the configured tag list and prepare terms
for full_tag in combined_tags:
tag_name = full_tag.split("urn:li:tag:")[-1].split(".")[-1].split(":")[0]
if tag_name in tag_set:
term_urn = make_term_urn(tag_name)
terms_to_add.add(term_urn)
tags_to_delete.add(full_tag) # Full URN for deletion

if not terms_to_add:
return cast(Aspect, in_glossary_terms) # No new terms to add

for tag_urn in tags_to_delete:
self.ctx.graph.hard_delete_entity(urn=tag_urn)

# Initialize the Glossary Terms properly
out_glossary_terms = GlossaryTermsClass(
terms=[GlossaryTermAssociationClass(urn=term) for term in terms_to_add],
auditStamp=AuditStampClass(
time=builder.get_sys_time(), actor="urn:li:corpUser:restEmitter"
),
)

if self.config.semantics == TransformerSemantics.PATCH:
patch_glossary_terms: Optional[
GlossaryTermsClass
] = TagsToTermMapper._merge_with_server_glossary_terms(
self.ctx.graph, entity_urn, out_glossary_terms
)
return cast(Optional[Aspect], patch_glossary_terms)
else:
return cast(Aspect, out_glossary_terms)
Loading

0 comments on commit c417c01

Please sign in to comment.