Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest/superset): initial support for superset datasets #11972

Merged
merged 17 commits into from
Dec 6, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions metadata-ingestion/src/datahub/ingestion/source/preset.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ def __init__(self, ctx: PipelineContext, config: PresetConfig):
super().__init__(ctx, config)
self.config = config
self.report = StaleEntityRemovalSourceReport()
self.platform = "preset"

def login(self):
try:
Expand Down
231 changes: 217 additions & 14 deletions metadata-ingestion/src/datahub/ingestion/source/superset.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import json
import logging
from functools import lru_cache
from typing import Dict, Iterable, List, Optional
from typing import Any, Dict, Iterable, List, Optional, Type, Union

import dateutil.parser as dp
import requests
Expand All @@ -16,7 +16,9 @@
from datahub.emitter.mce_builder import (
make_chart_urn,
make_dashboard_urn,
make_data_platform_urn,
make_dataset_urn,
make_dataset_urn_with_platform_instance,
make_domain_urn,
)
from datahub.emitter.mcp_builder import add_domain_to_entity_wu
Expand Down Expand Up @@ -47,16 +49,33 @@
AuditStamp,
ChangeAuditStamps,
Status,
TimeStamp,
)
from datahub.metadata.com.linkedin.pegasus2avro.metadata.snapshot import (
ChartSnapshot,
DashboardSnapshot,
DatasetSnapshot,
)
from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
from datahub.metadata.com.linkedin.pegasus2avro.schema import (
ArrayType,
BooleanType,
BytesType,
MySqlDDL,
NullType,
NumberType,
RecordType,
SchemaField,
SchemaFieldDataType,
SchemaMetadata,
StringType,
TimeType,
)
from datahub.metadata.schema_classes import (
ChartInfoClass,
ChartTypeClass,
DashboardInfoClass,
DatasetPropertiesClass,
)
from datahub.utilities import config_clean
from datahub.utilities.registries.domain_registry import DomainRegistry
Expand All @@ -82,6 +101,61 @@
"box_plot": ChartTypeClass.BAR,
}

SUPERSET_FIELD_TYPE_MAPPINGS: Dict[
str,
Type[
Union[
ArrayType,
BytesType,
BooleanType,
NumberType,
RecordType,
StringType,
TimeType,
NullType,
]
],
] = {
"BYTES": BytesType,
"BOOL": BooleanType,
"BOOLEAN": BooleanType,
"DOUBLE": NumberType,
"DOUBLE PRECISION": NumberType,
"DECIMAL": NumberType,
"NUMERIC": NumberType,
"BIGNUMERIC": NumberType,
"BIGDECIMAL": NumberType,
"FLOAT64": NumberType,
"INT": NumberType,
"INT64": NumberType,
"SMALLINT": NumberType,
"INTEGER": NumberType,
"BIGINT": NumberType,
"TINYINT": NumberType,
"BYTEINT": NumberType,
"STRING": StringType,
"TIME": TimeType,
"TIMESTAMP": TimeType,
"DATE": TimeType,
"DATETIME": TimeType,
"GEOGRAPHY": NullType,
"JSON": NullType,
"INTERVAL": NullType,
"ARRAY": ArrayType,
"STRUCT": RecordType,
"CHARACTER VARYING": StringType,
"CHARACTER": StringType,
"CHAR": StringType,
"TIMESTAMP WITHOUT TIME ZONE": TimeType,
"REAL": NumberType,
"VARCHAR": StringType,
"TIMESTAMPTZ": TimeType,
"GEOMETRY": NullType,
"HLLSKETCH": NullType,
"TIMETZ": TimeType,
"VARBYTE": StringType,
}
hwmarkcheng marked this conversation as resolved.
Show resolved Hide resolved

platform_without_databases = ["druid"]


Expand All @@ -105,8 +179,8 @@ class SupersetConfig(
password: Optional[str] = Field(default=None, description="Superset password.")
api_key: Optional[str] = Field(default=None, description="Preset.io API key.")
api_secret: Optional[str] = Field(default=None, description="Preset.io API secret.")
manager_uri: str = Field(
default="https://api.app.preset.io", description="Preset.io API URL"
manager_uri: Optional[str] = Field(
hwmarkcheng marked this conversation as resolved.
Show resolved Hide resolved
default=None, description="Preset.io API secret."
)
# Configuration for stateful ingestion
stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = Field(
Expand Down Expand Up @@ -250,11 +324,18 @@ def get_platform_from_database_id(self, database_id):
return platform_name

@lru_cache(maxsize=None)
def get_datasource_urn_from_id(self, datasource_id):
def get_dataset_info(self, dataset_id: int) -> dict:
dataset_response = self.session.get(
f"{self.config.connect_uri}/api/v1/dataset/{datasource_id}"
).json()

f"{self.config.connect_uri}/api/v1/dataset/{dataset_id}",
)
if dataset_response.status_code != 200:
logger.warning(f"Failed to get dataset info: {dataset_response.text}")
dataset_response.raise_for_status()
hwmarkcheng marked this conversation as resolved.
Show resolved Hide resolved
return dataset_response.json()

def get_datasource_urn_from_id(
self, dataset_response: dict, platform_instance: str
) -> str:
schema_name = dataset_response.get("result", {}).get("schema")
table_name = dataset_response.get("result", {}).get("table_name")
database_id = dataset_response.get("result", {}).get("database", {}).get("id")
Expand All @@ -277,15 +358,17 @@ def get_datasource_urn_from_id(self, datasource_id):

if database_id and table_name:
return make_dataset_urn(
platform=platform,
platform=platform_instance,
hwmarkcheng marked this conversation as resolved.
Show resolved Hide resolved
name=".".join(
name for name in [database_name, schema_name, table_name] if name
),
env=self.config.env,
)
return None
raise ValueError("Could not construct dataset URN")
hwmarkcheng marked this conversation as resolved.
Show resolved Hide resolved

def construct_dashboard_from_api_data(self, dashboard_data):
def construct_dashboard_from_api_data(
self, dashboard_data: dict
) -> DashboardSnapshot:
dashboard_urn = make_dashboard_urn(
platform=self.platform,
name=dashboard_data["id"],
Expand Down Expand Up @@ -340,7 +423,7 @@ def construct_dashboard_from_api_data(self, dashboard_data):
}

if dashboard_data.get("certified_by"):
custom_properties["CertifiedBy"] = dashboard_data.get("certified_by")
custom_properties["CertifiedBy"] = dashboard_data.get("certified_by", "")
custom_properties["CertificationDetails"] = str(
dashboard_data.get("certification_details")
)
Expand Down Expand Up @@ -389,7 +472,7 @@ def emit_dashboard_mces(self) -> Iterable[MetadataWorkUnit]:
entity_urn=dashboard_snapshot.urn,
)

def construct_chart_from_chart_data(self, chart_data):
def construct_chart_from_chart_data(self, chart_data: dict) -> ChartSnapshot:
chart_urn = make_chart_urn(
platform=self.platform,
name=chart_data["id"],
Expand All @@ -415,9 +498,12 @@ def construct_chart_from_chart_data(self, chart_data):
chart_url = f"{self.config.display_uri}{chart_data.get('url', '')}"

datasource_id = chart_data.get("datasource_id")
datasource_urn = self.get_datasource_urn_from_id(datasource_id)
dataset_response = self.get_dataset_info(datasource_id)
datasource_urn = self.get_datasource_urn_from_id(
dataset_response, self.platform
)

params = json.loads(chart_data.get("params"))
params = json.loads(chart_data.get("params", "{}"))
metrics = [
get_metric_name(metric)
for metric in (params.get("metrics", []) or [params.get("metric")])
Expand Down Expand Up @@ -494,9 +580,126 @@ def emit_chart_mces(self) -> Iterable[MetadataWorkUnit]:
entity_urn=chart_snapshot.urn,
)

def gen_schema_fields(self, column_data: List[Dict[str, str]]) -> List[SchemaField]:
schema_fields: List[SchemaField] = []
for col in column_data:
data_type = SUPERSET_FIELD_TYPE_MAPPINGS.get(col.get("type", ""))
field = SchemaField(
fieldPath=col.get("column_name", ""),
type=SchemaFieldDataType(data_type() if data_type else NullType()),
nativeDataType="",
description=col.get("column_name", ""),
nullable=True,
)
schema_fields.append(field)
return schema_fields

def gen_schema_metadata(
self,
dataset_response: dict,
) -> SchemaMetadata:
dataset_response = dataset_response.get("result", {})
column_data = dataset_response.get("columns", [])
schema_metadata = SchemaMetadata(
schemaName=dataset_response.get("table_name", ""),
platform=make_data_platform_urn(self.platform),
version=0,
hash="",
platformSchema=MySqlDDL(tableSchema=""),
fields=self.gen_schema_fields(column_data),
)
return schema_metadata

def gen_dataset_urn(self, datahub_dataset_name: str) -> str:
return make_dataset_urn_with_platform_instance(
platform=self.platform,
name=datahub_dataset_name,
platform_instance=self.config.platform_instance,
env=self.config.env,
)

def construct_dataset_from_dataset_data(
self, dataset_data: dict
hwmarkcheng marked this conversation as resolved.
Show resolved Hide resolved
) -> DatasetSnapshot:
dataset_response = self.get_dataset_info(dataset_data.get("id"))
datasource_urn = self.get_datasource_urn_from_id(
dataset_response, self.platform
)
# Check API format for dataset
modified_ts = int(
dp.parse(dataset_data.get("changed_on_utc", "now")).timestamp() * 1000
)
table_name = dataset_data.get("table_name", "")
dataset_url = f"{self.config.display_uri}{dataset_data.get('explore_url', '')}"
metrics = [
metric.get("metric_name")
for metric in (dataset_response.get("result", {}).get("metrics", []))
]

owners = [
owner.get("first_name") + "_" + str(owner.get("id"))
for owner in (dataset_response.get("result", {}).get("owners", []))
]
custom_properties = {
"Metrics": ", ".join(metrics),
"Owners": ", ".join(owners),
}

dataset_info = DatasetPropertiesClass(
name=table_name,
description="",
lastModified=TimeStamp(time=modified_ts),
externalUrl=dataset_url,
customProperties=custom_properties,
)
aspects_items: List[Any] = []
aspects_items.extend(
[
self.gen_schema_metadata(dataset_response),
hwmarkcheng marked this conversation as resolved.
Show resolved Hide resolved
dataset_info,
]
)

dataset_snapshot = DatasetSnapshot(
urn=datasource_urn,
aspects=aspects_items,
)
return dataset_snapshot

def emit_dataset_mces(self) -> Iterable[MetadataWorkUnit]:
current_dataset_page = 0
total_datasets = PAGE_SIZE
while current_dataset_page * PAGE_SIZE <= total_datasets:
full_dataset_response = self.session.get(
f"{self.config.connect_uri}/api/v1/dataset/",
params=f"q=(page:{current_dataset_page},page_size:{PAGE_SIZE})",
hwmarkcheng marked this conversation as resolved.
Show resolved Hide resolved
)
if full_dataset_response.status_code != 200:
logger.warning(
f"Failed to get dataset data: {full_dataset_response.text}"
)
full_dataset_response.raise_for_status()

current_dataset_page += 1

payload = full_dataset_response.json()
total_datasets = payload["count"]

for dataset_data in payload["result"]:
dataset_snapshot = self.construct_dataset_from_dataset_data(
dataset_data
)
mce = MetadataChangeEvent(proposedSnapshot=dataset_snapshot)
yield MetadataWorkUnit(id=dataset_snapshot.urn, mce=mce)
yield from self._get_domain_wu(
title=dataset_data.get("table_name", ""),
entity_urn=dataset_snapshot.urn,
)

def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
yield from self.emit_dashboard_mces()
yield from self.emit_chart_mces()
yield from self.emit_dataset_mces()
hwmarkcheng marked this conversation as resolved.
Show resolved Hide resolved

def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
return [
Expand Down
Loading