Skip to content

Commit

Permalink
feat(ingest): verify dynamic registry types at runtime (datahub-proje…
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored and topwebtek7 committed Apr 2, 2021
1 parent 1adcc83 commit cefc22a
Show file tree
Hide file tree
Showing 10 changed files with 102 additions and 4 deletions.
4 changes: 3 additions & 1 deletion metadata-ingestion/setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ sections = FUTURE,STDLIB,THIRDPARTY,FIRSTPARTY,LOCALFOLDER
skip_glob=src/datahub/metadata

[tool:pytest]
addopts = --cov src --cov-report term --cov-config setup.cfg
addopts = --cov src --cov-report term --cov-config setup.cfg --strict-markers
markers =
slow: marks tests as slow (deselect with '-m "not slow"')
testpaths =
tests/unit
tests/integration
Expand Down
1 change: 1 addition & 0 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ def get_long_description():
"typing_extensions>=3.7.4; python_version < '3.8'",
"mypy_extensions>=0.4.3",
# Actual dependencies.
"typing-inspect",
"pydantic>=1.5.1",
}

Expand Down
4 changes: 4 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/api/closeable.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
from abc import abstractmethod


class Closeable:
@abstractmethod
def close(self):
pass
20 changes: 17 additions & 3 deletions metadata-ingestion/src/datahub/ingestion/api/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from typing import Dict, Generic, Type, TypeVar, Union

import pkg_resources
import typing_inspect

from datahub.configuration.common import ConfigurationError

Expand All @@ -13,6 +14,20 @@ class Registry(Generic[T]):
def __init__(self):
self._mapping: Dict[str, Union[Type[T], Exception]] = {}

def _get_registered_type(self) -> Type[T]:
cls = typing_inspect.get_generic_type(self)
tp = typing_inspect.get_args(cls)[0]
return tp

def _check_cls(self, cls: Type[T]):
if inspect.isabstract(cls):
raise ValueError(
f"cannot register an abstract type in the registry; got {cls}"
)
super_cls = self._get_registered_type()
if not issubclass(cls, super_cls):
raise ValueError(f"must be derived from {super_cls}; got {cls}")

def _register(self, key: str, tp: Union[Type[T], Exception]) -> None:
if key in self._mapping:
raise KeyError(f"key already in use - {key}")
Expand All @@ -21,8 +36,7 @@ def _register(self, key: str, tp: Union[Type[T], Exception]) -> None:
self._mapping[key] = tp

def register(self, key: str, cls: Type[T]) -> None:
if inspect.isabstract(cls):
raise ValueError("cannot register an abstract type in the registry")
self._check_cls(cls)
self._register(key, cls)

def register_disabled(self, key: str, reason: Exception) -> None:
Expand All @@ -35,7 +49,6 @@ def is_enabled(self, key: str) -> bool:
def load(self, entry_point_key: str) -> None:
for entry_point in pkg_resources.iter_entry_points(entry_point_key):
name = entry_point.name
plugin_class = None

try:
plugin_class = entry_point.load()
Expand All @@ -55,6 +68,7 @@ def get(self, key: str) -> Type[T]:
# to load it dynamically.
module_name, class_name = key.rsplit(".", 1)
MyClass = getattr(importlib.import_module(module_name), class_name)
self._check_cls(MyClass)
return MyClass

if key not in self._mapping:
Expand Down
2 changes: 2 additions & 0 deletions metadata-ingestion/tests/integration/ldap/test_ldap.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import mce_helpers
import pytest

from datahub.ingestion.run.pipeline import Pipeline


@pytest.mark.slow
def test_ldap_ingest(mysql, pytestconfig, tmp_path, mock_time):
test_resources_dir = pytestconfig.rootpath / "tests/integration/ldap"

Expand Down
2 changes: 2 additions & 0 deletions metadata-ingestion/tests/integration/mongodb/test_mongodb.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import mce_helpers
import pytest

from datahub.ingestion.run.pipeline import Pipeline


@pytest.mark.slow
def test_mongodb_ingest(mongodb, pytestconfig, tmp_path, mock_time):
test_resources_dir = pytestconfig.rootpath / "tests/integration/mongodb"

Expand Down
2 changes: 2 additions & 0 deletions metadata-ingestion/tests/integration/mysql/test_mysql.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import fs_helpers
import mce_helpers
import pytest
from click.testing import CliRunner

from datahub.entrypoints import datahub


@pytest.mark.slow
def test_mysql_ingest(mysql, pytestconfig, tmp_path, mock_time):
test_resources_dir = pytestconfig.rootpath / "tests/integration/mysql"
config_file = (test_resources_dir / "mysql_to_file.yml").resolve()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@

import fs_helpers
import mce_helpers
import pytest
from click.testing import CliRunner

from datahub.entrypoints import datahub


@pytest.mark.slow
def test_mssql_ingest(sql_server, pytestconfig, tmp_path, mock_time):
test_resources_dir = pytestconfig.rootpath / "tests/integration/sql_server"

Expand Down
18 changes: 18 additions & 0 deletions metadata-ingestion/tests/unit/test_check.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from click.testing import CliRunner

from datahub.entrypoints import datahub


def test_cli_help():
runner = CliRunner()
result = runner.invoke(datahub, ["--help"])
assert result.output


def test_check_local_docker():
# This just verifies that it runs without error.
# We don't actually know what environment this will be run in, so
# we can't depend on the output. Eventually, we should mock the docker SDK.
runner = CliRunner()
result = runner.invoke(datahub, ["check", "local-docker"])
assert result.output
51 changes: 51 additions & 0 deletions metadata-ingestion/tests/unit/test_plugin_system.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,61 @@
import pytest
from click.testing import CliRunner

from datahub.configuration.common import ConfigurationError
from datahub.entrypoints import datahub
from datahub.ingestion.api.registry import Registry
from datahub.ingestion.api.sink import Sink
from datahub.ingestion.extractor.extractor_registry import extractor_registry
from datahub.ingestion.sink.console import ConsoleSink
from datahub.ingestion.sink.sink_registry import sink_registry
from datahub.ingestion.source.source_registry import source_registry


@pytest.mark.parametrize(
"registry",
[
source_registry,
sink_registry,
extractor_registry,
],
)
def test_registry_nonempty(registry):
assert len(registry.mapping) > 0


def test_list_all():
# This just verifies that it runs without error.
runner = CliRunner()
result = runner.invoke(datahub, ["ingest-list-plugins"])
assert result.exit_code == 0


def test_registry():
# Make a mini sink registry.
fake_registry = Registry[Sink]()
fake_registry.register("console", ConsoleSink)
fake_registry.register_disabled("disabled", ImportError("disabled sink"))

class DummyClass:
pass

assert len(fake_registry.mapping) > 0
assert fake_registry.is_enabled("console")
assert fake_registry.get("console") == ConsoleSink
assert (
fake_registry.get("datahub.ingestion.sink.console.ConsoleSink") == ConsoleSink
)

with pytest.raises(KeyError, match="key cannot contain '.'"):
fake_registry.register("thisdoesnotexist.otherthing", ConsoleSink)
with pytest.raises(KeyError, match="in use"):
fake_registry.register("console", ConsoleSink)
with pytest.raises(KeyError, match="not find"):
fake_registry.get("thisdoesnotexist")

with pytest.raises(ValueError, match="abstract"):
fake_registry.register("thisdoesnotexist", Sink) # type: ignore
with pytest.raises(ValueError, match="derived"):
fake_registry.register("thisdoesnotexist", DummyClass) # type: ignore
with pytest.raises(ConfigurationError, match="disabled"):
fake_registry.get("disabled")

0 comments on commit cefc22a

Please sign in to comment.