Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[2/n][dagster-fivetran] Update DagsterFivetranTranslator and related classes for rework #25751

Merged
merged 6 commits into from
Nov 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from dagster._core.libraries import DagsterLibraryRegistry

from dagster_fivetran.asset_defs import (
DagsterFivetranTranslator as DagsterFivetranTranslator,
build_fivetran_assets as build_fivetran_assets,
load_assets_from_fivetran_instance as load_assets_from_fivetran_instance,
)
Expand All @@ -14,6 +13,7 @@
FivetranWorkspace as FivetranWorkspace,
fivetran_resource as fivetran_resource,
)
from dagster_fivetran.translator import DagsterFivetranTranslator as DagsterFivetranTranslator
from dagster_fivetran.types import FivetranOutput as FivetranOutput
from dagster_fivetran.version import __version__ as __version__

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
from dagster._utils.log import get_dagster_logger

from dagster_fivetran.resources import DEFAULT_POLL_INTERVAL, FivetranResource
from dagster_fivetran.translator import DagsterFivetranTranslator, FivetranConnectorTableProps
from dagster_fivetran.utils import (
generate_materializations,
get_fivetran_connector_url,
Expand All @@ -53,50 +54,6 @@
logger = get_dagster_logger()


class FivetranConnectorTableProps(NamedTuple):
table: str
connector_id: str
name: str
connector_url: str
schemas: Mapping[str, Any]
database: Optional[str]
service: Optional[str]


class DagsterFivetranTranslator:
def get_asset_key(self, props: FivetranConnectorTableProps) -> AssetKey:
"""Get the AssetKey for a table synced by a Fivetran connector."""
return AssetKey(props.table.split("."))

def get_asset_spec(self, props: FivetranConnectorTableProps) -> AssetSpec:
"""Get the AssetSpec for a table synced by a Fivetran connector."""
schema_name, table_name = props.table.split(".")
schema_entry = next(
schema
for schema in props.schemas["schemas"].values()
if schema["name_in_destination"] == schema_name
)
table_entry = next(
table_entry
for table_entry in schema_entry["tables"].values()
if table_entry["name_in_destination"] == table_name
)

metadata = metadata_for_table(
table_entry,
props.connector_url,
database=props.database,
schema=schema_name,
table=table_name,
)

return AssetSpec(
key=self.get_asset_key(props),
metadata=metadata,
kinds={"fivetran", *({props.service} if props.service else set())},
)


def _fetch_and_attach_col_metadata(
fivetran_resource: FivetranResource, connector_id: str, materialization: AssetMaterialization
) -> AssetMaterialization:
Expand Down Expand Up @@ -165,9 +122,9 @@ def _build_fivetran_assets(
tracked_asset_keys = {
table: AssetKey([*asset_key_prefix, *table.split(".")])
if not translator_instance or not connection_metadata
else translator_instance.get_asset_key(
else translator_instance.get_asset_spec(
FivetranConnectorTableProps(table=table, **connection_metadata._asdict())
)
).key
for table in destination_tables
}
user_facing_asset_keys = table_to_asset_key_map or tracked_asset_keys
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import logging
import os
import time
from enum import Enum
from typing import Any, Mapping, Optional, Sequence, Tuple
from urllib.parse import urljoin

Expand All @@ -20,14 +19,13 @@
from dagster._annotations import experimental
from dagster._config.pythonic_config import ConfigurableResource
from dagster._core.definitions.resource_definition import dagster_maintained_resource
from dagster._record import record
from dagster._serdes.serdes import whitelist_for_serdes
from dagster._utils.cached_method import cached_method
from dagster._vendored.dateutil import parser
from pydantic import Field, PrivateAttr
from requests.auth import HTTPBasicAuth
from requests.exceptions import RequestException

from dagster_fivetran.translator import FivetranWorkspaceData
from dagster_fivetran.types import FivetranOutput
from dagster_fivetran.utils import get_fivetran_connector_url, get_fivetran_logs_url

Expand Down Expand Up @@ -445,38 +443,6 @@ def my_fivetran_job():
# ------------------
# Reworked resources
# ------------------
class FivetranContentType(Enum):
"""Enum representing each object in Fivetran's ontology."""

CONNECTOR = "connector"
DESTINATION = "destination"


@whitelist_for_serdes
@record
class FivetranContentData:
"""A record representing a piece of content in a Fivetran workspace.
Includes the object's type and data as returned from the API.
"""

content_type: FivetranContentType
properties: Mapping[str, Any]


@record
class FivetranWorkspaceData:
"""A record representing all content in a Fivetran workspace.
Provided as context for the translator so that it can resolve dependencies between content.
"""

connectors_by_id: Mapping[str, FivetranContentData]
destinations_by_id: Mapping[str, FivetranContentData]

@classmethod
def from_content_data(
cls, content_data: Sequence[FivetranContentData]
) -> "FivetranWorkspaceData":
raise NotImplementedError()


@experimental
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
from enum import Enum
from typing import Any, Mapping, NamedTuple, Optional, Sequence

from dagster._core.definitions.asset_key import AssetKey
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._record import record
from dagster._serdes.serdes import whitelist_for_serdes

from dagster_fivetran.utils import metadata_for_table


class FivetranConnectorTableProps(NamedTuple):
table: str
connector_id: str
name: str
connector_url: str
schemas: Mapping[str, Any]
database: Optional[str]
service: Optional[str]


class FivetranContentType(Enum):
"""Enum representing each object in Fivetran's ontology."""

CONNECTOR = "connector"
DESTINATION = "destination"


@whitelist_for_serdes
@record
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still want/need this class, FivetranContentType etc if it's not getting fed into the translator?

In particular I think having the content type isn't necessary, since we aren't exposing this data to the user directly

Copy link
Contributor Author

@maximearmstrong maximearmstrong Nov 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's worth keeping FivetranContentType for a few reasons:

Overall, I'm in favor of keeping this to keep our design pattern for XWorkspaceData and fetch_x_workspace_data as consistent as possible across integrations.

class FivetranContentData:
"""A record representing a piece of content in a Fivetran workspace.
Includes the object's type and data as returned from the API.
"""

content_type: FivetranContentType
properties: Mapping[str, Any]


@record
class FivetranWorkspaceData:
"""A record representing all content in a Fivetran workspace.
Provided as context for the translator so that it can resolve dependencies between content.
"""

connectors_by_id: Mapping[str, FivetranContentData]
destinations_by_id: Mapping[str, FivetranContentData]

@classmethod
def from_content_data(
cls, content_data: Sequence[FivetranContentData]
) -> "FivetranWorkspaceData":
raise NotImplementedError()

def to_fivetran_connector_table_props_data(self) -> Sequence[FivetranConnectorTableProps]:
"""Method that converts a `FivetranWorkspaceData` object
to a collection of `FivetranConnectorTableProps` objects.
"""
raise NotImplementedError()


class DagsterFivetranTranslator:
"""Translator class which converts a `FivetranConnectorTableProps` object into AssetSpecs.
Subclass this class to implement custom logic for each type of Fivetran content.
"""

def get_asset_spec(self, props: FivetranConnectorTableProps) -> AssetSpec:
"""Get the AssetSpec for a table synced by a Fivetran connector."""
schema_name, table_name = props.table.split(".")
schema_entry = next(
schema
for schema in props.schemas["schemas"].values()
if schema["name_in_destination"] == schema_name
)
table_entry = next(
table_entry
for table_entry in schema_entry["tables"].values()
if table_entry["name_in_destination"] == table_name
)

metadata = metadata_for_table(
table_entry,
props.connector_url,
database=props.database,
schema=schema_name,
table=table_name,
)

return AssetSpec(
key=AssetKey(props.table.split(".")),
metadata=metadata,
kinds={"fivetran", *({props.service} if props.service else set())},
)
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
asset,
io_manager,
)
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._core.definitions.asset_spec import AssetSpec, replace_attributes
from dagster._core.definitions.materialize import materialize
from dagster._core.definitions.metadata.metadata_value import MetadataValue
from dagster._core.definitions.metadata.table import TableColumn, TableSchema
Expand Down Expand Up @@ -269,12 +269,13 @@ def downstream_asset(xyz):


class CustomDagsterFivetranTranslator(DagsterFivetranTranslator):
def get_asset_key(self, props: FivetranConnectorTableProps) -> AssetKey:
return super().get_asset_key(props).with_prefix("my_prefix")

def get_asset_spec(self, props: FivetranConnectorTableProps) -> AssetSpec:
asset_spec = super().get_asset_spec(props)
return asset_spec._replace(metadata={"foo": "bar", **asset_spec.metadata})
return replace_attributes(
asset_spec,
key=asset_spec.key.with_prefix("my_prefix"),
metadata={"foo": "bar", **asset_spec.metadata},
)


@responses.activate
Expand Down