Skip to content

Commit

Permalink
Remove get_asset_key in DagsterFivetranTranslator
Browse files Browse the repository at this point in the history
  • Loading branch information
maximearmstrong committed Nov 8, 2024
1 parent d5afe74 commit 5d21367
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,9 @@ def _build_fivetran_assets(
tracked_asset_keys = {
table: AssetKey([*asset_key_prefix, *table.split(".")])
if not translator_instance or not connection_metadata
else translator_instance.get_asset_key(
else translator_instance.get_asset_spec(
FivetranConnectorTableProps(table=table, **connection_metadata._asdict())
)
).key
for table in destination_tables
}
user_facing_asset_keys = table_to_asset_key_map or tracked_asset_keys
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,6 @@ class DagsterFivetranTranslator:
Subclass this class to implement custom logic for each type of Fivetran content.
"""

def get_asset_key(self, props: FivetranConnectorTableProps) -> AssetKey:
"""Get the AssetKey for a table synced by a Fivetran connector."""
return AssetKey(props.table.split("."))

def get_asset_spec(self, props: FivetranConnectorTableProps) -> AssetSpec:
"""Get the AssetSpec for a table synced by a Fivetran connector."""
schema_name, table_name = props.table.split(".")
Expand All @@ -91,7 +87,7 @@ def get_asset_spec(self, props: FivetranConnectorTableProps) -> AssetSpec:
)

return AssetSpec(
key=self.get_asset_key(props),
key=AssetKey(props.table.split(".")),
metadata=metadata,
kinds={"fivetran", *({props.service} if props.service else set())},
)
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
asset,
io_manager,
)
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._core.definitions.asset_spec import AssetSpec, replace_attributes
from dagster._core.definitions.materialize import materialize
from dagster._core.definitions.metadata.metadata_value import MetadataValue
from dagster._core.definitions.metadata.table import TableColumn, TableSchema
Expand Down Expand Up @@ -269,12 +269,13 @@ def downstream_asset(xyz):


class CustomDagsterFivetranTranslator(DagsterFivetranTranslator):
def get_asset_key(self, props: FivetranConnectorTableProps) -> AssetKey:
return super().get_asset_key(props).with_prefix("my_prefix")

def get_asset_spec(self, props: FivetranConnectorTableProps) -> AssetSpec:
asset_spec = super().get_asset_spec(props)
return asset_spec._replace(metadata={"foo": "bar", **asset_spec.metadata})
return replace_attributes(
asset_spec,
key=asset_spec.key.with_prefix("my_prefix"),
metadata={"foo": "bar", **asset_spec.metadata},
)


@responses.activate
Expand Down

0 comments on commit 5d21367

Please sign in to comment.