From 220dfe728ce4a12c81458d0d421ec4d5ba8e2768 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Mon, 12 Jul 2021 11:03:53 -0700 Subject: [PATCH] feat(ingest): support dynamic imports for transfomer methods (#2858) --- .../datahub/configuration/import_resolver.py | 13 ++++++++++ .../src/datahub/ingestion/api/registry.py | 12 +++++++--- .../transformer/add_dataset_ownership.py | 3 +++ .../ingestion/transformer/add_dataset_tags.py | 3 +++ .../tests/unit/test_transform_dataset.py | 24 ++++++++++++++++++- 5 files changed, 51 insertions(+), 4 deletions(-) create mode 100644 metadata-ingestion/src/datahub/configuration/import_resolver.py diff --git a/metadata-ingestion/src/datahub/configuration/import_resolver.py b/metadata-ingestion/src/datahub/configuration/import_resolver.py new file mode 100644 index 0000000000000..f99dfea613c49 --- /dev/null +++ b/metadata-ingestion/src/datahub/configuration/import_resolver.py @@ -0,0 +1,13 @@ +import pydantic + +from datahub.ingestion.api.registry import import_key + + +def _pydantic_resolver(v): + if isinstance(v, str): + return import_key(v) + return v + + +def pydantic_resolve_key(field): + return pydantic.validator(field, pre=True, allow_reuse=True)(_pydantic_resolver) diff --git a/metadata-ingestion/src/datahub/ingestion/api/registry.py b/metadata-ingestion/src/datahub/ingestion/api/registry.py index 71248e60162ef..80908d547c925 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/registry.py +++ b/metadata-ingestion/src/datahub/ingestion/api/registry.py @@ -1,6 +1,6 @@ import importlib import inspect -from typing import Dict, Generic, Type, TypeVar, Union +from typing import Any, Dict, Generic, Type, TypeVar, Union import entrypoints import typing_inspect @@ -11,6 +11,13 @@ T = TypeVar("T") +def import_key(key: str) -> Any: + assert "." in key, "import key must contain a ." + module_name, item_name = key.rsplit(".", 1) + item = getattr(importlib.import_module(module_name), item_name) + return item + + class Registry(Generic[T]): def __init__(self): self._mapping: Dict[str, Union[Type[T], Exception]] = {} @@ -68,8 +75,7 @@ def get(self, key: str) -> Type[T]: if key.find(".") >= 0: # If the key contains a dot, we treat it as a import path and attempt # to load it dynamically. - module_name, class_name = key.rsplit(".", 1) - MyClass = getattr(importlib.import_module(module_name), class_name) + MyClass = import_key(key) self._check_cls(MyClass) return MyClass diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_ownership.py b/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_ownership.py index ca3683a3f264d..6d78d53fd168f 100644 --- a/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_ownership.py +++ b/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_ownership.py @@ -2,6 +2,7 @@ import datahub.emitter.mce_builder as builder from datahub.configuration.common import ConfigModel +from datahub.configuration.import_resolver import pydantic_resolve_key from datahub.ingestion.api.common import PipelineContext, RecordEnvelope from datahub.ingestion.api.transform import Transformer from datahub.metadata.schema_classes import ( @@ -22,6 +23,8 @@ class AddDatasetOwnershipConfig(ConfigModel): ] default_actor: str = builder.make_user_urn("etl") + _resolve_owner_fn = pydantic_resolve_key("get_owners_to_add") + class AddDatasetOwnership(Transformer): """Transformer that adds owners to datasets according to a callback function.""" diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_tags.py b/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_tags.py index 36e102d4346ba..9514e23baf370 100644 --- a/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_tags.py +++ b/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_tags.py @@ -2,6 +2,7 @@ import datahub.emitter.mce_builder as builder from datahub.configuration.common import ConfigModel +from datahub.configuration.import_resolver import pydantic_resolve_key from datahub.ingestion.api.common import PipelineContext, RecordEnvelope from datahub.ingestion.api.transform import Transformer from datahub.metadata.schema_classes import ( @@ -20,6 +21,8 @@ class AddDatasetTagsConfig(ConfigModel): Callable[[DatasetSnapshotClass], List[TagAssociationClass]], ] + _resolve_tag_fn = pydantic_resolve_key("get_tags_to_add") + class AddDatasetTags(Transformer): """Transformer that adds tags to datasets according to a callback function.""" diff --git a/metadata-ingestion/tests/unit/test_transform_dataset.py b/metadata-ingestion/tests/unit/test_transform_dataset.py index 91a945db00416..ea60234919aa9 100644 --- a/metadata-ingestion/tests/unit/test_transform_dataset.py +++ b/metadata-ingestion/tests/unit/test_transform_dataset.py @@ -4,7 +4,10 @@ from datahub.ingestion.transformer.add_dataset_ownership import ( SimpleAddDatasetOwnership, ) -from datahub.ingestion.transformer.add_dataset_tags import SimpleAddDatasetTags +from datahub.ingestion.transformer.add_dataset_tags import ( + AddDatasetTags, + SimpleAddDatasetTags, +) def make_generic_dataset(): @@ -120,3 +123,22 @@ def test_simple_dataset_tags_transformation(mock_time): assert tags_aspect assert len(tags_aspect.tags) == 2 assert tags_aspect.tags[0].tag == builder.make_tag_urn("NeedsDocumentation") + + +def dummy_tag_resolver_method(dataset_snapshot): + return [] + + +def test_import_resolver(): + transformer = AddDatasetTags.create( + { + "get_tags_to_add": "tests.unit.test_transform_dataset.dummy_tag_resolver_method" + }, + PipelineContext(run_id="test-tags"), + ) + output = list( + transformer.transform( + [RecordEnvelope(input, metadata={}) for input in [make_generic_dataset()]] + ) + ) + assert output