Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingestion): Add test_connection methods for important sources #9334

Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ def _check_oauth_config(oauth_config: Optional[OAuthConfiguration]) -> None:
"'oauth_config' is none but should be set when using OAUTH_AUTHENTICATOR authentication"
)
if oauth_config.use_certificate is True:
if oauth_config.provider == OAuthIdentityProvider.OKTA.value:
if oauth_config.provider == OAuthIdentityProvider.OKTA:
raise ValueError(
"Certificate authentication is not supported for Okta."
)
Expand Down
94 changes: 54 additions & 40 deletions metadata-ingestion/tests/integration/dbt/test_dbt.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,18 @@
resolve_athena_modified_type,
resolve_trino_modified_type,
)
from tests.test_helpers import mce_helpers
from tests.test_helpers import mce_helpers, test_connection_helpers

FROZEN_TIME = "2022-02-03 07:00:00"
GMS_PORT = 8080
GMS_SERVER = f"http://localhost:{GMS_PORT}"


@pytest.fixture(scope="module")
def test_resources_dir(pytestconfig):
return pytestconfig.rootpath / "tests/integration/dbt"


@dataclass
class DbtTestConfig:
run_id: str
Expand Down Expand Up @@ -195,7 +200,14 @@ def set_paths(
)
@pytest.mark.integration
@freeze_time(FROZEN_TIME)
def test_dbt_ingest(dbt_test_config, pytestconfig, tmp_path, mock_time, requests_mock):
def test_dbt_ingest(
dbt_test_config,
test_resources_dir,
pytestconfig,
tmp_path,
mock_time,
requests_mock,
):
config: DbtTestConfig = dbt_test_config
test_resources_dir = pytestconfig.rootpath / "tests/integration/dbt"

Expand Down Expand Up @@ -233,44 +245,48 @@ def test_dbt_ingest(dbt_test_config, pytestconfig, tmp_path, mock_time, requests
)


@pytest.mark.parametrize(
"config_dict, is_success",
[
(
{
"manifest_path": "dbt_manifest.json",
"catalog_path": "dbt_catalog.json",
"target_platform": "postgres",
},
True,
),
(
{
"manifest_path": "dbt_manifest.json",
"catalog_path": "dbt_catalog-this-file-does-not-exist.json",
"target_platform": "postgres",
},
False,
),
],
)
@pytest.mark.integration
@freeze_time(FROZEN_TIME)
def test_dbt_test_connection_success(pytestconfig):
test_resources_dir = pytestconfig.rootpath / "tests/integration/dbt"
config = {
"manifest_path": str((test_resources_dir / "dbt_manifest.json").resolve()),
"catalog_path": str((test_resources_dir / "dbt_catalog.json").resolve()),
"target_platform": "postgres",
}
report = DBTCoreSource.test_connection(config)
assert report is not None
assert report.basic_connectivity
assert report.basic_connectivity.capable
assert report.basic_connectivity.failure_reason is None


@pytest.mark.integration
@freeze_time(FROZEN_TIME)
def test_dbt_test_connection_failure(pytestconfig):
test_resources_dir = pytestconfig.rootpath / "tests/integration/dbt"
config = {
"manifest_path": str((test_resources_dir / "dbt_manifest.json").resolve()),
"catalog_path": str((test_resources_dir / "dbt_cat.json").resolve()),
"target_platform": "postgres",
}
report = DBTCoreSource.test_connection(config)
assert report is not None
assert report.basic_connectivity
assert not report.basic_connectivity.capable
assert report.basic_connectivity.failure_reason
assert "No such file or directory" in report.basic_connectivity.failure_reason
def test_dbt_test_connection(test_resources_dir, config_dict, is_success):
config_dict["manifest_path"] = str(
(test_resources_dir / config_dict["manifest_path"]).resolve()
)
config_dict["catalog_path"] = str(
(test_resources_dir / config_dict["catalog_path"]).resolve()
)
report = test_connection_helpers.run_test_connection(DBTCoreSource, config_dict)
if is_success:
test_connection_helpers.assert_basic_connectivity_success(report)
else:
test_connection_helpers.assert_basic_connectivity_failure(
report, "No such file or directory"
)


@pytest.mark.integration
@freeze_time(FROZEN_TIME)
def test_dbt_tests(pytestconfig, tmp_path, mock_time, **kwargs):
test_resources_dir = pytestconfig.rootpath / "tests/integration/dbt"

def test_dbt_tests(test_resources_dir, pytestconfig, tmp_path, mock_time, **kwargs):
# Run the metadata ingestion pipeline.
output_file = tmp_path / "dbt_test_events.json"
golden_path = test_resources_dir / "dbt_test_events_golden.json"
Expand Down Expand Up @@ -373,9 +389,9 @@ def test_resolve_athena_modified_type(data_type, expected_data_type):

@pytest.mark.integration
@freeze_time(FROZEN_TIME)
def test_dbt_tests_only_assertions(pytestconfig, tmp_path, mock_time, **kwargs):
test_resources_dir = pytestconfig.rootpath / "tests/integration/dbt"

def test_dbt_tests_only_assertions(
test_resources_dir, pytestconfig, tmp_path, mock_time, **kwargs
):
# Run the metadata ingestion pipeline.
output_file = tmp_path / "test_only_assertions.json"

Expand Down Expand Up @@ -451,10 +467,8 @@ def test_dbt_tests_only_assertions(pytestconfig, tmp_path, mock_time, **kwargs):
@pytest.mark.integration
@freeze_time(FROZEN_TIME)
def test_dbt_only_test_definitions_and_results(
pytestconfig, tmp_path, mock_time, **kwargs
test_resources_dir, pytestconfig, tmp_path, mock_time, **kwargs
):
test_resources_dir = pytestconfig.rootpath / "tests/integration/dbt"

# Run the metadata ingestion pipeline.
output_file = tmp_path / "test_only_definitions_and_assertions.json"

Expand Down
98 changes: 50 additions & 48 deletions metadata-ingestion/tests/integration/kafka/test_kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,20 @@

from datahub.ingestion.api.source import SourceCapability
from datahub.ingestion.source.kafka import KafkaSource
from tests.test_helpers import mce_helpers
from tests.test_helpers import mce_helpers, test_connection_helpers
from tests.test_helpers.click_helpers import run_datahub_cmd
from tests.test_helpers.docker_helpers import wait_for_port

FROZEN_TIME = "2020-04-14 07:00:00"


@pytest.fixture(scope="module")
def mock_kafka_service(docker_compose_runner, pytestconfig):
test_resources_dir = pytestconfig.rootpath / "tests/integration/kafka"
def test_resources_dir(pytestconfig):
return pytestconfig.rootpath / "tests/integration/kafka"


@pytest.fixture(scope="module")
def mock_kafka_service(docker_compose_runner, test_resources_dir):
with docker_compose_runner(
test_resources_dir / "docker-compose.yml", "kafka", cleanup=False
) as docker_services:
Expand All @@ -37,9 +40,9 @@ def mock_kafka_service(docker_compose_runner, pytestconfig):

@freeze_time(FROZEN_TIME)
@pytest.mark.integration
def test_kafka_ingest(mock_kafka_service, pytestconfig, tmp_path, mock_time):
test_resources_dir = pytestconfig.rootpath / "tests/integration/kafka"

def test_kafka_ingest(
mock_kafka_service, test_resources_dir, pytestconfig, tmp_path, mock_time
):
# Run the metadata ingestion pipeline.
config_file = (test_resources_dir / "kafka_to_file.yml").resolve()
run_datahub_cmd(["ingest", "-c", f"{config_file}"], tmp_path=tmp_path)
Expand All @@ -53,47 +56,46 @@ def test_kafka_ingest(mock_kafka_service, pytestconfig, tmp_path, mock_time):
)


@pytest.mark.parametrize(
"config_dict, is_success",
[
(
{
"connection": {
"bootstrap": "localhost:29092",
"schema_registry_url": "http://localhost:28081",
},
},
True,
),
(
{
"connection": {
"bootstrap": "localhost:2909",
"schema_registry_url": "http://localhost:2808",
},
},
False,
),
],
)
@pytest.mark.integration
@freeze_time(FROZEN_TIME)
def test_kafka_test_connection_success(mock_kafka_service):
config = {
"connection": {
"bootstrap": "localhost:29092",
"schema_registry_url": "http://localhost:28081",
},
}
report = KafkaSource.test_connection(config)
assert report is not None
assert report.basic_connectivity
assert report.basic_connectivity.capable
assert report.basic_connectivity.failure_reason is None
assert report.capability_report
assert report.capability_report[SourceCapability.SCHEMA_METADATA].capable
assert (
report.capability_report[SourceCapability.SCHEMA_METADATA].failure_reason
is None
)


@pytest.mark.integration
@freeze_time(FROZEN_TIME)
def test_kafka_test_connection_failure(mock_kafka_service):
config = {
"connection": {
"bootstrap": "localhost:2909",
"schema_registry_url": "http://localhost:2808",
},
}
report = KafkaSource.test_connection(config)
assert report is not None
assert report.basic_connectivity
assert not report.basic_connectivity.capable
assert report.basic_connectivity.failure_reason
assert "Failed to get metadata" in report.basic_connectivity.failure_reason
assert report.capability_report
assert not report.capability_report[SourceCapability.SCHEMA_METADATA].capable
failure_reason = report.capability_report[
SourceCapability.SCHEMA_METADATA
].failure_reason
assert failure_reason
assert "Failed to establish a new connection" in failure_reason
def test_kafka_test_connection(mock_kafka_service, config_dict, is_success):
report = test_connection_helpers.run_test_connection(KafkaSource, config_dict)
if is_success:
test_connection_helpers.assert_basic_connectivity_success(report)
test_connection_helpers.assert_capability_report(
capability_report=report.capability_report,
success_capabilities=[SourceCapability.SCHEMA_METADATA],
)
else:
test_connection_helpers.assert_basic_connectivity_failure(
report, "Failed to get metadata"
)
test_connection_helpers.assert_capability_report(
capability_report=report.capability_report,
failure_capabilities={
SourceCapability.SCHEMA_METADATA: "Failed to establish a new connection"
},
)
38 changes: 37 additions & 1 deletion metadata-ingestion/tests/integration/mysql/test_mysql.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
import pytest
from freezegun import freeze_time

from tests.test_helpers import mce_helpers
from datahub.ingestion.source.sql.mysql import MySQLSource
from tests.test_helpers import mce_helpers, test_connection_helpers
from tests.test_helpers.click_helpers import run_datahub_cmd
from tests.test_helpers.docker_helpers import wait_for_port

Expand Down Expand Up @@ -75,3 +76,38 @@ def test_mysql_ingest_no_db(
output_path=tmp_path / "mysql_mces.json",
golden_path=test_resources_dir / golden_file,
)


@pytest.mark.parametrize(
"config_dict, is_success",
[
(
{
"host_port": "localhost:53307",
"database": "northwind",
"username": "root",
"password": "example",
},
True,
),
(
{
"host_port": "localhost:5330",
"database": "wrong_db",
"username": "wrong_user",
"password": "wrong_pass",
},
False,
),
],
)
@freeze_time(FROZEN_TIME)
@pytest.mark.integration
def test_mysql_test_connection(mysql_runner, config_dict, is_success):
report = test_connection_helpers.run_test_connection(MySQLSource, config_dict)
if is_success:
test_connection_helpers.assert_basic_connectivity_success(report)
else:
test_connection_helpers.assert_basic_connectivity_failure(
report, "Connection refused"
)
24 changes: 10 additions & 14 deletions metadata-ingestion/tests/integration/powerbi/test_powerbi.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
Report,
Workspace,
)
from tests.test_helpers import mce_helpers
from tests.test_helpers import mce_helpers, test_connection_helpers

pytestmark = pytest.mark.integration_batch_2
FROZEN_TIME = "2022-02-03 07:00:00"
Expand Down Expand Up @@ -667,24 +667,20 @@ def test_powerbi_ingest(mock_msal, pytestconfig, tmp_path, mock_time, requests_m
@mock.patch("msal.ConfidentialClientApplication", side_effect=mock_msal_cca)
@pytest.mark.integration
def test_powerbi_test_connection_success(mock_msal):
report = PowerBiDashboardSource.test_connection(default_source_config())
assert report is not None
assert report.basic_connectivity
assert report.basic_connectivity.capable
assert report.basic_connectivity.failure_reason is None
report = test_connection_helpers.run_test_connection(
PowerBiDashboardSource, default_source_config()
)
test_connection_helpers.assert_basic_connectivity_success(report)


@freeze_time(FROZEN_TIME)
@pytest.mark.integration
def test_powerbi_test_connection_failure():
report = PowerBiDashboardSource.test_connection(default_source_config())
assert report is not None
assert report.basic_connectivity
assert not report.basic_connectivity.capable
assert report.basic_connectivity.failure_reason
assert (
"Unable to get authority configuration"
in report.basic_connectivity.failure_reason
report = test_connection_helpers.run_test_connection(
PowerBiDashboardSource, default_source_config()
)
test_connection_helpers.assert_basic_connectivity_failure(
report, "Unable to get authority configuration"
)


Expand Down
Loading
Loading