Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Persistent UDF Materialization #454

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changes/unreleased/Features-20230103-170813.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
kind: Features
body: Add materialization for BQ persistent SQL UDF
time: 2023-01-03T17:08:13.915345-07:00
custom:
Author: anaghshineh
Issue: "451"
PR: "454"
14 changes: 14 additions & 0 deletions dbt/adapters/bigquery/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,20 @@ def table_ref(database, schema, table_name):
dataset_ref = google.cloud.bigquery.DatasetReference(database, schema)
return google.cloud.bigquery.TableReference(dataset_ref, table_name)

@staticmethod
def routine_ref(database: str, schema: str, routine_name: str) -> google.cloud.bigquery.RoutineReference:
"""Returns a RoutineReference object referencing a BigQuery routine."""
return google.cloud.bigquery.RoutineReference.from_string(f"{database}.{schema}.{routine_name}")

def get_bq_routine(self, database: str, schema: str, identifier: str) -> google.cloud.bigquery.Routine:
"""Get a BigQuery routine (UDF) for a schema/model."""
conn = self.get_thread_connection()
# backwards compatibility: fill in with defaults if not specified
database = database or conn.credentials.database
schema = schema or conn.credentials.schema
routine_ref = self.routine_ref(database, schema, identifier)
return conn.handle.get_routine(routine_ref)
Comment on lines +596 to +603
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reason you can't use info_schema.routines? If we convert these python functions to SQL macro equivalents, we could publish this materialization as a dbt package (at least for an trial period).
https://cloud.google.com/bigquery/docs/information-schema-routines


def get_bq_table(self, database, schema, identifier):
"""Get a bigquery table for a schema/model."""
conn = self.get_thread_connection()
Expand Down
40 changes: 35 additions & 5 deletions dbt/adapters/bigquery/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -747,20 +747,34 @@ def _get_catalog_schemas(self, manifest: Manifest) -> SchemaSearchMap:
)
return result

@staticmethod
def _get_description_option(self, config: Dict[str, Any], node: Dict[str, Any]) -> Dict[str, str]:
"""
Sets the description, if applicable, as part of the relation options
"""
opts = {}

if config.persist_relation_docs() and "description" in node: # type: ignore[attr-defined]
description = sql_escape(node["description"])
opts["description"] = '"""{}"""'.format(description)

return opts

@available.parse(lambda *a, **k: {})
def get_common_options(
self, config: Dict[str, Any], node: Dict[str, Any], temporary: bool = False
) -> Dict[str, Any]:
opts = {}
"""
Constructs a dictionary of common options for configuring Bigquery
tables and views. Supports config options for setting a description,
the number of hours to expiration, and labels.
"""
opts = self._get_description_option(config, node)

if (config.get("hours_to_expiration") is not None) and (not temporary):
expiration = f'TIMESTAMP_ADD(CURRENT_TIMESTAMP(), INTERVAL {config.get("hours_to_expiration")} hour)'
opts["expiration_timestamp"] = expiration

if config.persist_relation_docs() and "description" in node: # type: ignore[attr-defined]
description = sql_escape(node["description"])
opts["description"] = '"""{}"""'.format(description)

if config.get("labels"):
labels = config.get("labels", {})
opts["labels"] = list(labels.items()) # type: ignore[assignment]
Expand All @@ -771,6 +785,9 @@ def get_common_options(
def get_table_options(
self, config: Dict[str, Any], node: Dict[str, Any], temporary: bool
) -> Dict[str, Any]:
"""
Constructs a dictionary of options for configuring a table.
"""
opts = self.get_common_options(config, node, temporary)

if config.get("kms_key_name") is not None:
Expand All @@ -794,9 +811,22 @@ def get_table_options(

@available.parse(lambda *a, **k: {})
def get_view_options(self, config: Dict[str, Any], node: Dict[str, Any]) -> Dict[str, Any]:
"""
Constructs a dictionary of options for configuring a view.
"""
opts = self.get_common_options(config, node)
return opts

@available.parse(lambda *a, **k: {})
def get_udf_options(self, config: Dict[str, Any], node: Dict[str, Any]) -> Dict[str, str]:
"""
Constructs a dictionary of options for configuring a UDF.
Currently, only supports a description option.
"""
opts = self._get_description_option(config, node)

return opts

@available.parse_none
def grant_access_to(self, entity, entity_type, role, grant_target_dict):
"""
Expand Down
46 changes: 46 additions & 0 deletions dbt/include/bigquery/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,52 @@

{% endmacro %}

-- Define the UDF options. Currently, only UDF description supported.
-- E.g., OPTIONS(description="description here")
{% macro bigquery_udf_options(config, node, temporary) %}
{% set opts = adapter.get_udf_options(config, node, temporary) %}
{%- do return(bigquery_options(opts)) -%}
{%- endmacro -%}

-- Define the return type for the UDF, if applicable.
-- E.g., RETURNS STRING
{% macro bigquery_udf_returns(udf_return_type) %}
{% if udf_return_type -%}
{% set udf_returns -%}
RETURNS {{ udf_return_type }}
{%- endset %}
{% endif %}
{%- do return(udf_returns) -%}
{%- endmacro -%}

-- Create the comma-separated list of UDF args.
-- E.g., arg1 STRING, arg2 INT64
{% macro bigquery_udf_args(udf_args_array) %}
{% set udf_args -%}
{%- for udf_arg in udf_args_array -%}
{{ udf_arg.name }} {{ udf_arg.type }}{% if not loop.last %},{% endif %}
{% endfor %}
{%- endset %}
{%- do return(udf_args) -%}
{%- endmacro -%}

-- Construct the SQL DDL for creating the UDF.
-- Sets the appropriate UDF args, return type, and options
-- based on the UDF model configuration
{% macro bigquery__get_create_udf_as_sql(relation, sql) -%}
{%- set udf_return_type = config.get('return_type', none) -%}
{%- set udf_args_array = config.get('args', []) -%}
{%- set sql_header = config.get('sql_header', none) -%}

{{ sql_header if sql_header is not none }}

create or replace function {{ relation }} (
{{ bigquery_udf_args(udf_args_array) }}
) {{ bigquery_udf_returns(udf_return_type) }} as (
{{ sql }}
) {{ bigquery_udf_options(config, model) }};
{%- endmacro -%}

{% macro bigquery__drop_schema(relation) -%}
{{ adapter.drop_schema(relation) }}
{% endmacro %}
Expand Down
23 changes: 23 additions & 0 deletions dbt/include/bigquery/macros/materializations/udf.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
{% materialization udf, adapter='bigquery' %}
{%- set target_relation = this %}

{{ run_hooks(pre_hooks) }}

-- Create the UDF
{%- call statement('main') -%}
{{ bigquery__get_create_udf_as_sql(target_relation, sql) }}
{%- endcall -%}

{{ run_hooks(post_hooks) }}

-- We do not need to explicitly call persist_docs as in other
-- materializations because UDF documentation is handled in the
-- get_create_udf_as_sql macro. There is no concept of column-level
-- documentation for UDFs.

-- Not calling apply_grants because dataset-level grants not
-- yet natively supported in dbt, and BigQuery does not support
-- permissions at the level of individual UDFs

{{ return({'relations': [target_relation]}) }}
{% endmaterialization %}
110 changes: 110 additions & 0 deletions tests/integration/bigquery_test/test_bigquery_udf_materialization.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
from tests.integration.base import DBTIntegrationTest, use_profile

class TestBigQueryUDFMaterialization(DBTIntegrationTest):

@property
def schema(self):
return "bigquery_test"

@property
def models(self):
return "udf-models"

@property
def project_config(self):
return {
'config-version': 2,
'vars': {
'udf_description': self.udf_description
}
}

@property
def udf_description(self):
return 'this is a UDF'

@use_profile('bigquery')
def test__bigquery_materialize_udf_no_args(self):
"""
Tests the materialization of a UDF model without arguments.
Asserts successful creation without args.
Also asserts that the UDF has the expected return type and
no description, based on the UDF model's config.
"""
results = self.run_dbt(['run', '--select', 'udf_no_args'])
self.assertEqual(len(results), 1)

with self.get_connection() as conn:
client = conn.handle

udf = client.get_routine(
self.adapter.connections.get_bq_routine(
self.default_database, self.unique_schema(), 'udf_no_args'
)
)

self.assertEqual(len(udf.arguments), 0)
self.assertEqual(udf.return_type, 'INT64')
# No description specified, so should be None
self.assertIsNone(udf.description)

@use_profile('bigquery')
def test__bigquery_materialize_udf_with_args(self):
"""
Tests the materialization of a UDF model with arguments.
Asserts successful creation with the expected args.
Also asserts that the UDF has neither a return type nor
a description, based on the UDF model's config.
"""
results = self.run_dbt(['run', '--select', 'udf_with_args'])
self.assertEqual(len(results), 1)

with self.get_connection() as conn:
client = conn.handle

udf = client.get_routine(
self.adapter.connections.get_bq_routine(
self.default_database, self.unique_schema(), 'udf_no_args'
)
)

# Make sure the UDF has two args as specified in model config
self.assertEqual(len(udf.arguments), 2)

# Check the name & type of the first arg
self.assertEqual(udf.arguments[0].name, 'color')
self.assertEqual(udf.arguments[0].data_type.type_kind.name, 'STRING')

# Check the name & type of the second arg
self.assertEqual(udf.arguments[1].name, 'is_pretty')
self.assertEqual(udf.arguments[1].data_type.type_kind.name, 'BOOL')

# Model config did not specify return_type, so should be None
self.assertIsNone(udf.return_type)
# No description specified, so should be None
self.assertIsNone(udf.description)


@use_profile('bigquery')
def test__bigquery_materialize_udf_with_description(self):
"""
Tests the materialization of a UDF model with a description.
Asserts successful creation. Also asserts the UDF's description
matches the one provided in the model properties file.
"""
results = self.run_dbt(['run', '--select', 'udf_with_description'])
self.assertEqual(len(results), 1)

with self.get_connection() as conn:
client = conn.handle

udf = client.get_routine(
self.adapter.connections.get_bq_routine(
self.default_database, self.unique_schema(), 'udf_with_description'
)
)
# Check that the description persisted
self.assertEqual(udf.description, self.udf_description)

self.assertEqual(len(udf.arguments), 0)
self.assertIsNone(udf.return_type)
9 changes: 9 additions & 0 deletions tests/integration/bigquery_test/udf-models/udf_no_args.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{{
config(
materialized='udf',
return_type='INT64',
persist_docs={ 'relation': false }
)
}}

ifnull(null,0,1)
14 changes: 14 additions & 0 deletions tests/integration/bigquery_test/udf-models/udf_with_args.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{{
config(
materialized='udf',
args=[
{'name': 'color', 'type': 'STRING'},
{'name': 'is_pretty', 'type': 'BOOL'}
]
)
}}

case
when is_pretty then concat('pretty', ' ', color)
else concat('not pretty', ' ', color)
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{{
config(
materialized='udf',
persist_docs={ 'relation': true }
)
}}

-- Dummy UDF that returns the following string
'UDF with description'
8 changes: 8 additions & 0 deletions tests/integration/bigquery_test/udf-models/udfs.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
version: 2

models:
- name: udf_no_args
description: 'Should not persist'

- name: udf_with_description
description: '{{ var("udf_description") }}'