Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingestion/transformer): extend dataset_to_data_product_urns_pattern to support containers #11124

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 32 additions & 1 deletion metadata-ingestion/docs/transformer/dataset_transformer.md
Original file line number Diff line number Diff line change
Expand Up @@ -1207,20 +1207,51 @@ The config, which we’d append to our ingestion recipe YAML, would look like th
| Field | Required | Type | Default | Description |
|---------------------------------------|----------|----------------------|-------------|---------------------------------------------------------------------------------------------|
| `dataset_to_data_product_urns_pattern`| ✅ | map[regx, urn] | | Dataset Entity urn with regular expression and dataproduct urn apply to matching entity urn.|
| `is_container` | | bool | `false` | Whether to also consider a container or not. If true, the data product will be attached to both the dataset and its container. |

Let’s suppose we’d like to append a series of dataproducts with specific datasets as its assets. To do so, we can use the `pattern_add_dataset_dataproduct` module that’s included in the ingestion framework. This will match the regex pattern to `urn` of the dataset and create the data product entity with given urn and matched datasets as its assets.

Let’s suppose we’d like to append a series of data products with specific datasets or their containers as assets. To do so, we can use the pattern_add_dataset_dataproduct module that’s included in the ingestion framework. This module matches a regex pattern to the urn of the dataset and creates a data product entity with the given urn, associating the matched datasets as its assets.

If the is_container field is set to true, the module will not only attach the data product to the matching datasets but will also find and attach the containers associated with those datasets. This means that both the datasets and their containers will be associated with the specified data product.

The config, which we’d append to our ingestion recipe YAML, would look like this:

- Add Product to dataset
```yaml
transformers:
- type: "pattern_add_dataset_dataproduct"
config:
dataset_to_data_product_urns_pattern:
rules:
".*example1.*": "urn:li:dataProduct:first"
".*example2.*": "urn:li:dataProduct:second"
```
- Add Product to dataset container
```yaml
transformers:
- type: "pattern_add_dataset_dataproduct"
config:
is_container: true
dataset_to_data_product_urns_pattern:
rules:
".*example1.*": "urn:li:dataProduct:first"
".*example2.*": "urn:li:dataProduct:second"
```
⚠️ Warning:
When working with two datasets in the same container but with different data products, only one data product can be attached to the container.

For example:
```yaml
transformers:
- type: "pattern_add_dataset_dataproduct"
config:
is_container: true
dataset_to_data_product_urns_pattern:
rules:
".*example1.*": "urn:li:dataProduct:first"
".*example2.*": "urn:li:dataProduct:second"
```
If example1 and example2 are in the same container, only urn:li:dataProduct:first will be added. However, if they are in separate containers, the system works as expected and assigns the correct data product URNs.

## Add Dataset dataProduct
### Config Details
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from datahub.ingestion.transformer.dataset_transformer import (
DatasetDataproductTransformer,
)
from datahub.metadata.schema_classes import MetadataChangeProposalClass
from datahub.metadata.schema_classes import ContainerClass, MetadataChangeProposalClass
from datahub.specific.dataproduct import DataProductPatchBuilder

logger = logging.getLogger(__name__)
Expand All @@ -23,6 +23,8 @@ class AddDatasetDataProductConfig(ConfigModel):

_resolve_data_product_fn = pydantic_resolve_key("get_data_product_to_add")

is_container: bool = False


class AddDatasetDataProduct(DatasetDataproductTransformer):
"""Transformer that adds dataproduct entity for provided dataset as its asset according to a callback function."""
Expand All @@ -49,10 +51,11 @@ def handle_end_of_stream(
self,
) -> List[Union[MetadataChangeProposalWrapper, MetadataChangeProposalClass]]:
data_products: Dict[str, DataProductPatchBuilder] = {}

data_products_container: Dict[str, DataProductPatchBuilder] = {}
logger.debug("Generating dataproducts")
for entity_urn in self.entity_map.keys():
data_product_urn = self.config.get_data_product_to_add(entity_urn)
is_container = self.config.is_container
if data_product_urn:
if data_product_urn not in data_products:
data_products[data_product_urn] = DataProductPatchBuilder(
Expand All @@ -63,11 +66,34 @@ def handle_end_of_stream(
data_product_urn
].add_asset(entity_urn)

if is_container:
assert self.ctx.graph
container_aspect = self.ctx.graph.get_aspect(
entity_urn, aspect_type=ContainerClass
)
if not container_aspect:
continue
container_urn = container_aspect.container
if data_product_urn not in data_products_container:
container_product = DataProductPatchBuilder(
data_product_urn
).add_asset(container_urn)
data_products_container[data_product_urn] = container_product
else:
data_products_container[
data_product_urn
] = data_products_container[data_product_urn].add_asset(
container_urn
)

mcps: List[
Union[MetadataChangeProposalWrapper, MetadataChangeProposalClass]
] = []
for data_product in data_products.values():
mcps.extend(list(data_product.build()))
if is_container:
for data_product in data_products_container.values():
mcps.extend(list(data_product.build()))
return mcps


Expand Down Expand Up @@ -97,6 +123,7 @@ def create(

class PatternDatasetDataProductConfig(ConfigModel):
dataset_to_data_product_urns_pattern: KeyValuePattern = KeyValuePattern.all()
is_container: bool = False

@pydantic.root_validator(pre=True)
def validate_pattern_value(cls, values: Dict) -> Dict:
Expand All @@ -122,6 +149,7 @@ def __init__(self, config: PatternDatasetDataProductConfig, ctx: PipelineContext
)[0]
if dataset_to_data_product.value(dataset_urn)
else None,
is_container=config.is_container,
)
super().__init__(generic_config, ctx)

Expand Down
Loading