diff --git a/metadata-ingestion/src/datahub/ingestion/source/metabase.py b/metadata-ingestion/src/datahub/ingestion/source/metabase.py index 6f8f5097b61497..12a76ff7b33ff7 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/metabase.py +++ b/metadata-ingestion/src/datahub/ingestion/source/metabase.py @@ -1,5 +1,6 @@ import json import logging +import re from dataclasses import dataclass from datetime import datetime, timezone from functools import lru_cache @@ -309,9 +310,10 @@ def construct_dashboard_from_api_data( chart_urns = [] cards_data = dashboard_details.get("dashcards", {}) for card_info in cards_data: - chart_urn = builder.make_chart_urn( - self.platform, card_info.get("card").get("id", "") - ) + card_id = card_info.get("card").get("id", "") + if not card_id: + continue # most likely a virtual card without an id (text or heading), not relevant. + chart_urn = builder.make_chart_urn(self.platform, card_id) chart_urns.append(chart_urn) dashboard_info_class = DashboardInfoClass( @@ -592,11 +594,12 @@ def get_datasource_urn( ) ] else: - raw_query = ( + raw_query_stripped = self.strip_template_expressions( card_details.get("dataset_query", {}).get("native", {}).get("query", "") ) + result = create_lineage_sql_parsed_result( - query=raw_query, + query=raw_query_stripped, default_db=database_name, default_schema=database_schema or self.config.default_schema, platform=platform, @@ -606,17 +609,35 @@ def get_datasource_urn( ) if result.debug_info.table_error: logger.info( - f"Failed to parse lineage from query {raw_query}: " + f"Failed to parse lineage from query {raw_query_stripped}: " f"{result.debug_info.table_error}" ) self.report.report_warning( key="metabase-query", - reason=f"Unable to retrieve lineage from query: {raw_query}", + reason=f"Unable to retrieve lineage from query: {raw_query_stripped}", ) return result.in_tables return None + @staticmethod + def strip_template_expressions(raw_query: str) -> str: + """ + Workarounds for metabase raw queries containing most commonly used template expressions: + + - strip conditional expressions "[[ .... ]]" + - replace all {{ filter expressions }} with "1" + + reference: https://www.metabase.com/docs/latest/questions/native-editor/sql-parameters + """ + + # drop [[ WHERE {{FILTER}} ]] + query_patched = re.sub(r"\[\[.+?\]\]", r" ", raw_query) + + # replace {{FILTER}} with 1 + query_patched = re.sub(r"\{\{.+?\}\}", r"1", query_patched) + return query_patched + @lru_cache(maxsize=None) def get_source_table_from_id( self, table_id: Union[int, str] diff --git a/metadata-ingestion/tests/integration/metabase/setup/dashboard_1.json b/metadata-ingestion/tests/integration/metabase/setup/dashboard_1.json index f58a1079c3620b..aa1197a0670bf3 100644 --- a/metadata-ingestion/tests/integration/metabase/setup/dashboard_1.json +++ b/metadata-ingestion/tests/integration/metabase/setup/dashboard_1.json @@ -3,6 +3,37 @@ "archived": false, "collection_position": null, "dashcards": [ + { + "size_x": 24, + "dashboard_tab_id": null, + "series": [], + "action_id": null, + "collection_authority_level": null, + "card": { + "query_average_duration": null + }, + "updated_at": "2024-05-18T18:22:19.900158Z", + "col": 0, + "id": 2, + "parameter_mappings": [], + "card_id": null, + "entity_id": "woqw4RKYx7rlOgqp-_UPc", + "visualization_settings": { + "dashcard.background": false, + "virtual_card": { + "name": null, + "dataset_query": {}, + "display": "heading", + "visualization_settings": {}, + "archived": false + }, + "text": "This is a virtual card and should be ignored by the ingestion." + }, + "size_y": 1, + "dashboard_id": 1, + "created_at": "2024-05-18T18:22:19.900158Z", + "row": 0 + }, { "size_x": 12, "dashboard_tab_id": null, diff --git a/metadata-ingestion/tests/integration/metabase/setup/dashboard_2.json b/metadata-ingestion/tests/integration/metabase/setup/dashboard_2.json index 2f9beaccc1e187..be2b0f23c39688 100644 --- a/metadata-ingestion/tests/integration/metabase/setup/dashboard_2.json +++ b/metadata-ingestion/tests/integration/metabase/setup/dashboard_2.json @@ -3,6 +3,37 @@ "archived": false, "collection_position": null, "dashcards": [ + { + "size_x": 24, + "dashboard_tab_id": null, + "series": [], + "action_id": null, + "collection_authority_level": null, + "card": { + "query_average_duration": null + }, + "updated_at": "2024-05-18T18:22:19.900158Z", + "col": 0, + "id": 2, + "parameter_mappings": [], + "card_id": null, + "entity_id": "woqw4RKYx7rlOgqp-_UPc", + "visualization_settings": { + "dashcard.background": false, + "virtual_card": { + "name": null, + "dataset_query": {}, + "display": "heading", + "visualization_settings": {}, + "archived": false + }, + "text": "This is a virtual card and should be ignored by the ingestion." + }, + "size_y": 1, + "dashboard_id": 1, + "created_at": "2024-05-18T18:22:19.900158Z", + "row": 0 + }, { "size_x": 12, "dashboard_tab_id": null, diff --git a/metadata-ingestion/tests/integration/metabase/test_metabase.py b/metadata-ingestion/tests/integration/metabase/test_metabase.py index b39550f3d048a2..5c433f14f380ff 100644 --- a/metadata-ingestion/tests/integration/metabase/test_metabase.py +++ b/metadata-ingestion/tests/integration/metabase/test_metabase.py @@ -8,6 +8,7 @@ from datahub.configuration.common import PipelineExecutionError from datahub.ingestion.run.pipeline import Pipeline +from datahub.ingestion.source.metabase import MetabaseSource from tests.test_helpers import mce_helpers from tests.test_helpers.state_helpers import ( get_current_checkpoint_from_pipeline, @@ -20,23 +21,6 @@ GMS_PORT = 8080 GMS_SERVER = f"http://localhost:{GMS_PORT}" -JSON_RESPONSE_MAP = { - "http://localhost:3000/api/session": "session.json", - "http://localhost:3000/api/user/current": "user.json", - "http://localhost:3000/api/collection/?exclude-other-user-collections=false": "collections.json", - "http://localhost:3000/api/collection/root/items?models=dashboard": "collection_dashboards.json", - "http://localhost:3000/api/collection/150/items?models=dashboard": "collection_dashboards.json", - "http://localhost:3000/api/dashboard/10": "dashboard_1.json", - "http://localhost:3000/api/dashboard/20": "dashboard_2.json", - "http://localhost:3000/api/user/1": "user.json", - "http://localhost:3000/api/card": "card.json", - "http://localhost:3000/api/database/1": "bigquery_database.json", - "http://localhost:3000/api/database/2": "postgres_database.json", - "http://localhost:3000/api/card/1": "card_1.json", - "http://localhost:3000/api/card/2": "card_2.json", - "http://localhost:3000/api/table/21": "table_21.json", - "http://localhost:3000/api/card/3": "card_3.json", -} RESPONSE_ERROR_LIST = ["http://localhost:3000/api/dashboard/public"] @@ -44,7 +28,9 @@ class MockResponse: - def __init__(self, url, data=None, jsond=None, error_list=None): + def __init__( + self, url, json_response_map=None, data=None, jsond=None, error_list=None + ): self.json_data = data self.url = url self.jsond = jsond @@ -52,11 +38,17 @@ def __init__(self, url, data=None, jsond=None, error_list=None): self.headers = {} self.auth = None self.status_code = 200 + self.response_map = json_response_map def json(self): - response_json_path = ( - f"{test_resources_dir}/setup/{JSON_RESPONSE_MAP.get(self.url)}" - ) + mocked_response_file = self.response_map.get(self.url) + response_json_path = f"{test_resources_dir}/setup/{mocked_response_file}" + + if not pathlib.Path(response_json_path).exists(): + raise Exception( + f"mock response file not found {self.url} -> {mocked_response_file}" + ) + with open(response_json_path) as file: data = json.loads(file.read()) self.json_data = data @@ -75,21 +67,68 @@ def raise_for_status(self): ) raise HTTPError(http_error_msg, response=self) + @staticmethod + def build_mocked_requests_sucess(json_response_map): + def mocked_requests_sucess_(*args, **kwargs): + return MockResponse(url=None, json_response_map=json_response_map) -def mocked_requests_sucess(*args, **kwargs): - return MockResponse(None) + return mocked_requests_sucess_ + @staticmethod + def build_mocked_requests_failure(json_response_map): + def mocked_requests_failure(*args, **kwargs): + return MockResponse( + url=None, + error_list=RESPONSE_ERROR_LIST, + json_response_map=json_response_map, + ) -def mocked_requests_failure(*args, **kwargs): - return MockResponse(None, error_list=RESPONSE_ERROR_LIST) + return mocked_requests_failure + @staticmethod + def build_mocked_requests_session_post(json_response_map): + def mocked_requests_session_post(url, data, json): + return MockResponse( + url=url, + data=data, + jsond=json, + json_response_map=json_response_map, + ) -def mocked_requests_session_post(url, data, json): - return MockResponse(url, data, json) + return mocked_requests_session_post + @staticmethod + def build_mocked_requests_session_delete(json_response_map): + def mocked_requests_session_delete(url, headers): + return MockResponse( + url=url, + data=None, + jsond=headers, + json_response_map=json_response_map, + ) -def mocked_requests_session_delete(url, headers): - return MockResponse(url, data=None, jsond=headers) + return mocked_requests_session_delete + + +@pytest.fixture +def default_json_response_map(): + return { + "http://localhost:3000/api/session": "session.json", + "http://localhost:3000/api/user/current": "user.json", + "http://localhost:3000/api/collection/?exclude-other-user-collections=false": "collections.json", + "http://localhost:3000/api/collection/root/items?models=dashboard": "collection_dashboards.json", + "http://localhost:3000/api/collection/150/items?models=dashboard": "collection_dashboards.json", + "http://localhost:3000/api/dashboard/10": "dashboard_1.json", + "http://localhost:3000/api/dashboard/20": "dashboard_2.json", + "http://localhost:3000/api/user/1": "user.json", + "http://localhost:3000/api/card": "card.json", + "http://localhost:3000/api/database/1": "bigquery_database.json", + "http://localhost:3000/api/database/2": "postgres_database.json", + "http://localhost:3000/api/card/1": "card_1.json", + "http://localhost:3000/api/card/2": "card_2.json", + "http://localhost:3000/api/table/21": "table_21.json", + "http://localhost:3000/api/card/3": "card_3.json", + } @pytest.fixture @@ -124,16 +163,24 @@ def test_pipeline(pytestconfig, tmp_path): @freeze_time(FROZEN_TIME) -def test_mode_ingest_success(pytestconfig, tmp_path, test_pipeline, mock_datahub_graph): +def test_mode_ingest_success( + pytestconfig, tmp_path, test_pipeline, mock_datahub_graph, default_json_response_map +): with patch( "datahub.ingestion.source.metabase.requests.session", - side_effect=mocked_requests_sucess, + side_effect=MockResponse.build_mocked_requests_sucess( + default_json_response_map + ), ), patch( "datahub.ingestion.source.metabase.requests.post", - side_effect=mocked_requests_session_post, + side_effect=MockResponse.build_mocked_requests_session_post( + default_json_response_map + ), ), patch( "datahub.ingestion.source.metabase.requests.delete", - side_effect=mocked_requests_session_delete, + side_effect=MockResponse.build_mocked_requests_session_delete( + default_json_response_map + ), ), patch( "datahub.ingestion.source.state_provider.datahub_ingestion_checkpointing_provider.DataHubGraph", mock_datahub_graph, @@ -153,16 +200,21 @@ def test_mode_ingest_success(pytestconfig, tmp_path, test_pipeline, mock_datahub @freeze_time(FROZEN_TIME) -def test_stateful_ingestion(test_pipeline, mock_datahub_graph): +def test_stateful_ingestion( + test_pipeline, mock_datahub_graph, default_json_response_map +): + json_response_map = default_json_response_map with patch( "datahub.ingestion.source.metabase.requests.session", - side_effect=mocked_requests_sucess, + side_effect=MockResponse.build_mocked_requests_sucess(json_response_map), ), patch( "datahub.ingestion.source.metabase.requests.post", - side_effect=mocked_requests_session_post, + side_effect=MockResponse.build_mocked_requests_session_post(json_response_map), ), patch( "datahub.ingestion.source.metabase.requests.delete", - side_effect=mocked_requests_session_delete, + side_effect=MockResponse.build_mocked_requests_session_delete( + json_response_map + ), ), patch( "datahub.ingestion.source.state_provider.datahub_ingestion_checkpointing_provider.DataHubGraph", mock_datahub_graph, @@ -176,10 +228,10 @@ def test_stateful_ingestion(test_pipeline, mock_datahub_graph): assert checkpoint1.state # Mock the removal of one of the dashboards - JSON_RESPONSE_MAP[ + json_response_map[ "http://localhost:3000/api/collection/root/items?models=dashboard" ] = "collection_dashboards_deleted_item.json" - JSON_RESPONSE_MAP[ + json_response_map[ "http://localhost:3000/api/collection/150/items?models=dashboard" ] = "collection_dashboards_deleted_item.json" @@ -208,16 +260,22 @@ def test_stateful_ingestion(test_pipeline, mock_datahub_graph): @freeze_time(FROZEN_TIME) -def test_mode_ingest_failure(pytestconfig, tmp_path): +def test_mode_ingest_failure(pytestconfig, tmp_path, default_json_response_map): with patch( "datahub.ingestion.source.metabase.requests.session", - side_effect=mocked_requests_failure, + side_effect=MockResponse.build_mocked_requests_failure( + default_json_response_map + ), ), patch( "datahub.ingestion.source.metabase.requests.post", - side_effect=mocked_requests_session_post, + side_effect=MockResponse.build_mocked_requests_session_post( + default_json_response_map + ), ), patch( "datahub.ingestion.source.metabase.requests.delete", - side_effect=mocked_requests_session_delete, + side_effect=MockResponse.build_mocked_requests_session_delete( + default_json_response_map + ), ): pipeline = Pipeline.create( { @@ -245,3 +303,31 @@ def test_mode_ingest_failure(pytestconfig, tmp_path): assert exec_error.args[0] == "Source reported errors" assert len(exec_error.args[1].failures) == 1 assert list(exec_error.args[1].failures.keys())[0] == "metabase-dashboard" + + +def test_strip_template_expressions(): + query_with_variables = ( + "SELECT count(*) FROM products WHERE category = {{category}}", + "SELECT count(*) FROM products WHERE category = 1", + ) + query_with_optional_clause = ( + "SELECT count(*) FROM products [[WHERE category = {{category}}]]", + "SELECT count(*) FROM products ", + ) + query_with_dashboard_filters = ( + "SELECT count(*) FROM products WHERE {{Filter1}} AND {{Filter2}}", + "SELECT count(*) FROM products WHERE 1 AND 1", + ) + + assert ( + MetabaseSource.strip_template_expressions(query_with_variables[0]) + == query_with_variables[1] + ) + assert ( + MetabaseSource.strip_template_expressions(query_with_optional_clause[0]) + == query_with_optional_clause[1] + ) + assert ( + MetabaseSource.strip_template_expressions(query_with_dashboard_filters[0]) + == query_with_dashboard_filters[1] + )