Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dbt test --store-failures #3316

Merged
merged 2 commits into from
May 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
### Features
- Support optional `updated_at` config parameter with `check` strategy snapshots. If not supplied, will use current timestamp (default). ([#1844](https://github.com/fishtown-analytics/dbt/issues/1844), [#3376](https://github.com/fishtown-analytics/dbt/pull/3376))
- Add the opt-in `--use-experimental-parser` flag ([#3307](https://github.com/fishtown-analytics/dbt/issues/3307))
- Store test failures in the database ([#517](https://github.com/fishtown-analytics/dbt/issues/517), [#903](https://github.com/fishtown-analytics/dbt/issues/903), [#2593](https://github.com/fishtown-analytics/dbt/issues/2593), [#3316](https://github.com/fishtown-analytics/dbt/issues/3316))

### Fixes
- Fix compiled sql for ephemeral models ([#3317](https://github.com/fishtown-analytics/dbt/issues/3317), [#3318](https://github.com/fishtown-analytics/dbt/pull/3318))
Expand Down
3 changes: 1 addition & 2 deletions core/dbt/compilation.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,7 @@ def add_ephemeral_prefix(self, name: str):

def _get_relation_name(self, node: ParsedNode):
relation_name = None
if (node.resource_type in NodeType.refable() and
not node.is_ephemeral_model):
if node.is_relational and not node.is_ephemeral_model:
adapter = get_adapter(self.config)
relation_cls = adapter.Relation
relation_name = str(relation_cls.create_from(self.config, node))
Expand Down
12 changes: 8 additions & 4 deletions core/dbt/contracts/graph/compiled.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,14 @@ class CompiledSchemaTestNode(CompiledNode, HasTestMetadata):
config: TestConfig = field(default_factory=TestConfig)

def same_config(self, other) -> bool:
return (
self.unrendered_config.get('severity') ==
other.unrendered_config.get('severity')
)
comparisons = [
self.unrendered_config.get(modifier) == other.unrendered_config.get(modifier) or (
self.unrendered_config.get(modifier) is None and
other.unrendered_config.get(modifier) is None
)
for modifier in ('severity', 'store_failures')
]
return all(comparisons)

def same_column_name(self, other) -> bool:
return self.column_name == other.column_name
Expand Down
5 changes: 5 additions & 0 deletions core/dbt/contracts/graph/model_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -436,8 +436,13 @@ class SeedConfig(NodeConfig):

@dataclass
class TestConfig(NodeConfig):
schema: Optional[str] = field(
default='dbt_test__audit',
metadata=CompareBehavior.Exclude.meta(),
)
materialized: str = 'test'
severity: Severity = Severity('ERROR')
store_failures: Optional[bool] = None


@dataclass
Expand Down
27 changes: 23 additions & 4 deletions core/dbt/contracts/graph/parsed.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,21 @@ class ParsedNodeMixins(dbtClassMixin):
def is_refable(self):
return self.resource_type in NodeType.refable()

@property
def should_store_failures(self):
return self.resource_type == NodeType.Test and (
self.config.store_failures if self.config.store_failures is not None
else flags.STORE_FAILURES
)

# will this node map to an object in the database?
@property
def is_relational(self):
return (
self.resource_type in NodeType.refable() or
self.should_store_failures
)

@property
def is_ephemeral(self):
return self.config.materialized == 'ephemeral'
Expand Down Expand Up @@ -370,10 +385,14 @@ class ParsedSchemaTestNode(ParsedNode, HasTestMetadata):
config: TestConfig = field(default_factory=TestConfig)

def same_config(self, other) -> bool:
return (
self.unrendered_config.get('severity') ==
other.unrendered_config.get('severity')
)
comparisons = [
self.unrendered_config.get(modifier) == other.unrendered_config.get(modifier) or (
self.unrendered_config.get(modifier) is None and
other.unrendered_config.get(modifier) is None
)
for modifier in ('severity', 'store_failures')
]
return all(comparisons)

def same_column_name(self, other) -> bool:
return self.column_name == other.column_name
Expand Down
10 changes: 8 additions & 2 deletions core/dbt/flags.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
WRITE_JSON = None
PARTIAL_PARSE = None
USE_COLORS = None
STORE_FAILURES = None


def env_set_truthy(key: str) -> Optional[str]:
Expand Down Expand Up @@ -54,7 +55,8 @@ def _get_context():

def reset():
global STRICT_MODE, FULL_REFRESH, USE_CACHE, WARN_ERROR, TEST_NEW_PARSER, \
USE_EXPERIMENTAL_PARSER, WRITE_JSON, PARTIAL_PARSE, MP_CONTEXT, USE_COLORS
USE_EXPERIMENTAL_PARSER, WRITE_JSON, PARTIAL_PARSE, MP_CONTEXT, USE_COLORS, \
STORE_FAILURES

STRICT_MODE = False
FULL_REFRESH = False
Expand All @@ -66,11 +68,13 @@ def reset():
PARTIAL_PARSE = False
MP_CONTEXT = _get_context()
USE_COLORS = True
STORE_FAILURES = False
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this definitely feels like the most expedient approach, but by making this a global flag, we lose the ability to specify something like this as a model-level config. Do you think there's a path to implementing this more like the full_refresh config?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

definitely!



def set_from_args(args):
global STRICT_MODE, FULL_REFRESH, USE_CACHE, WARN_ERROR, TEST_NEW_PARSER, \
USE_EXPERIMENTAL_PARSER, WRITE_JSON, PARTIAL_PARSE, MP_CONTEXT, USE_COLORS
USE_EXPERIMENTAL_PARSER, WRITE_JSON, PARTIAL_PARSE, MP_CONTEXT, USE_COLORS, \
STORE_FAILURES

USE_CACHE = getattr(args, 'use_cache', USE_CACHE)

Expand All @@ -94,6 +98,8 @@ def set_from_args(args):
if use_colors_override is not None:
USE_COLORS = use_colors_override

STORE_FAILURES = getattr(args, 'store_failures', STORE_FAILURES)


# initialize everything to the defaults on module load
reset()
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,12 @@
{% endif %}
{% do return(config_full_refresh) %}
{% endmacro %}


{% macro should_store_failures() %}
{% set config_store_failures = config.get('store_failures') %}
{% if config_store_failures is none %}
{% set config_store_failures = flags.STORE_FAILURES %}
{% endif %}
{% do return(config_store_failures) %}
{% endmacro %}
44 changes: 40 additions & 4 deletions core/dbt/include/global_project/macros/materializations/test.sql
Original file line number Diff line number Diff line change
@@ -1,10 +1,46 @@
{%- materialization test, default -%}

{% set relations = [] %}

{% if should_store_failures() %}

{% set identifier = model['alias'] %}
{% set old_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier) %}
{% set target_relation = api.Relation.create(
identifier=identifier, schema=schema, database=database, type='table') -%} %}

{% if old_relation %}
{% do adapter.drop_relation(old_relation) %}
{% endif %}

{% call statement(auto_begin=True) %}
{{ create_table_as(False, target_relation, sql) }}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is 🔥

{% endcall %}

{% do relations.append(target_relation) %}

{% set main_sql %}
select count(*) as validation_errors
from {{ target_relation }}
{% endset %}

{{ adapter.commit() }}

{% else %}

{% set main_sql %}
select count(*) as validation_errors
from (
{{ sql }}
) _dbt_internal_test
{% endset %}

{% endif %}

{% call statement('main', fetch_result=True) -%}
select count(*) as validation_errors
from (
{{ sql }}
) _dbt_internal_test
{{ main_sql }}
{%- endcall %}

{{ return({'relations': relations}) }}

{%- endmaterialization -%}
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,16 @@

with all_values as (

select distinct
{{ column_name }} as value_field
select
{{ column_name }} as value_field,
count(*) as n_records

from {{ model }}
group by 1

)

select
value_field

select *
from all_values
where value_field not in (
{% for value in values -%}
Expand Down
7 changes: 7 additions & 0 deletions core/dbt/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -719,6 +719,13 @@ def _build_test_subparser(subparsers, base_subparser):
Stop execution upon a first test failure.
'''
)
sub.add_argument(
'--store-failures',
action='store_true',
help='''
Store test results (failing rows) in the database
'''
)

sub.set_defaults(cls=test_task.TestTask, which='test', rpc_method='test')
return sub
Expand Down
2 changes: 1 addition & 1 deletion core/dbt/parser/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -655,7 +655,7 @@ def _check_resource_uniqueness(
alias_resources: Dict[str, ManifestNode] = {}

for resource, node in manifest.nodes.items():
if node.resource_type not in NodeType.refable():
if not node.is_relational:
continue
# appease mypy - sources aren't refable!
assert not isinstance(node, ParsedSourceDefinition)
Expand Down
33 changes: 24 additions & 9 deletions core/dbt/parser/schema_test_builders.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,21 @@ def get_nice_schema_test_name(
clean_flat_args = [re.sub('[^0-9a-zA-Z_]+', '_', arg) for arg in flat_args]
unique = "__".join(clean_flat_args)

cutoff = 32
if len(unique) <= cutoff:
label = unique
else:
label = hashlib.md5(unique.encode('utf-8')).hexdigest()
# for the file path + alias, the name must be <64 characters
# if the full name is too long, include the first 30 identifying chars plus
# a 32-character hash of the full contents

test_identifier = '{}_{}'.format(test_type, test_name)
full_name = '{}_{}'.format(test_identifier, unique)

filename = '{}_{}_{}'.format(test_type, test_name, label)
name = '{}_{}_{}'.format(test_type, test_name, unique)
if len(full_name) >= 64:
test_trunc_identifier = test_identifier[:30]
label = hashlib.md5(full_name.encode('utf-8')).hexdigest()
short_name = '{}_{}'.format(test_trunc_identifier, label)
else:
short_name = full_name

return filename, name
return short_name, full_name


@dataclass
Expand Down Expand Up @@ -185,7 +190,7 @@ class TestBuilder(Generic[Testable]):
r'(?P<test_name>([a-zA-Z_][0-9a-zA-Z_]*))'
)
# kwargs representing test configs
MODIFIER_ARGS = ('severity', 'tags', 'enabled')
MODIFIER_ARGS = ('severity', 'tags', 'enabled', 'store_failures')

def __init__(
self,
Expand Down Expand Up @@ -231,6 +236,10 @@ def __init__(
self.compiled_name: str = compiled_name
self.fqn_name: str = fqn_name

# use hashed name as alias if too long
if compiled_name != fqn_name:
self.modifiers['alias'] = compiled_name
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

really clever & really cooll!


def _bad_type(self) -> TypeError:
return TypeError('invalid target type "{}"'.format(type(self.target)))

Expand Down Expand Up @@ -271,13 +280,19 @@ def extract_test_args(test, name=None) -> Tuple[str, Dict[str, Any]]:
def enabled(self) -> Optional[bool]:
return self.modifiers.get('enabled')

def alias(self) -> Optional[str]:
return self.modifiers.get('alias')

def severity(self) -> Optional[str]:
sev = self.modifiers.get('severity')
if sev:
return sev.upper()
else:
return None

def store_failures(self) -> Optional[bool]:
return self.modifiers.get('store_failures')

def tags(self) -> List[str]:
tags = self.modifiers.get('tags', [])
if isinstance(tags, str):
Expand Down
5 changes: 5 additions & 0 deletions core/dbt/parser/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,11 @@ def render_test_update(self, node, config, builder):
node.config['severity'] = builder.severity()
if builder.enabled() is not None:
node.config['enabled'] = builder.enabled()
# note: this does not respect generate_alias_name() macro
if builder.alias() is not None:
node.unrendered_config['alias'] = builder.alias()
node.config['alias'] = builder.alias()
node.alias = builder.alias()
# source node tests are processed at patch_source time
if isinstance(builder.target, UnpatchedSourceDefinition):
sources = [builder.target.fqn[-2], builder.target.fqn[-1]]
Expand Down
7 changes: 7 additions & 0 deletions core/dbt/task/printer.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,13 @@ def print_run_result_error(
logger.info(" compiled SQL at {}".format(
result.node.compiled_path))

if result.node.should_store_failures:
with TextOnly():
logger.info("")
msg = f"select * from {result.node.relation_name}"
border = '-' * len(msg)
logger.info(f" See test failures:\n {border}\n {msg}\n {border}")

elif result.message is not None:
first = True
for line in result.message.split("\n"):
Expand Down
3 changes: 1 addition & 2 deletions core/dbt/task/runnable.py
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ def get_model_schemas(
for node in self.manifest.nodes.values():
if node.unique_id not in selected_uids:
continue
if node.is_refable and not node.is_ephemeral:
if node.is_relational and not node.is_ephemeral:
relation = adapter.Relation.create_from(self.config, node)
result.add(relation.without_identifier())

Expand Down Expand Up @@ -525,7 +525,6 @@ def create_schema(relation: BaseRelation) -> None:
db_schema = (db_lower, schema.lower())
if db_schema not in existing_schemas_lowered:
existing_schemas_lowered.add(db_schema)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a reason why this disappeared?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nope :) thanks for catching it!


fut = tpe.submit_connected(
adapter, f'create_{info.database or ""}_{info.schema}',
create_schema, info
Expand Down
Loading