Skip to content

Commit

Permalink
get SF managed working
Browse files Browse the repository at this point in the history
  • Loading branch information
colin-rogers-dbt committed Jan 15, 2025
1 parent cb85618 commit 2b450ba
Show file tree
Hide file tree
Showing 8 changed files with 100 additions and 44 deletions.
54 changes: 42 additions & 12 deletions dbt/adapters/snowflake/catalog.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,28 @@
from typing import Dict, Optional
from typing import Dict, Optional, Any

import textwrap

from dbt.adapters.base import BaseRelation
from dbt.adapters.contracts.catalog import CatalogIntegration, CatalogIntegrationType
from dbt.adapters.contracts.relation import RelationConfig
from dbt.adapters.relation_configs import RelationResults


class SnowflakeManagedIcebergCatalogIntegration(CatalogIntegration):
catalog_type = CatalogIntegrationType.managed

def render_ddl_predicates(self, relation: RelationConfig) -> str:
def render_ddl_predicates(self, relation: BaseRelation, config: RelationConfig) -> str:
"""
{{ optional('external_volume', dynamic_table.catalog.external_volume) }}
{{ optional('catalog', dynamic_table.catalog.name) }}
base_location = '{{ dynamic_table.catalog.base_location }}'
:param config:
:param relation:
:return:
"""
base_location: str = f"_dbt/{relation.schema}/{relation.name}"

if sub_path := relation.config.get("base_location_subpath"):
if sub_path := config.get("base_location_subpath"):
base_location += f"/{sub_path}"

iceberg_ddl_predicates: str = f"""
Expand All @@ -30,11 +32,39 @@ def render_ddl_predicates(self, relation: RelationConfig) -> str:
"""
return textwrap.indent(textwrap.dedent(iceberg_ddl_predicates), " " * 10)

@classmethod
def parse_relation_results(cls, relation_results: RelationResults) -> Dict[str, Any]:
import agate

# this try block can be removed once enable_iceberg_materializations is retired
try:
catalog_results: "agate.Table" = relation_results["catalog"]
except KeyError:
# this happens when `enable_iceberg_materializations` is turned off
return {}

if len(catalog_results) == 0:
# this happens when the dynamic table is a standard dynamic table (e.g. not iceberg)
return {}

# for now, if we get catalog results, it's because this is an iceberg table
# this is because we only run `show iceberg tables` to get catalog metadata
# this will need to be updated once this is in `show objects`
catalog: "agate.Row" = catalog_results.rows[0]
config_dict = {
"table_format": "iceberg",
"name": catalog.get("catalog_name"),
"external_volume": catalog.get("external_volume_name"),
"base_location": catalog.get("base_location"),
}

return config_dict


class SnowflakeGlueCatalogIntegration(CatalogIntegration):
catalog_type = CatalogIntegrationType.glue
auto_refresh: str = "FALSE"
replace_invalid_characters: str = "FALSE"
auto_refresh: Optional[str] = None # "TRUE" | "FALSE"
replace_invalid_characters: Optional[str] = None # "TRUE" | "FALSE"

def _handle_adapter_configs(self, adapter_configs: Optional[Dict]) -> None:
if adapter_configs:
Expand All @@ -43,15 +73,15 @@ def _handle_adapter_configs(self, adapter_configs: Optional[Dict]) -> None:
if "replace_invalid_characters" in adapter_configs:
self.replace_invalid_characters = adapter_configs["replace_invalid_characters"]

def render_ddl_predicates(self, relation: BaseRelation) -> str:
ddl_predicate = f"""create or replace iceberg table {relation.render()}
external_volume = '{self.external_volume}
catalog = '{self.name}'
def render_ddl_predicates(self, relation: BaseRelation, config: RelationConfig) -> str:
ddl_predicate = f"""
external_volume = '{self.external_volume}'
catalog = '{self.integration_name}'
"""
if self.namespace:
ddl_predicate += "CATALOG_NAMESPACE = '{self.namespace}'"
ddl_predicate += f"CATALOG_NAMESPACE = '{self.namespace}'\n"
if self.auto_refresh:
ddl_predicate += f"REPLACE_INVALID_CHARACTERS = {self.auto_refresh}"
ddl_predicate += f"auto_refresh = {self.auto_refresh}\n"
if self.replace_invalid_characters:
ddl_predicate += f"AUTO_REFRESH = {self.replace_invalid_characters}"
ddl_predicate += f"replace_invalid_characters = {self.replace_invalid_characters}\n"
return ddl_predicate
1 change: 1 addition & 0 deletions dbt/adapters/snowflake/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ class SnowflakeConfig(AdapterConfig):
table_format: Optional[str] = None
external_volume: Optional[str] = None
base_location_subpath: Optional[str] = None
catalog_name: Optional[str] = None


class SnowflakeAdapter(SQLAdapter):
Expand Down
43 changes: 27 additions & 16 deletions dbt/adapters/snowflake/relation.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
import textwrap

from dataclasses import dataclass, field
from typing import FrozenSet, Optional, Type, Iterator, Tuple


from dbt.adapters.clients import catalogs as catalogs_client
from dbt.adapters.base.relation import BaseRelation
from dbt.adapters.contracts.catalog import CatalogIntegrationConfig, CatalogIntegrationType
from dbt.adapters.contracts.relation import ComponentName, RelationConfig
from dbt.adapters.events.types import AdapterEventWarning, AdapterEventDebug
from dbt.adapters.relation_configs import (
RelationConfigBase,
RelationConfigChangeAction,
RelationResults,
)
from dbt.adapters.snowflake.catalog import SnowflakeManagedIcebergCatalogIntegration
from dbt.adapters.utils import classproperty
from dbt_common.exceptions import DbtRuntimeError
from dbt_common.events.functions import fire_event, warn_or_error
Expand Down Expand Up @@ -64,6 +64,10 @@ def is_dynamic_table(self) -> bool:

@property
def is_iceberg_format(self) -> bool:
if self.catalog_name:
return (
catalogs_client.get_catalog(self.catalog_name).table_format == TableFormat.ICEBERG
)
return self.table_format == TableFormat.ICEBERG

@classproperty
Expand Down Expand Up @@ -167,7 +171,11 @@ def get_ddl_prefix_for_create(self, config: RelationConfig, temporary: bool) ->
"""

transient_explicitly_set_true: bool = config.get("transient", False)

catalog_name = config.get("catalog_name", None)
if catalog_name:
catalog = catalogs_client.get_catalog(catalog_name)
if catalog.table_format == TableFormat.ICEBERG:
return "iceberg"
# Temporary tables are a Snowflake feature that do not exist in the
# Iceberg framework. We ignore the Iceberg status of the model.
if temporary:
Expand Down Expand Up @@ -203,18 +211,21 @@ def get_ddl_prefix_for_alter(self) -> str:
else:
return ""

def get_iceberg_ddl_options(self, config: RelationConfig) -> str:
base_location: str = f"_dbt/{self.schema}/{self.name}"

if subpath := config.get("base_location_subpath"):
base_location += f"/{subpath}"

iceberg_ddl_predicates: str = f"""
external_volume = '{config.get('external_volume')}'
catalog = 'snowflake'
base_location = '{base_location}'
"""
return textwrap.indent(textwrap.dedent(iceberg_ddl_predicates), " " * 10)
def add_managed_catalog_integration(self, config: RelationConfig) -> str:
catalog_name = "snowflake_managed"
external_volume = config.get("external_volume")
integration_config = CatalogIntegrationConfig(
catalog_name=catalog_name,
integration_name=catalog_name,
table_format=self.table_format,
catalog_type=CatalogIntegrationType.managed.value,
external_volume=external_volume,
)
catalogs_client.add_catalog(
SnowflakeManagedIcebergCatalogIntegration(integration_config),
catalog_name=catalog_name,
)
return catalog_name

def __drop_conditions(self, old_relation: "SnowflakeRelation") -> Iterator[Tuple[bool, str]]:
drop_view_message: str = (
Expand Down
11 changes: 6 additions & 5 deletions dbt/adapters/snowflake/relation_configs/dynamic_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,11 @@ def default(cls) -> Self:
return cls("ON_CREATE")


def _setup_catalog_integration(catalog_info: Union[Dict, RelationConfig]) -> Optional[str]:
breakpoint()
def _setup_catalog_integration(catalog_info: Union[Dict, RelationConfig]) -> str:
if not catalog_info:
return None
return "SNOWFLAKE"
elif isinstance(catalog_info, str):
return catalog_info
elif isinstance(catalog_info, dict):
catalog_config = SnowflakeCatalogConfig.from_dict(catalog_info)
else:
Expand All @@ -64,7 +65,7 @@ def _setup_catalog_integration(catalog_info: Union[Dict, RelationConfig]) -> Opt
)
return catalog_name
else:
return None
return TableFormat.default().value


@dataclass(frozen=True, eq=True, unsafe_hash=True)
Expand All @@ -90,7 +91,7 @@ class SnowflakeDynamicTableConfig(SnowflakeRelationConfigBase):
query: str
target_lag: str
snowflake_warehouse: str
catalog: Optional[str] = None
catalog: str = "SNOWFLAKE"
refresh_mode: Optional[RefreshMode] = RefreshMode.default()
initialize: Optional[Initialize] = Initialize.default()

Expand Down
10 changes: 3 additions & 7 deletions dbt/include/snowflake/macros/relations/dynamic_table/create.sql
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

{%- set dynamic_table = relation.from_config(config.model) -%}

{%- if dynamic_table.catalog is not none -%}
{%- if dynamic_table.catalog != 'snowflake' -%}
{{ _get_create_dynamic_iceberg_table_as_sql(dynamic_table, relation, sql) }}
{%- else -%}
{{ _get_create_dynamic_standard_table_as_sql(dynamic_table, relation, sql) }}
Expand Down Expand Up @@ -70,16 +70,12 @@
-- A valid DDL statement which will result in a new dynamic iceberg table.
-#}

{% set catalog_integration = adapter.get_catalog_integration(dynamic_table.catalog) -%}

{% if not catalog_integration -%}
{{ raise('Catalog integration is required for iceberg tables') }}
{%- endif -%}
{% set catalog_integration = adapter.get_catalog_integration(relation.catalog) -%}

create dynamic iceberg table {{ relation }}
target_lag = '{{ dynamic_table.target_lag }}'
warehouse = {{ dynamic_table.snowflake_warehouse }}
{{ catalog_integration.render_ddl_predicates(relation) }}
{{ catalog_integration.render_ddl_predicates(relation, config.model.config) }}
{{ optional('refresh_mode', dynamic_table.refresh_mode) }}
{{ optional('initialize', dynamic_table.initialize) }}
as (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

{%- set dynamic_table = relation.from_config(config.model) -%}

{%- if dynamic_table.catalog is not none -%}
{%- if dynamic_table.catalog != 'SNOWFLAKE' -%}
{{ _get_replace_dynamic_iceberg_table_as_sql(dynamic_table, relation, sql) }}
{%- else -%}
{{ _get_replace_dynamic_standard_table_as_sql(dynamic_table, relation, sql) }}
Expand Down
14 changes: 11 additions & 3 deletions dbt/include/snowflake/macros/relations/table/create.sql
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
{%- set cluster_by_keys = config.get('cluster_by', default=none) -%}
{%- set enable_automatic_clustering = config.get('automatic_clustering', default=false) -%}
{%- set copy_grants = config.get('copy_grants', default=false) -%}
{%- set catalog_name = config.get('catalog_name', default=none) -%}

{%- if cluster_by_keys is not none and cluster_by_keys is string -%}
{%- set cluster_by_keys = [cluster_by_keys] -%}
Expand All @@ -21,18 +22,25 @@
{% else %}
{%- set cluster_by_string = none -%}
{%- endif -%}
{%- if catalog_name is not none %}
{%- set catalog_integration = adapter.get_catalog_integration(catalog_name) -%}
{%- endif -%}
{%- set sql_header = config.get('sql_header', none) -%}

{{ sql_header if sql_header is not none }}

create or replace {{ materialization_prefix }} table {{ relation }}
{%- if relation.is_iceberg_format %}
{%- if catalog_integration is not none %}
{#
Valid DDL in CTAS statements. Plain create statements have a different order.
https://docs.snowflake.com/en/sql-reference/sql/create-iceberg-table
#}
{{ relation.get_iceberg_ddl_options(config.model.config) }}
{%- endif -%}
{{ catalog_integration.render_ddl_predicates(relation=relation, config=config.model.config) }}
{%- elif relation.is_iceberg_format %}
{%- set catalog_name = relation.add_managed_catalog_integration(config.model.config) -%}
{%- set catalog_integration = adapter.get_catalog_integration(catalog_name) -%}
{{ catalog_integration.render_ddl_predicates(relation, config.model.config) }}
{%- endif %}

{%- set contract_config = config.get('contract') -%}
{%- if contract_config.enforced -%}
Expand Down
9 changes: 9 additions & 0 deletions hatch.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,15 @@ docker-dev = [
"docker run --rm -it --name dbt-snowflake-dev -v $(pwd):/opt/code dbt-snowflake-dev",
]

[envs.local]
# TODO: if/when hatch gets support for defining editable dependencies, the
# pre-install commands here and post-install commands in the matrix can be moved
# to the dependencies section
pre-install-commands = [
"pip install -e ../dbt-adapter",
"pip install -e ../dbt-core/core",
]

[envs.build]
detached = true
dependencies = [
Expand Down

0 comments on commit 2b450ba

Please sign in to comment.