Skip to content

Commit

Permalink
Move translator back to load fn
Browse files Browse the repository at this point in the history
  • Loading branch information
benpankow committed Oct 23, 2024
1 parent eeb1c8a commit f6da204
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 12 deletions.
13 changes: 6 additions & 7 deletions python_modules/libraries/dagster-sigma/dagster_sigma/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
from aiohttp.client_exceptions import ClientResponseError
from dagster import ConfigurableResource
from dagster._annotations import deprecated, public
from dagster._config.pythonic_config.resource import ResourceDependency
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._core.definitions.definitions_class import Definitions
from dagster._core.definitions.definitions_load_context import StateBackedDefinitionsLoader
Expand Down Expand Up @@ -65,9 +64,6 @@ class SigmaOrganization(ConfigurableResource):
default=False,
description="Whether to warn rather than raise when lineage data cannot be fetched for an element.",
)
translator: ResourceDependency[Type[DagsterSigmaTranslator]] = Field(
default=DagsterSigmaTranslator
)

_api_token: Optional[str] = PrivateAttr(None)

Expand Down Expand Up @@ -403,10 +399,13 @@ def build_defs(
Returns:
Definitions: The set of assets representing the Sigma content in the organization.
"""
return Definitions(assets=load_sigma_asset_specs(self))
return Definitions(assets=load_sigma_asset_specs(self, dagster_sigma_translator))


def load_sigma_asset_specs(organization: SigmaOrganization) -> Sequence[AssetSpec]:
def load_sigma_asset_specs(
organization: SigmaOrganization,
dagster_sigma_translator: Type[DagsterSigmaTranslator] = DagsterSigmaTranslator,
) -> Sequence[AssetSpec]:
"""Returns a list of AssetSpecs representing the Sigma content in the organization.
Args:
Expand All @@ -418,7 +417,7 @@ def load_sigma_asset_specs(organization: SigmaOrganization) -> Sequence[AssetSpe
with organization.process_config_and_initialize_cm() as initialized_organization:
return check.is_list(
SigmaOrganizationDefsLoader(
organization=initialized_organization, translator_cls=organization.translator
organization=initialized_organization, translator_cls=dagster_sigma_translator
)
.build_defs()
.assets,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ def get_workbook_spec(self, data: SigmaWorkbook) -> AssetSpec:
base_url=SigmaBaseUrl.AWS_US,
client_id=EnvVar("SIGMA_CLIENT_ID"),
client_secret=EnvVar("SIGMA_CLIENT_SECRET"),
translator=MyCoolTranslator,
)

sigma_specs = load_sigma_asset_specs(resource)
sigma_specs = load_sigma_asset_specs(resource, dagster_sigma_translator=MyCoolTranslator)
defs = Definitions(assets=[*sigma_specs], jobs=[define_asset_job("all_asset_job")])
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,10 @@ def test_load_assets_organization_data_translator(
sigma_auth_token: str, sigma_sample_data: None
) -> None:
with instance_for_test() as _instance:
repository_def = repository_def_from_pointer(
repository_def = initialize_repository_def_from_pointer(
CodePointer.from_python_file(
str(Path(__file__).parent / "pending_repo_with_translator.py"), "defs", None
),
DefinitionsLoadType.INITIALIZATION,
None,
)

assert len(repository_def.assets_defs_by_key) == 2
Expand Down

0 comments on commit f6da204

Please sign in to comment.