From 7697ab741b07a5d507ec66fdff513e86573115ea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sergio=20G=C3=B3mez=20Villamor?= Date: Tue, 3 Dec 2024 10:59:54 +0100 Subject: [PATCH 1/4] feat: adds reporting metrics for lineage construction in tableau ingestion --- .../ingestion/source/tableau/tableau.py | 48 ++++++++++++------- 1 file changed, 31 insertions(+), 17 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py b/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py index 0eafdb4ad23ba..fbaafc1753dab 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py +++ b/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py @@ -599,7 +599,13 @@ class TableauSourceReport(StaleEntityRemovalSourceReport): num_datasource_field_skipped_no_name: int = 0 num_csql_field_skipped_no_name: int = 0 num_table_field_skipped_no_name: int = 0 + # lineage + num_upstream_table_lineage: int = 0 + num_upstream_fine_grained_lineage: int = 0 num_upstream_table_skipped_no_name: int = 0 + num_upstream_table_skipped_no_columns: int = 0 + num_upstream_table_failed_generate_reference: int = 0 + num_upstream_fine_grained_lineage_failed_parse_sql: int = 0 @platform_name("Tableau") @@ -1292,7 +1298,7 @@ def _create_upstream_table_lineage( datasource: dict, browse_path: Optional[str], is_embedded_ds: bool = False, - ) -> Tuple: + ) -> Tuple[List[Upstream], List[FineGrainedLineage]]: upstream_tables: List[Upstream] = [] fine_grained_lineages: List[FineGrainedLineage] = [] table_id_to_urn = {} @@ -1453,7 +1459,8 @@ def get_upstream_tables( c.COLUMNS_CONNECTION ].get("totalCount") if not is_custom_sql and not num_tbl_cols: - logger.debug( + self.report.num_upstream_table_skipped_no_columns += 1 + logger.warning( f"Skipping upstream table with id {table[c.ID]}, no columns: {table}" ) continue @@ -1469,7 +1476,10 @@ def get_upstream_tables( table, default_schema_map=self.config.default_schema_map ) except Exception as e: - logger.info(f"Failed to generate upstream reference for {table}: {e}") + self.report.num_upstream_table_failed_generate_reference += 1 + logger.warning( + f"Failed to generate upstream reference for {table}: {e}" + ) continue table_urn = ref.make_dataset_urn( @@ -1635,15 +1645,7 @@ def get_upstream_fields_from_custom_sql( func_overridden_info=None, # Here we don't want to override any information from configuration ) - if parsed_result is None: - logger.info( - f"Failed to extract column level lineage from datasource {datasource_urn}" - ) - return [] - if parsed_result.debug_info.error: - logger.info( - f"Failed to extract column level lineage from datasource {datasource_urn}: {parsed_result.debug_info.error}" - ) + if parsed_result is None or parsed_result.debug_info.error: return [] cll: List[ColumnLineageInfo] = ( @@ -2005,6 +2007,7 @@ def _create_lineage_to_upstream_tables( aspect_name=c.UPSTREAM_LINEAGE, aspect=upstream_lineage, ) + self.report.num_upstream_table_lineage += len(upstream_tables) @staticmethod def _clean_tableau_query_parameters(query: str) -> str: @@ -2104,7 +2107,7 @@ def parse_custom_sql( f"Overridden info upstream_db={upstream_db}, platform_instance={platform_instance}, platform={platform}" ) - return create_lineage_sql_parsed_result( + parsed_result = create_lineage_sql_parsed_result( query=query, default_db=upstream_db, platform=platform, @@ -2114,6 +2117,15 @@ def parse_custom_sql( schema_aware=not self.config.sql_parsing_disable_schema_awareness, ) + if parsed_result is None or parsed_result.debug_info.error: + message = f"Failed to extract column level lineage from datasource {datasource_urn}" + if parsed_result is not None and parsed_result.debug_info.error: + message += f": {parsed_result.debug_info.error}" + logger.warning(message) + self.report.num_upstream_fine_grained_lineage_failed_parse_sql += 1 + + return parsed_result + def _enrich_database_tables_with_parsed_schemas( self, parsing_result: SqlParsingResult ) -> None: @@ -2148,9 +2160,6 @@ def _create_lineage_from_unsupported_csql( ) if parsed_result is None: - logger.info( - f"Failed to extract table level lineage for datasource {csql_urn}" - ) return self._enrich_database_tables_with_parsed_schemas(parsed_result) @@ -2170,12 +2179,13 @@ def _create_lineage_from_unsupported_csql( upstreams=upstream_tables, fineGrainedLineages=fine_grained_lineages, ) - yield self.get_metadata_change_proposal( csql_urn, aspect_name=c.UPSTREAM_LINEAGE, aspect=upstream_lineage, ) + self.report.num_upstream_table_lineage += len(upstream_tables) + self.report.num_upstream_fine_grained_lineage += len(fine_grained_lineages) def _get_schema_metadata_for_datasource( self, datasource_fields: List[dict] @@ -2326,6 +2336,10 @@ def emit_datasource( aspect_name=c.UPSTREAM_LINEAGE, aspect=upstream_lineage, ) + self.report.num_upstream_table_lineage += len(upstream_tables) + self.report.num_upstream_fine_grained_lineage += len( + fine_grained_lineages + ) # Datasource Fields schema_metadata = self._get_schema_metadata_for_datasource( From c6875e993bb3a574c0b856f3083df88b5cf1c316 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sergio=20G=C3=B3mez=20Villamor?= Date: Thu, 5 Dec 2024 07:36:25 +0100 Subject: [PATCH 2/4] num_tables_with_upstream_lineage --- .../src/datahub/ingestion/source/tableau/tableau.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py b/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py index 3bbc014b9bd04..f1c61c4cc75cf 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py +++ b/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py @@ -597,6 +597,7 @@ class TableauSourceReport(StaleEntityRemovalSourceReport): num_csql_field_skipped_no_name: int = 0 num_table_field_skipped_no_name: int = 0 # lineage + num_tables_with_upstream_lineage: int = 0 num_upstream_table_lineage: int = 0 num_upstream_fine_grained_lineage: int = 0 num_upstream_table_skipped_no_name: int = 0 @@ -2031,6 +2032,7 @@ def _create_lineage_to_upstream_tables( aspect_name=c.UPSTREAM_LINEAGE, aspect=upstream_lineage, ) + self.report.num_tables_with_upstream_lineage += 1 self.report.num_upstream_table_lineage += len(upstream_tables) @staticmethod @@ -2208,6 +2210,7 @@ def _create_lineage_from_unsupported_csql( aspect_name=c.UPSTREAM_LINEAGE, aspect=upstream_lineage, ) + self.report.num_tables_with_upstream_lineage += 1 self.report.num_upstream_table_lineage += len(upstream_tables) self.report.num_upstream_fine_grained_lineage += len(fine_grained_lineages) @@ -2360,6 +2363,7 @@ def emit_datasource( aspect_name=c.UPSTREAM_LINEAGE, aspect=upstream_lineage, ) + self.report.num_tables_with_upstream_lineage += 1 self.report.num_upstream_table_lineage += len(upstream_tables) self.report.num_upstream_fine_grained_lineage += len( fine_grained_lineages From e585d149a6b759ffb6bfc15030dfd12f5c0c165f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sergio=20G=C3=B3mez=20Villamor?= Date: Thu, 5 Dec 2024 07:54:08 +0100 Subject: [PATCH 3/4] tweak failed parse sql warning messages --- .../datahub/ingestion/source/tableau/tableau.py | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py b/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py index f1c61c4cc75cf..5b8f3e894c650 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py +++ b/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py @@ -603,6 +603,7 @@ class TableauSourceReport(StaleEntityRemovalSourceReport): num_upstream_table_skipped_no_name: int = 0 num_upstream_table_skipped_no_columns: int = 0 num_upstream_table_failed_generate_reference: int = 0 + num_upstream_table_lineage_failed_parse_sql: int = 0 num_upstream_fine_grained_lineage_failed_parse_sql: int = 0 @@ -2143,11 +2144,17 @@ def parse_custom_sql( schema_aware=not self.config.sql_parsing_disable_schema_awareness, ) - if parsed_result is None or parsed_result.debug_info.error: - message = f"Failed to extract column level lineage from datasource {datasource_urn}" - if parsed_result is not None and parsed_result.debug_info.error: - message += f": {parsed_result.debug_info.error}" - logger.warning(message) + assert parsed_result is not None + + if parsed_result.debug_info.table_error: + logger.warning( + f"Failed to extract table lineage from datasource {datasource_urn}: {parsed_result.debug_info.table_error}" + ) + self.report.num_upstream_table_lineage_failed_parse_sql += 1 + if parsed_result.debug_info.column_error: + logger.warning( + f"Failed to extract column level lineage from datasource {datasource_urn}: {parsed_result.debug_info.column_error}" + ) self.report.num_upstream_fine_grained_lineage_failed_parse_sql += 1 return parsed_result From ffc61b475f6dbe31c3d7324f8b0c37d6862197bf Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Thu, 5 Dec 2024 08:27:19 -0800 Subject: [PATCH 4/4] Update metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py --- .../src/datahub/ingestion/source/tableau/tableau.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py b/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py index 5b8f3e894c650..1dc32548e2eec 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py +++ b/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py @@ -2151,7 +2151,7 @@ def parse_custom_sql( f"Failed to extract table lineage from datasource {datasource_urn}: {parsed_result.debug_info.table_error}" ) self.report.num_upstream_table_lineage_failed_parse_sql += 1 - if parsed_result.debug_info.column_error: + elif parsed_result.debug_info.column_error: logger.warning( f"Failed to extract column level lineage from datasource {datasource_urn}: {parsed_result.debug_info.column_error}" )