From 8a60fbeff208c6b3efb2eda3bcae26646788ae6c Mon Sep 17 00:00:00 2001 From: Claire Carroll Date: Tue, 19 Jun 2018 08:20:47 +1000 Subject: [PATCH 1/5] Add insert-by-period materialization --- README.md | 51 +++++- .../insert_by_period_materialization.sql | 168 ++++++++++++++++++ 2 files changed, 217 insertions(+), 2 deletions(-) create mode 100644 macros/materializations/insert_by_period_materialization.sql diff --git a/README.md b/README.md index 3ee17850..a6fa2470 100644 --- a/README.md +++ b/README.md @@ -190,8 +190,8 @@ This macro implements an "outer union." The list of tables provided to this macr Usage: ``` {{ dbt_utils.union_tables( - tables=[ref('table_1'), ref('table_2')], - column_override={"some_field": "varchar(100)"}, + tables=[ref('table_1'), ref('table_2')], + column_override={"some_field": "varchar(100)"}, exclude=["some_other_field"] ) }} ``` @@ -266,6 +266,53 @@ Usage: ``` {{ dbt_utils.get_url_parameter(field='page_url', url_parameter='utm_source') }} ``` +--- +### Materializations +#### insert_by_period ([source](macros/materializations/insert_by_period_materialization.sql)) +`insert_by_period` allows dbt to insert records into a table one period (i.e. day, week) at a time. + +This materialization is particularly useful for large models where the initial run can be problematic. + +Should a run of a model using this materialization be interrupted, a subsequent run will continue building the target table from where it was interrupted (granted the `--full-refresh` flag is omitted). + +Progress is logged in the command line for easy monitoring. + +Usage: +```sql +{{ + config( + materialized = "insert_by_period", + period = "day" -- optional; period to break the model into - if not provided, "week" will be used. + -- Must be a valid Datepart (https://docs.aws.amazon.com/redshift/latest/dg/r_Dateparts_for_datetime_functions.html) + timestamp_field = "created_at", -- required; the column name of the timestamp field that will be used to break the model into smaller queries + start_date = "2018-01-01", -- required; literal date or timestamp - generally choose a date that is earlier than the start of your data + stop_date = "2018-06-01", -- optional; literal date or timestamp - if not provided, today's timestamp will be used + ) +}} + +with events as ( + + select * + from public.events + where __PERIOD_FILTER__ -- This will be replaced with a filter in the materialization code + +) + +....complex aggregates here.... + +``` + + +Caveats: +* This materialization is compatible with dbt 0.10.1. +* This materialization has been written for Redshift. +* This materialization can only be used for a model where records are not expected to change after they are created. +* Any model post-hooks that use `{{this}}` will fail using this materialization. For example: +```yaml +models: + project-name: + post-hook: "grant select on {{this}} to db_reader" +``` ---- diff --git a/macros/materializations/insert_by_period_materialization.sql b/macros/materializations/insert_by_period_materialization.sql new file mode 100644 index 00000000..0681fd9f --- /dev/null +++ b/macros/materializations/insert_by_period_materialization.sql @@ -0,0 +1,168 @@ +{% macro get_period_boundaries(target_schema, target_table, timestamp_field, start_date, stop_date, period) -%} + + {% call statement('period_boundaries', fetch_result=True) -%} + with data as ( + select + coalesce(max("{{timestamp_field}}"), '{{start_date}}')::timestamp as start_timestamp, + coalesce( + {{dbt_utils.dateadd('millisecond', + -1, + "nullif('" ~ stop_date ~ "','')::timestamp")}}, + {{dbt_utils.current_timestamp()}} + ) as stop_timestamp + from "{{target_schema}}"."{{target_table}}" + ) + + select + start_timestamp, + stop_timestamp, + {{dbt_utils.datediff('start_timestamp', + 'stop_timestamp', + period)}} + 1 as num_periods + from data + {%- endcall %} + +{%- endmacro %} + +{% macro get_period_sql(target_cols_csv, sql, timestamp_field, period, start_timestamp, stop_timestamp, offset) -%} + + {%- set period_filter -%} + ("{{timestamp_field}}" > '{{start_timestamp}}'::timestamp + interval '{{offset}} {{period}}' and + "{{timestamp_field}}" <= '{{start_timestamp}}'::timestamp + interval '{{offset}} {{period}}' + interval '1 {{period}}' and + "{{timestamp_field}}" < '{{stop_timestamp}}'::timestamp) + {%- endset -%} + + {%- set filtered_sql = sql | replace("__PERIOD_FILTER__", period_filter) -%} + + select + {{target_cols_csv}} + from ( + {{filtered_sql}} + ) + +{%- endmacro %} + +{% materialization insert_by_period, default -%} + {%- set timestamp_field = config.require('timestamp_field') -%} + {%- set start_date = config.require('start_date') -%} + {%- set stop_date = config.get('stop_date') or '' -%}} + {%- set period = config.get('period') or 'week' -%} + + {%- set identifier = model['name'] -%} + + {%- set existing_relations = adapter.list_relations(schema=schema) -%} + {%- set old_relation = adapter.get_relation(relations_list=existing_relations, + schema=schema, identifier=identifier) -%} + {%- set target_relation = api.Relation.create(identifier=identifier, schema=schema, type='table') -%} + + {%- set non_destructive_mode = (flags.NON_DESTRUCTIVE == True) -%} + {%- set full_refresh_mode = (flags.FULL_REFRESH == True) -%} + + {%- set exists_as_table = (old_relation is not none and old_relation.is_table) -%} + {%- set exists_not_as_table = (old_relation is not none and not old_relation.is_table) -%} + + {%- set should_truncate = (non_destructive_mode and full_refresh_mode and exists_as_table) -%} + {%- set should_drop = (not should_truncate and (full_refresh_mode or exists_not_as_table)) -%} + {%- set force_create = (flags.FULL_REFRESH and not flags.NON_DESTRUCTIVE) -%} + + -- setup + {% if old_relation is none -%} + -- noop + {%- elif should_truncate -%} + {{adapter.truncate_relation(old_relation)}} + {%- elif should_drop -%} + {{adapter.drop_relation(old_relation)}} + {%- set old_relation = none -%} + {%- endif %} + + {{run_hooks(pre_hooks, inside_transaction=False)}} + + -- `begin` happens here, so `commit` after it to finish the transaction + {{run_hooks(pre_hooks, inside_transaction=True)}} + {% call statement() -%} + begin; -- make extra sure we've closed out the transaction + commit; + {%- endcall %} + + -- build model + {% if force_create or old_relation is none -%} + {# Create an empty target table -#} + {% call statement('main') -%} + {%- set empty_sql = sql | replace("__PERIOD_FILTER__", 'false') -%} + {{create_table_as(False, target_relation, empty_sql)}}; + {%- endcall %} + {%- endif %} + + {% set _ = dbt_utils.get_period_boundaries(schema, + identifier, + timestamp_field, + start_date, + stop_date, + period) %} + {%- set start_timestamp = load_result('period_boundaries')['data'][0][0] | string -%} + {%- set stop_timestamp = load_result('period_boundaries')['data'][0][1] | string -%} + {%- set num_periods = load_result('period_boundaries')['data'][0][2] | int -%} + + {% set target_columns = adapter.get_columns_in_table(schema, identifier) %} + {%- set target_cols_csv = target_columns | map(attribute='quoted') | join(', ') -%} + {%- set loop_vars = {'sum_rows_inserted': 0} -%} + + -- commit each period as a separate transaction + {% for i in range(num_periods) -%} + {%- set msg = "Running for " ~ period ~ " " ~ (i + 1) ~ " of " ~ (num_periods) -%} + {{log(" + " ~ modules.datetime.datetime.now().strftime('%H:%M:%S') ~ " " ~ msg, info=True)}} + + {%- set tmp_identifier = model['name'] ~ '__dbt_incremental_period' ~ i ~ '_tmp' -%} + {%- set tmp_relation = api.Relation.create(identifier=tmp_identifier, + schema=schema, type='table') -%} + {% call statement() -%} + {% set tmp_table_sql = dbt_utils.get_period_sql(target_cols_csv, + sql, + timestamp_field, + period, + start_timestamp, + stop_timestamp, + i) %} + {{dbt.create_table_as(True, tmp_relation, tmp_table_sql)}} + {%- endcall %} + + {{adapter.expand_target_column_types(temp_table=tmp_identifier, + to_schema=schema, + to_table=identifier)}} + {%- set name = 'main-' ~ i -%} + {% call statement(name, fetch_result=True) -%} + insert into {{target_relation}} ({{target_cols_csv}}) + ( + select + {{target_cols_csv}} + from {{tmp_relation.include(schema=False)}} + ); + {%- endcall %} + {%- set rows_inserted = (load_result('main-' ~ i)['status'].split(" "))[2] | int -%} + {%- set sum_rows_inserted = loop_vars['sum_rows_inserted'] + rows_inserted -%} + {%- if loop_vars.update({'sum_rows_inserted': sum_rows_inserted}) %} {% endif -%} + + {%- set msg = "Ran for " ~ period ~ " " ~ (i + 1) ~ " of " ~ (num_periods) ~ "; " ~ rows_inserted ~ " records inserted" -%} + {{log(" + " ~ modules.datetime.datetime.now().strftime('%H:%M:%S') ~ " " ~ msg, info=True)}} + + {%- endfor %} + + {% call statement() -%} + begin; + {%- endcall %} + + {{run_hooks(post_hooks, inside_transaction=True)}} + + {% call statement() -%} + commit; + {%- endcall %} + + {{run_hooks(post_hooks, inside_transaction=False)}} + + {%- set status_string = "INSERT " ~ loop_vars['sum_rows_inserted'] -%} + + {% call noop_statement(name='main', status=status_string) -%} + -- no-op + {%- endcall %} + +{%- endmaterialization %} From 57e0b9876ddcbf641726719f55dc3def2fd8ef9d Mon Sep 17 00:00:00 2001 From: Claire Carroll Date: Mon, 25 Jun 2018 20:09:01 +1000 Subject: [PATCH 2/5] Update readme --- README.md | 24 +++++++++++++++--------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index a6fa2470..295a10cb 100644 --- a/README.md +++ b/README.md @@ -282,12 +282,10 @@ Usage: {{ config( materialized = "insert_by_period", - period = "day" -- optional; period to break the model into - if not provided, "week" will be used. - -- Must be a valid Datepart (https://docs.aws.amazon.com/redshift/latest/dg/r_Dateparts_for_datetime_functions.html) - timestamp_field = "created_at", -- required; the column name of the timestamp field that will be used to break the model into smaller queries - start_date = "2018-01-01", -- required; literal date or timestamp - generally choose a date that is earlier than the start of your data - stop_date = "2018-06-01", -- optional; literal date or timestamp - if not provided, today's timestamp will be used - ) + period = "day", + timestamp_field = "created_at", + start_date = "2018-01-01", + stop_date = "2018-06-01" }} with events as ( @@ -301,17 +299,25 @@ with events as ( ....complex aggregates here.... ``` - +Configuration values: +* `period`: period to break the model into, must be a valid [datepart](https://docs.aws.amazon.com/redshift/latest/dg/r_Dateparts_for_datetime_functions.html) (default='Week') +* `timestamp_field`: the column name of the timestamp field that will be used to break the model into smaller queries +* `start_date`: literal date or timestamp - generally choose a date that is earlier than the start of your data +* `stop_date`: literal date or timestamp (default=current_timestamp) Caveats: * This materialization is compatible with dbt 0.10.1. * This materialization has been written for Redshift. * This materialization can only be used for a model where records are not expected to change after they are created. -* Any model post-hooks that use `{{this}}` will fail using this materialization. For example: +* Any model post-hooks that use `{{ this }}` will fail using this materialization. For example: ```yaml models: project-name: - post-hook: "grant select on {{this}} to db_reader" + post-hook: "grant select on {{ this }} to db_reader" +``` +A useful workaround is to change the above post-hook to: +```yaml + post-hook: "grant select on {{ this.schema }}.{{ this.name }} to db_reader" ``` ---- From 4a71ed57c7e473aa01ad6886a4e7404c42d85414 Mon Sep 17 00:00:00 2001 From: Claire Carroll Date: Tue, 26 Jun 2018 19:49:40 +1000 Subject: [PATCH 3/5] Minor tweaks to readme --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 295a10cb..92710479 100644 --- a/README.md +++ b/README.md @@ -271,7 +271,7 @@ Usage: #### insert_by_period ([source](macros/materializations/insert_by_period_materialization.sql)) `insert_by_period` allows dbt to insert records into a table one period (i.e. day, week) at a time. -This materialization is particularly useful for large models where the initial run can be problematic. +This materialization is particularly useful for building large tables, especially where the initial run can be problematic. Should a run of a model using this materialization be interrupted, a subsequent run will continue building the target table from where it was interrupted (granted the `--full-refresh` flag is omitted). @@ -291,7 +291,7 @@ Usage: with events as ( select * - from public.events + from {{ ref('events') }} where __PERIOD_FILTER__ -- This will be replaced with a filter in the materialization code ) From b2c1752826bed027a6c55ce76115fbcc34050509 Mon Sep 17 00:00:00 2001 From: Claire Carroll Date: Wed, 27 Jun 2018 19:43:48 +1000 Subject: [PATCH 4/5] Add error handling when filter is not defined --- .../materializations/insert_by_period_materialization.sql | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/macros/materializations/insert_by_period_materialization.sql b/macros/materializations/insert_by_period_materialization.sql index 0681fd9f..23c9eee1 100644 --- a/macros/materializations/insert_by_period_materialization.sql +++ b/macros/materializations/insert_by_period_materialization.sql @@ -48,6 +48,13 @@ {%- set stop_date = config.get('stop_date') or '' -%}} {%- set period = config.get('period') or 'week' -%} + {%- if not '__PERIOD_FILTER__' is in sql -%} + {%- set error_message -%} + Model '{{ model.unique_id }}' does not include the required string '__PERIOD_FILTER__' in its sql + {%- endset -%} + {{ exceptions.raise_compiler_error(error_message) }} + {%- endif -%} + {%- set identifier = model['name'] -%} {%- set existing_relations = adapter.list_relations(schema=schema) -%} From b9241331dc1a5b7b015d31124d0d4199812475ae Mon Sep 17 00:00:00 2001 From: Claire Carroll Date: Wed, 27 Jun 2018 19:46:52 +1000 Subject: [PATCH 5/5] this is just to say --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 92710479..a085ff9e 100644 --- a/README.md +++ b/README.md @@ -271,7 +271,7 @@ Usage: #### insert_by_period ([source](macros/materializations/insert_by_period_materialization.sql)) `insert_by_period` allows dbt to insert records into a table one period (i.e. day, week) at a time. -This materialization is particularly useful for building large tables, especially where the initial run can be problematic. +This materialization is appropriate for event data that can be processed in discrete periods. It is similar in concept to the built-in incremental materialization, but has the added benefit of building the model in chunks even during a full-refresh so is particularly useful for models where the initial run can be problematic. Should a run of a model using this materialization be interrupted, a subsequent run will continue building the target table from where it was interrupted (granted the `--full-refresh` flag is omitted).