Skip to content

Commit

Permalink
feat(ingest/dbt): handle complex dbt sql + improve docs (datahub-proj…
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored and sleeperdeep committed Jun 25, 2024
1 parent 9210572 commit 6092423
Show file tree
Hide file tree
Showing 6 changed files with 120 additions and 32 deletions.
11 changes: 11 additions & 0 deletions metadata-ingestion/docs/sources/dbt/dbt-cloud_pre.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
### Setup

This source pulls dbt metadata directly from the dbt Cloud APIs.

You'll need to have a dbt Cloud job set up to run your dbt project, and "Generate docs on run" should be enabled.

The token should have the "read metadata" permission.

To get the required IDs, go to the job details page (this is the one with the "Run History" table), and look at the URL.
It should look something like this: https://cloud.getdbt.com/next/deploy/107298/projects/175705/jobs/148094.
In this example, the account ID is 107298, the project ID is 175705, and the job ID is 148094.
3 changes: 1 addition & 2 deletions metadata-ingestion/docs/sources/dbt/dbt.md
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,7 @@ dbt source snapshot-freshness
dbt build
cp target/run_results.json target/run_results_backup.json
dbt docs generate
# Reference target/run_results_backup.json in the dbt source config.
cp target/run_results_backup.json target/run_results.json
```

:::
Expand Down
41 changes: 41 additions & 0 deletions metadata-ingestion/docs/sources/dbt/dbt_pre.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
### Setup

The artifacts used by this source are:

- [dbt manifest file](https://docs.getdbt.com/reference/artifacts/manifest-json)
- This file contains model, source, tests and lineage data.
- [dbt catalog file](https://docs.getdbt.com/reference/artifacts/catalog-json)
- This file contains schema data.
- dbt does not record schema data for Ephemeral models, as such datahub will show Ephemeral models in the lineage, however there will be no associated schema for Ephemeral models
- [dbt sources file](https://docs.getdbt.com/reference/artifacts/sources-json)
- This file contains metadata for sources with freshness checks.
- We transfer dbt's freshness checks to DataHub's last-modified fields.
- Note that this file is optional – if not specified, we'll use time of ingestion instead as a proxy for time last-modified.
- [dbt run_results file](https://docs.getdbt.com/reference/artifacts/run-results-json)
- This file contains metadata from the result of a dbt run, e.g. dbt test
- When provided, we transfer dbt test run results into assertion run events to see a timeline of test runs on the dataset

To generate these files, we recommend this workflow for dbt build and datahub ingestion.

```sh
dbt source snapshot-freshness
dbt build
cp target/run_results.json target/run_results_backup.json
dbt docs generate
cp target/run_results_backup.json target/run_results.json

# Run datahub ingestion, pointing at the files in the target/ directory
```

The necessary artifact files will then appear in the `target/` directory of your dbt project.

We also have guides on handling more complex dbt orchestration techniques and multi-project setups below.

:::note Entity is in manifest but missing from catalog

This warning usually appears when the catalog.json file was not generated by a `dbt docs generate` command.
Most other dbt commands generate a partial catalog file, which may impact the completeness of the metadata in ingested into DataHub.

Following the above workflow should ensure that the catalog file is generated correctly.

:::
49 changes: 39 additions & 10 deletions metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,12 @@
infer_output_schema,
sqlglot_lineage,
)
from datahub.sql_parsing.sqlglot_utils import detach_ctes, try_format_query
from datahub.sql_parsing.sqlglot_utils import (
detach_ctes,
parse_statements_and_pick,
try_format_query,
)
from datahub.utilities.lossy_collections import LossyList
from datahub.utilities.mapping import Constants, OperationProcessor
from datahub.utilities.time import datetime_to_ts_millis
from datahub.utilities.topological_sort import topological_sort
Expand All @@ -138,8 +143,12 @@
@dataclass
class DBTSourceReport(StaleEntityRemovalSourceReport):
sql_statements_parsed: int = 0
sql_parser_detach_ctes_failures: int = 0
sql_parser_skipped_missing_code: int = 0
sql_statements_table_error: int = 0
sql_statements_column_error: int = 0
sql_parser_detach_ctes_failures: LossyList[str] = field(default_factory=LossyList)
sql_parser_skipped_missing_code: LossyList[str] = field(default_factory=LossyList)

in_manifest_but_missing_catalog: LossyList[str] = field(default_factory=LossyList)


class EmitDirective(ConfigEnum):
Expand Down Expand Up @@ -989,7 +998,9 @@ def _filter_nodes(self, all_nodes: List[DBTNode]) -> List[DBTNode]:
def _to_schema_info(schema_fields: List[SchemaField]) -> SchemaInfo:
return {column.fieldPath: column.nativeDataType for column in schema_fields}

def _infer_schemas_and_update_cll(self, all_nodes_map: Dict[str, DBTNode]) -> None:
def _infer_schemas_and_update_cll( # noqa: C901
self, all_nodes_map: Dict[str, DBTNode]
) -> None:
"""Annotate the DBTNode objects with schema information and column-level lineage.
Note that this mutates the DBTNode objects directly.
Expand Down Expand Up @@ -1103,11 +1114,18 @@ def _infer_schemas_and_update_cll(self, all_nodes_map: Dict[str, DBTNode]) -> No

# Run sql parser to infer the schema + generate column lineage.
sql_result = None
if node.compiled_code:
if node.node_type in {"source", "test"}:
# For sources, we generate CLL as a 1:1 mapping.
# We don't support CLL for tests (assertions).
pass
elif node.compiled_code:
try:
# Add CTE stops based on the upstreams list.
preprocessed_sql = detach_ctes(
node.compiled_code,
parse_statements_and_pick(
node.compiled_code,
platform=schema_resolver.platform,
),
platform=schema_resolver.platform,
cte_mapping={
cte_name: upstream_node.get_fake_ephemeral_table_name()
Expand All @@ -1123,7 +1141,7 @@ def _infer_schemas_and_update_cll(self, all_nodes_map: Dict[str, DBTNode]) -> No
},
)
except Exception as e:
self.report.sql_parser_detach_ctes_failures += 1
self.report.sql_parser_detach_ctes_failures.append(node.dbt_name)
logger.debug(
f"Failed to detach CTEs from compiled code. {node.dbt_name} will not have column lineage."
)
Expand All @@ -1132,9 +1150,20 @@ def _infer_schemas_and_update_cll(self, all_nodes_map: Dict[str, DBTNode]) -> No
sql_result = sqlglot_lineage(
preprocessed_sql, schema_resolver=schema_resolver
)
self.report.sql_statements_parsed += 1
if sql_result.debug_info.error:
self.report.sql_statements_table_error += 1
logger.info(
f"Failed to parse compiled code for {node.dbt_name}: {sql_result.debug_info.error}"
)
elif sql_result.debug_info.column_error:
self.report.sql_statements_column_error += 1
logger.info(
f"Failed to generate CLL for {node.dbt_name}: {sql_result.debug_info.column_error}"
)
else:
self.report.sql_statements_parsed += 1
else:
self.report.sql_parser_skipped_missing_code += 1
self.report.sql_parser_skipped_missing_code.append(node.dbt_name)

# Save the column lineage.
if self.config.include_column_lineage and sql_result:
Expand Down Expand Up @@ -1737,7 +1766,7 @@ def _translate_dbt_name_to_upstream_urn(dbt_name: str) -> str:
if node.cll_debug_info and node.cll_debug_info.error:
self.report.report_warning(
node.dbt_name,
f"Error parsing column lineage: {node.cll_debug_info.error}",
f"Error parsing SQL to generate column lineage: {node.cll_debug_info.error}",
)
cll = [
FineGrainedLineage(
Expand Down
29 changes: 9 additions & 20 deletions metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,10 +185,7 @@ def extract_dbt_entities(
if catalog_node is None:
if materialization not in {"test", "ephemeral"}:
# Test and ephemeral nodes will never show up in the catalog.
report.report_warning(
key,
f"Entity {key} ({name}) is in manifest but missing from catalog",
)
report.in_manifest_but_missing_catalog.append(key)
else:
catalog_type = all_catalog_entities[key]["metadata"]["type"]

Expand Down Expand Up @@ -281,6 +278,14 @@ def extract_dbt_entities(

dbt_entities.append(dbtNode)

if report.in_manifest_but_missing_catalog:
# We still want this to show up as a warning, but don't want to spam the warnings section
# if there's a lot of them.
report.warning(
"in_manifest_but_missing_catalog",
f"Found {len(report.in_manifest_but_missing_catalog)} nodes in manifest but not in catalog. See in_manifest_but_missing_catalog for details.",
)

return dbt_entities


Expand Down Expand Up @@ -425,22 +430,6 @@ def load_run_results(
@capability(SourceCapability.DELETION_DETECTION, "Enabled via stateful ingestion")
@capability(SourceCapability.LINEAGE_COARSE, "Enabled by default")
class DBTCoreSource(DBTSourceBase, TestableSource):
"""
The artifacts used by this source are:
- [dbt manifest file](https://docs.getdbt.com/reference/artifacts/manifest-json)
- This file contains model, source, tests and lineage data.
- [dbt catalog file](https://docs.getdbt.com/reference/artifacts/catalog-json)
- This file contains schema data.
- dbt does not record schema data for Ephemeral models, as such datahub will show Ephemeral models in the lineage, however there will be no associated schema for Ephemeral models
- [dbt sources file](https://docs.getdbt.com/reference/artifacts/sources-json)
- This file contains metadata for sources with freshness checks.
- We transfer dbt's freshness checks to DataHub's last-modified fields.
- Note that this file is optional – if not specified, we'll use time of ingestion instead as a proxy for time last-modified.
- [dbt run_results file](https://docs.getdbt.com/reference/artifacts/run-results-json)
- This file contains metadata from the result of a dbt run, e.g. dbt test
- When provided, we transfer dbt test run results into assertion run events to see a timeline of test runs on the dataset
"""

config: DBTCoreConfig

@classmethod
Expand Down
19 changes: 19 additions & 0 deletions metadata-ingestion/src/datahub/sql_parsing/sqlglot_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,25 @@ def parse_statement(
return statement


def parse_statements_and_pick(sql: str, platform: DialectOrStr) -> sqlglot.Expression:
dialect = get_dialect(platform)
statements = [
expression for expression in sqlglot.parse(sql, dialect=dialect) if expression
]
if not statements:
raise ValueError(f"No statements found in query: {sql}")
elif len(statements) == 1:
return statements[0]
else:
# If we find multiple statements, we assume the last one is the main query.
# Usually the prior queries are going to be things like `CREATE FUNCTION`
# or `GRANT ...`, which we don't care about.
logger.debug(
"Found multiple statements in query, picking the last one: %s", sql
)
return statements[-1]


def _expression_to_string(
expression: sqlglot.exp.ExpOrStr, platform: DialectOrStr
) -> str:
Expand Down

0 comments on commit 6092423

Please sign in to comment.