From aadf012dc5a342045e1ef0a2e73cfec81fa9631c Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Wed, 17 Apr 2024 21:24:48 -0700 Subject: [PATCH 1/4] feat(ingest/dbt): handle complex dbt compile steps --- .../ingestion/source/dbt/dbt_common.py | 26 ++++++++++++++++--- .../src/datahub/sql_parsing/sqlglot_utils.py | 19 ++++++++++++++ 2 files changed, 42 insertions(+), 3 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py index 4876e2b6fcff4a..9f39fbf39b293a 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py @@ -124,7 +124,11 @@ infer_output_schema, sqlglot_lineage, ) -from datahub.sql_parsing.sqlglot_utils import detach_ctes, try_format_query +from datahub.sql_parsing.sqlglot_utils import ( + detach_ctes, + parse_statements_and_pick, + try_format_query, +) from datahub.utilities.mapping import Constants, OperationProcessor from datahub.utilities.time import datetime_to_ts_millis from datahub.utilities.topological_sort import topological_sort @@ -138,6 +142,8 @@ @dataclass class DBTSourceReport(StaleEntityRemovalSourceReport): sql_statements_parsed: int = 0 + sql_statements_table_error: int = 0 + sql_statements_column_error: int = 0 sql_parser_detach_ctes_failures: int = 0 sql_parser_skipped_missing_code: int = 0 @@ -1107,7 +1113,10 @@ def _infer_schemas_and_update_cll(self, all_nodes_map: Dict[str, DBTNode]) -> No try: # Add CTE stops based on the upstreams list. preprocessed_sql = detach_ctes( - node.compiled_code, + parse_statements_and_pick( + node.compiled_code, + platform=schema_resolver.platform, + ), platform=schema_resolver.platform, cte_mapping={ cte_name: upstream_node.get_fake_ephemeral_table_name() @@ -1132,7 +1141,18 @@ def _infer_schemas_and_update_cll(self, all_nodes_map: Dict[str, DBTNode]) -> No sql_result = sqlglot_lineage( preprocessed_sql, schema_resolver=schema_resolver ) - self.report.sql_statements_parsed += 1 + if sql_result.debug_info.error: + self.report.sql_statements_table_error += 1 + logger.info( + f"Failed to parse compiled code for {node.dbt_name}: {sql_result.debug_info.error}" + ) + elif sql_result.debug_info.column_error: + self.report.sql_statements_column_error += 1 + logger.info( + f"Failed to generate CLL for {node.dbt_name}: {sql_result.debug_info.column_error}" + ) + else: + self.report.sql_statements_parsed += 1 else: self.report.sql_parser_skipped_missing_code += 1 diff --git a/metadata-ingestion/src/datahub/sql_parsing/sqlglot_utils.py b/metadata-ingestion/src/datahub/sql_parsing/sqlglot_utils.py index bad72e69221014..c7cf975a3a9533 100644 --- a/metadata-ingestion/src/datahub/sql_parsing/sqlglot_utils.py +++ b/metadata-ingestion/src/datahub/sql_parsing/sqlglot_utils.py @@ -64,6 +64,25 @@ def parse_statement( return statement +def parse_statements_and_pick(sql: str, platform: DialectOrStr) -> sqlglot.Expression: + dialect = get_dialect(platform) + statements = [ + expression for expression in sqlglot.parse(sql, dialect=dialect) if expression + ] + if not statements: + raise ValueError(f"No statements found in query: {sql}") + elif len(statements) == 1: + return statements[0] + else: + # If we find multiple statements, we assume the last one is the main query. + # Usually the prior queries are going to be things like `CREATE FUNCTION` + # or `GRANT ...`, which we don't care about. + logger.debug( + "Found multiple statements in query, picking the last one: %s", sql + ) + return statements[-1] + + def _expression_to_string( expression: sqlglot.exp.ExpOrStr, platform: DialectOrStr ) -> str: From a42ef89a607112b7d2d965f062c3f77298d724ca Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Wed, 17 Apr 2024 21:25:17 -0700 Subject: [PATCH 2/4] improve docs on dbt setup --- .../docs/sources/dbt/dbt-cloud_pre.md | 11 +++++ metadata-ingestion/docs/sources/dbt/dbt.md | 3 +- .../docs/sources/dbt/dbt_pre.md | 41 +++++++++++++++++++ .../datahub/ingestion/source/dbt/dbt_core.py | 16 -------- 4 files changed, 53 insertions(+), 18 deletions(-) create mode 100644 metadata-ingestion/docs/sources/dbt/dbt-cloud_pre.md create mode 100644 metadata-ingestion/docs/sources/dbt/dbt_pre.md diff --git a/metadata-ingestion/docs/sources/dbt/dbt-cloud_pre.md b/metadata-ingestion/docs/sources/dbt/dbt-cloud_pre.md new file mode 100644 index 00000000000000..ac7283d21015a7 --- /dev/null +++ b/metadata-ingestion/docs/sources/dbt/dbt-cloud_pre.md @@ -0,0 +1,11 @@ +### Setup + +This source pulls dbt metadata directly from the dbt Cloud APIs. + +You'll need to have a dbt Cloud job set up to run your dbt project, and "Generate docs on run" should be enabled. + +The token should have the "read metadata" permission. + +To get the required IDs, go to the job details page (this is the one with the "Run History" table), and look at the URL. +It should look something like this: https://cloud.getdbt.com/next/deploy/107298/projects/175705/jobs/148094. +In this example, the account ID is 107298, the project ID is 175705, and the job ID is 148094. diff --git a/metadata-ingestion/docs/sources/dbt/dbt.md b/metadata-ingestion/docs/sources/dbt/dbt.md index 02a5d760c66d30..eca5101e006426 100644 --- a/metadata-ingestion/docs/sources/dbt/dbt.md +++ b/metadata-ingestion/docs/sources/dbt/dbt.md @@ -187,8 +187,7 @@ dbt source snapshot-freshness dbt build cp target/run_results.json target/run_results_backup.json dbt docs generate - -# Reference target/run_results_backup.json in the dbt source config. +cp target/run_results_backup.json target/run_results.json ``` ::: diff --git a/metadata-ingestion/docs/sources/dbt/dbt_pre.md b/metadata-ingestion/docs/sources/dbt/dbt_pre.md new file mode 100644 index 00000000000000..495c25f94048f1 --- /dev/null +++ b/metadata-ingestion/docs/sources/dbt/dbt_pre.md @@ -0,0 +1,41 @@ +### Setup + +The artifacts used by this source are: + +- [dbt manifest file](https://docs.getdbt.com/reference/artifacts/manifest-json) + - This file contains model, source, tests and lineage data. +- [dbt catalog file](https://docs.getdbt.com/reference/artifacts/catalog-json) + - This file contains schema data. + - dbt does not record schema data for Ephemeral models, as such datahub will show Ephemeral models in the lineage, however there will be no associated schema for Ephemeral models +- [dbt sources file](https://docs.getdbt.com/reference/artifacts/sources-json) + - This file contains metadata for sources with freshness checks. + - We transfer dbt's freshness checks to DataHub's last-modified fields. + - Note that this file is optional – if not specified, we'll use time of ingestion instead as a proxy for time last-modified. +- [dbt run_results file](https://docs.getdbt.com/reference/artifacts/run-results-json) + - This file contains metadata from the result of a dbt run, e.g. dbt test + - When provided, we transfer dbt test run results into assertion run events to see a timeline of test runs on the dataset + +To generate these files, we recommend this workflow for dbt build and datahub ingestion. + +```sh +dbt source snapshot-freshness +dbt build +cp target/run_results.json target/run_results_backup.json +dbt docs generate +cp target/run_results_backup.json target/run_results.json + +# Run datahub ingestion, pointing at the files in the target/ directory +``` + +The necessary artifact files will then appear in the `target/` directory of your dbt project. + +We also have guides on handling more complex dbt orchestration techniques and multi-project setups below. + +:::note Entity is in manifest but missing from catalog + +This warning usually appears when the catalog.json file was not generated by a `dbt docs generate` command. +Most other dbt commands generate a partial catalog file, which may impact the completeness of the metadata in ingested into DataHub. + +Following the above workflow should ensure that the catalog file is generated correctly. + +::: diff --git a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py index 89f562fdc71a10..75a4fa8f1c8d98 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py @@ -425,22 +425,6 @@ def load_run_results( @capability(SourceCapability.DELETION_DETECTION, "Enabled via stateful ingestion") @capability(SourceCapability.LINEAGE_COARSE, "Enabled by default") class DBTCoreSource(DBTSourceBase, TestableSource): - """ - The artifacts used by this source are: - - [dbt manifest file](https://docs.getdbt.com/reference/artifacts/manifest-json) - - This file contains model, source, tests and lineage data. - - [dbt catalog file](https://docs.getdbt.com/reference/artifacts/catalog-json) - - This file contains schema data. - - dbt does not record schema data for Ephemeral models, as such datahub will show Ephemeral models in the lineage, however there will be no associated schema for Ephemeral models - - [dbt sources file](https://docs.getdbt.com/reference/artifacts/sources-json) - - This file contains metadata for sources with freshness checks. - - We transfer dbt's freshness checks to DataHub's last-modified fields. - - Note that this file is optional – if not specified, we'll use time of ingestion instead as a proxy for time last-modified. - - [dbt run_results file](https://docs.getdbt.com/reference/artifacts/run-results-json) - - This file contains metadata from the result of a dbt run, e.g. dbt test - - When provided, we transfer dbt test run results into assertion run events to see a timeline of test runs on the dataset - """ - config: DBTCoreConfig @classmethod From bcf7952828bd2d9e64c744c22b706b599f4f0e14 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Wed, 17 Apr 2024 21:45:19 -0700 Subject: [PATCH 3/4] fix lint --- .../src/datahub/ingestion/source/dbt/dbt_common.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py index 9f39fbf39b293a..17259bb0ca1867 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py @@ -995,7 +995,9 @@ def _filter_nodes(self, all_nodes: List[DBTNode]) -> List[DBTNode]: def _to_schema_info(schema_fields: List[SchemaField]) -> SchemaInfo: return {column.fieldPath: column.nativeDataType for column in schema_fields} - def _infer_schemas_and_update_cll(self, all_nodes_map: Dict[str, DBTNode]) -> None: + def _infer_schemas_and_update_cll( # noqa: C901 + self, all_nodes_map: Dict[str, DBTNode] + ) -> None: """Annotate the DBTNode objects with schema information and column-level lineage. Note that this mutates the DBTNode objects directly. From 4d51fce15a0f7037013ddd74a33815eefa899dc6 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Mon, 22 Apr 2024 13:08:35 -0700 Subject: [PATCH 4/4] improve logging --- .../ingestion/source/dbt/dbt_common.py | 19 +++++++++++++------ .../datahub/ingestion/source/dbt/dbt_core.py | 13 +++++++++---- 2 files changed, 22 insertions(+), 10 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py index 17259bb0ca1867..b2d93b2e0fd6f7 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py @@ -129,6 +129,7 @@ parse_statements_and_pick, try_format_query, ) +from datahub.utilities.lossy_collections import LossyList from datahub.utilities.mapping import Constants, OperationProcessor from datahub.utilities.time import datetime_to_ts_millis from datahub.utilities.topological_sort import topological_sort @@ -144,8 +145,10 @@ class DBTSourceReport(StaleEntityRemovalSourceReport): sql_statements_parsed: int = 0 sql_statements_table_error: int = 0 sql_statements_column_error: int = 0 - sql_parser_detach_ctes_failures: int = 0 - sql_parser_skipped_missing_code: int = 0 + sql_parser_detach_ctes_failures: LossyList[str] = field(default_factory=LossyList) + sql_parser_skipped_missing_code: LossyList[str] = field(default_factory=LossyList) + + in_manifest_but_missing_catalog: LossyList[str] = field(default_factory=LossyList) class EmitDirective(ConfigEnum): @@ -1111,7 +1114,11 @@ def _infer_schemas_and_update_cll( # noqa: C901 # Run sql parser to infer the schema + generate column lineage. sql_result = None - if node.compiled_code: + if node.node_type in {"source", "test"}: + # For sources, we generate CLL as a 1:1 mapping. + # We don't support CLL for tests (assertions). + pass + elif node.compiled_code: try: # Add CTE stops based on the upstreams list. preprocessed_sql = detach_ctes( @@ -1134,7 +1141,7 @@ def _infer_schemas_and_update_cll( # noqa: C901 }, ) except Exception as e: - self.report.sql_parser_detach_ctes_failures += 1 + self.report.sql_parser_detach_ctes_failures.append(node.dbt_name) logger.debug( f"Failed to detach CTEs from compiled code. {node.dbt_name} will not have column lineage." ) @@ -1156,7 +1163,7 @@ def _infer_schemas_and_update_cll( # noqa: C901 else: self.report.sql_statements_parsed += 1 else: - self.report.sql_parser_skipped_missing_code += 1 + self.report.sql_parser_skipped_missing_code.append(node.dbt_name) # Save the column lineage. if self.config.include_column_lineage and sql_result: @@ -1759,7 +1766,7 @@ def _translate_dbt_name_to_upstream_urn(dbt_name: str) -> str: if node.cll_debug_info and node.cll_debug_info.error: self.report.report_warning( node.dbt_name, - f"Error parsing column lineage: {node.cll_debug_info.error}", + f"Error parsing SQL to generate column lineage: {node.cll_debug_info.error}", ) cll = [ FineGrainedLineage( diff --git a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py index 75a4fa8f1c8d98..581e1cc8e027b3 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py @@ -185,10 +185,7 @@ def extract_dbt_entities( if catalog_node is None: if materialization not in {"test", "ephemeral"}: # Test and ephemeral nodes will never show up in the catalog. - report.report_warning( - key, - f"Entity {key} ({name}) is in manifest but missing from catalog", - ) + report.in_manifest_but_missing_catalog.append(key) else: catalog_type = all_catalog_entities[key]["metadata"]["type"] @@ -281,6 +278,14 @@ def extract_dbt_entities( dbt_entities.append(dbtNode) + if report.in_manifest_but_missing_catalog: + # We still want this to show up as a warning, but don't want to spam the warnings section + # if there's a lot of them. + report.warning( + "in_manifest_but_missing_catalog", + f"Found {len(report.in_manifest_but_missing_catalog)} nodes in manifest but not in catalog. See in_manifest_but_missing_catalog for details.", + ) + return dbt_entities