Skip to content

Commit

Permalink
Test custom translator + refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
briantu committed Oct 29, 2024
1 parent a0b5b33 commit 1fa4e78
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 102 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import base64
from typing import Any

import pytest
Expand All @@ -13,6 +12,7 @@
asset,
io_manager,
)
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._core.definitions.materialize import materialize
from dagster._core.definitions.metadata.metadata_value import MetadataValue
from dagster._core.definitions.metadata.table import TableColumn, TableSchema
Expand All @@ -23,22 +23,19 @@
from dagster_fivetran.asset_defs import (
DagsterFivetranTranslator,
FivetranConnectionMetadata,
FivetranConnectorTableProps,
load_assets_from_fivetran_instance,
)
from responses import matchers

from dagster_fivetran_tests.utils import (
DEFAULT_CONNECTOR_ID,
DEFAULT_CONNECTOR_ID_2,
get_complex_sample_connector_schema_config,
get_sample_columns_response,
get_sample_connector_response,
get_sample_connectors_response,
get_sample_connectors_response_multiple,
get_sample_destination_details_response,
get_sample_groups_response,
get_sample_sync_response,
get_sample_update_response,
mock_responses,
)


Expand Down Expand Up @@ -83,46 +80,7 @@ def load_input(self, context: InputContext) -> Any:
api_key=EnvVar("FIVETRAN_API_KEY"), api_secret=EnvVar("FIVETRAN_API_SECRET")
)

b64_encoded_auth_str = base64.b64encode(b"some_key:some_secret").decode("utf-8")
expected_auth_header = {"Authorization": f"Basic {b64_encoded_auth_str}"}

responses.add(
method=responses.GET,
url=ft_resource.api_base_url + "groups",
json=get_sample_groups_response(),
status=200,
match=[matchers.header_matcher(expected_auth_header)],
)
responses.add(
method=responses.GET,
url=ft_resource.api_base_url + "destinations/some_group",
json=(get_sample_destination_details_response()),
status=200,
match=[matchers.header_matcher(expected_auth_header)],
)
responses.add(
method=responses.GET,
url=ft_resource.api_base_url + "groups/some_group/connectors",
json=(
get_sample_connectors_response_multiple()
if multiple_connectors
else get_sample_connectors_response()
),
status=200,
match=[matchers.header_matcher(expected_auth_header)],
)

responses.add(
responses.GET,
f"{ft_resource.api_connector_url}{DEFAULT_CONNECTOR_ID}/schemas",
json=get_complex_sample_connector_schema_config(),
)
if multiple_connectors:
responses.add(
responses.GET,
f"{ft_resource.api_connector_url}{DEFAULT_CONNECTOR_ID_2}/schemas",
json=get_complex_sample_connector_schema_config("_xyz1", "_abc"),
)
mock_responses(ft_resource, multiple_connectors)

if connector_to_group_fn:
ft_cacheable_assets = load_assets_from_fivetran_instance(
Expand Down Expand Up @@ -310,83 +268,48 @@ def downstream_asset(xyz):
assert load_calls == [xyz_asset_key]


@responses.activate
def test_load_from_instance_with_translator() -> None:
with environ({"FIVETRAN_API_KEY": "some_key", "FIVETRAN_API_SECRET": "some_secret"}):
load_calls = []
class CustomDagsterFivetranTranslator(DagsterFivetranTranslator):
def get_asset_key(self, props: FivetranConnectorTableProps) -> AssetKey:
return super().get_asset_key(props).with_prefix("my_prefix")

@io_manager
def test_io_manager(_context) -> IOManager:
class TestIOManager(IOManager):
def handle_output(self, context: OutputContext, obj) -> None:
return
def get_asset_spec(self, props: FivetranConnectorTableProps) -> AssetSpec:
asset_spec = super().get_asset_spec(props)
return asset_spec._replace(metadata={"foo": "bar", **asset_spec.metadata})

def load_input(self, context: InputContext) -> Any:
load_calls.append(context.asset_key)
return None

return TestIOManager()

@responses.activate
@pytest.mark.parametrize(
("translator, custom_prefix, custom_metadata"),
[
(DagsterFivetranTranslator, [], {}),
(CustomDagsterFivetranTranslator, ["my_prefix"], {"foo": "bar"}),
],
)
def test_load_from_instance_with_translator(translator, custom_prefix, custom_metadata) -> None:
with environ({"FIVETRAN_API_KEY": "some_key", "FIVETRAN_API_SECRET": "some_secret"}):
ft_resource = FivetranResource(
api_key=EnvVar("FIVETRAN_API_KEY"), api_secret=EnvVar("FIVETRAN_API_SECRET")
)

b64_encoded_auth_str = base64.b64encode(b"some_key:some_secret").decode("utf-8")
expected_auth_header = {"Authorization": f"Basic {b64_encoded_auth_str}"}

responses.add(
method=responses.GET,
url=ft_resource.api_base_url + "groups",
json=get_sample_groups_response(),
status=200,
match=[matchers.header_matcher(expected_auth_header)],
)
responses.add(
method=responses.GET,
url=ft_resource.api_base_url + "destinations/some_group",
json=(get_sample_destination_details_response()),
status=200,
match=[matchers.header_matcher(expected_auth_header)],
)
responses.add(
method=responses.GET,
url=ft_resource.api_base_url + "groups/some_group/connectors",
json=(get_sample_connectors_response()),
status=200,
match=[matchers.header_matcher(expected_auth_header)],
)

responses.add(
responses.GET,
f"{ft_resource.api_connector_url}{DEFAULT_CONNECTOR_ID}/schemas",
json=get_complex_sample_connector_schema_config(),
)
mock_responses(ft_resource)

ft_cacheable_assets = load_assets_from_fivetran_instance(
ft_resource,
poll_interval=10,
poll_timeout=600,
translator=DagsterFivetranTranslator,
translator=translator,
)
ft_assets = ft_cacheable_assets.build_definitions(
ft_cacheable_assets.compute_cacheable_data()
)
ft_assets = with_resources(ft_assets, {"test_io_manager": test_io_manager})

# Create set of expected asset keys
tables = {
AssetKey(["xyz1", "abc2"]),
AssetKey(["xyz1", "abc1"]),
AssetKey(["abc", "xyz"]),
AssetKey([*custom_prefix, "xyz1", "abc2"]),
AssetKey([*custom_prefix, "xyz1", "abc1"]),
AssetKey([*custom_prefix, "abc", "xyz"]),
}

# Set up a downstream asset to consume the xyz output table
xyz_asset_key = AssetKey(["abc", "xyz"])

@asset(ins={"xyz": AssetIn(key=xyz_asset_key)})
def downstream_asset(xyz):
return

# Check schema metadata is added correctly to asset def
assets_def = ft_assets[0]

Expand All @@ -410,6 +333,8 @@ def downstream_asset(xyz):
)
assert has_kind(assets_def.tags_by_key[key], "snowflake")

for key, value in custom_metadata.items():
assert all(metadata[key] == value for metadata in assets_def.metadata_by_key.values())
assert ft_assets[0].keys == tables
assert all(
[ft_assets[0].group_names_by_key.get(t) == ("some_service_some_name") for t in tables]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
import base64

import responses
from dagster._utils.merger import deep_merge_dicts
from responses import matchers

DEFAULT_CONNECTOR_ID = "some_connector"
DEFAULT_CONNECTOR_ID_2 = "some_other_connector"
Expand Down Expand Up @@ -278,3 +282,45 @@ def get_sample_columns_response():
},
},
}


def mock_responses(ft_resource, multiple_connectors=False):
b64_encoded_auth_str = base64.b64encode(b"some_key:some_secret").decode("utf-8")
expected_auth_header = {"Authorization": f"Basic {b64_encoded_auth_str}"}
responses.add(
method=responses.GET,
url=ft_resource.api_base_url + "groups",
json=get_sample_groups_response(),
status=200,
match=[matchers.header_matcher(expected_auth_header)],
)
responses.add(
method=responses.GET,
url=ft_resource.api_base_url + "destinations/some_group",
json=(get_sample_destination_details_response()),
status=200,
match=[matchers.header_matcher(expected_auth_header)],
)
responses.add(
method=responses.GET,
url=ft_resource.api_base_url + "groups/some_group/connectors",
json=(
get_sample_connectors_response_multiple()
if multiple_connectors
else get_sample_connectors_response()
),
status=200,
match=[matchers.header_matcher(expected_auth_header)],
)

responses.add(
responses.GET,
f"{ft_resource.api_connector_url}{DEFAULT_CONNECTOR_ID}/schemas",
json=get_complex_sample_connector_schema_config(),
)
if multiple_connectors:
responses.add(
responses.GET,
f"{ft_resource.api_connector_url}{DEFAULT_CONNECTOR_ID_2}/schemas",
json=get_complex_sample_connector_schema_config("_xyz1", "_abc"),
)

0 comments on commit 1fa4e78

Please sign in to comment.