Skip to content

Commit

Permalink
[2/n][dagster-fivetran] Update DagsterFivetranTranslator and related …
Browse files Browse the repository at this point in the history
…classes for rework (#25751)

## Summary & Motivation

~~Builds out a very barebones translator class for the new version of
the Fivetran integration.~~

~~The implementation for this translator will be inspired by the
`DagsterFivetranTranslator` introduced in
#25557, but a new
implementation is required to leverage the workspace context and
state-backed definitions, which is incompatible with the current
translator and way of building assets.~~

Edit after Ben's comment
[here](#25751 (comment)):

Move things around under translator.py. This PR leverages the
`DagsterFivetranTranslator` introduced in introduced in
#25557.
`FivetranWorkspaceData` will implement the method
`to_fivetran_connector_table_props_data` in a subsequent PR, that will
map raw connector and destination data fetched using the Fivetran API
into `FivetranConnectorTableProps` objects, that are compatible with the
translator. This process will match what we currently do.

## How I Tested These Changes

Tests will be added in subsequent PRs.
  • Loading branch information
maximearmstrong authored Nov 8, 2024
1 parent 0a0c77b commit b3e4e7c
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 87 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from dagster._core.libraries import DagsterLibraryRegistry

from dagster_fivetran.asset_defs import (
DagsterFivetranTranslator as DagsterFivetranTranslator,
build_fivetran_assets as build_fivetran_assets,
load_assets_from_fivetran_instance as load_assets_from_fivetran_instance,
)
Expand All @@ -14,6 +13,7 @@
FivetranWorkspace as FivetranWorkspace,
fivetran_resource as fivetran_resource,
)
from dagster_fivetran.translator import DagsterFivetranTranslator as DagsterFivetranTranslator
from dagster_fivetran.types import FivetranOutput as FivetranOutput
from dagster_fivetran.version import __version__ as __version__

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
from dagster._utils.log import get_dagster_logger

from dagster_fivetran.resources import DEFAULT_POLL_INTERVAL, FivetranResource
from dagster_fivetran.translator import DagsterFivetranTranslator, FivetranConnectorTableProps
from dagster_fivetran.utils import (
generate_materializations,
get_fivetran_connector_url,
Expand All @@ -53,50 +54,6 @@
logger = get_dagster_logger()


class FivetranConnectorTableProps(NamedTuple):
table: str
connector_id: str
name: str
connector_url: str
schemas: Mapping[str, Any]
database: Optional[str]
service: Optional[str]


class DagsterFivetranTranslator:
def get_asset_key(self, props: FivetranConnectorTableProps) -> AssetKey:
"""Get the AssetKey for a table synced by a Fivetran connector."""
return AssetKey(props.table.split("."))

def get_asset_spec(self, props: FivetranConnectorTableProps) -> AssetSpec:
"""Get the AssetSpec for a table synced by a Fivetran connector."""
schema_name, table_name = props.table.split(".")
schema_entry = next(
schema
for schema in props.schemas["schemas"].values()
if schema["name_in_destination"] == schema_name
)
table_entry = next(
table_entry
for table_entry in schema_entry["tables"].values()
if table_entry["name_in_destination"] == table_name
)

metadata = metadata_for_table(
table_entry,
props.connector_url,
database=props.database,
schema=schema_name,
table=table_name,
)

return AssetSpec(
key=self.get_asset_key(props),
metadata=metadata,
kinds={"fivetran", *({props.service} if props.service else set())},
)


def _fetch_and_attach_col_metadata(
fivetran_resource: FivetranResource, connector_id: str, materialization: AssetMaterialization
) -> AssetMaterialization:
Expand Down Expand Up @@ -165,9 +122,9 @@ def _build_fivetran_assets(
tracked_asset_keys = {
table: AssetKey([*asset_key_prefix, *table.split(".")])
if not translator_instance or not connection_metadata
else translator_instance.get_asset_key(
else translator_instance.get_asset_spec(
FivetranConnectorTableProps(table=table, **connection_metadata._asdict())
)
).key
for table in destination_tables
}
user_facing_asset_keys = table_to_asset_key_map or tracked_asset_keys
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import logging
import os
import time
from enum import Enum
from typing import Any, Mapping, Optional, Sequence, Tuple
from urllib.parse import urljoin

Expand All @@ -20,14 +19,13 @@
from dagster._annotations import experimental
from dagster._config.pythonic_config import ConfigurableResource
from dagster._core.definitions.resource_definition import dagster_maintained_resource
from dagster._record import record
from dagster._serdes.serdes import whitelist_for_serdes
from dagster._utils.cached_method import cached_method
from dagster._vendored.dateutil import parser
from pydantic import Field, PrivateAttr
from requests.auth import HTTPBasicAuth
from requests.exceptions import RequestException

from dagster_fivetran.translator import FivetranWorkspaceData
from dagster_fivetran.types import FivetranOutput
from dagster_fivetran.utils import get_fivetran_connector_url, get_fivetran_logs_url

Expand Down Expand Up @@ -445,38 +443,6 @@ def my_fivetran_job():
# ------------------
# Reworked resources
# ------------------
class FivetranContentType(Enum):
"""Enum representing each object in Fivetran's ontology."""

CONNECTOR = "connector"
DESTINATION = "destination"


@whitelist_for_serdes
@record
class FivetranContentData:
"""A record representing a piece of content in a Fivetran workspace.
Includes the object's type and data as returned from the API.
"""

content_type: FivetranContentType
properties: Mapping[str, Any]


@record
class FivetranWorkspaceData:
"""A record representing all content in a Fivetran workspace.
Provided as context for the translator so that it can resolve dependencies between content.
"""

connectors_by_id: Mapping[str, FivetranContentData]
destinations_by_id: Mapping[str, FivetranContentData]

@classmethod
def from_content_data(
cls, content_data: Sequence[FivetranContentData]
) -> "FivetranWorkspaceData":
raise NotImplementedError()


@experimental
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
from enum import Enum
from typing import Any, Mapping, NamedTuple, Optional, Sequence

from dagster._core.definitions.asset_key import AssetKey
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._record import record
from dagster._serdes.serdes import whitelist_for_serdes

from dagster_fivetran.utils import metadata_for_table


class FivetranConnectorTableProps(NamedTuple):
table: str
connector_id: str
name: str
connector_url: str
schemas: Mapping[str, Any]
database: Optional[str]
service: Optional[str]


class FivetranContentType(Enum):
"""Enum representing each object in Fivetran's ontology."""

CONNECTOR = "connector"
DESTINATION = "destination"


@whitelist_for_serdes
@record
class FivetranContentData:
"""A record representing a piece of content in a Fivetran workspace.
Includes the object's type and data as returned from the API.
"""

content_type: FivetranContentType
properties: Mapping[str, Any]


@record
class FivetranWorkspaceData:
"""A record representing all content in a Fivetran workspace.
Provided as context for the translator so that it can resolve dependencies between content.
"""

connectors_by_id: Mapping[str, FivetranContentData]
destinations_by_id: Mapping[str, FivetranContentData]

@classmethod
def from_content_data(
cls, content_data: Sequence[FivetranContentData]
) -> "FivetranWorkspaceData":
raise NotImplementedError()

def to_fivetran_connector_table_props_data(self) -> Sequence[FivetranConnectorTableProps]:
"""Method that converts a `FivetranWorkspaceData` object
to a collection of `FivetranConnectorTableProps` objects.
"""
raise NotImplementedError()


class DagsterFivetranTranslator:
"""Translator class which converts a `FivetranConnectorTableProps` object into AssetSpecs.
Subclass this class to implement custom logic for each type of Fivetran content.
"""

def get_asset_spec(self, props: FivetranConnectorTableProps) -> AssetSpec:
"""Get the AssetSpec for a table synced by a Fivetran connector."""
schema_name, table_name = props.table.split(".")
schema_entry = next(
schema
for schema in props.schemas["schemas"].values()
if schema["name_in_destination"] == schema_name
)
table_entry = next(
table_entry
for table_entry in schema_entry["tables"].values()
if table_entry["name_in_destination"] == table_name
)

metadata = metadata_for_table(
table_entry,
props.connector_url,
database=props.database,
schema=schema_name,
table=table_name,
)

return AssetSpec(
key=AssetKey(props.table.split(".")),
metadata=metadata,
kinds={"fivetran", *({props.service} if props.service else set())},
)
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
asset,
io_manager,
)
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._core.definitions.asset_spec import AssetSpec, replace_attributes
from dagster._core.definitions.materialize import materialize
from dagster._core.definitions.metadata.metadata_value import MetadataValue
from dagster._core.definitions.metadata.table import TableColumn, TableSchema
Expand Down Expand Up @@ -269,12 +269,13 @@ def downstream_asset(xyz):


class CustomDagsterFivetranTranslator(DagsterFivetranTranslator):
def get_asset_key(self, props: FivetranConnectorTableProps) -> AssetKey:
return super().get_asset_key(props).with_prefix("my_prefix")

def get_asset_spec(self, props: FivetranConnectorTableProps) -> AssetSpec:
asset_spec = super().get_asset_spec(props)
return asset_spec._replace(metadata={"foo": "bar", **asset_spec.metadata})
return replace_attributes(
asset_spec,
key=asset_spec.key.with_prefix("my_prefix"),
metadata={"foo": "bar", **asset_spec.metadata},
)


@responses.activate
Expand Down

0 comments on commit b3e4e7c

Please sign in to comment.