Skip to content

Commit

Permalink
feat!: dissociate table_format from table_type (#90)
Browse files Browse the repository at this point in the history
* feat!: dissociate table_format from table_type

* chore: improve readme
  • Loading branch information
Jrmyy authored Dec 12, 2022
1 parent 57a9d04 commit 1b495d7
Show file tree
Hide file tree
Showing 7 changed files with 25 additions and 18 deletions.
10 changes: 8 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,12 @@ _Additional information_
* An array list of columns to bucket data, ignored if using Iceberg
* `bucket_count` (`default=none`)
* The number of buckets for bucketing your data, ignored if using Iceberg
* `table_type` (`default='hive'`)
* The type of table
* Supports `hive` or `iceberg`
* `format` (`default='parquet'`)
* The data format for the table
* Supports `ORC`, `PARQUET`, `AVRO`, `JSON`, `TEXTFILE` or `iceberg`
* Supports `ORC`, `PARQUET`, `AVRO`, `JSON`, `TEXTFILE`
* `write_compression` (`default=none`)
* The compression type to use for any storage format that allows compression to be specified. To see which options are available, check out [CREATE TABLE AS][create-table-as]
* `field_delimiter` (`default=none`)
Expand Down Expand Up @@ -166,7 +169,8 @@ To get started just add this as your model:
```
{{ config(
materialized='table',
format='iceberg',
table_type='iceberg',
format='parquet',
partitioned_by=['bucket(user_id, 5)'],
table_properties={
'optimize_rewrite_delete_file_threshold': '2'
Expand All @@ -185,6 +189,8 @@ SELECT

Iceberg supports bucketing as hidden partitions, therefore use the `partitioned_by` config to add specific bucketing conditions.

Iceberg supports several table formats for data : `PARQUET`, `AVRO` and `ORC`.

It is possible to use iceberg in an incremental fashion, specifically 2 strategies are supported:
* `append`: new records are appended to the table, this can lead to duplicates
* `merge`: must be used in combination with `unique_key` and it's only available with Engine version 3.
Expand Down
4 changes: 2 additions & 2 deletions dbt/include/athena/macros/adapters/relation.sql
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{% macro drop_relation(relation) -%}
{% if config.get('format') != 'iceberg' and config.get('incremental_strategy') != 'append' %}
{% if config.get('table_type') != 'iceberg' and config.get('incremental_strategy') != 'append' %}
{%- do adapter.clean_up_table(relation.schema, relation.table) -%}
{% endif %}
{% call statement('drop_relation', auto_begin=False) -%}
Expand All @@ -8,7 +8,7 @@
{% endmacro %}

{% macro set_table_classification(relation) -%}
{%- set format = config.get('format') -%}
{%- set format = config.get('format', default='parquet') -%}
{% call statement('set_table_classification', auto_begin=False) -%}
alter table {{ relation }} set tblproperties ('classification' = '{{ format }}')
{%- endcall %}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{% macro validate_get_incremental_strategy(raw_strategy, format) %}
{%- if format == 'iceberg' -%}
{% macro validate_get_incremental_strategy(raw_strategy, table_type) %}
{%- if table_type == 'iceberg' -%}
{% set invalid_strategy_msg -%}
Invalid incremental strategy provided: {{ raw_strategy }}
Incremental models on Iceberg tables only work with 'append' or 'merge' (v3 only) strategy.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
{% materialization incremental, adapter='athena' -%}

{% set raw_strategy = config.get('incremental_strategy') or 'insert_overwrite' %}
{% set format = config.get('format', default='parquet') %}
{% set strategy = validate_get_incremental_strategy(raw_strategy, format) %}
{% set table_type = config.get('table_type', default='hive') | lower %}
{% set strategy = validate_get_incremental_strategy(raw_strategy, table_type) %}
{% set on_schema_change = incremental_validate_on_schema_change(config.get('on_schema_change'), default='ignore') %}

{% set partitioned_by = config.get('partitioned_by', default=none) %}
Expand Down Expand Up @@ -43,7 +43,7 @@
{% do run_query(create_table_as(True, tmp_relation, sql)) %}
{% set build_sql = incremental_insert(on_schema_change, tmp_relation, target_relation, existing_relation) %}
{% do to_drop.append(tmp_relation) %}
{% elif strategy == 'merge' and format == 'iceberg' %}
{% elif strategy == 'merge' and table_type == 'iceberg' %}
{% set unique_key = config.get('unique_key') %}
{% set empty_unique_key -%}
Merge strategy must implement unique_key as a single column or a list of columns.
Expand All @@ -66,7 +66,7 @@
{% endcall %}

-- set table properties
{% if not to_drop and format != 'iceberg' %}
{% if not to_drop and table_type != 'iceberg' %}
{{ set_table_classification(target_relation) }}
{% endif %}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{% macro sync_column_schemas(on_schema_change, target_relation, schema_changes_dict) %}
{%- set partitioned_by = config.get('partitioned_by', default=none) -%}
{% set format = config.get('format', default='parquet') %}
{% set table_type = config.get('table_type', default='hive') | lower %}
{%- if partitioned_by is none -%}
{%- set partitioned_by = [] -%}
{%- endif %}
Expand All @@ -12,7 +12,7 @@
{% elif on_schema_change == 'sync_all_columns' %}
{%- set remove_from_target_arr = schema_changes_dict['target_not_in_source'] -%}
{%- set new_target_types = schema_changes_dict['new_target_types'] -%}
{% if format == 'iceberg' %}
{% if table_type == 'iceberg' %}
{% if add_to_target_arr | length > 0 %}
{%- do alter_relation_add_columns(target_relation, add_to_target_arr) -%}
{% endif %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
{%- set bucketed_by = config.get('bucketed_by', default=none) -%}
{%- set bucket_count = config.get('bucket_count', default=none) -%}
{%- set field_delimiter = config.get('field_delimiter', default=none) -%}
{%- set table_type = config.get('table_type', default='hive') | lower -%}
{%- set format = config.get('format', default='parquet') -%}
{%- set write_compression = config.get('write_compression', default=none) -%}
{%- set s3_data_dir = config.get('s3_data_dir', default=target.s3_data_dir) -%}
Expand All @@ -13,7 +14,7 @@
{%- set location_property = 'external_location' -%}
{%- set partition_property = 'partitioned_by' -%}

{%- if format == 'iceberg' -%}
{%- if table_type == 'iceberg' -%}
{%- set location_property = 'location' -%}
{%- set partition_property = 'partitioning' -%}
{%- if bucketed_by is not none or bucket_count is not none -%}
Expand All @@ -37,8 +38,8 @@
create table
{{ relation }}
with (
table_type={%- if format == 'iceberg' -%}'iceberg'{%- else -%}'hive'{%- endif %},
is_external={%- if format == 'iceberg' -%}false{%- else -%}true{%- endif %},
table_type='{{ table_type }}',
is_external={%- if table_type == 'iceberg' -%}false{%- else -%}true{%- endif %},
{{ location_property }}='{{ adapter.s3_table_location(s3_data_dir, s3_data_naming, relation.schema, relation.identifier, external_location, temporary) }}',
{%- if partitioned_by is not none %}
{{ partition_property }}=ARRAY{{ partitioned_by | tojson | replace('\"', '\'') }},
Expand All @@ -55,7 +56,7 @@
{%- if write_compression is not none %}
write_compression='{{ write_compression }}',
{%- endif %}
format={%- if format == 'iceberg' -%}'parquet'{%- else -%}'{{ format }}'{%- endif %}
format='{{ format }}'
{%- if extra_table_properties is not none -%}
{%- for prop_name, prop_value in extra_table_properties.items() -%}
,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{% materialization table, adapter='athena' -%}
{%- set identifier = model['alias'] -%}

{%- set format = config.get('format', default='parquet') -%}
{%- set table_type = config.get('table_type', default='hive') | lower -%}
{%- set old_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier) -%}
{%- set target_relation = api.Relation.create(identifier=identifier,
schema=schema,
Expand All @@ -20,7 +20,7 @@
{{ create_table_as(False, target_relation, sql) }}
{%- endcall %}

{% if format != 'iceberg' %}
{% if table_type != 'iceberg' %}
{{ set_table_classification(target_relation) }}
{% endif %}

Expand Down

0 comments on commit 1b495d7

Please sign in to comment.