Skip to content

Commit

Permalink
feat(metabase): add stateful ingestion
Browse files Browse the repository at this point in the history
Modify the metabase source to support stateful ingestion

Signed-off-by: Pablo Osinaga <[email protected]>
  • Loading branch information
paguos committed Apr 23, 2024
1 parent 934ab03 commit 54f40b9
Show file tree
Hide file tree
Showing 7 changed files with 1,052 additions and 36 deletions.
31 changes: 25 additions & 6 deletions metadata-ingestion/src/datahub/ingestion/source/metabase.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import json
import logging
from dataclasses import dataclass
from datetime import datetime, timezone
from functools import lru_cache
from typing import Dict, Iterable, List, Optional, Tuple, Union
Expand All @@ -21,8 +22,10 @@
platform_name,
support_status,
)
from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.api.source import Source, SourceReport, MetadataWorkUnitProcessor
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.state.stateful_ingestion_base import StatefulIngestionConfigBase, StatefulIngestionReport, \
StatefulIngestionSourceBase
from datahub.metadata.com.linkedin.pegasus2avro.common import (
AuditStamp,
ChangeAuditStamps,
Expand All @@ -44,13 +47,15 @@
)
from datahub.sql_parsing.sqlglot_lineage import create_lineage_sql_parsed_result
from datahub.utilities import config_clean
from ingestion.source.state.stale_entity_removal_handler import StaleEntityRemovalHandler, \
StatefulStaleMetadataRemovalConfig, StaleEntityRemovalSourceReport

logger = logging.getLogger(__name__)

DATASOURCE_URN_RECURSION_LIMIT = 5


class MetabaseConfig(DatasetLineageProviderConfigBase):
class MetabaseConfig(DatasetLineageProviderConfigBase, StatefulIngestionConfigBase):
# See the Metabase /api/session endpoint for details
# https://www.metabase.com/docs/latest/api-documentation.html#post-apisession
connect_uri: str = Field(default="localhost:3000", description="Metabase host URL.")
Expand Down Expand Up @@ -84,6 +89,7 @@ class MetabaseConfig(DatasetLineageProviderConfigBase):
default=False,
description="Flag that if true, exclude other user collections",
)
stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = None

@validator("connect_uri", "display_uri")
def remove_trailing_slash(cls, v):
Expand All @@ -97,12 +103,16 @@ def default_display_uri_to_connect_uri(cls, values):
return values


@dataclass
class MetabaseReport(StaleEntityRemovalSourceReport):
pass

@platform_name("Metabase")
@config_class(MetabaseConfig)
@support_status(SupportStatus.CERTIFIED)
@capability(SourceCapability.PLATFORM_INSTANCE, "Enabled by default")
@capability(SourceCapability.LINEAGE_COARSE, "Supported by default")
class MetabaseSource(Source):
class MetabaseSource(StatefulIngestionSourceBase):
"""
This plugin extracts Charts, dashboards, and associated metadata. This plugin is in beta and has only been tested
on PostgreSQL and H2 database.
Expand Down Expand Up @@ -147,17 +157,18 @@ class MetabaseSource(Source):
"""

config: MetabaseConfig
report: SourceReport
report: MetabaseReport
platform = "metabase"

def __hash__(self):
return id(self)

def __init__(self, ctx: PipelineContext, config: MetabaseConfig):
super().__init__(ctx)
super().__init__(config, ctx)
self.config = config
self.report = SourceReport()
self.report = MetabaseReport()
self.setup_session()
self.source_config: MetabaseConfig = config

def setup_session(self) -> None:
login_response = requests.post(
Expand Down Expand Up @@ -739,6 +750,14 @@ def create(cls, config_dict: dict, ctx: PipelineContext) -> Source:
config = MetabaseConfig.parse_obj(config_dict)
return cls(ctx, config)

def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
return [
*super().get_workunit_processors(),
StaleEntityRemovalHandler.create(
self, self.source_config, self.ctx
).workunit_processor,
]

def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
yield from self.emit_card_mces()
yield from self.emit_dashboard_mces()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@
{
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DashboardSnapshot": {
"urn": "urn:li:dashboard:(metabase,1)",
"urn": "urn:li:dashboard:(metabase,10)",
"aspects": [
{
"com.linkedin.pegasus2avro.dashboard.DashboardInfo": {
Expand Down Expand Up @@ -234,7 +234,59 @@
{
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DashboardSnapshot": {
"urn": "urn:li:dashboard:(metabase,1)",
"urn": "urn:li:dashboard:(metabase,20)",
"aspects": [
{
"com.linkedin.pegasus2avro.dashboard.DashboardInfo": {
"customProperties": {},
"title": "Dashboard 2",
"description": "",
"charts": [
"urn:li:chart:(metabase,1)",
"urn:li:chart:(metabase,2)",
"urn:li:chart:(metabase,3)"
],
"datasets": [],
"lastModified": {
"created": {
"time": 1705398694904,
"actor": "urn:li:corpuser:[email protected]"
},
"lastModified": {
"time": 1705398694904,
"actor": "urn:li:corpuser:[email protected]"
}
},
"dashboardUrl": "http://localhost:3000/dashboard/20"
}
},
{
"com.linkedin.pegasus2avro.common.Ownership": {
"owners": [
{
"owner": "urn:li:corpuser:[email protected]",
"type": "DATAOWNER"
}
],
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
}
}
}
]
}
},
"systemMetadata": {
"lastObserved": 1636614000000,
"runId": "metabase-test",
"lastRunId": "no-run-id-provided"
}
},
{
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DashboardSnapshot": {
"urn": "urn:li:dashboard:(metabase,10)",
"aspects": [
{
"com.linkedin.pegasus2avro.dashboard.DashboardInfo": {
Expand Down Expand Up @@ -333,7 +385,23 @@
},
{
"entityType": "dashboard",
"entityUrn": "urn:li:dashboard:(metabase,1)",
"entityUrn": "urn:li:dashboard:(metabase,10)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1636614000000,
"runId": "metabase-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dashboard",
"entityUrn": "urn:li:dashboard:(metabase,20)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"total": 1, "data": [{"description": null, "collection_position": null, "database_id": null, "name": "This is a test", "id": 10, "entity_id": "Q4gEaOmoBkfQX3_gXiH9g", "last-edit-info": {"id": 14, "last_name": "Doe", "first_name": "John", "email": "[email protected]", "timestamp": "2024-01-12T14:55:38.43304Z"}, "model": "dashboard"}], "models": ["dashboard"], "limit": null, "offset": null}
{"total":2,"data":[{"description":null,"collection_position":null,"database_id":null,"name":"This is a test","id":10,"entity_id":"Q4gEaOmoBkfQX3_gXiH9g","last-edit-info":{"id":14,"last_name":"Doe","first_name":"John","email":"[email protected]","timestamp":"2024-01-12T14:55:38.43304Z"},"model":"dashboard"},{"description":null,"collection_position":null,"database_id":null,"name":"This is a test","id":20,"entity_id":"R5jSaUsuDkqFK9_gTiH2x","last-edit-info":{"id":14,"last_name":"Doe","first_name":"John","email":"[email protected]","timestamp":"2024-01-12T14:55:38.43304Z"},"model":"dashboard"}],"models":["dashboard"],"limit":null,"offset":null}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"total": 1, "data": [{"description": null, "collection_position": null, "database_id": null, "name": "This is a test", "id": 10, "entity_id": "Q4gEaOmoBkfQX3_gXiH9g", "last-edit-info": {"id": 14, "last_name": "Doe", "first_name": "John", "email": "[email protected]", "timestamp": "2024-01-12T14:55:38.43304Z"}, "model": "dashboard"}], "models": ["dashboard"], "limit": null, "offset": null}
Original file line number Diff line number Diff line change
Expand Up @@ -588,7 +588,7 @@
"entity_id": "lXypX5aa14HjkN_Im82C2",
"visualization_settings": {},
"size_y": 6,
"dashboard_id": 1,
"dashboard_id": 10,
"created_at": "2024-01-16T09:50:34.394488Z",
"row": 0
},
Expand Down Expand Up @@ -802,7 +802,7 @@
"entity_id": "iVOtiEPgX-a90Qh3rJWui",
"visualization_settings": {},
"size_y": 6,
"dashboard_id": 1,
"dashboard_id": 20,
"created_at": "2024-01-16T09:51:34.833525Z",
"row": 6
}
Expand All @@ -821,7 +821,7 @@
"made_public_by_id": null,
"embedding_params": null,
"cache_ttl": null,
"id": 1,
"id": 10,
"position": null,
"entity_id": "Z6B2yiCTEMiwZFe4x5jPT",
"param_fields": null,
Expand Down
Loading

0 comments on commit 54f40b9

Please sign in to comment.