Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest): grafana connector #10891

Merged
merged 15 commits into from
Jul 15, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,7 @@
"flask-openid>=1.3.0",
"dask[dataframe]<2024.7.0",
},
"grafana": {"requests"},
"glue": aws_common,
# hdbcli is supported officially by SAP, sqlalchemy-hana is built on top but not officially supported
"hana": sql_common
Expand Down Expand Up @@ -634,6 +635,7 @@
"dynamodb = datahub.ingestion.source.dynamodb.dynamodb:DynamoDBSource",
"elasticsearch = datahub.ingestion.source.elastic_search:ElasticsearchSource",
"feast = datahub.ingestion.source.feast:FeastRepositorySource",
"grafana = datahub.ingestion.source.grafana.grafana_source:GrafanaSource",
"glue = datahub.ingestion.source.aws.glue:GlueSource",
"sagemaker = datahub.ingestion.source.aws.sagemaker:SagemakerSource",
"hana = datahub.ingestion.source.sql.hana:HanaSource",
Expand Down
18 changes: 14 additions & 4 deletions metadata-ingestion/src/datahub/ingestion/run/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,8 @@ def run(self) -> None:
)
)

stack.enter_context(self.sink)

self.final_status = PipelineStatus.UNKNOWN
self._notify_reporters_on_ingestion_start()
callback = None
Expand All @@ -460,7 +462,17 @@ def run(self) -> None:
if not self.dry_run:
self.sink.handle_work_unit_start(wu)
try:
record_envelopes = self.extractor.get_records(wu)
# Most of this code is meant to be fully stream-based instead of generating all records into memory.
# However, the extractor in particular will never generate a particularly large list. We want the
# exception reporting to be associated with the source, and not the transformer. As such, we
# need to materialize the generator returned by get_records().
record_envelopes = list(self.extractor.get_records(wu))
except Exception as e:
self.source.get_report().failure(
"Source produced bad metadata", context=wu.id, exc=e
)
continue
try:
for record_envelope in self.transform(record_envelopes):
if not self.dry_run:
try:
Expand All @@ -482,9 +494,9 @@ def run(self) -> None:
)
# TODO: Transformer errors should cause the pipeline to fail.

self.extractor.close()
if not self.dry_run:
self.sink.handle_work_unit_end(wu)
self.extractor.close()
self.source.close()
# no more data is coming, we need to let the transformers produce any additional records if they are holding on to state
for record_envelope in self.transform(
Expand Down Expand Up @@ -518,8 +530,6 @@ def run(self) -> None:

self._notify_reporters_on_ingestion_completion()

self.sink.close()

def transform(self, records: Iterable[RecordEnvelope]) -> Iterable[RecordEnvelope]:
"""
Transforms the given sequence of records by passing the records through the transformers
Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
from typing import Iterable, List, Optional

import requests
from pydantic import Field, SecretStr

import datahub.emitter.mce_builder as builder
from datahub.configuration.source_common import PlatformInstanceConfigMixin
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.decorators import (
SupportStatus,
config_class,
platform_name,
support_status,
)
from datahub.ingestion.api.source import MetadataWorkUnitProcessor
from datahub.ingestion.api.source_helpers import auto_workunit
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StaleEntityRemovalHandler,
StaleEntityRemovalSourceReport,
StatefulIngestionConfigBase,
)
from datahub.ingestion.source.state.stateful_ingestion_base import (
StatefulIngestionReport,
StatefulIngestionSourceBase,
)
from datahub.metadata.com.linkedin.pegasus2avro.common import ChangeAuditStamps
from datahub.metadata.schema_classes import DashboardInfoClass, StatusClass


class GrafanaSourceConfig(StatefulIngestionConfigBase, PlatformInstanceConfigMixin):
url: str = Field(
default="",
description="Grafana URL in the format http://your-grafana-instance with no trailing slash",
)
service_account_token: SecretStr = Field(
description="Service account token for Grafana"
)


class GrafanaReport(StaleEntityRemovalSourceReport):
pass


@platform_name("Grafana")
@config_class(GrafanaSourceConfig)
@support_status(SupportStatus.TESTING)
class GrafanaSource(StatefulIngestionSourceBase):
"""
This is an experimental source for Grafana.
Currently only ingests dashboards (no charts)
"""

def __init__(self, config: GrafanaSourceConfig, ctx: PipelineContext):
super().__init__(config, ctx)
self.source_config = config
self.report = GrafanaReport()
self.platform = "grafana"

@classmethod
def create(cls, config_dict, ctx):
config = GrafanaSourceConfig.parse_obj(config_dict)
return cls(config, ctx)

def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
return [
*super().get_workunit_processors(),
StaleEntityRemovalHandler.create(
self, self.source_config, self.ctx
).workunit_processor,
]

def get_report(self) -> StatefulIngestionReport:
return self.report

def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
headers = {
"Authorization": f"Bearer {self.source_config.service_account_token.get_secret_value()}",
"Content-Type": "application/json",
}
try:
response = requests.get(
f"{self.source_config.url}/api/search", headers=headers
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

general best practice is to create a request.Session and then use that everywhere - should be ok here though since it only makes one request

response.raise_for_status()
except requests.exceptions.RequestException as e:
self.report.report_failure(f"Failed to fetch dashboards: {str(e)}")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
self.report.report_failure(f"Failed to fetch dashboards: {str(e)}")
self.report.failure("Failed to fetch dashboards", exc=e)")

return
res_json = response.json()
for item in res_json:
uid = item["uid"]
title = item["title"]
url_path = item["url"]
full_url = f"{self.source_config.url}{url_path}"
dashboard_urn = builder.make_dashboard_urn(
platform=self.platform,
name=uid,
platform_instance=self.source_config.platform_instance,
)

yield from auto_workunit(
MetadataChangeProposalWrapper.construct_many(
entityUrn=dashboard_urn,
aspects=[
DashboardInfoClass(
description="",
title=title,
charts=[],
lastModified=ChangeAuditStamps(),
externalUrl=full_url,
customProperties={
key: str(value)
for key, value in {
"displayName": title,
"id": item["id"],
"uid": uid,
"title": title,
"uri": item["uri"],
"type": item["type"],
"folderId": item.get("folderId"),
"folderUid": item.get("folderUid"),
"folderTitle": item.get("folderTitle"),
}.items()
if value is not None
},
),
StatusClass(removed=False),
],
)
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
{
"id": null,
"uid": "default",
"title": "Default Dashboard",
"tags": [],
"timezone": "browser",
"schemaVersion": 16,
"version": 0,
"panels": [
{
"type": "text",
"title": "Welcome",
"gridPos": {
"x": 0,
"y": 0,
"w": 24,
"h": 5
},
"options": {
"content": "Welcome to your Grafana dashboard!",
"mode": "markdown"
}
}
]
}
32 changes: 32 additions & 0 deletions metadata-ingestion/tests/integration/grafana/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
version: '3.7'

services:
grafana:
image: grafana/grafana:latest
container_name: grafana
ports:
- "3000:3000"
environment:
- GF_SECURITY_ADMIN_PASSWORD=admin
- GF_SECURITY_ADMIN_USER=admin
- GF_PATHS_PROVISIONING=/etc/grafana/provisioning
volumes:
- grafana-storage:/var/lib/grafana
- ./provisioning:/etc/grafana/provisioning
- ./default-dashboard.json:/var/lib/grafana/dashboards/default-dashboard.json
depends_on:
- postgres

postgres:
image: postgres:13
container_name: grafana-postgres
environment:
POSTGRES_DB: grafana
POSTGRES_USER: grafana
POSTGRES_PASSWORD: grafana
volumes:
- postgres-storage:/var/lib/postgresql/data

volumes:
grafana-storage:
postgres-storage:
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
[
{
"entityType": "dashboard",
"entityUrn": "urn:li:dashboard:(grafana,default)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1720785600000,
"runId": "grafana-test-simple",
"lastRunId": "no-run-id-provided"
}
}
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
api_keys:
- name: 'example-api-key'
role: 'Admin'
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
apiVersion: 1

providers:
- name: 'default'
orgId: 1
folder: ''
type: file
disableDeletion: false
updateIntervalSeconds: 10
options:
path: /var/lib/grafana/dashboards
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
apiVersion: 1

datasources:
- name: PostgreSQL
type: postgres
access: proxy
url: postgres:5432
database: grafana
user: grafana
password: grafana
jsonData:
sslmode: disable
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
service_accounts:
- name: 'example-service-account'
role: 'Admin'
apiKeys:
- keyName: 'example-api-key'
role: 'Admin'
Loading
Loading