Skip to content

Commit

Permalink
fix(ingestion/transformer): extend dataset_to_data_product_urns_patte…
Browse files Browse the repository at this point in the history
…rn to support containers
  • Loading branch information
sagar-salvi-apptware committed Aug 8, 2024
1 parent 3d9a954 commit 4ce35a9
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 3 deletions.
19 changes: 18 additions & 1 deletion metadata-ingestion/docs/transformer/dataset_transformer.md
Original file line number Diff line number Diff line change
Expand Up @@ -1207,15 +1207,32 @@ The config, which we’d append to our ingestion recipe YAML, would look like th
| Field | Required | Type | Default | Description |
|---------------------------------------|----------|----------------------|-------------|---------------------------------------------------------------------------------------------|
| `dataset_to_data_product_urns_pattern`| ✅ | map[regx, urn] | | Dataset Entity urn with regular expression and dataproduct urn apply to matching entity urn.|
| `is_container` | | bool | `false` | Whether to also consider a container or not. If true, the data product will be attached to both the dataset and its container. |


Let’s suppose we’d like to append a series of data products with specific datasets or their containers as assets. To do so, we can use the pattern_add_dataset_dataproduct module that’s included in the ingestion framework. This module matches a regex pattern to the urn of the dataset and creates a data product entity with the given urn, associating the matched datasets as its assets.

If the is_container field is set to true, the module will not only attach the data product to the matching datasets but will also find and attach the containers associated with those datasets. This means that both the datasets and their containers will be associated with the specified data product.

Let’s suppose we’d like to append a series of dataproducts with specific datasets as its assets. To do so, we can use the `pattern_add_dataset_dataproduct` module that’s included in the ingestion framework. This will match the regex pattern to `urn` of the dataset and create the data product entity with given urn and matched datasets as its assets.

The config, which we’d append to our ingestion recipe YAML, would look like this:

- Add Product to dataset
```yaml
transformers:
- type: "pattern_add_dataset_dataproduct"
config:
dataset_to_data_product_urns_pattern:
rules:
".*example1.*": "urn:li:dataProduct:first"
".*example2.*": "urn:li:dataProduct:second"
```
- Add Product to dataset container
```yaml
transformers:
- type: "pattern_add_dataset_dataproduct"
config:
is_container: true
dataset_to_data_product_urns_pattern:
rules:
".*example1.*": "urn:li:dataProduct:first"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from datahub.ingestion.transformer.dataset_transformer import (
DatasetDataproductTransformer,
)
from datahub.metadata.schema_classes import MetadataChangeProposalClass
from datahub.metadata.schema_classes import ContainerClass, MetadataChangeProposalClass
from datahub.specific.dataproduct import DataProductPatchBuilder

logger = logging.getLogger(__name__)
Expand All @@ -23,6 +23,8 @@ class AddDatasetDataProductConfig(ConfigModel):

_resolve_data_product_fn = pydantic_resolve_key("get_data_product_to_add")

is_container: bool = False


class AddDatasetDataProduct(DatasetDataproductTransformer):
"""Transformer that adds dataproduct entity for provided dataset as its asset according to a callback function."""
Expand All @@ -49,10 +51,11 @@ def handle_end_of_stream(
self,
) -> List[Union[MetadataChangeProposalWrapper, MetadataChangeProposalClass]]:
data_products: Dict[str, DataProductPatchBuilder] = {}

data_products_container: Dict[str, DataProductPatchBuilder] = {}
logger.debug("Generating dataproducts")
for entity_urn in self.entity_map.keys():
data_product_urn = self.config.get_data_product_to_add(entity_urn)
is_container = self.config.is_container
if data_product_urn:
if data_product_urn not in data_products:
data_products[data_product_urn] = DataProductPatchBuilder(
Expand All @@ -63,11 +66,34 @@ def handle_end_of_stream(
data_product_urn
].add_asset(entity_urn)

if is_container:
assert self.ctx.graph
container_aspect = self.ctx.graph.get_aspect(
entity_urn, aspect_type=ContainerClass
)
if not container_aspect:
continue
container_urn = container_aspect.container
if data_product_urn not in data_products_container:
container_product = DataProductPatchBuilder(
data_product_urn
).add_asset(container_urn)
data_products_container[data_product_urn] = container_product
else:
data_products_container[
data_product_urn
] = data_products_container[data_product_urn].add_asset(
container_urn
)

mcps: List[
Union[MetadataChangeProposalWrapper, MetadataChangeProposalClass]
] = []
for data_product in data_products.values():
mcps.extend(list(data_product.build()))
if is_container:
for data_product in data_products_container.values():
mcps.extend(list(data_product.build()))
return mcps


Expand Down Expand Up @@ -97,6 +123,7 @@ def create(

class PatternDatasetDataProductConfig(ConfigModel):
dataset_to_data_product_urns_pattern: KeyValuePattern = KeyValuePattern.all()
is_container: bool = False

@pydantic.root_validator(pre=True)
def validate_pattern_value(cls, values: Dict) -> Dict:
Expand All @@ -122,6 +149,7 @@ def __init__(self, config: PatternDatasetDataProductConfig, ctx: PipelineContext
)[0]
if dataset_to_data_product.value(dataset_urn)
else None,
is_container=config.is_container,
)
super().__init__(generic_config, ctx)

Expand Down

0 comments on commit 4ce35a9

Please sign in to comment.