-
Notifications
You must be signed in to change notification settings - Fork 3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(ingest/transformer): tags to terms transformer #10758
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||
---|---|---|---|---|---|---|---|---|---|---|
|
@@ -10,7 +10,7 @@ The below table shows transformer which can transform aspects of entity [Dataset | |||||||||
| `ownership` | - [Simple Add Dataset ownership](#simple-add-dataset-ownership)<br/> - [Pattern Add Dataset ownership](#pattern-add-dataset-ownership)<br/> - [Simple Remove Dataset Ownership](#simple-remove-dataset-ownership)<br/> - [Extract Ownership from Tags](#extract-ownership-from-tags)<br/> - [Clean suffix prefix from Ownership](#clean-suffix-prefix-from-ownership) | | ||||||||||
| `globalTags` | - [Simple Add Dataset globalTags ](#simple-add-dataset-globaltags)<br/> - [Pattern Add Dataset globalTags](#pattern-add-dataset-globaltags)<br/> - [Add Dataset globalTags](#add-dataset-globaltags) | | ||||||||||
| `browsePaths` | - [Set Dataset browsePath](#set-dataset-browsepath) | | ||||||||||
| `glossaryTerms` | - [Simple Add Dataset glossaryTerms ](#simple-add-dataset-glossaryterms)<br/> - [Pattern Add Dataset glossaryTerms](#pattern-add-dataset-glossaryterms) | | ||||||||||
| `glossaryTerms` | - [Simple Add Dataset glossaryTerms ](#simple-add-dataset-glossaryterms)<br/> - [Pattern Add Dataset glossaryTerms](#pattern-add-dataset-glossaryterms)<br/> - [Tags to Term Mapping](#tags-to-term-mapping) | | ||||||||||
| `schemaMetadata` | - [Pattern Add Dataset Schema Field glossaryTerms](#pattern-add-dataset-schema-field-glossaryterms)<br/> - [Pattern Add Dataset Schema Field globalTags](#pattern-add-dataset-schema-field-globaltags) | | ||||||||||
| `datasetProperties` | - [Simple Add Dataset datasetProperties](#simple-add-dataset-datasetproperties)<br/> - [Add Dataset datasetProperties](#add-dataset-datasetproperties) | | ||||||||||
| `domains` | - [Simple Add Dataset domains](#simple-add-dataset-domains)<br/> - [Pattern Add Dataset domains](#pattern-add-dataset-domains)<br/> - [Domain Mapping Based on Tags](#domain-mapping-based-on-tags) | | ||||||||||
|
@@ -668,6 +668,56 @@ We can add glossary terms to datasets based on a regex filter. | |||||||||
".*example1.*": ["urn:li:glossaryTerm:Email", "urn:li:glossaryTerm:Address"] | ||||||||||
".*example2.*": ["urn:li:glossaryTerm:PostalCode"] | ||||||||||
``` | ||||||||||
|
||||||||||
## Tags to Term Mapping | ||||||||||
### Config Details | ||||||||||
|
||||||||||
| Field | Required | Type | Default | Description | | ||||||||||
|---------------|----------|--------------------|-------------|-------------------------------------------------------------------------------------------------------| | ||||||||||
| `tags` | ✅ | List[str] | | List of tag names based on which terms will be created and associated with the dataset. | | ||||||||||
| `semantics` | | enum | "OVERWRITE" | Determines whether to OVERWRITE or PATCH the terms associated with the dataset on DataHub GMS. | | ||||||||||
|
||||||||||
<br/> | ||||||||||
|
||||||||||
The `tags_to_term` transformer is designed to map specific tags to glossary terms within DataHub. It takes a configuration of tags should be translated into corresponding glossaryTerm. This transformer can apply these mappings to any tags found either at column level of dataset or dataset top level. | ||||||||||
sagar-salvi-apptware marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||
|
||||||||||
When specifying tags in the configuration, use the tag's simple name rather than the full tag URN. | ||||||||||
sagar-salvi-apptware marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||
|
||||||||||
For example, instead of using the tag URN `urn:li:tag:snowflakedb.snowflakeschema.tag_name:tag_value`, you should specify just the tag name `tag_name` in the mapping configuration | ||||||||||
sagar-salvi-apptware marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||
|
||||||||||
```yaml | ||||||||||
transformers: | ||||||||||
- type: "tags_to_term" | ||||||||||
config: | ||||||||||
semantics: OVERWRITE # OVERWRITE is the default behavior | ||||||||||
tags: | ||||||||||
- "tag_name" | ||||||||||
sagar-salvi-apptware marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||
``` | ||||||||||
|
||||||||||
`tags_to_term` can be configured in below different way | ||||||||||
sagar-salvi-apptware marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||
|
||||||||||
- Add domains based on tags, however overwrite the domains available for the dataset on DataHub GMS | ||||||||||
sagar-salvi-apptware marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||
```yaml | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add blank lines around fenced code blocks. Fenced code blocks should be surrounded by blank lines for better readability. 700a701
+
709a710
+
ToolsMarkdownlint
|
||||||||||
transformers: | ||||||||||
- type: "domain_mapping_based_on_tags" | ||||||||||
config: | ||||||||||
semantics: OVERWRITE # OVERWRITE is default behaviour | ||||||||||
tags: | ||||||||||
- "example1" | ||||||||||
- "example2" | ||||||||||
- "example3" | ||||||||||
``` | ||||||||||
- Add domains based on tags, however keep the domains available for the dataset on DataHub GMS | ||||||||||
sagar-salvi-apptware marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||
```yaml | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add blank lines around fenced code blocks. Fenced code blocks should be surrounded by blank lines for better readability. 711a712
+
720a721
+ Committable suggestion
Suggested change
ToolsMarkdownlint
|
||||||||||
transformers: | ||||||||||
- type: "domain_mapping_based_on_tags" | ||||||||||
config: | ||||||||||
semantics: PATCH | ||||||||||
domain_mapping: | ||||||||||
'example1': "urn:li:domain:engineering" | ||||||||||
'example2': "urn:li:domain:hr" | ||||||||||
``` | ||||||||||
|
||||||||||
## Pattern Add Dataset Schema Field glossaryTerms | ||||||||||
### Config Details | ||||||||||
| Field | Required | Type | Default | Description | | ||||||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,145 @@ | ||
from typing import List, Optional, Set, cast | ||
|
||
import datahub.emitter.mce_builder as builder | ||
from datahub.configuration.common import ( | ||
TransformerSemantics, | ||
TransformerSemanticsConfigModel, | ||
) | ||
from datahub.emitter.mce_builder import Aspect, make_term_urn | ||
from datahub.ingestion.api.common import PipelineContext | ||
from datahub.ingestion.graph.client import DataHubGraph | ||
from datahub.ingestion.transformer.dataset_transformer import TagsToTermTransformer | ||
from datahub.metadata.schema_classes import ( | ||
AuditStampClass, | ||
GlobalTagsClass, | ||
GlossaryTermAssociationClass, | ||
GlossaryTermsClass, | ||
SchemaMetadataClass, | ||
) | ||
|
||
|
||
class TagsToTermMapperConfig(TransformerSemanticsConfigModel): | ||
tags: List[str] | ||
|
||
|
||
class TagsToTermMapper(TagsToTermTransformer): | ||
"""This transformer maps specified tags to corresponding glossary terms for a dataset.""" | ||
|
||
def __init__(self, config: TagsToTermMapperConfig, ctx: PipelineContext): | ||
super().__init__() | ||
self.ctx: PipelineContext = ctx | ||
self.config: TagsToTermMapperConfig = config | ||
|
||
@classmethod | ||
def create(cls, config_dict: dict, ctx: PipelineContext) -> "TagsToTermMapper": | ||
config = TagsToTermMapperConfig.parse_obj(config_dict) | ||
return cls(config, ctx) | ||
|
||
@staticmethod | ||
def _merge_with_server_glossary_terms( | ||
graph: DataHubGraph, | ||
urn: str, | ||
glossary_terms_aspect: Optional[GlossaryTermsClass], | ||
) -> Optional[GlossaryTermsClass]: | ||
if not glossary_terms_aspect or not glossary_terms_aspect.terms: | ||
# nothing to add, no need to consult server | ||
return None | ||
|
||
# Merge the transformed terms with existing server terms. | ||
# The transformed terms takes precedence, which may change the term context. | ||
server_glossary_terms_aspect = graph.get_glossary_terms(entity_urn=urn) | ||
if server_glossary_terms_aspect is not None: | ||
glossary_terms_aspect.terms = list( | ||
{ | ||
**{term.urn: term for term in server_glossary_terms_aspect.terms}, | ||
**{term.urn: term for term in glossary_terms_aspect.terms}, | ||
}.values() | ||
) | ||
|
||
return glossary_terms_aspect | ||
|
||
@staticmethod | ||
def get_tags_from_global_tags(global_tags: Optional[GlobalTagsClass]) -> Set[str]: | ||
"""Extracts tags urn from GlobalTagsClass.""" | ||
if not global_tags or not global_tags.tags: | ||
return set() | ||
|
||
return {tag_assoc.tag for tag_assoc in global_tags.tags} | ||
|
||
@staticmethod | ||
def get_tags_from_schema_metadata( | ||
schema_metadata: Optional[SchemaMetadataClass], | ||
) -> Set[str]: | ||
"""Extracts globalTags from all fields in SchemaMetadataClass.""" | ||
if not schema_metadata or not schema_metadata.fields: | ||
return set() | ||
tags = set() | ||
for field in schema_metadata.fields: | ||
if field.globalTags: | ||
tags.update( | ||
TagsToTermMapper.get_tags_from_global_tags(field.globalTags) | ||
) | ||
return tags | ||
|
||
def transform_aspect( | ||
self, entity_urn: str, aspect_name: str, aspect: Optional[Aspect] | ||
) -> Optional[Aspect]: | ||
|
||
in_glossary_terms: Optional[GlossaryTermsClass] = cast( | ||
Optional[GlossaryTermsClass], aspect | ||
) | ||
|
||
assert self.ctx.graph | ||
in_global_tags_aspect: Optional[GlobalTagsClass] = self.ctx.graph.get_tags( | ||
entity_urn | ||
) | ||
in_schema_metadata_aspect: Optional[ | ||
SchemaMetadataClass | ||
] = self.ctx.graph.get_schema_metadata(entity_urn) | ||
|
||
if in_global_tags_aspect is None and in_schema_metadata_aspect is None: | ||
return cast(Aspect, in_glossary_terms) | ||
|
||
global_tags = TagsToTermMapper.get_tags_from_global_tags(in_global_tags_aspect) | ||
schema_metadata_tags = TagsToTermMapper.get_tags_from_schema_metadata( | ||
in_schema_metadata_aspect | ||
) | ||
|
||
# Combine tags from both global and schema level | ||
combined_tags = global_tags.union(schema_metadata_tags) | ||
|
||
tag_set = set(self.config.tags) | ||
terms_to_add = set() | ||
tags_to_delete = set() | ||
|
||
# Check each global tag against the configured tag list and prepare terms | ||
for full_tag in combined_tags: | ||
tag_name = full_tag.split("urn:li:tag:")[-1].split(".")[-1].split(":")[0] | ||
if tag_name in tag_set: | ||
term_urn = make_term_urn(tag_name) | ||
terms_to_add.add(term_urn) | ||
tags_to_delete.add(full_tag) # Full URN for deletion | ||
|
||
if not terms_to_add: | ||
return cast(Aspect, in_glossary_terms) # No new terms to add | ||
|
||
for tag_urn in tags_to_delete: | ||
self.ctx.graph.remove_tag(tag_urn=tag_urn, resource_urn=entity_urn) | ||
|
||
# Initialize the Glossary Terms properly | ||
out_glossary_terms = GlossaryTermsClass( | ||
terms=[GlossaryTermAssociationClass(urn=term) for term in terms_to_add], | ||
auditStamp=AuditStampClass( | ||
time=builder.get_sys_time(), actor="urn:li:corpUser:restEmitter" | ||
), | ||
) | ||
|
||
if self.config.semantics == TransformerSemantics.PATCH: | ||
patch_glossary_terms: Optional[ | ||
GlossaryTermsClass | ||
] = TagsToTermMapper._merge_with_server_glossary_terms( | ||
self.ctx.graph, entity_urn, out_glossary_terms | ||
) | ||
return cast(Optional[Aspect], patch_glossary_terms) | ||
else: | ||
return cast(Aspect, out_glossary_terms) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add blank lines around headings.
Headings should be surrounded by blank lines for better readability.
Committable suggestion
Tools
Markdownlint