Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: Custom Iceberg base_location_root #1289

Merged
merged 13 commits into from
Jan 24, 2025
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20250113-133414.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Added support for custom iceberg base_location_root
time: 2025-01-13T13:34:14.326047-08:00
custom:
Author: LProcopi15
Issue: "1284"
1 change: 1 addition & 0 deletions dbt/adapters/snowflake/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ class SnowflakeConfig(AdapterConfig):
# extended formats
table_format: Optional[str] = None
external_volume: Optional[str] = None
base_location_root: Optional[str] = None
base_location_subpath: Optional[str] = None


Expand Down
5 changes: 4 additions & 1 deletion dbt/adapters/snowflake/relation.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,10 @@ def get_ddl_prefix_for_alter(self) -> str:
return ""

def get_iceberg_ddl_options(self, config: RelationConfig) -> str:
base_location: str = f"_dbt/{self.schema}/{self.name}"
# If the base_location_root config is supplied, overwrite the default value ("_dbt/")
base_location: str = (
f"{config.get('base_location_root', '_dbt')}/{self.schema}/{self.name}"
)

if subpath := config.get("base_location_subpath"):
base_location += f"/{subpath}"
Expand Down
63 changes: 63 additions & 0 deletions tests/functional/iceberg/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,37 @@
select * from {{ ref('first_table') }}
"""

_MODEL_BASIC_ICEBERG_MODEL_WITH_PATH = """
{{
config(
transient = "true",
materialized = "table",
cluster_by=['id'],
table_format="iceberg",
external_volume="s3_iceberg_snow",
base_location_root="root_path",
)
}}

select * from {{ ref('first_table') }}
"""

_MODEL_BASIC_ICEBERG_MODEL_WITH_PATH_SUBPATH = """
{{
config(
transient = "true",
materialized = "table",
cluster_by=['id'],
table_format="iceberg",
external_volume="s3_iceberg_snow",
base_location_root="root_path",
base_location_subpath="subpath",
)
}}

select * from {{ ref('first_table') }}
"""

_MODEL_BASIC_DYNAMIC_TABLE_MODEL = """
{{ config(
materialized='dynamic_table',
Expand All @@ -36,6 +67,38 @@
select * from {{ ref('first_table') }}
"""

_MODEL_BASIC_DYNAMIC_TABLE_MODEL_WITH_PATH = """
{{
config(
transient = "true",
materialized = "table",
mikealfare marked this conversation as resolved.
Show resolved Hide resolved
cluster_by=['id'],
table_format="iceberg",
external_volume="s3_iceberg_snow",
base_location_root="root_path",
)
}}

select * from {{ ref('first_table') }}
"""

_MODEL_BASIC_DYNAMIC_TABLE_MODEL_WITH_PATH_SUBPATH = """
{{
config(
transient = "true",
materialized = "table",
mikealfare marked this conversation as resolved.
Show resolved Hide resolved
cluster_by=['id'],
table_format="iceberg",
external_volume="s3_iceberg_snow",
base_location_root="root_path",
base_location_subpath='subpath',
)
}}

select * from {{ ref('first_table') }}
"""


_MODEL_BASIC_DYNAMIC_TABLE_MODEL_WITH_SUBPATH = """
{{ config(
materialized='dynamic_table',
Expand Down
12 changes: 10 additions & 2 deletions tests/functional/iceberg/test_table_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@
from tests.functional.iceberg.models import (
_MODEL_BASIC_TABLE_MODEL,
_MODEL_BASIC_ICEBERG_MODEL,
_MODEL_BASIC_ICEBERG_MODEL_WITH_PATH,
_MODEL_BASIC_ICEBERG_MODEL_WITH_PATH_SUBPATH,
_MODEL_BASIC_DYNAMIC_TABLE_MODEL,
_MODEL_BASIC_DYNAMIC_TABLE_MODEL_WITH_PATH,
_MODEL_BASIC_DYNAMIC_TABLE_MODEL_WITH_PATH_SUBPATH,
_MODEL_BASIC_DYNAMIC_TABLE_MODEL_WITH_SUBPATH,
_MODEL_BUILT_ON_ICEBERG_TABLE,
_MODEL_TABLE_BEFORE_SWAP,
Expand All @@ -26,14 +30,18 @@ def models(self):
return {
"first_table.sql": _MODEL_BASIC_TABLE_MODEL,
"iceberg_table.sql": _MODEL_BASIC_ICEBERG_MODEL,
"iceberg_tableb.sql": _MODEL_BASIC_ICEBERG_MODEL_WITH_PATH,
"iceberg_tablec.sql": _MODEL_BASIC_ICEBERG_MODEL_WITH_PATH_SUBPATH,
"table_built_on_iceberg_table.sql": _MODEL_BUILT_ON_ICEBERG_TABLE,
"dynamic_table.sql": _MODEL_BASIC_DYNAMIC_TABLE_MODEL,
"dynamic_tableb.sql": _MODEL_BASIC_DYNAMIC_TABLE_MODEL_WITH_SUBPATH,
"dynamic_tableb.sql": _MODEL_BASIC_DYNAMIC_TABLE_MODEL_WITH_PATH,
"dynamic_tablec.sql": _MODEL_BASIC_DYNAMIC_TABLE_MODEL_WITH_PATH_SUBPATH,
"dynamic_tabled.sql": _MODEL_BASIC_DYNAMIC_TABLE_MODEL_WITH_SUBPATH,
}

def test_iceberg_tables_build_and_can_be_referred(self, project):
run_results = run_dbt()
assert len(run_results) == 5
assert len(run_results) == 9


class TestIcebergTableTypeBuildsOnExistingTable:
Expand Down
79 changes: 79 additions & 0 deletions tests/unit/test_iceberg_location.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import pytest
from dbt.adapters.snowflake.relation import SnowflakeRelation


@pytest.fixture
def iceberg_config() -> dict:
"""Fixture providing standard Iceberg configuration."""
return {
"schema": "my_schema",
"identifier": "my_table",
"external_volume": "s3_iceberg_snow",
"base_location_root": "root_path",
"base_location_subpath": "subpath",
}


def get_actual_base_location(config: dict[str, str]) -> str:
"""Get the actual base location from the configuration by parsing the DDL predicates."""

relation = SnowflakeRelation.create(
schema=config["schema"],
identifier=config["identifier"],
)

actual_ddl_predicates = relation.get_iceberg_ddl_options(config).strip()
actual_base_location = actual_ddl_predicates.split("base_location = ")[1]

return actual_base_location


def test_iceberg_path_and_subpath(iceberg_config: dict[str, str]):
mikealfare marked this conversation as resolved.
Show resolved Hide resolved
"""Test when base_location_root and base_location_subpath are provided"""
expected_base_location = (
f"'{iceberg_config['base_location_root']}/"
f"{iceberg_config['schema']}/"
f"{iceberg_config['identifier']}/"
f"{iceberg_config['base_location_subpath']}'"
).strip()

assert get_actual_base_location(iceberg_config) == expected_base_location


def test_iceberg_only_subpath(iceberg_config: dict[str, str]):
"""Test when only base_location_subpath is provided"""
del iceberg_config["base_location_root"]

expected_base_location = (
f"'_dbt/"
f"{iceberg_config['schema']}/"
f"{iceberg_config['identifier']}/"
f"{iceberg_config['base_location_subpath']}'"
).strip()

assert get_actual_base_location(iceberg_config) == expected_base_location


def test_iceberg_only_path(iceberg_config: dict[str, str]):
"""Test when only base_location_root is provided"""
del iceberg_config["base_location_subpath"]

expected_base_location = (
f"'{iceberg_config['base_location_root']}/"
f"{iceberg_config['schema']}/"
f"{iceberg_config['identifier']}'"
).strip()

assert get_actual_base_location(iceberg_config) == expected_base_location


def test_iceberg_no_path(iceberg_config: dict[str, str]):
"""Test when no base_location_root or is base_location_subpath provided"""
del iceberg_config["base_location_root"]
del iceberg_config["base_location_subpath"]

expected_base_location = (
f"'_dbt/" f"{iceberg_config['schema']}/" f"{iceberg_config['identifier']}'"
).strip()

assert get_actual_base_location(iceberg_config) == expected_base_location
Loading