From 7fc7034b45a3b05c9f82c34c136ab2d05768de8a Mon Sep 17 00:00:00 2001 From: sagar-salvi-apptware Date: Fri, 9 Aug 2024 02:18:55 +0530 Subject: [PATCH] fix(ingestion/transformer): extend dataset_to_data_product_urns_pattern to support containers --- .../docs/transformer/dataset_transformer.md | 33 ++++++++++++++++++- .../transformer/add_dataset_dataproduct.py | 32 ++++++++++++++++-- 2 files changed, 62 insertions(+), 3 deletions(-) diff --git a/metadata-ingestion/docs/transformer/dataset_transformer.md b/metadata-ingestion/docs/transformer/dataset_transformer.md index ac6fefc3095741..03a224bcf7da47 100644 --- a/metadata-ingestion/docs/transformer/dataset_transformer.md +++ b/metadata-ingestion/docs/transformer/dataset_transformer.md @@ -1207,20 +1207,51 @@ The config, which we’d append to our ingestion recipe YAML, would look like th | Field | Required | Type | Default | Description | |---------------------------------------|----------|----------------------|-------------|---------------------------------------------------------------------------------------------| | `dataset_to_data_product_urns_pattern`| ✅ | map[regx, urn] | | Dataset Entity urn with regular expression and dataproduct urn apply to matching entity urn.| +| `is_container` | | bool | `false` | Whether to also consider a container or not. If true, the data product will be attached to both the dataset and its container. | -Let’s suppose we’d like to append a series of dataproducts with specific datasets as its assets. To do so, we can use the `pattern_add_dataset_dataproduct` module that’s included in the ingestion framework. This will match the regex pattern to `urn` of the dataset and create the data product entity with given urn and matched datasets as its assets. + +Let’s suppose we’d like to append a series of data products with specific datasets or their containers as assets. To do so, we can use the pattern_add_dataset_dataproduct module that’s included in the ingestion framework. This module matches a regex pattern to the urn of the dataset and creates a data product entity with the given urn, associating the matched datasets as its assets. + +If the is_container field is set to true, the module will not only attach the data product to the matching datasets but will also find and attach the containers associated with those datasets. This means that both the datasets and their containers will be associated with the specified data product. The config, which we’d append to our ingestion recipe YAML, would look like this: +- Add Product to dataset + ```yaml + transformers: + - type: "pattern_add_dataset_dataproduct" + config: + dataset_to_data_product_urns_pattern: + rules: + ".*example1.*": "urn:li:dataProduct:first" + ".*example2.*": "urn:li:dataProduct:second" + ``` +- Add Product to dataset container ```yaml transformers: - type: "pattern_add_dataset_dataproduct" config: + is_container: true dataset_to_data_product_urns_pattern: rules: ".*example1.*": "urn:li:dataProduct:first" ".*example2.*": "urn:li:dataProduct:second" ``` +⚠️ Warning: +When working with two datasets in the same container but with different data products, only one data product can be attached to the container. + +For example: +```yaml +transformers: + - type: "pattern_add_dataset_dataproduct" + config: + is_container: true + dataset_to_data_product_urns_pattern: + rules: + ".*example1.*": "urn:li:dataProduct:first" + ".*example2.*": "urn:li:dataProduct:second" +``` +If example1 and example2 are in the same container, only urn:li:dataProduct:first will be added. However, if they are in separate containers, the system works as expected and assigns the correct data product URNs. ## Add Dataset dataProduct ### Config Details diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_dataproduct.py b/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_dataproduct.py index 45e92628430258..c474e423030e05 100644 --- a/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_dataproduct.py +++ b/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_dataproduct.py @@ -11,7 +11,7 @@ from datahub.ingestion.transformer.dataset_transformer import ( DatasetDataproductTransformer, ) -from datahub.metadata.schema_classes import MetadataChangeProposalClass +from datahub.metadata.schema_classes import ContainerClass, MetadataChangeProposalClass from datahub.specific.dataproduct import DataProductPatchBuilder logger = logging.getLogger(__name__) @@ -23,6 +23,8 @@ class AddDatasetDataProductConfig(ConfigModel): _resolve_data_product_fn = pydantic_resolve_key("get_data_product_to_add") + is_container: bool = False + class AddDatasetDataProduct(DatasetDataproductTransformer): """Transformer that adds dataproduct entity for provided dataset as its asset according to a callback function.""" @@ -49,10 +51,11 @@ def handle_end_of_stream( self, ) -> List[Union[MetadataChangeProposalWrapper, MetadataChangeProposalClass]]: data_products: Dict[str, DataProductPatchBuilder] = {} - + data_products_container: Dict[str, DataProductPatchBuilder] = {} logger.debug("Generating dataproducts") for entity_urn in self.entity_map.keys(): data_product_urn = self.config.get_data_product_to_add(entity_urn) + is_container = self.config.is_container if data_product_urn: if data_product_urn not in data_products: data_products[data_product_urn] = DataProductPatchBuilder( @@ -63,11 +66,34 @@ def handle_end_of_stream( data_product_urn ].add_asset(entity_urn) + if is_container: + assert self.ctx.graph + container_aspect = self.ctx.graph.get_aspect( + entity_urn, aspect_type=ContainerClass + ) + if not container_aspect: + continue + container_urn = container_aspect.container + if data_product_urn not in data_products_container: + container_product = DataProductPatchBuilder( + data_product_urn + ).add_asset(container_urn) + data_products_container[data_product_urn] = container_product + else: + data_products_container[ + data_product_urn + ] = data_products_container[data_product_urn].add_asset( + container_urn + ) + mcps: List[ Union[MetadataChangeProposalWrapper, MetadataChangeProposalClass] ] = [] for data_product in data_products.values(): mcps.extend(list(data_product.build())) + if is_container: + for data_product in data_products_container.values(): + mcps.extend(list(data_product.build())) return mcps @@ -97,6 +123,7 @@ def create( class PatternDatasetDataProductConfig(ConfigModel): dataset_to_data_product_urns_pattern: KeyValuePattern = KeyValuePattern.all() + is_container: bool = False @pydantic.root_validator(pre=True) def validate_pattern_value(cls, values: Dict) -> Dict: @@ -122,6 +149,7 @@ def __init__(self, config: PatternDatasetDataProductConfig, ctx: PipelineContext )[0] if dataset_to_data_product.value(dataset_urn) else None, + is_container=config.is_container, ) super().__init__(generic_config, ctx)