Skip to content

Commit

Permalink
[dagster-fivetran] Scaffold DagsterFivetranTranslator for rework
Browse files Browse the repository at this point in the history
  • Loading branch information
maximearmstrong committed Nov 5, 2024
1 parent 8c291f8 commit 7c954b2
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 45 deletions.
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
from dagster_fivetran.v2.resources import FivetranWorkspace as FivetranWorkspace
from dagster_fivetran.v2.translator import DagsterFivetranTranslator as DagsterFivetranTranslator
Original file line number Diff line number Diff line change
@@ -1,57 +1,14 @@
import logging
from enum import Enum
from typing import Any, Mapping, Optional, Sequence
from typing import Any, Mapping, Optional

from dagster import get_dagster_logger
from dagster._annotations import experimental
from dagster._config.pythonic_config import ConfigurableResource
from dagster._record import record
from dagster._utils.cached_method import cached_method
from pydantic import Field, PrivateAttr
from requests.auth import HTTPBasicAuth


class FivetranContentType(Enum):
"""Enum representing each object in Fivetran's ontology."""

CONNECTOR = "connector"
DESTINATION = "destination"


@record
class FivetranContentData:
"""A record representing a piece of content in a Fivetran workspace.
Includes the object's type and data as returned from the API.
"""

content_type: FivetranContentType
properties: Mapping[str, Any]

def to_cached_data(self) -> Mapping[str, Any]:
return {"content_type": self.content_type.value, "properties": self.properties}

@classmethod
def from_cached_data(cls, data: Mapping[Any, Any]) -> "FivetranContentData":
return cls(
content_type=FivetranContentType(data["content_type"]),
properties=data["properties"],
)


@record
class FivetranWorkspaceData:
"""A record representing all content in a Fivetran workspace.
Provided as context for the translator so that it can resolve dependencies between content.
"""

connectors_by_id: Mapping[str, FivetranContentData]
destinations_by_id: Mapping[str, FivetranContentData]

@classmethod
def from_content_data(
cls, content_data: Sequence[FivetranContentData]
) -> "FivetranWorkspaceData":
raise NotImplementedError()
from dagster_fivetran.v2.translator import FivetranWorkspaceData


@experimental
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
from enum import Enum
from typing import Any, Mapping, Sequence

from dagster import _check as check
from dagster._core.definitions.asset_key import AssetKey
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._record import record


class FivetranContentType(Enum):
"""Enum representing each object in Fivetran's ontology."""

CONNECTOR = "connector"
DESTINATION = "destination"


@record
class FivetranContentData:
"""A record representing a piece of content in a Fivetran workspace.
Includes the object's type and data as returned from the API.
"""

content_type: FivetranContentType
properties: Mapping[str, Any]

def to_cached_data(self) -> Mapping[str, Any]:
return {"content_type": self.content_type.value, "properties": self.properties}

@classmethod
def from_cached_data(cls, data: Mapping[Any, Any]) -> "FivetranContentData":
return cls(
content_type=FivetranContentType(data["content_type"]),
properties=data["properties"],
)


@record
class FivetranWorkspaceData:
"""A record representing all content in a Fivetran workspace.
Provided as context for the translator so that it can resolve dependencies between content.
"""

connectors_by_id: Mapping[str, FivetranContentData]
destinations_by_id: Mapping[str, FivetranContentData]

@classmethod
def from_content_data(
cls, content_data: Sequence[FivetranContentData]
) -> "FivetranWorkspaceData":
raise NotImplementedError()


class DagsterFivetranTranslator:
"""Translator class which converts raw response data from the Fivetran API into AssetSpecs.
Subclass this class to implement custom logic for each type of Fivetran content.
"""

def __init__(self, context: FivetranWorkspaceData):
self._context = context

@property
def workspace_data(self) -> FivetranWorkspaceData:
return self._context

def get_asset_key(self, data: FivetranContentData) -> AssetKey:
if data.content_type == FivetranContentType.CONNECTOR:
return self.get_connector_asset_key(data)
else:
check.assert_never(data.content_type)

def get_asset_spec(self, data: FivetranContentData) -> AssetSpec:
if data.content_type == FivetranContentType.CONNECTOR:
return self.get_connector_spec(data)
else:
check.assert_never(data.content_type)

def get_connector_asset_key(self, data: FivetranContentData) -> AssetKey:
raise NotImplementedError()

def get_connector_spec(self, data: FivetranContentData) -> AssetSpec:
raise NotImplementedError()

0 comments on commit 7c954b2

Please sign in to comment.