From 4a86d9eef01478d6782b1627ca53eddbed5da898 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Borja=20V=C3=A1zquez-Barreiros?= Date: Wed, 27 Nov 2024 21:56:37 +0000 Subject: [PATCH 1/9] additional config for insert_overwrite --- .../macros/materializations/incremental.sql | 32 +++++++- .../incremental_strategy/insert_overwrite.sql | 73 ++++++++++++++++--- .../incremental_strategy/microbatch.sql | 4 +- 3 files changed, 91 insertions(+), 18 deletions(-) diff --git a/dbt/include/bigquery/macros/materializations/incremental.sql b/dbt/include/bigquery/macros/materializations/incremental.sql index 935280d63..cc2d9c615 100644 --- a/dbt/include/bigquery/macros/materializations/incremental.sql +++ b/dbt/include/bigquery/macros/materializations/incremental.sql @@ -17,6 +17,28 @@ {% do return(strategy) %} {% endmacro %} + +{% macro dbt_bigquery_validate_insert_overwrite_fn(config, strategy) %} + {#-- Find and validate the function used for insert_overwrite #} + {%- set insert_overwrite_fn = config.get('insert_overwrite_fn', none) -%} + {%- set default_fn = 'merge' -%} + {% if insert_overwrite_fn is none and strategy in ['insert_overwrite','microbatch']%} + {{return (default_fn)}} + {% elif insert_overwrite_fn is not in ["delete+insert"] and strategy in ['insert_overwrite','microbatch']%} + {% set wrong_fn -%} + The 'insert_overwrite_fn' option has to be either 'merge' (default) or 'delete+insert'. + {%- endset %} + {% do exceptions.raise_compiler_error(wrong_strategy_msg) %} + {% elif insert_overwrite_fn is not none and strategy not ['insert_overwrite','microbatch'] %} + {% set wrong_strategy_msg -%} + The 'insert_overwrite_fn' option requires the 'incremental_strategy' option to be set to 'insert_overwrite' or 'microbatch'. + {%- endset %} + {% do exceptions.raise_compiler_error(wrong_strategy_msg) %} + {% else %} + {% return(insert_overwrite_fn) %} + {% endif %} +{% endmacro %} + {% macro source_sql_with_partition(partition_by, source_sql) %} {%- if partition_by.time_ingestion_partitioning %} @@ -43,19 +65,19 @@ {% endmacro %} {% macro bq_generate_incremental_build_sql( - strategy, tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions, incremental_predicates + strategy, tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions, incremental_predicates, insert_overwrite_fn ) %} {#-- if partitioned, use BQ scripting to get the range of partition values to be updated --#} {% if strategy == 'insert_overwrite' %} {% set build_sql = bq_generate_incremental_insert_overwrite_build_sql( - tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions + tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions, insert_overwrite_fn ) %} {% elif strategy == 'microbatch' %} {% set build_sql = bq_generate_microbatch_build_sql( - tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions + tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions, insert_overwrite_fn ) %} {% else %} {# strategy == 'merge' #} @@ -81,6 +103,8 @@ {#-- Validate early so we don't run SQL if the strategy is invalid --#} {% set strategy = dbt_bigquery_validate_get_incremental_strategy(config) -%} + {#-- Validate early that the fn strategy is set correctly for insert_overwrite--#} + {% set insert_overwrite_fn = dbt_bigquery_validate_insert_overwrite_fn(config, strategy) -%} {%- set raw_partition_by = config.get('partition_by', none) -%} {%- set partition_by = adapter.parse_partition_by(raw_partition_by) -%} @@ -153,7 +177,7 @@ {% endif %} {% set build_sql = bq_generate_incremental_build_sql( - strategy, tmp_relation, target_relation, compiled_code, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, partition_by.copy_partitions, incremental_predicates + strategy, tmp_relation, target_relation, compiled_code, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, partition_by.copy_partitions, incremental_predicates, insert_overwrite_fn ) %} {%- call statement('main') -%} diff --git a/dbt/include/bigquery/macros/materializations/incremental_strategy/insert_overwrite.sql b/dbt/include/bigquery/macros/materializations/incremental_strategy/insert_overwrite.sql index 3ba67931e..50095c14d 100644 --- a/dbt/include/bigquery/macros/materializations/incremental_strategy/insert_overwrite.sql +++ b/dbt/include/bigquery/macros/materializations/incremental_strategy/insert_overwrite.sql @@ -1,5 +1,5 @@ {% macro bq_generate_incremental_insert_overwrite_build_sql( - tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions + tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions, insert_overwrite_fn ) %} {% if partition_by is none %} {% set missing_partition_msg -%} @@ -9,7 +9,7 @@ {% endif %} {% set build_sql = bq_insert_overwrite_sql( - tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions + tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions, insert_overwrite_fn ) %} {{ return(build_sql) }} @@ -38,17 +38,17 @@ {% endmacro %} {% macro bq_insert_overwrite_sql( - tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions + tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions, insert_overwrite_fn ) %} {% if partitions is not none and partitions != [] %} {# static #} - {{ bq_static_insert_overwrite_sql(tmp_relation, target_relation, sql, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions) }} + {{ bq_static_insert_overwrite_sql(tmp_relation, target_relation, sql, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions, insert_overwrite_fn) }} {% else %} {# dynamic #} - {{ bq_dynamic_insert_overwrite_sql(tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns, tmp_relation_exists, copy_partitions) }} + {{ bq_dynamic_insert_overwrite_sql(tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns, tmp_relation_exists, copy_partitions, insert_overwrite_fn) }} {% endif %} {% endmacro %} {% macro bq_static_insert_overwrite_sql( - tmp_relation, target_relation, sql, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions + tmp_relation, target_relation, sql, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions, insert_overwrite_fn ) %} {% set predicate -%} @@ -85,8 +85,14 @@ in the "temporary table exists" case, we save the model SQL result as a temp table first, wherein the sql_header is included by the create_table_as macro. #} - -- 1. run the merge statement - {{ get_insert_overwrite_merge_sql(target_relation, source_sql, dest_columns, [predicate], include_sql_header = not tmp_relation_exists) }}; + + {% if insert_overwrite_fn == 'delete+insert' %} + -- 1. run insert_overwrite with delete+insert transaction strategy optimisation + {{ bq_get_insert_overwrite_with_delete_and_insert_sql(target_relation, source_sql, dest_columns, [predicate], include_sql_header = not tmp_relation_exists) }}; + {% else %} + -- 1. run insert_overwrite with merge strategy optimisation + {{ get_insert_overwrite_merge_sql(target_relation, source_sql, dest_columns, [predicate], include_sql_header = not tmp_relation_exists) }}; + {% endif %} {%- if tmp_relation_exists -%} -- 2. clean up the temp table @@ -100,7 +106,7 @@ tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns, tmp_relation_exists, copy_partitions ) %} {%- if tmp_relation_exists is false -%} - {# We run temp table creation in a separated script to move to partitions copy if it doesn't already exist #} + {# We run temp table creation in a separated script to move to partitions copy if it does not already exist #} {%- call statement('create_tmp_relation_for_copy', language='sql') -%} {{ bq_create_table_as(partition_by, True, tmp_relation, sql, 'sql') }} @@ -155,12 +161,55 @@ from {{ tmp_relation }} ); - -- 3. run the merge statement - {{ get_insert_overwrite_merge_sql(target_relation, source_sql, dest_columns, [predicate]) }}; - + {% if insert_overwrite_fn == 'delete+insert' %} + -- 3. run insert_overwrite with the delete+insert transaction strategy optimisation + {{ bq_get_insert_overwrite_with_delete_and_insert_sql(target_relation, source_sql, dest_columns, [predicate]) }}; + {% else %} + -- 3. run insert_overwrite with the merge strategy optimisation + {{ get_insert_overwrite_merge_sql(target_relation, source_sql, dest_columns, [predicate]) }}; + {% endif %} -- 4. clean up the temp table drop table if exists {{ tmp_relation }} {% endif %} {% endmacro %} + + + +{% macro bq_get_insert_overwrite_with_delete_and_insert_sql(target, source, dest_columns, predicates, include_sql_header) -%} + {%- set predicates = [] if predicates is none else [] + predicates -%} + {%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%} + {%- set sql_header = config.get('sql_header', none) -%} + + {{ sql_header if sql_header is not none and include_sql_header }} + + begin + begin transaction; + + -- (as of Nov 2024) + -- DELETE operations are free if the partition is a DATE + -- * Not free if the partitions are granular (hourly, monthly) + -- or some other conditions like subqueries and so on. + delete from {{ target }} as DBT_INTERNAL_DEST + where true + {%- if predicates %} + {% for predicate in predicates %} + and {{ predicate }} + {% endfor %} + {%- endif -%}; + + + insert into {{ target }} ({{ dest_cols_csv }}) + ( + select {{ dest_cols_csv }} + from {{ source }} + ); + + commit transaction; + + exception when error then + raise using message = FORMAT("Error: %s", @@error.message); + rollback transaction; + end +{% endmacro %} \ No newline at end of file diff --git a/dbt/include/bigquery/macros/materializations/incremental_strategy/microbatch.sql b/dbt/include/bigquery/macros/materializations/incremental_strategy/microbatch.sql index d4c4b7453..f2c585c25 100644 --- a/dbt/include/bigquery/macros/materializations/incremental_strategy/microbatch.sql +++ b/dbt/include/bigquery/macros/materializations/incremental_strategy/microbatch.sql @@ -18,10 +18,10 @@ {% endmacro %} {% macro bq_generate_microbatch_build_sql( - tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions + tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions, insert_overwrite_fn ) %} {% set build_sql = bq_insert_overwrite_sql( - tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions + tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions, insert_overwrite_fn ) %} {{ return(build_sql) }} From ac3b1260d3b0fdb3a8ed44084dbb6dd7edf1130f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Borja=20V=C3=A1zquez-Barreiros?= Date: Wed, 27 Nov 2024 22:06:38 +0000 Subject: [PATCH 2/9] sub_strategy --- .../incremental_strategy/insert_overwrite.sql | 2 +- .../incremental_strategy_fixtures.py | 48 +++++++++++++++++++ .../test_incremental_strategies.py | 3 ++ 3 files changed, 52 insertions(+), 1 deletion(-) diff --git a/dbt/include/bigquery/macros/materializations/incremental_strategy/insert_overwrite.sql b/dbt/include/bigquery/macros/materializations/incremental_strategy/insert_overwrite.sql index 50095c14d..01bb8307f 100644 --- a/dbt/include/bigquery/macros/materializations/incremental_strategy/insert_overwrite.sql +++ b/dbt/include/bigquery/macros/materializations/incremental_strategy/insert_overwrite.sql @@ -212,4 +212,4 @@ raise using message = FORMAT("Error: %s", @@error.message); rollback transaction; end -{% endmacro %} \ No newline at end of file +{% endmacro %} diff --git a/tests/functional/adapter/incremental/incremental_strategy_fixtures.py b/tests/functional/adapter/incremental/incremental_strategy_fixtures.py index 02efbb6c2..6e345cfed 100644 --- a/tests/functional/adapter/incremental/incremental_strategy_fixtures.py +++ b/tests/functional/adapter/incremental/incremental_strategy_fixtures.py @@ -611,3 +611,51 @@ }} select * from {{ ref('input_model') }} """ + +overwrite_static_day_delete_and_insert_sub_strategy_sql = """ +{% set partitions_to_replace = [ + "'2020-01-01'", + "'2020-01-02'", +] %} + +{{ + config( + materialized="incremental", + incremental_strategy="insert_overwrite", + insert_overwrite_fn='delete+insert', + cluster_by="id", + partition_by={ + "field": "date_time", + "data_type": "datetime", + "granularity": "day" + }, + partitions=partitions_to_replace, + on_schema_change="sync_all_columns" + ) +}} + + +with data as ( + + {% if not is_incremental() %} + + select 1 as id, cast('2020-01-01' as datetime) as date_time union all + select 2 as id, cast('2020-01-01' as datetime) as date_time union all + select 3 as id, cast('2020-01-01' as datetime) as date_time union all + select 4 as id, cast('2020-01-01' as datetime) as date_time + + {% else %} + + -- we want to overwrite the 4 records in the 2020-01-01 partition + -- with the 2 records below, but add two more in the 2020-01-02 partition + select 10 as id, cast('2020-01-01' as datetime) as date_time union all + select 20 as id, cast('2020-01-01' as datetime) as date_time union all + select 30 as id, cast('2020-01-02' as datetime) as date_time union all + select 40 as id, cast('2020-01-02' as datetime) as date_time + + {% endif %} + +) + +select * from data +""".lstrip() diff --git a/tests/functional/adapter/incremental/test_incremental_strategies.py b/tests/functional/adapter/incremental/test_incremental_strategies.py index 1a339d601..fbef2a794 100644 --- a/tests/functional/adapter/incremental/test_incremental_strategies.py +++ b/tests/functional/adapter/incremental/test_incremental_strategies.py @@ -27,6 +27,7 @@ overwrite_day_with_time_ingestion_sql, overwrite_day_with_time_partition_datetime_sql, overwrite_static_day_sql, + overwrite_static_day_delete_and_insert_sub_strategy_sql, ) @@ -50,6 +51,7 @@ def models(self): "incremental_overwrite_day_with_time_partition.sql": overwrite_day_with_time_ingestion_sql, "incremental_overwrite_day_with_time_partition_datetime.sql": overwrite_day_with_time_partition_datetime_sql, "incremental_overwrite_static_day.sql": overwrite_static_day_sql, + "incremental_overwrite_static_day_with_deleteinsert.sql": overwrite_static_day_delete_and_insert_sub_strategy_sql } @pytest.fixture(scope="class") @@ -85,6 +87,7 @@ def test__bigquery_assert_incremental_configurations_apply_the_right_strategy(se "incremental_overwrite_day_with_time_partition_expected", ), ("incremental_overwrite_static_day", "incremental_overwrite_day_expected"), + ("incremental_overwrite_static_day_with_deleteinsert", "incremental_overwrite_day_expected"), ] db_with_schema = f"{project.database}.{project.test_schema}" for incremental_strategy in incremental_strategies: From e26a05f371ae0ee0c068b5305bb4abb6f48f7aeb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Borja=20V=C3=A1zquez-Barreiros?= Date: Sun, 29 Dec 2024 13:56:30 +0000 Subject: [PATCH 3/9] substrategy fixtures Create Features-20241229-174752.yaml --- .../unreleased/Features-20241229-174752.yaml | 6 ++ .../macros/materializations/incremental.sql | 49 +++++---- .../incremental_strategy/insert_overwrite.sql | 70 ++++++------ .../incremental_strategy/microbatch.sql | 4 +- .../incremental_strategy_fixtures.py | 100 +++++++++++++++++- .../test_incremental_strategies.py | 17 ++- 6 files changed, 179 insertions(+), 67 deletions(-) create mode 100644 .changes/unreleased/Features-20241229-174752.yaml diff --git a/.changes/unreleased/Features-20241229-174752.yaml b/.changes/unreleased/Features-20241229-174752.yaml new file mode 100644 index 000000000..6d2a01ae8 --- /dev/null +++ b/.changes/unreleased/Features-20241229-174752.yaml @@ -0,0 +1,6 @@ +kind: Features +body: add multiple substrategies for insert_overwrite and microbatch +time: 2024-12-29T17:47:52.647374Z +custom: + Author: borjavb + Issue: "1409" diff --git a/dbt/include/bigquery/macros/materializations/incremental.sql b/dbt/include/bigquery/macros/materializations/incremental.sql index a399b44d0..1ca3d1e7e 100644 --- a/dbt/include/bigquery/macros/materializations/incremental.sql +++ b/dbt/include/bigquery/macros/materializations/incremental.sql @@ -18,25 +18,27 @@ {% endmacro %} -{% macro dbt_bigquery_validate_insert_overwrite_fn(config, strategy) %} - {#-- Find and validate the function used for insert_overwrite #} - {%- set insert_overwrite_fn = config.get('insert_overwrite_fn', none) -%} - {%- set default_fn = 'merge' -%} - {% if insert_overwrite_fn is none and strategy in ['insert_overwrite','microbatch']%} - {{return (default_fn)}} - {% elif insert_overwrite_fn is not in ["delete+insert"] and strategy in ['insert_overwrite','microbatch']%} +{% macro dbt_bigquery_validate_incremental_substrategy(config, strategy, copy_partitions) %} + {#-- Find and validate the function used for insert_overwrite + Legacy behaviour was to pass the copy_partitions as part of the `partition_by` clause + So we need to bring back that optionality into this validation. + #} + {%- set incremental_substrategy = config.get('incremental_substrategy', 'copy_partitions' if copy_partitions else 'merge') -%} + + {% if strategy in ['insert_overwrite', 'microbatch'] %} + {% if incremental_substrategy not in ['merge', 'delete+insert', 'copy_partitions'] %} {% set wrong_fn -%} - The 'insert_overwrite_fn' option has to be either 'merge' (default) or 'delete+insert'. + The 'incremental_substrategy' option has to be either 'merge' (default), 'delete+insert' or 'copy_partitions'. {%- endset %} {% do exceptions.raise_compiler_error(wrong_strategy_msg) %} - {% elif insert_overwrite_fn is not none and strategy not ['insert_overwrite','microbatch'] %} + {% endif %} + {% elif incremental_substrategy is not none%} {% set wrong_strategy_msg -%} - The 'insert_overwrite_fn' option requires the 'incremental_strategy' option to be set to 'insert_overwrite' or 'microbatch'. + The 'incremental_substrategy' option requires the 'incremental_strategy' option to be set to 'insert_overwrite' or 'microbatch'. {%- endset %} {% do exceptions.raise_compiler_error(wrong_strategy_msg) %} - {% else %} - {% return(insert_overwrite_fn) %} - {% endif %} + {% endif %} + {{ return(incremental_substrategy) }} {% endmacro %} {% macro source_sql_with_partition(partition_by, source_sql) %} @@ -65,19 +67,19 @@ {% endmacro %} {% macro bq_generate_incremental_build_sql( - strategy, tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions, incremental_predicates, insert_overwrite_fn + strategy, tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, incremental_substrategy, incremental_predicates ) %} {#-- if partitioned, use BQ scripting to get the range of partition values to be updated --#} {% if strategy == 'insert_overwrite' %} {% set build_sql = bq_generate_incremental_insert_overwrite_build_sql( - tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions, insert_overwrite_fn + tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, incremental_substrategy ) %} {% elif strategy == 'microbatch' %} {% set build_sql = bq_generate_microbatch_build_sql( - tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions, insert_overwrite_fn + tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, incremental_substrategy ) %} {% else %} {# strategy == 'merge' #} @@ -103,14 +105,16 @@ {#-- Validate early so we don't run SQL if the strategy is invalid --#} {% set strategy = dbt_bigquery_validate_get_incremental_strategy(config) -%} - {#-- Validate early that the fn strategy is set correctly for insert_overwrite--#} - {% set insert_overwrite_fn = dbt_bigquery_validate_insert_overwrite_fn(config, strategy) -%} {%- set raw_partition_by = config.get('partition_by', none) -%} {%- set partition_by = adapter.parse_partition_by(raw_partition_by) -%} {%- set partitions = config.get('partitions', none) -%} {%- set cluster_by = config.get('cluster_by', none) -%} + {#-- Validate early that the incremental substrategy is set correctly for insert_overwrite or microbatch--#} + {% set incremental_substrategy = dbt_bigquery_validate_incremental_substrategy(config, strategy, partition_by.copy_partitions) -%} + + {% set on_schema_change = incremental_validate_on_schema_change(config.get('on_schema_change'), default='ignore') %} {% set incremental_predicates = config.get('predicates', default=none) or config.get('incremental_predicates', default=none) %} @@ -119,13 +123,8 @@ {{ run_hooks(pre_hooks) }} - {% if partition_by.copy_partitions is true and strategy not in ['insert_overwrite', 'microbatch'] %} {#-- We can't copy partitions with merge strategy --#} - {% set wrong_strategy_msg -%} - The 'copy_partitions' option requires the 'incremental_strategy' option to be set to 'insert_overwrite' or 'microbatch'. - {%- endset %} - {% do exceptions.raise_compiler_error(wrong_strategy_msg) %} - {% elif existing_relation is none %} + {% if existing_relation is none %} {%- call statement('main', language=language) -%} {{ bq_create_table_as(partition_by, False, target_relation, compiled_code, language) }} {%- endcall -%} @@ -177,7 +176,7 @@ {% endif %} {% set build_sql = bq_generate_incremental_build_sql( - strategy, tmp_relation, target_relation, compiled_code, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, partition_by.copy_partitions, incremental_predicates, insert_overwrite_fn + strategy, tmp_relation, target_relation, compiled_code, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, incremental_substrategy, ) %} {%- call statement('main') -%} diff --git a/dbt/include/bigquery/macros/materializations/incremental_strategy/insert_overwrite.sql b/dbt/include/bigquery/macros/materializations/incremental_strategy/insert_overwrite.sql index 01bb8307f..f955c265b 100644 --- a/dbt/include/bigquery/macros/materializations/incremental_strategy/insert_overwrite.sql +++ b/dbt/include/bigquery/macros/materializations/incremental_strategy/insert_overwrite.sql @@ -1,5 +1,5 @@ {% macro bq_generate_incremental_insert_overwrite_build_sql( - tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions, insert_overwrite_fn + tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, incremental_substrategy ) %} {% if partition_by is none %} {% set missing_partition_msg -%} @@ -9,7 +9,7 @@ {% endif %} {% set build_sql = bq_insert_overwrite_sql( - tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions, insert_overwrite_fn + tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, incremental_substrategy ) %} {{ return(build_sql) }} @@ -38,17 +38,17 @@ {% endmacro %} {% macro bq_insert_overwrite_sql( - tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions, insert_overwrite_fn + tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, incremental_substrategy ) %} {% if partitions is not none and partitions != [] %} {# static #} - {{ bq_static_insert_overwrite_sql(tmp_relation, target_relation, sql, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions, insert_overwrite_fn) }} + {{ bq_static_insert_overwrite_sql(tmp_relation, target_relation, sql, partition_by, partitions, dest_columns, tmp_relation_exists, incremental_substrategy) }} {% else %} {# dynamic #} - {{ bq_dynamic_insert_overwrite_sql(tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns, tmp_relation_exists, copy_partitions, insert_overwrite_fn) }} + {{ bq_dynamic_insert_overwrite_sql(tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns, tmp_relation_exists, incremental_substrategy) }} {% endif %} {% endmacro %} {% macro bq_static_insert_overwrite_sql( - tmp_relation, target_relation, sql, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions, insert_overwrite_fn + tmp_relation, target_relation, sql, partition_by, partitions, dest_columns, tmp_relation_exists, incremental_substrategy ) %} {% set predicate -%} @@ -75,35 +75,35 @@ ) {%- endset -%} - {% if copy_partitions %} + {% if incremental_substrategy == 'copy_partitions' %} {% do bq_copy_partitions(tmp_relation, target_relation, partitions, partition_by) %} {% else %} - {#-- In case we're putting the model SQL _directly_ into the MERGE statement, - we need to prepend the MERGE statement with the user-configured sql_header, - which may be needed to resolve that model SQL (e.g. referencing a variable or UDF in the header) - in the "temporary table exists" case, we save the model SQL result as a temp table first, wherein the - sql_header is included by the create_table_as macro. - #} - - {% if insert_overwrite_fn == 'delete+insert' %} - -- 1. run insert_overwrite with delete+insert transaction strategy optimisation - {{ bq_get_insert_overwrite_with_delete_and_insert_sql(target_relation, source_sql, dest_columns, [predicate], include_sql_header = not tmp_relation_exists) }}; - {% else %} - -- 1. run insert_overwrite with merge strategy optimisation - {{ get_insert_overwrite_merge_sql(target_relation, source_sql, dest_columns, [predicate], include_sql_header = not tmp_relation_exists) }}; - {% endif %} - - {%- if tmp_relation_exists -%} - -- 2. clean up the temp table - drop table if exists {{ tmp_relation }}; - {%- endif -%} + {#-- In case we're putting the model SQL _directly_ into the MERGE/insert+delete transaction, + we need to prepend the merge/transaction statement with the user-configured sql_header, + which may be needed to resolve that model SQL (e.g. referencing a variable or UDF in the header) + in the "temporary table exists" case, we save the model SQL result as a temp table first, wherein the + sql_header is included by the create_table_as macro. + #} + + {% if incremental_substrategy == 'delete+insert' %} + -- 1. run insert_overwrite with delete+insert transaction strategy optimisation + {{ bq_get_insert_overwrite_with_delete_and_insert_sql(target_relation, source_sql, dest_columns, [predicate], include_sql_header = not tmp_relation_exists) }}; + {% else %} + -- 1. run insert_overwrite with merge strategy optimisation + {{ get_insert_overwrite_merge_sql(target_relation, source_sql, dest_columns, [predicate], include_sql_header = not tmp_relation_exists) }}; + {% endif %} + + {%- if tmp_relation_exists -%} + -- 2. clean up the temp table + drop table if exists {{ tmp_relation }}; + {%- endif -%} - {% endif %} + {% endif %} {% endmacro %} {% macro bq_dynamic_copy_partitions_insert_overwrite_sql( - tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns, tmp_relation_exists, copy_partitions + tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns, tmp_relation_exists ) %} {%- if tmp_relation_exists is false -%} {# We run temp table creation in a separated script to move to partitions copy if it does not already exist #} @@ -123,9 +123,9 @@ drop table if exists {{ tmp_relation }} {% endmacro %} -{% macro bq_dynamic_insert_overwrite_sql(tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns, tmp_relation_exists, copy_partitions) %} - {%- if copy_partitions is true %} - {{ bq_dynamic_copy_partitions_insert_overwrite_sql(tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns, tmp_relation_exists, copy_partitions) }} +{% macro bq_dynamic_insert_overwrite_sql(tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns, tmp_relation_exists, incremental_substrategy) %} + {% if incremental_substrategy == 'copy_partitions' %} + {{ bq_dynamic_copy_partitions_insert_overwrite_sql(tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns, tmp_relation_exists) }} {% else -%} {% set predicate -%} {{ partition_by.render_wrapped(alias='DBT_INTERNAL_DEST') }} in unnest(dbt_partitions_for_replacement) @@ -161,7 +161,7 @@ from {{ tmp_relation }} ); - {% if insert_overwrite_fn == 'delete+insert' %} + {% if incremental_substrategy == 'delete+insert' %} -- 3. run insert_overwrite with the delete+insert transaction strategy optimisation {{ bq_get_insert_overwrite_with_delete_and_insert_sql(target_relation, source_sql, dest_columns, [predicate]) }}; {% else %} @@ -185,11 +185,11 @@ {{ sql_header if sql_header is not none and include_sql_header }} begin - begin transaction; + begin transaction; -- (as of Nov 2024) - -- DELETE operations are free if the partition is a DATE - -- * Not free if the partitions are granular (hourly, monthly) + -- DELETE operations are free if the partition is a DATE + -- * Not free if the partitions are granular (hourly, monthly) -- or some other conditions like subqueries and so on. delete from {{ target }} as DBT_INTERNAL_DEST where true diff --git a/dbt/include/bigquery/macros/materializations/incremental_strategy/microbatch.sql b/dbt/include/bigquery/macros/materializations/incremental_strategy/microbatch.sql index f2c585c25..21c52c4c6 100644 --- a/dbt/include/bigquery/macros/materializations/incremental_strategy/microbatch.sql +++ b/dbt/include/bigquery/macros/materializations/incremental_strategy/microbatch.sql @@ -18,10 +18,10 @@ {% endmacro %} {% macro bq_generate_microbatch_build_sql( - tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions, insert_overwrite_fn + tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, incremental_substrategy ) %} {% set build_sql = bq_insert_overwrite_sql( - tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions, insert_overwrite_fn + tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, incremental_substrategy ) %} {{ return(build_sql) }} diff --git a/tests/functional/adapter/incremental/incremental_strategy_fixtures.py b/tests/functional/adapter/incremental/incremental_strategy_fixtures.py index c710685b3..d6f0f6a6f 100644 --- a/tests/functional/adapter/incremental/incremental_strategy_fixtures.py +++ b/tests/functional/adapter/incremental/incremental_strategy_fixtures.py @@ -676,7 +676,105 @@ config( materialized="incremental", incremental_strategy="insert_overwrite", - insert_overwrite_fn='delete+insert', + incremental_substrategy='delete+insert', + cluster_by="id", + partition_by={ + "field": "date_time", + "data_type": "datetime", + "granularity": "day" + }, + partitions=partitions_to_replace, + on_schema_change="sync_all_columns" + ) +}} + + +with data as ( + + {% if not is_incremental() %} + + select 1 as id, cast('2020-01-01' as datetime) as date_time union all + select 2 as id, cast('2020-01-01' as datetime) as date_time union all + select 3 as id, cast('2020-01-01' as datetime) as date_time union all + select 4 as id, cast('2020-01-01' as datetime) as date_time + + {% else %} + + -- we want to overwrite the 4 records in the 2020-01-01 partition + -- with the 2 records below, but add two more in the 2020-01-02 partition + select 10 as id, cast('2020-01-01' as datetime) as date_time union all + select 20 as id, cast('2020-01-01' as datetime) as date_time union all + select 30 as id, cast('2020-01-02' as datetime) as date_time union all + select 40 as id, cast('2020-01-02' as datetime) as date_time + + {% endif %} + +) + +select * from data +""".lstrip() + + +overwrite_static_day_merge_sub_strategy_sql = """ +{% set partitions_to_replace = [ + "'2020-01-01'", + "'2020-01-02'", +] %} + +{{ + config( + materialized="incremental", + incremental_strategy="insert_overwrite", + incremental_substrategy='merge', + cluster_by="id", + partition_by={ + "field": "date_time", + "data_type": "datetime", + "granularity": "day" + }, + partitions=partitions_to_replace, + on_schema_change="sync_all_columns" + ) +}} + + +with data as ( + + {% if not is_incremental() %} + + select 1 as id, cast('2020-01-01' as datetime) as date_time union all + select 2 as id, cast('2020-01-01' as datetime) as date_time union all + select 3 as id, cast('2020-01-01' as datetime) as date_time union all + select 4 as id, cast('2020-01-01' as datetime) as date_time + + {% else %} + + -- we want to overwrite the 4 records in the 2020-01-01 partition + -- with the 2 records below, but add two more in the 2020-01-02 partition + select 10 as id, cast('2020-01-01' as datetime) as date_time union all + select 20 as id, cast('2020-01-01' as datetime) as date_time union all + select 30 as id, cast('2020-01-02' as datetime) as date_time union all + select 40 as id, cast('2020-01-02' as datetime) as date_time + + {% endif %} + +) + +select * from data +""".lstrip() + + +overwrite_static_day_copy_partitions_sub_strategy_sql = """ +{% set partitions_to_replace = [ + "'2020-01-01'", + "'2020-01-02'", +] %} + +{{ + config( + materialized="incremental", + incremental_strategy="insert_overwrite", + incremental_substrategy='copy_partitions', cluster_by="id", partition_by={ "field": "date_time", diff --git a/tests/functional/adapter/incremental/test_incremental_strategies.py b/tests/functional/adapter/incremental/test_incremental_strategies.py index fbef2a794..8c0528807 100644 --- a/tests/functional/adapter/incremental/test_incremental_strategies.py +++ b/tests/functional/adapter/incremental/test_incremental_strategies.py @@ -28,6 +28,8 @@ overwrite_day_with_time_partition_datetime_sql, overwrite_static_day_sql, overwrite_static_day_delete_and_insert_sub_strategy_sql, + overwrite_static_day_merge_sub_strategy_sql, + overwrite_static_day_copy_partitions_sub_strategy_sql, ) @@ -50,8 +52,10 @@ def models(self): "incremental_overwrite_time.sql": overwrite_time_sql, "incremental_overwrite_day_with_time_partition.sql": overwrite_day_with_time_ingestion_sql, "incremental_overwrite_day_with_time_partition_datetime.sql": overwrite_day_with_time_partition_datetime_sql, - "incremental_overwrite_static_day.sql": overwrite_static_day_sql, - "incremental_overwrite_static_day_with_deleteinsert.sql": overwrite_static_day_delete_and_insert_sub_strategy_sql + "incremental_overwrite_static_substrategy_day.sql": overwrite_static_day_sql, + "incremental_overwrite_static_substrategy_day_with_deleteinsert.sql": overwrite_static_day_delete_and_insert_sub_strategy_sql, + "incremental_overwrite_static_substrategy_day_with_merge.sql": overwrite_static_day_merge_sub_strategy_sql, + "incremental_overwrite_static_substrategy_day_with_copy_partitions.sql": overwrite_static_day_copy_partitions_sub_strategy_sql, } @pytest.fixture(scope="class") @@ -86,8 +90,13 @@ def test__bigquery_assert_incremental_configurations_apply_the_right_strategy(se "incremental_overwrite_day_with_time_partition_datetime", "incremental_overwrite_day_with_time_partition_expected", ), - ("incremental_overwrite_static_day", "incremental_overwrite_day_expected"), - ("incremental_overwrite_static_day_with_deleteinsert", "incremental_overwrite_day_expected"), + ("incremental_overwrite_static_substrategy_day", "incremental_overwrite_day_expected"), + ( + "incremental_overwrite_static_substrategy_day_with_deleteinsert", + "incremental_overwrite_static_substrategy_day_with_merge", + "incremental_overwrite_static_substrategy_day_with_copy_partitions", + "incremental_overwrite_day_expected", + ), ] db_with_schema = f"{project.database}.{project.test_schema}" for incremental_strategy in incremental_strategies: From 58937898f882fb2b243e4244c554a29401e509d8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Borja=20V=C3=A1zquez-Barreiros?= Date: Mon, 30 Dec 2024 12:29:40 +0000 Subject: [PATCH 4/9] Update incremental.sql --- dbt/include/bigquery/macros/materializations/incremental.sql | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dbt/include/bigquery/macros/materializations/incremental.sql b/dbt/include/bigquery/macros/materializations/incremental.sql index 1ca3d1e7e..6cabdf2ed 100644 --- a/dbt/include/bigquery/macros/materializations/incremental.sql +++ b/dbt/include/bigquery/macros/materializations/incremental.sql @@ -30,7 +30,7 @@ {% set wrong_fn -%} The 'incremental_substrategy' option has to be either 'merge' (default), 'delete+insert' or 'copy_partitions'. {%- endset %} - {% do exceptions.raise_compiler_error(wrong_strategy_msg) %} + {% do exceptions.raise_compiler_error(wrong_fn) %} {% endif %} {% elif incremental_substrategy is not none%} {% set wrong_strategy_msg -%} @@ -176,7 +176,7 @@ {% endif %} {% set build_sql = bq_generate_incremental_build_sql( - strategy, tmp_relation, target_relation, compiled_code, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, incremental_substrategy, + strategy, tmp_relation, target_relation, compiled_code, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, incremental_substrategy, incremental_predicates ) %} {%- call statement('main') -%} From a6a0b62e62aa5ab2cb8b0fb32f771ce875891187 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Borja=20V=C3=A1zquez-Barreiros?= Date: Tue, 21 Jan 2025 10:43:12 +0000 Subject: [PATCH 5/9] update tests --- .../incremental/test_incremental_strategies.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/tests/functional/adapter/incremental/test_incremental_strategies.py b/tests/functional/adapter/incremental/test_incremental_strategies.py index 8c0528807..67b1495d1 100644 --- a/tests/functional/adapter/incremental/test_incremental_strategies.py +++ b/tests/functional/adapter/incremental/test_incremental_strategies.py @@ -90,10 +90,19 @@ def test__bigquery_assert_incremental_configurations_apply_the_right_strategy(se "incremental_overwrite_day_with_time_partition_datetime", "incremental_overwrite_day_with_time_partition_expected", ), - ("incremental_overwrite_static_substrategy_day", "incremental_overwrite_day_expected"), - ( + ( + "incremental_overwrite_static_substrategy_day", + "incremental_overwrite_day_expected" + ), + ( "incremental_overwrite_static_substrategy_day_with_deleteinsert", + "incremental_overwrite_day_expected", + ), + ( "incremental_overwrite_static_substrategy_day_with_merge", + "incremental_overwrite_day_expected", + ), + ( "incremental_overwrite_static_substrategy_day_with_copy_partitions", "incremental_overwrite_day_expected", ), From 1471e9683e25619861762bae845c79c7893ea569 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Borja=20V=C3=A1zquez-Barreiros?= Date: Tue, 21 Jan 2025 11:30:02 +0000 Subject: [PATCH 6/9] fmt --- .../adapter/incremental/test_incremental_strategies.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/tests/functional/adapter/incremental/test_incremental_strategies.py b/tests/functional/adapter/incremental/test_incremental_strategies.py index 67b1495d1..2da098c4e 100644 --- a/tests/functional/adapter/incremental/test_incremental_strategies.py +++ b/tests/functional/adapter/incremental/test_incremental_strategies.py @@ -90,15 +90,12 @@ def test__bigquery_assert_incremental_configurations_apply_the_right_strategy(se "incremental_overwrite_day_with_time_partition_datetime", "incremental_overwrite_day_with_time_partition_expected", ), - ( - "incremental_overwrite_static_substrategy_day", - "incremental_overwrite_day_expected" - ), + ("incremental_overwrite_static_substrategy_day", "incremental_overwrite_day_expected"), ( "incremental_overwrite_static_substrategy_day_with_deleteinsert", "incremental_overwrite_day_expected", ), - ( + ( "incremental_overwrite_static_substrategy_day_with_merge", "incremental_overwrite_day_expected", ), From 48b34ae852ef93657044a6cb2271e08f47fecfdc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Borja=20V=C3=A1zquez-Barreiros?= Date: Tue, 21 Jan 2025 12:50:48 +0000 Subject: [PATCH 7/9] fmt --- .../adapter/incremental/test_incremental_strategies.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/functional/adapter/incremental/test_incremental_strategies.py b/tests/functional/adapter/incremental/test_incremental_strategies.py index 2da098c4e..9f74b9d5f 100644 --- a/tests/functional/adapter/incremental/test_incremental_strategies.py +++ b/tests/functional/adapter/incremental/test_incremental_strategies.py @@ -91,7 +91,7 @@ def test__bigquery_assert_incremental_configurations_apply_the_right_strategy(se "incremental_overwrite_day_with_time_partition_expected", ), ("incremental_overwrite_static_substrategy_day", "incremental_overwrite_day_expected"), - ( + ( "incremental_overwrite_static_substrategy_day_with_deleteinsert", "incremental_overwrite_day_expected", ), From b235260630652fd675349d582d2b34ce536e1611 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Borja=20V=C3=A1zquez-Barreiros?= Date: Tue, 21 Jan 2025 23:47:02 +0000 Subject: [PATCH 8/9] add commit option --- .../macros/materializations/incremental.sql | 4 +- .../incremental_strategy/insert_overwrite.sql | 56 ++++++++++--------- .../incremental_strategy_fixtures.py | 49 ++++++++++++++++ .../test_incremental_strategies.py | 6 ++ 4 files changed, 88 insertions(+), 27 deletions(-) diff --git a/dbt/include/bigquery/macros/materializations/incremental.sql b/dbt/include/bigquery/macros/materializations/incremental.sql index 6cabdf2ed..6a1a074e8 100644 --- a/dbt/include/bigquery/macros/materializations/incremental.sql +++ b/dbt/include/bigquery/macros/materializations/incremental.sql @@ -26,9 +26,9 @@ {%- set incremental_substrategy = config.get('incremental_substrategy', 'copy_partitions' if copy_partitions else 'merge') -%} {% if strategy in ['insert_overwrite', 'microbatch'] %} - {% if incremental_substrategy not in ['merge', 'delete+insert', 'copy_partitions'] %} + {% if incremental_substrategy not in ['merge', 'commit+delete+insert', 'delete+insert', 'copy_partitions'] %} {% set wrong_fn -%} - The 'incremental_substrategy' option has to be either 'merge' (default), 'delete+insert' or 'copy_partitions'. + The 'incremental_substrategy' option has to be either 'merge' (default), 'commit+delete+insert', 'delete+insert' or 'copy_partitions'. {%- endset %} {% do exceptions.raise_compiler_error(wrong_fn) %} {% endif %} diff --git a/dbt/include/bigquery/macros/materializations/incremental_strategy/insert_overwrite.sql b/dbt/include/bigquery/macros/materializations/incremental_strategy/insert_overwrite.sql index f955c265b..02c40300c 100644 --- a/dbt/include/bigquery/macros/materializations/incremental_strategy/insert_overwrite.sql +++ b/dbt/include/bigquery/macros/materializations/incremental_strategy/insert_overwrite.sql @@ -87,8 +87,11 @@ #} {% if incremental_substrategy == 'delete+insert' %} - -- 1. run insert_overwrite with delete+insert transaction strategy optimisation + -- 1. run insert_overwrite with delete+insert (without a transaction) strategy optimisation {{ bq_get_insert_overwrite_with_delete_and_insert_sql(target_relation, source_sql, dest_columns, [predicate], include_sql_header = not tmp_relation_exists) }}; + {% elif incremental_substrategy == 'commit+delete+insert' %} + -- 1. run insert_overwrite with delete+insert (with a transaction) strategy optimisation + {{ bq_get_insert_overwrite_with_delete_and_insert_sql(target_relation, source_sql, dest_columns, [predicate], include_sql_header = not tmp_relation_exists, transactional=True) }}; {% else %} -- 1. run insert_overwrite with merge strategy optimisation {{ get_insert_overwrite_merge_sql(target_relation, source_sql, dest_columns, [predicate], include_sql_header = not tmp_relation_exists) }}; @@ -162,8 +165,11 @@ ); {% if incremental_substrategy == 'delete+insert' %} - -- 3. run insert_overwrite with the delete+insert transaction strategy optimisation + -- 3. run insert_overwrite with the delete+insert (without a transaction) strategy optimisation {{ bq_get_insert_overwrite_with_delete_and_insert_sql(target_relation, source_sql, dest_columns, [predicate]) }}; + {% elif incremental_substrategy == 'commit+delete+insert' %} + -- 3. run insert_overwrite with the delete+insert (with a transaction) strategy optimisation + {{ bq_get_insert_overwrite_with_delete_and_insert_sql(target_relation, source_sql, dest_columns, [predicate], transactional=True) }}; {% else %} -- 3. run insert_overwrite with the merge strategy optimisation {{ get_insert_overwrite_merge_sql(target_relation, source_sql, dest_columns, [predicate]) }}; @@ -177,39 +183,39 @@ -{% macro bq_get_insert_overwrite_with_delete_and_insert_sql(target, source, dest_columns, predicates, include_sql_header) -%} +{% macro bq_get_insert_overwrite_with_delete_and_insert_sql(target, source, dest_columns, predicates, include_sql_header, transactional=False) -%} {%- set predicates = [] if predicates is none else [] + predicates -%} {%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%} {%- set sql_header = config.get('sql_header', none) -%} {{ sql_header if sql_header is not none and include_sql_header }} + {% if transactional %} + -- We can rely on a multi-statement transaction to enable atomicity + -- If something goes south, nothing is committed + -- DELETE + INSERT allow for isolation lock begin - begin transaction; - - -- (as of Nov 2024) - -- DELETE operations are free if the partition is a DATE - -- * Not free if the partitions are granular (hourly, monthly) - -- or some other conditions like subqueries and so on. - delete from {{ target }} as DBT_INTERNAL_DEST - where true - {%- if predicates %} - {% for predicate in predicates %} - and {{ predicate }} - {% endfor %} - {%- endif -%}; - - - insert into {{ target }} ({{ dest_cols_csv }}) - ( - select {{ dest_cols_csv }} - from {{ source }} - ); + begin transaction ; + {% endif %} + -- DELETE operations are free https://cloud.google.com/bigquery/docs/using-dml-with-partitioned-tables#using_dml_delete_to_delete_partitions + delete from {{ target }} as DBT_INTERNAL_DEST + where true + {% if predicates %} and {{ predicates | join(' and ') }} {% endif %}; - commit transaction; + -- INSERT the data + insert into {{ target }} ({{ dest_cols_csv }}) + ( + select {{ dest_cols_csv }} + from {{ source }} + ) + {% if transactional %} + -- leaving the trailing ; out of the if as the calling macro already adds a leading ; to this output + ; commit transaction; exception when error then - raise using message = FORMAT("Error: %s", @@error.message); + -- If things go south, abort and rollback + raise using message = FORMAT("dbt error while commit+delete+instert: %s", @@error.message); rollback transaction; end + {% endif %} {% endmacro %} diff --git a/tests/functional/adapter/incremental/incremental_strategy_fixtures.py b/tests/functional/adapter/incremental/incremental_strategy_fixtures.py index d6f0f6a6f..d8a2dfc9c 100644 --- a/tests/functional/adapter/incremental/incremental_strategy_fixtures.py +++ b/tests/functional/adapter/incremental/incremental_strategy_fixtures.py @@ -714,6 +714,55 @@ select * from data """.lstrip() +overwrite_static_day_commit_delete_and_insert_sub_strategy_sql = """ +{% set partitions_to_replace = [ + "'2020-01-01'", + "'2020-01-02'", +] %} + +{{ + config( + materialized="incremental", + incremental_strategy="insert_overwrite", + incremental_substrategy='commit+delete+insert', + cluster_by="id", + partition_by={ + "field": "date_time", + "data_type": "datetime", + "granularity": "day" + }, + partitions=partitions_to_replace, + on_schema_change="sync_all_columns" + ) +}} + + +with data as ( + + {% if not is_incremental() %} + + select 1 as id, cast('2020-01-01' as datetime) as date_time union all + select 2 as id, cast('2020-01-01' as datetime) as date_time union all + select 3 as id, cast('2020-01-01' as datetime) as date_time union all + select 4 as id, cast('2020-01-01' as datetime) as date_time + + {% else %} + + -- we want to overwrite the 4 records in the 2020-01-01 partition + -- with the 2 records below, but add two more in the 2020-01-02 partition + select 10 as id, cast('2020-01-01' as datetime) as date_time union all + select 20 as id, cast('2020-01-01' as datetime) as date_time union all + select 30 as id, cast('2020-01-02' as datetime) as date_time union all + select 40 as id, cast('2020-01-02' as datetime) as date_time + + {% endif %} + +) + +select * from data +""".lstrip() + + overwrite_static_day_merge_sub_strategy_sql = """ {% set partitions_to_replace = [ diff --git a/tests/functional/adapter/incremental/test_incremental_strategies.py b/tests/functional/adapter/incremental/test_incremental_strategies.py index 9f74b9d5f..dd6ab7196 100644 --- a/tests/functional/adapter/incremental/test_incremental_strategies.py +++ b/tests/functional/adapter/incremental/test_incremental_strategies.py @@ -28,6 +28,7 @@ overwrite_day_with_time_partition_datetime_sql, overwrite_static_day_sql, overwrite_static_day_delete_and_insert_sub_strategy_sql, + overwrite_static_day_commit_delete_and_insert_sub_strategy_sql, overwrite_static_day_merge_sub_strategy_sql, overwrite_static_day_copy_partitions_sub_strategy_sql, ) @@ -54,6 +55,7 @@ def models(self): "incremental_overwrite_day_with_time_partition_datetime.sql": overwrite_day_with_time_partition_datetime_sql, "incremental_overwrite_static_substrategy_day.sql": overwrite_static_day_sql, "incremental_overwrite_static_substrategy_day_with_deleteinsert.sql": overwrite_static_day_delete_and_insert_sub_strategy_sql, + "incremental_overwrite_static_substrategy_day_with_commitdeleteinsert.sql": overwrite_static_day_commit_delete_and_insert_sub_strategy_sql, "incremental_overwrite_static_substrategy_day_with_merge.sql": overwrite_static_day_merge_sub_strategy_sql, "incremental_overwrite_static_substrategy_day_with_copy_partitions.sql": overwrite_static_day_copy_partitions_sub_strategy_sql, } @@ -95,6 +97,10 @@ def test__bigquery_assert_incremental_configurations_apply_the_right_strategy(se "incremental_overwrite_static_substrategy_day_with_deleteinsert", "incremental_overwrite_day_expected", ), + ( + "incremental_overwrite_static_substrategy_day_with_commitdeleteinsert", + "incremental_overwrite_day_expected", + ), ( "incremental_overwrite_static_substrategy_day_with_merge", "incremental_overwrite_day_expected", From a40509863f99d9c784e7c054ceff2af9f370687a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Borja=20V=C3=A1zquez-Barreiros?= Date: Tue, 21 Jan 2025 23:49:48 +0000 Subject: [PATCH 9/9] Update incremental_strategy_fixtures.py --- .../adapter/incremental/incremental_strategy_fixtures.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/tests/functional/adapter/incremental/incremental_strategy_fixtures.py b/tests/functional/adapter/incremental/incremental_strategy_fixtures.py index d8a2dfc9c..72756f43b 100644 --- a/tests/functional/adapter/incremental/incremental_strategy_fixtures.py +++ b/tests/functional/adapter/incremental/incremental_strategy_fixtures.py @@ -762,8 +762,6 @@ select * from data """.lstrip() - - overwrite_static_day_merge_sub_strategy_sql = """ {% set partitions_to_replace = [ "'2020-01-01'",