From 46aa962bad67e0523b2707e1d535f6bf53c227f0 Mon Sep 17 00:00:00 2001 From: hwmarkcheng <94201005+hwmarkcheng@users.noreply.github.com> Date: Fri, 6 Dec 2024 16:48:00 -0500 Subject: [PATCH] feat(ingest/superset): initial support for superset datasets (#11972) --- .../src/datahub/ingestion/source/preset.py | 1 + .../src/datahub/ingestion/source/superset.py | 280 ++++++++--- .../superset/golden_test_ingest.json | 2 + .../superset/golden_test_stateful_ingest.json | 476 ++++++++++++++++-- .../integration/superset/test_superset.py | 343 ++++++++++++- .../tests/unit/test_preset_source.py | 20 + 6 files changed, 1020 insertions(+), 102 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/preset.py b/metadata-ingestion/src/datahub/ingestion/source/preset.py index 6f53223e000f1..7b0bc89648c52 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/preset.py +++ b/metadata-ingestion/src/datahub/ingestion/source/preset.py @@ -85,6 +85,7 @@ def __init__(self, ctx: PipelineContext, config: PresetConfig): super().__init__(ctx, config) self.config = config self.report = StaleEntityRemovalSourceReport() + self.platform = "preset" def login(self): try: diff --git a/metadata-ingestion/src/datahub/ingestion/source/superset.py b/metadata-ingestion/src/datahub/ingestion/source/superset.py index 5ce33da5c55fa..1da233bf0b22a 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/superset.py +++ b/metadata-ingestion/src/datahub/ingestion/source/superset.py @@ -1,10 +1,12 @@ import json import logging +from datetime import datetime from functools import lru_cache -from typing import Dict, Iterable, List, Optional +from typing import Any, Dict, Iterable, List, Optional import dateutil.parser as dp import requests +from pydantic import BaseModel from pydantic.class_validators import root_validator, validator from pydantic.fields import Field @@ -16,7 +18,9 @@ from datahub.emitter.mce_builder import ( make_chart_urn, make_dashboard_urn, + make_data_platform_urn, make_dataset_urn, + make_dataset_urn_with_platform_instance, make_domain_urn, ) from datahub.emitter.mcp_builder import add_domain_to_entity_wu @@ -31,6 +35,7 @@ ) from datahub.ingestion.api.source import MetadataWorkUnitProcessor, Source from datahub.ingestion.api.workunit import MetadataWorkUnit +from datahub.ingestion.source.sql.sql_types import resolve_sql_type from datahub.ingestion.source.sql.sqlalchemy_uri_mapper import ( get_platform_from_sqlalchemy_uri, ) @@ -47,16 +52,26 @@ AuditStamp, ChangeAuditStamps, Status, + TimeStamp, ) from datahub.metadata.com.linkedin.pegasus2avro.metadata.snapshot import ( ChartSnapshot, DashboardSnapshot, + DatasetSnapshot, ) from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent +from datahub.metadata.com.linkedin.pegasus2avro.schema import ( + MySqlDDL, + NullType, + SchemaField, + SchemaFieldDataType, + SchemaMetadata, +) from datahub.metadata.schema_classes import ( ChartInfoClass, ChartTypeClass, DashboardInfoClass, + DatasetPropertiesClass, ) from datahub.utilities import config_clean from datahub.utilities.registries.domain_registry import DomainRegistry @@ -82,9 +97,29 @@ "box_plot": ChartTypeClass.BAR, } + platform_without_databases = ["druid"] +class SupersetDataset(BaseModel): + id: int + table_name: str + changed_on_utc: Optional[str] = None + explore_url: Optional[str] = "" + + @property + def modified_dt(self) -> Optional[datetime]: + if self.changed_on_utc: + return dp.parse(self.changed_on_utc) + return None + + @property + def modified_ts(self) -> Optional[int]: + if self.modified_dt: + return int(self.modified_dt.timestamp() * 1000) + return None + + class SupersetConfig( StatefulIngestionConfigBase, EnvConfigMixin, PlatformInstanceConfigMixin ): @@ -103,15 +138,17 @@ class SupersetConfig( ) username: Optional[str] = Field(default=None, description="Superset username.") password: Optional[str] = Field(default=None, description="Superset password.") - api_key: Optional[str] = Field(default=None, description="Preset.io API key.") - api_secret: Optional[str] = Field(default=None, description="Preset.io API secret.") - manager_uri: str = Field( - default="https://api.app.preset.io", description="Preset.io API URL" - ) # Configuration for stateful ingestion stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = Field( default=None, description="Superset Stateful Ingestion Config." ) + ingest_dashboards: bool = Field( + default=True, description="Enable to ingest dashboards." + ) + ingest_charts: bool = Field(default=True, description="Enable to ingest charts.") + ingest_datasets: bool = Field( + default=False, description="Enable to ingest datasets." + ) provider: str = Field(default="db", description="Superset provider.") options: Dict = Field(default={}, description="") @@ -123,6 +160,10 @@ class SupersetConfig( description="Can be used to change mapping for database names in superset to what you have in datahub", ) + class Config: + # This is required to allow preset configs to get parsed + extra = "allow" + @validator("connect_uri", "display_uri") def remove_trailing_slash(cls, v): return config_clean.remove_trailing_slashes(v) @@ -229,6 +270,28 @@ def create(cls, config_dict: dict, ctx: PipelineContext) -> Source: config = SupersetConfig.parse_obj(config_dict) return cls(ctx, config) + def paginate_entity_api_results(self, entity_type, page_size=100): + current_page = 0 + total_items = page_size + + while current_page * page_size < total_items: + response = self.session.get( + f"{self.config.connect_uri}/api/v1/{entity_type}/", + params={"q": f"(page:{current_page},page_size:{page_size})"}, + ) + + if response.status_code != 200: + logger.warning(f"Failed to get {entity_type} data: {response.text}") + + payload = response.json() + # Update total_items with the actual count from the response + total_items = payload.get("count", total_items) + # Yield each item in the result, this gets passed into the construct functions + for item in payload.get("result", []): + yield item + + current_page += 1 + @lru_cache(maxsize=None) def get_platform_from_database_id(self, database_id): database_response = self.session.get( @@ -250,11 +313,18 @@ def get_platform_from_database_id(self, database_id): return platform_name @lru_cache(maxsize=None) - def get_datasource_urn_from_id(self, datasource_id): + def get_dataset_info(self, dataset_id: int) -> dict: dataset_response = self.session.get( - f"{self.config.connect_uri}/api/v1/dataset/{datasource_id}" - ).json() - + f"{self.config.connect_uri}/api/v1/dataset/{dataset_id}", + ) + if dataset_response.status_code != 200: + logger.warning(f"Failed to get dataset info: {dataset_response.text}") + dataset_response.raise_for_status() + return dataset_response.json() + + def get_datasource_urn_from_id( + self, dataset_response: dict, platform_instance: str + ) -> str: schema_name = dataset_response.get("result", {}).get("schema") table_name = dataset_response.get("result", {}).get("table_name") database_id = dataset_response.get("result", {}).get("database", {}).get("id") @@ -283,9 +353,11 @@ def get_datasource_urn_from_id(self, datasource_id): ), env=self.config.env, ) - return None + raise ValueError("Could not construct dataset URN") - def construct_dashboard_from_api_data(self, dashboard_data): + def construct_dashboard_from_api_data( + self, dashboard_data: dict + ) -> DashboardSnapshot: dashboard_urn = make_dashboard_urn( platform=self.platform, name=dashboard_data["id"], @@ -340,7 +412,7 @@ def construct_dashboard_from_api_data(self, dashboard_data): } if dashboard_data.get("certified_by"): - custom_properties["CertifiedBy"] = dashboard_data.get("certified_by") + custom_properties["CertifiedBy"] = dashboard_data.get("certified_by", "") custom_properties["CertificationDetails"] = str( dashboard_data.get("certification_details") ) @@ -358,38 +430,25 @@ def construct_dashboard_from_api_data(self, dashboard_data): return dashboard_snapshot def emit_dashboard_mces(self) -> Iterable[MetadataWorkUnit]: - current_dashboard_page = 0 - # we will set total dashboards to the actual number after we get the response - total_dashboards = PAGE_SIZE - - while current_dashboard_page * PAGE_SIZE <= total_dashboards: - dashboard_response = self.session.get( - f"{self.config.connect_uri}/api/v1/dashboard/", - params=f"q=(page:{current_dashboard_page},page_size:{PAGE_SIZE})", - ) - if dashboard_response.status_code != 200: - logger.warning( - f"Failed to get dashboard data: {dashboard_response.text}" - ) - dashboard_response.raise_for_status() - - payload = dashboard_response.json() - total_dashboards = payload.get("count") or 0 - - current_dashboard_page += 1 - - for dashboard_data in payload["result"]: + for dashboard_data in self.paginate_entity_api_results("dashboard", PAGE_SIZE): + try: dashboard_snapshot = self.construct_dashboard_from_api_data( dashboard_data ) - mce = MetadataChangeEvent(proposedSnapshot=dashboard_snapshot) - yield MetadataWorkUnit(id=dashboard_snapshot.urn, mce=mce) - yield from self._get_domain_wu( - title=dashboard_data.get("dashboard_title", ""), - entity_urn=dashboard_snapshot.urn, + except Exception as e: + self.report.warning( + f"Failed to construct dashboard snapshot. Dashboard name: {dashboard_data.get('dashboard_title')}. Error: \n{e}" ) + continue + # Emit the dashboard + mce = MetadataChangeEvent(proposedSnapshot=dashboard_snapshot) + yield MetadataWorkUnit(id=dashboard_snapshot.urn, mce=mce) + yield from self._get_domain_wu( + title=dashboard_data.get("dashboard_title", ""), + entity_urn=dashboard_snapshot.urn, + ) - def construct_chart_from_chart_data(self, chart_data): + def construct_chart_from_chart_data(self, chart_data: dict) -> ChartSnapshot: chart_urn = make_chart_urn( platform=self.platform, name=chart_data["id"], @@ -415,9 +474,12 @@ def construct_chart_from_chart_data(self, chart_data): chart_url = f"{self.config.display_uri}{chart_data.get('url', '')}" datasource_id = chart_data.get("datasource_id") - datasource_urn = self.get_datasource_urn_from_id(datasource_id) + dataset_response = self.get_dataset_info(datasource_id) + datasource_urn = self.get_datasource_urn_from_id( + dataset_response, self.platform + ) - params = json.loads(chart_data.get("params")) + params = json.loads(chart_data.get("params", "{}")) metrics = [ get_metric_name(metric) for metric in (params.get("metrics", []) or [params.get("metric")]) @@ -467,36 +529,124 @@ def construct_chart_from_chart_data(self, chart_data): return chart_snapshot def emit_chart_mces(self) -> Iterable[MetadataWorkUnit]: - current_chart_page = 0 - # we will set total charts to the actual number after we get the response - total_charts = PAGE_SIZE - - while current_chart_page * PAGE_SIZE <= total_charts: - chart_response = self.session.get( - f"{self.config.connect_uri}/api/v1/chart/", - params=f"q=(page:{current_chart_page},page_size:{PAGE_SIZE})", + for chart_data in self.paginate_entity_api_results("chart", PAGE_SIZE): + try: + chart_snapshot = self.construct_chart_from_chart_data(chart_data) + + mce = MetadataChangeEvent(proposedSnapshot=chart_snapshot) + except Exception as e: + self.report.warning( + f"Failed to construct chart snapshot. Chart name: {chart_data.get('table_name')}. Error: \n{e}" + ) + continue + # Emit the chart + yield MetadataWorkUnit(id=chart_snapshot.urn, mce=mce) + yield from self._get_domain_wu( + title=chart_data.get("slice_name", ""), + entity_urn=chart_snapshot.urn, ) - if chart_response.status_code != 200: - logger.warning(f"Failed to get chart data: {chart_response.text}") - chart_response.raise_for_status() - current_chart_page += 1 + def gen_schema_fields(self, column_data: List[Dict[str, str]]) -> List[SchemaField]: + schema_fields: List[SchemaField] = [] + for col in column_data: + col_type = (col.get("type") or "").lower() + data_type = resolve_sql_type(col_type) + if data_type is None: + data_type = NullType() + + field = SchemaField( + fieldPath=col.get("column_name", ""), + type=SchemaFieldDataType(data_type), + nativeDataType="", + description=col.get("column_name", ""), + nullable=True, + ) + schema_fields.append(field) + return schema_fields + + def gen_schema_metadata( + self, + dataset_response: dict, + ) -> SchemaMetadata: + dataset_response = dataset_response.get("result", {}) + column_data = dataset_response.get("columns", []) + schema_metadata = SchemaMetadata( + schemaName=dataset_response.get("table_name", ""), + platform=make_data_platform_urn(self.platform), + version=0, + hash="", + platformSchema=MySqlDDL(tableSchema=""), + fields=self.gen_schema_fields(column_data), + ) + return schema_metadata - payload = chart_response.json() - total_charts = payload["count"] - for chart_data in payload["result"]: - chart_snapshot = self.construct_chart_from_chart_data(chart_data) + def gen_dataset_urn(self, datahub_dataset_name: str) -> str: + return make_dataset_urn_with_platform_instance( + platform=self.platform, + name=datahub_dataset_name, + platform_instance=self.config.platform_instance, + env=self.config.env, + ) - mce = MetadataChangeEvent(proposedSnapshot=chart_snapshot) - yield MetadataWorkUnit(id=chart_snapshot.urn, mce=mce) - yield from self._get_domain_wu( - title=chart_data.get("slice_name", ""), - entity_urn=chart_snapshot.urn, + def construct_dataset_from_dataset_data( + self, dataset_data: dict + ) -> DatasetSnapshot: + dataset_response = self.get_dataset_info(dataset_data.get("id")) + dataset = SupersetDataset(**dataset_response["result"]) + datasource_urn = self.get_datasource_urn_from_id( + dataset_response, self.platform + ) + + dataset_url = f"{self.config.display_uri}{dataset.explore_url or ''}" + + dataset_info = DatasetPropertiesClass( + name=dataset.table_name, + description="", + lastModified=TimeStamp(time=dataset.modified_ts) + if dataset.modified_ts + else None, + externalUrl=dataset_url, + ) + aspects_items: List[Any] = [] + aspects_items.extend( + [ + self.gen_schema_metadata(dataset_response), + dataset_info, + ] + ) + + dataset_snapshot = DatasetSnapshot( + urn=datasource_urn, + aspects=aspects_items, + ) + return dataset_snapshot + + def emit_dataset_mces(self) -> Iterable[MetadataWorkUnit]: + for dataset_data in self.paginate_entity_api_results("dataset", PAGE_SIZE): + try: + dataset_snapshot = self.construct_dataset_from_dataset_data( + dataset_data ) + mce = MetadataChangeEvent(proposedSnapshot=dataset_snapshot) + except Exception as e: + self.report.warning( + f"Failed to construct dataset snapshot. Dataset name: {dataset_data.get('table_name')}. Error: \n{e}" + ) + continue + # Emit the dataset + yield MetadataWorkUnit(id=dataset_snapshot.urn, mce=mce) + yield from self._get_domain_wu( + title=dataset_data.get("table_name", ""), + entity_urn=dataset_snapshot.urn, + ) def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: - yield from self.emit_dashboard_mces() - yield from self.emit_chart_mces() + if self.config.ingest_dashboards: + yield from self.emit_dashboard_mces() + if self.config.ingest_charts: + yield from self.emit_chart_mces() + if self.config.ingest_datasets: + yield from self.emit_dataset_mces() def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]: return [ diff --git a/metadata-ingestion/tests/integration/superset/golden_test_ingest.json b/metadata-ingestion/tests/integration/superset/golden_test_ingest.json index 767b85a72b975..4801af9465f2c 100644 --- a/metadata-ingestion/tests/integration/superset/golden_test_ingest.json +++ b/metadata-ingestion/tests/integration/superset/golden_test_ingest.json @@ -26,6 +26,7 @@ "urn:li:chart:(superset,11)" ], "datasets": [], + "dashboards": [], "lastModified": { "created": { "time": 0, @@ -73,6 +74,7 @@ "urn:li:chart:(superset,13)" ], "datasets": [], + "dashboards": [], "lastModified": { "created": { "time": 0, diff --git a/metadata-ingestion/tests/integration/superset/golden_test_stateful_ingest.json b/metadata-ingestion/tests/integration/superset/golden_test_stateful_ingest.json index 0f20799017979..ac6a3b6942a32 100644 --- a/metadata-ingestion/tests/integration/superset/golden_test_stateful_ingest.json +++ b/metadata-ingestion/tests/integration/superset/golden_test_stateful_ingest.json @@ -26,6 +26,7 @@ "urn:li:chart:(superset,11)" ], "datasets": [], + "dashboards": [], "lastModified": { "created": { "time": 0, @@ -44,7 +45,7 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "superset-2020_04_14-07_00_00", + "runId": "superset-2020_04_14-07_00_00-83v7ts", "lastRunId": "no-run-id-provided", "pipelineName": "test_pipeline" } @@ -92,7 +93,7 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "superset-2020_04_14-07_00_00", + "runId": "superset-2020_04_14-07_00_00-83v7ts", "lastRunId": "no-run-id-provided", "pipelineName": "test_pipeline" } @@ -140,7 +141,7 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "superset-2020_04_14-07_00_00", + "runId": "superset-2020_04_14-07_00_00-83v7ts", "lastRunId": "no-run-id-provided", "pipelineName": "test_pipeline" } @@ -188,47 +189,413 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "superset-2020_04_14-07_00_00", + "runId": "superset-2020_04_14-07_00_00-83v7ts", "lastRunId": "no-run-id-provided", "pipelineName": "test_pipeline" } }, { "proposedSnapshot": { - "com.linkedin.pegasus2avro.metadata.snapshot.ChartSnapshot": { - "urn": "urn:li:chart:(superset,13)", + "com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": { + "urn": "urn:li:dataset:(urn:li:dataPlatform:postgres,test_database1.test_schema2.Test Table 2,PROD)", "aspects": [ { - "com.linkedin.pegasus2avro.common.Status": { - "removed": false + "com.linkedin.pegasus2avro.schema.SchemaMetadata": { + "schemaName": "Test Table 2", + "platform": "urn:li:dataPlatform:superset", + "version": 0, + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "hash": "", + "platformSchema": { + "com.linkedin.pegasus2avro.schema.MySqlDDL": { + "tableSchema": "" + } + }, + "fields": [] } }, { - "com.linkedin.pegasus2avro.chart.ChartInfo": { - "customProperties": { - "Metrics": "", - "Filters": "", - "Dimensions": "" + "com.linkedin.pegasus2avro.dataset.DatasetProperties": { + "customProperties": {}, + "externalUrl": "mock://mock-domain.superset.com", + "name": "Test Table 2", + "description": "", + "tags": [] + } + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "superset-2020_04_14-07_00_00-83v7ts", + "lastRunId": "no-run-id-provided", + "pipelineName": "test_pipeline" + } +}, +{ + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": { + "urn": "urn:li:dataset:(urn:li:dataPlatform:postgres,test_database1.test_schema2.Test Table 2,PROD)", + "aspects": [ + { + "com.linkedin.pegasus2avro.schema.SchemaMetadata": { + "schemaName": "Test Table 2", + "platform": "urn:li:dataPlatform:superset", + "version": 0, + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" }, - "title": "test_chart_title_4", + "hash": "", + "platformSchema": { + "com.linkedin.pegasus2avro.schema.MySqlDDL": { + "tableSchema": "" + } + }, + "fields": [] + } + }, + { + "com.linkedin.pegasus2avro.dataset.DatasetProperties": { + "customProperties": {}, + "externalUrl": "mock://mock-domain.superset.com", + "name": "Test Table 2", "description": "", + "tags": [] + } + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "superset-2020_04_14-07_00_00-83v7ts", + "lastRunId": "no-run-id-provided", + "pipelineName": "test_pipeline" + } +}, +{ + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": { + "urn": "urn:li:dataset:(urn:li:dataPlatform:postgres,test_database1.test_schema2.Test Table 2,PROD)", + "aspects": [ + { + "com.linkedin.pegasus2avro.schema.SchemaMetadata": { + "schemaName": "Test Table 2", + "platform": "urn:li:dataPlatform:superset", + "version": 0, + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, "lastModified": { - "created": { - "time": 0, - "actor": "urn:li:corpuser:unknown" - }, - "lastModified": { - "time": 1586847600000, - "actor": "urn:li:corpuser:test_username_2" + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "hash": "", + "platformSchema": { + "com.linkedin.pegasus2avro.schema.MySqlDDL": { + "tableSchema": "" } }, - "chartUrl": "mock://mock-domain.superset.com/explore/test_chart_url_13", - "inputs": [ - { - "string": "urn:li:dataset:(urn:li:dataPlatform:external,test_database_name.test_schema_name.test_table_name,PROD)" + "fields": [] + } + }, + { + "com.linkedin.pegasus2avro.dataset.DatasetProperties": { + "customProperties": {}, + "externalUrl": "mock://mock-domain.superset.com", + "name": "Test Table 2", + "description": "", + "tags": [] + } + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "superset-2020_04_14-07_00_00-83v7ts", + "lastRunId": "no-run-id-provided", + "pipelineName": "test_pipeline" + } +}, +{ + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": { + "urn": "urn:li:dataset:(urn:li:dataPlatform:postgres,test_database1.test_schema2.Test Table 2,PROD)", + "aspects": [ + { + "com.linkedin.pegasus2avro.schema.SchemaMetadata": { + "schemaName": "Test Table 2", + "platform": "urn:li:dataPlatform:superset", + "version": 0, + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "hash": "", + "platformSchema": { + "com.linkedin.pegasus2avro.schema.MySqlDDL": { + "tableSchema": "" } - ], - "type": "HISTOGRAM" + }, + "fields": [] + } + }, + { + "com.linkedin.pegasus2avro.dataset.DatasetProperties": { + "customProperties": {}, + "externalUrl": "mock://mock-domain.superset.com", + "name": "Test Table 2", + "description": "", + "tags": [] + } + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "superset-2020_04_14-07_00_00-83v7ts", + "lastRunId": "no-run-id-provided", + "pipelineName": "test_pipeline" + } +}, +{ + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": { + "urn": "urn:li:dataset:(urn:li:dataPlatform:postgres,test_database1.test_schema2.Test Table 2,PROD)", + "aspects": [ + { + "com.linkedin.pegasus2avro.schema.SchemaMetadata": { + "schemaName": "Test Table 2", + "platform": "urn:li:dataPlatform:superset", + "version": 0, + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "hash": "", + "platformSchema": { + "com.linkedin.pegasus2avro.schema.MySqlDDL": { + "tableSchema": "" + } + }, + "fields": [] + } + }, + { + "com.linkedin.pegasus2avro.dataset.DatasetProperties": { + "customProperties": {}, + "externalUrl": "mock://mock-domain.superset.com", + "name": "Test Table 2", + "description": "", + "tags": [] + } + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "superset-2020_04_14-07_00_00-83v7ts", + "lastRunId": "no-run-id-provided", + "pipelineName": "test_pipeline" + } +}, +{ + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": { + "urn": "urn:li:dataset:(urn:li:dataPlatform:postgres,test_database1.test_schema2.Test Table 2,PROD)", + "aspects": [ + { + "com.linkedin.pegasus2avro.schema.SchemaMetadata": { + "schemaName": "Test Table 2", + "platform": "urn:li:dataPlatform:superset", + "version": 0, + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "hash": "", + "platformSchema": { + "com.linkedin.pegasus2avro.schema.MySqlDDL": { + "tableSchema": "" + } + }, + "fields": [] + } + }, + { + "com.linkedin.pegasus2avro.dataset.DatasetProperties": { + "customProperties": {}, + "externalUrl": "mock://mock-domain.superset.com", + "name": "Test Table 2", + "description": "", + "tags": [] + } + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "superset-2020_04_14-07_00_00-83v7ts", + "lastRunId": "no-run-id-provided", + "pipelineName": "test_pipeline" + } +}, +{ + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": { + "urn": "urn:li:dataset:(urn:li:dataPlatform:postgres,test_database1.test_schema2.Test Table 2,PROD)", + "aspects": [ + { + "com.linkedin.pegasus2avro.schema.SchemaMetadata": { + "schemaName": "Test Table 2", + "platform": "urn:li:dataPlatform:superset", + "version": 0, + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "hash": "", + "platformSchema": { + "com.linkedin.pegasus2avro.schema.MySqlDDL": { + "tableSchema": "" + } + }, + "fields": [] + } + }, + { + "com.linkedin.pegasus2avro.dataset.DatasetProperties": { + "customProperties": {}, + "externalUrl": "mock://mock-domain.superset.com", + "name": "Test Table 2", + "description": "", + "tags": [] + } + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "superset-2020_04_14-07_00_00-83v7ts", + "lastRunId": "no-run-id-provided", + "pipelineName": "test_pipeline" + } +}, +{ + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": { + "urn": "urn:li:dataset:(urn:li:dataPlatform:postgres,test_database1.test_schema2.Test Table 2,PROD)", + "aspects": [ + { + "com.linkedin.pegasus2avro.schema.SchemaMetadata": { + "schemaName": "Test Table 2", + "platform": "urn:li:dataPlatform:superset", + "version": 0, + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "hash": "", + "platformSchema": { + "com.linkedin.pegasus2avro.schema.MySqlDDL": { + "tableSchema": "" + } + }, + "fields": [] + } + }, + { + "com.linkedin.pegasus2avro.dataset.DatasetProperties": { + "customProperties": {}, + "externalUrl": "mock://mock-domain.superset.com", + "name": "Test Table 2", + "description": "", + "tags": [] + } + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "superset-2020_04_14-07_00_00-83v7ts", + "lastRunId": "no-run-id-provided", + "pipelineName": "test_pipeline" + } +}, +{ + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": { + "urn": "urn:li:dataset:(urn:li:dataPlatform:postgres,test_database1.test_schema2.Test Table 2,PROD)", + "aspects": [ + { + "com.linkedin.pegasus2avro.schema.SchemaMetadata": { + "schemaName": "Test Table 2", + "platform": "urn:li:dataPlatform:superset", + "version": 0, + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "hash": "", + "platformSchema": { + "com.linkedin.pegasus2avro.schema.MySqlDDL": { + "tableSchema": "" + } + }, + "fields": [] + } + }, + { + "com.linkedin.pegasus2avro.dataset.DatasetProperties": { + "customProperties": {}, + "externalUrl": "mock://mock-domain.superset.com", + "name": "Test Table 2", + "description": "", + "tags": [] } } ] @@ -236,7 +603,41 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "superset-2020_04_14-07_00_00", + "runId": "superset-2020_04_14-07_00_00-83v7ts", + "lastRunId": "no-run-id-provided", + "pipelineName": "test_pipeline" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,test_database1.test_schema2.Test Table 2,PROD)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "superset-2020_04_14-07_00_00-83v7ts", + "lastRunId": "no-run-id-provided", + "pipelineName": "test_pipeline" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,test_database1.test_schema1.Test Table 1,PROD)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": true + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "superset-2020_04_14-07_00_00-83v7ts", "lastRunId": "no-run-id-provided", "pipelineName": "test_pipeline" } @@ -253,7 +654,24 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "superset-2020_04_14-07_00_00", + "runId": "superset-2020_04_14-07_00_00-83v7ts", + "lastRunId": "no-run-id-provided", + "pipelineName": "test_pipeline" + } +}, +{ + "entityType": "chart", + "entityUrn": "urn:li:chart:(superset,13)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": true + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "superset-2020_04_14-07_00_00-83v7ts", "lastRunId": "no-run-id-provided", "pipelineName": "test_pipeline" } diff --git a/metadata-ingestion/tests/integration/superset/test_superset.py b/metadata-ingestion/tests/integration/superset/test_superset.py index b3b5982016146..e8251e54a1f85 100644 --- a/metadata-ingestion/tests/integration/superset/test_superset.py +++ b/metadata-ingestion/tests/integration/superset/test_superset.py @@ -133,6 +133,222 @@ def register_mock_api(request_mock: Any, override_data: dict = {}) -> None: ], }, }, + "mock://mock-domain.superset.com/api/v1/dataset/": { + "method": "GET", + "status_code": 200, + "json": { + "count": 215, + "description_columns": {}, + "ids": [1, 2, 3], + "result": [ + { + "changed_by": { + "first_name": "Test", + "id": 1, + "last_name": "User1", + "username": "test_username_1", + }, + "changed_by_name": "test_username_1", + "changed_on_delta_humanized": "10 months ago", + "changed_on_utc": "2024-01-05T21:10:15.650819+0000", + "database": {"database_name": "test_database1", "id": 1}, + "datasource_type": "table", + "default_endpoint": None, + "description": None, + "explore_url": "/explore/?datasource_type=table&datasource_id=1", + "extra": None, + "id": 1, + "kind": "virtual", + "owners": [ + { + "first_name": "Test", + "id": 1, + "last_name": "Owner1", + "username": "test_username_1", + } + ], + "schema": "test_schema1", + "sql": "SELECT * FROM test_table1", + "table_name": "Test Table 1", + }, + { + "changed_by": { + "first_name": "Test", + "id": 2, + "last_name": "User2", + "username": "test_username_2", + }, + "changed_by_name": "test_username_2", + "changed_on_delta_humanized": "9 months ago", + "changed_on_utc": "2024-02-10T15:30:20.123456+0000", + "database": {"database_name": "test_database2", "id": 2}, + "datasource_type": "table", + "default_endpoint": None, + "description": "Sample description for dataset 2", + "explore_url": "/explore/?datasource_type=table&datasource_id=2", + "extra": None, + "id": 2, + "kind": "physical", + "owners": [ + { + "first_name": "Test", + "id": 2, + "last_name": "Owner2", + "username": "test_username_2", + } + ], + "schema": "test_schema2", + "sql": "SELECT * FROM test_table2", + "table_name": "Test Table 2", + }, + ], + }, + }, + "mock://mock-domain.superset.com/api/v1/dataset/1": { + "method": "GET", + "status_code": 200, + "json": { + "id": 1, + "result": { + "always_filter_main_dttm": False, + "cache_timeout": None, + "changed_by": {"first_name": "Test", "last_name": "User1"}, + "changed_on": "2024-01-05T21:10:15.650819+0000", + "changed_on_humanized": "10 months ago", + "created_by": {"first_name": "Test", "last_name": "User1"}, + "created_on": "2024-01-05T21:10:15.650819+0000", + "created_on_humanized": "10 months ago", + "currency_formats": {}, + "database": { + "backend": "postgresql", + "database_name": "test_database1", + "id": 1, + }, + "datasource_name": "Test Table 1", + "datasource_type": "table", + "default_endpoint": None, + "description": None, + "extra": None, + "fetch_values_predicate": None, + "filter_select_enabled": True, + "granularity_sqla": [ + ["created_at", "created_at"], + ["updated_at", "updated_at"], + ], + "id": 1, + "is_managed_externally": False, + "is_sqllab_view": False, + "kind": "virtual", + "main_dttm_col": None, + "metrics": [ + { + "changed_on": "2024-01-05T21:10:15.650819+0000", + "created_on": "2024-01-05T21:10:15.650819+0000", + "currency": None, + "d3format": None, + "description": None, + "expression": "count(*)", + "extra": None, + "id": 1, + "metric_name": "count", + "metric_type": None, + "rendered_expression": "count(*)", + "verbose_name": None, + "warning_text": None, + } + ], + "name": "Test Table 1", + "normalize_columns": True, + "offset": 0, + "owners": [{"first_name": "Test", "id": 1, "last_name": "Owner1"}], + "rendered_sql": "SELECT * FROM test_table1", + "schema": "test_schema1", + "select_star": "SELECT * FROM test_schema1.test_table1 LIMIT 100", + "sql": "SELECT * FROM test_table1", + "table_name": "Test Table 1", + "uid": "1__table", + "url": "/tablemodelview/edit/1", + "verbose_map": { + "__timestamp": "Time", + "id": "ID", + "name": "Name", + "created_at": "Created At", + "updated_at": "Updated At", + }, + }, + }, + }, + "mock://mock-domain.superset.com/api/v1/dataset/2": { + "method": "GET", + "status_code": 200, + "json": { + "id": 2, + "result": { + "always_filter_main_dttm": False, + "cache_timeout": None, + "changed_by": {"first_name": "Test", "last_name": "User2"}, + "changed_on": "2024-02-10T15:30:20.123456+0000", + "changed_on_humanized": "9 months ago", + "created_by": {"first_name": "Test", "last_name": "User2"}, + "created_on": "2024-02-10T15:30:20.123456+0000", + "created_on_humanized": "9 months ago", + "currency_formats": {}, + "database": { + "backend": "postgresql", + "database_name": "test_database1", + "id": 1, + }, + "datasource_name": "Test Table 2", + "datasource_type": "table", + "default_endpoint": None, + "description": "Sample description for dataset 2", + "extra": None, + "fetch_values_predicate": None, + "filter_select_enabled": True, + "granularity_sqla": [["date_column", "date_column"]], + "id": 2, + "is_managed_externally": False, + "is_sqllab_view": True, + "kind": "virtual", + "main_dttm_col": "date_column", + "metrics": [ + { + "changed_on": "2024-02-10T15:30:20.123456+0000", + "created_on": "2024-02-10T15:30:20.123456+0000", + "currency": None, + "d3format": None, + "description": None, + "expression": "sum(value)", + "extra": None, + "id": 2, + "metric_name": "total_value", + "metric_type": None, + "rendered_expression": "sum(value)", + "verbose_name": "Total Value", + "warning_text": None, + } + ], + "name": "Test Table 2", + "normalize_columns": True, + "offset": 0, + "owners": [{"first_name": "Test", "id": 2, "last_name": "Owner2"}], + "rendered_sql": "SELECT * FROM test_table2", + "schema": "test_schema2", + "select_star": "SELECT * FROM test_schema2.test_table2 LIMIT 100", + "sql": "SELECT * FROM test_table2", + "table_name": "Test Table 2", + "uid": "2__table", + "url": "/tablemodelview/edit/2", + "verbose_map": { + "__timestamp": "Time", + "id": "ID", + "name": "Name", + "value": "Value", + "date_column": "Date", + }, + }, + }, + }, "mock://mock-domain.superset.com/api/v1/dataset/20": { "method": "GET", "status_code": 200, @@ -147,6 +363,19 @@ def register_mock_api(request_mock: Any, override_data: dict = {}) -> None: }, }, }, + "mock://mock-domain.superset.com/api/v1/database/1": { + "method": "GET", + "status_code": 200, + "json": { + "id": 1, + "result": { + "configuration_method": "sqlalchemy_form", + "database_name": "test_database1", + "id": 1, + "sqlalchemy_uri": "postgresql://user:password@host:port/test_database1", + }, + }, + }, "mock://mock-domain.superset.com/api/v1/database/30": { "method": "GET", "status_code": 200, @@ -225,6 +454,8 @@ def test_superset_stateful_ingest( "username": "test_username", "password": "test_password", "provider": "db", + # Enable dataset ingestion + "ingest_datasets": True, # enable stateful ingestion "stateful_ingestion": { "enabled": True, @@ -244,7 +475,7 @@ def test_superset_stateful_ingest( "pipeline_name": "test_pipeline", } - dashboard_endpoint_override = { + asset_override = { "mock://mock-domain.superset.com/api/v1/dashboard/": { "method": "GET", "status_code": 200, @@ -276,6 +507,92 @@ def test_superset_stateful_ingest( ], }, }, + "mock://mock-domain.superset.com/api/v1/chart/": { + "method": "GET", + "status_code": 200, + "json": { + "count": 3, + "result": [ + { + "id": "10", + "changed_by": { + "username": "test_username_1", + }, + "changed_on_utc": "2020-04-14T07:00:00.000000+0000", + "slice_name": "test_chart_title_1", + "viz_type": "box_plot", + "url": "/explore/test_chart_url_10", + "datasource_id": "20", + "params": '{"metrics": [], "adhoc_filters": []}', + }, + { + "id": "11", + "changed_by": { + "username": "test_username_1", + }, + "changed_on_utc": "2020-04-14T07:00:00.000000+0000", + "slice_name": "test_chart_title_2", + "viz_type": "pie", + "url": "/explore/test_chart_url_11", + "datasource_id": "20", + "params": '{"metrics": [], "adhoc_filters": []}', + }, + { + "id": "12", + "changed_by": { + "username": "test_username_2", + }, + "changed_on_utc": "2020-04-14T07:00:00.000000+0000", + "slice_name": "test_chart_title_3", + "viz_type": "treemap", + "url": "/explore/test_chart_url_12", + "datasource_id": "20", + "params": '{"metrics": [], "adhoc_filters": []}', + }, + ], + }, + }, + "mock://mock-domain.superset.com/api/v1/dataset/": { + "method": "GET", + "status_code": 200, + "json": { + "count": 214, + "description_columns": {}, + "ids": [1, 2], + "result": [ + { + "changed_by": { + "first_name": "Test", + "id": 2, + "last_name": "User2", + "username": "test_username_2", + }, + "changed_by_name": "test_username_2", + "changed_on_delta_humanized": "9 months ago", + "changed_on_utc": "2024-02-10T15:30:20.123456+0000", + "database": {"database_name": "test_database1", "id": 1}, + "datasource_type": "table", + "default_endpoint": None, + "description": "Sample description for dataset 2", + "explore_url": "/explore/?datasource_type=table&datasource_id=2", + "extra": None, + "id": 2, + "kind": "physical", + "owners": [ + { + "first_name": "Test", + "id": 2, + "last_name": "Owner2", + "username": "test_username_2", + } + ], + "schema": "test_schema2", + "sql": "SELECT * FROM test_table2", + "table_name": "Test Table 2", + }, + ], + }, + }, } with patch( @@ -292,10 +609,8 @@ def test_superset_stateful_ingest( assert checkpoint1 assert checkpoint1.state - # Remove one dashboard from the superset config. - register_mock_api( - request_mock=requests_mock, override_data=dashboard_endpoint_override - ) + # Remove one dashboard, chart, dataset from the superset config. + register_mock_api(request_mock=requests_mock, override_data=asset_override) # Capture MCEs of second run to validate Status(removed=true) deleted_mces_path = f"{tmp_path}/superset_deleted_mces.json" @@ -313,15 +628,27 @@ def test_superset_stateful_ingest( # part of the second state state1 = checkpoint1.state state2 = checkpoint2.state - difference_urns = list( + dashboard_difference_urns = list( state1.get_urns_not_in(type="dashboard", other_checkpoint_state=state2) ) + chart_difference_urns = list( + state1.get_urns_not_in(type="chart", other_checkpoint_state=state2) + ) + dataset_difference_urns = list( + state1.get_urns_not_in(type="dataset", other_checkpoint_state=state2) + ) - assert len(difference_urns) == 1 + assert len(dashboard_difference_urns) == 1 + assert len(chart_difference_urns) == 1 + assert len(dataset_difference_urns) == 1 urn1 = "urn:li:dashboard:(superset,2)" + urn2 = "urn:li:chart:(superset,13)" + urn3 = "urn:li:dataset:(urn:li:dataPlatform:postgres,test_database1.test_schema1.Test Table 1,PROD)" - assert urn1 in difference_urns + assert urn1 in dashboard_difference_urns + assert urn2 in chart_difference_urns + assert urn3 in dataset_difference_urns # Validate that all providers have committed successfully. validate_all_providers_have_committed_successfully( diff --git a/metadata-ingestion/tests/unit/test_preset_source.py b/metadata-ingestion/tests/unit/test_preset_source.py index d97db651f4c79..dc81f4c8284d5 100644 --- a/metadata-ingestion/tests/unit/test_preset_source.py +++ b/metadata-ingestion/tests/unit/test_preset_source.py @@ -20,3 +20,23 @@ def test_set_display_uri(): assert config.connect_uri == "" assert config.manager_uri == "https://api.app.preset.io" assert config.display_uri == display_uri + + +def test_preset_config_parsing(): + preset_config = { + "connect_uri": "https://preset.io", + "api_key": "dummy_api_key", + "api_secret": "dummy_api_secret", + "manager_uri": "https://api.app.preset.io", + } + + # Tests if SupersetConfig fields are parsed extra fields correctly + config = PresetConfig.parse_obj(preset_config) + + # Test Preset-specific fields + assert config.api_key == "dummy_api_key" + assert config.api_secret == "dummy_api_secret" + assert config.manager_uri == "https://api.app.preset.io" + + # Test that regular Superset fields are still parsed + assert config.connect_uri == "https://preset.io"