Skip to content

Commit

Permalink
feat(ingest): framework - client side changes for monitoring and repo…
Browse files Browse the repository at this point in the history
…rting (#3807)
  • Loading branch information
rslanka authored Feb 2, 2022
1 parent 78d35f9 commit f20382f
Show file tree
Hide file tree
Showing 32 changed files with 2,100 additions and 559 deletions.
7 changes: 5 additions & 2 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,8 +312,11 @@ def get_long_description():
"datahub-kafka = datahub.ingestion.sink.datahub_kafka:DatahubKafkaSink",
"datahub-rest = datahub.ingestion.sink.datahub_rest:DatahubRestSink",
],
"datahub.ingestion.state_provider.plugins": [
"datahub = datahub.ingestion.source.state_provider.datahub_ingestion_state_provider:DatahubIngestionStateProvider",
"datahub.ingestion.checkpointing_provider.plugins": [
"datahub = datahub.ingestion.source.state_provider.datahub_ingestion_checkpointing_provider:DatahubIngestionCheckpointingProvider",
],
"datahub.ingestion.reporting_provider.plugins": [
"datahub = datahub.ingestion.reporting.datahub_ingestion_reporting_provider:DatahubIngestionReportingProvider",
],
"apache_airflow_provider": ["provider_info=datahub_provider:get_provider_info"],
}
Expand Down
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
95 changes: 95 additions & 0 deletions metadata-ingestion/source_docs/reporting_telemetry.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
# Datahub's Reporting Framework for Ingestion Job Telemetry
The Datahub's reporting framework allows for configuring reporting providers with the ingestion pipelines to send
telemetry about the ingestion job runs to external systems for monitoring purposes. It is powered by the Datahub's
stateful ingestion framework. The `datahub` reporting provider comes with the standard client installation,
and allows for reporting ingestion job telemetry to the datahub backend as the destination.

**_NOTE_**: This feature requires the server to be `statefulIngestion` capable.
This is a feature of metadata service with version >= `0.8.20`.

To check if you are running a stateful ingestion capable server:
```console
curl http://<datahub-gms-endpoint>/config

{
models: { },
statefulIngestionCapable: true, # <-- this should be present and true
retention: "true",
noCode: "true"
}
```

## Config details
The ingestion reporting providers are a list of reporting provider configurations under the `reporting` config
param of the pipeline, each reporting provider configuration begin a type and config pair object. The telemetry data will
be sent to all the reporting providers in this list.

Note that a `.` is used to denote nested fields, and `[idx]` is used to denote an element of an array of objects in the YAML recipe.

| Field | Required | Default | Description |
|-------------------------| -------- |------------------------------------------------------------------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------|
| `reporting[idx].type` || `datahub` | The type of the ingestion reporting provider registered with datahub. |
| `reporting[idx].config` | | The `datahub_api` config if set at pipeline level. Otherwise, the default `DatahubClientConfig`. See the [defaults](https://github.com/linkedin/datahub/blob/master/metadata-ingestion/src/datahub/ingestion/graph/client.py#L19) here. | The configuration required for initializing the datahub reporting provider. |
| `pipeline_name` || | The name of the ingestion pipeline. This is used as a part of the identifying key for the telemetry data reported by each job in the ingestion pipeline. |

#### Supported sources
* All sql based sources.
* snowflake_usage.
#### Sample configuration
```yaml
source:
type: "snowflake"
config:
username: <user_name>
password: <password>
role: <role>
host_port: <host_port>
warehouse: <ware_house>
# Rest of the source specific params ...
# This is mandatory. Changing it will cause old telemetry correlation to be lost.
pipeline_name: "my_snowflake_pipeline_1"

# Pipeline-level datahub_api configuration.
datahub_api: # Optional. But if provided, this config will be used by the "datahub" ingestion state provider.
server: "http://localhost:8080"

sink:
type: "datahub-rest"
config:
server: 'http://localhost:8080'

reporting:
- type: "datahub" # Required
config: # Optional.
datahub_api: # default value
server: "http://localhost:8080"
```
## Reporting Ingestion State Provider (Developer Guide)
An ingestion reporting state provider is responsible for saving and retrieving the ingestion telemetry
associated with the ingestion runs of various jobs inside the source connector of the ingestion pipeline.
The data model used for capturing the telemetry is [DatahubIngestionRunSummary](https://github.com/linkedin/datahub/blob/master/metadata-models/src/main/pegasus/com/linkedin/datajob/datahub/DatahubIngestionRunSummary.pdl).
A reporting ingestion state provider needs to implement the [IngestionReportingProviderBase](https://github.com/linkedin/datahub/blob/master/metadata-ingestion/src/datahub/ingestion/api/ingestion_job_reporting_provider_base.py)
interface and register itself with datahub by adding an entry under `datahub.ingestion.checkpointing_provider.plugins`
key of the entry_points section in [setup.py](https://github.com/linkedin/datahub/blob/master/metadata-ingestion/setup.py)
with its type and implementation class as shown below.
```python
entry_points = {
# <snip other keys>"
"datahub.ingestion.checkpointing_provider.plugins": [
"datahub = datahub.ingestion.source.state_provider.datahub_ingestion_checkpointing_provider:DatahubIngestionCheckpointingProvider",
],
}
```

### Datahub Reporting Ingestion State Provider
This is the reporting state provider implementation that is available out of the box in datahub. Its type is `datahub` and it is implemented on top
of the `datahub_api` client and the timeseries aspect capabilities of the datahub-backend.
#### Config details

Note that a `.` is used to denote nested fields in the YAML recipe.

| Field | Required | Default | Description |
|----------------------------------------------------------|----------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-----------------------------------------------------------------------------|
| `type` | ✅ | `datahub` | The type of the ingestion reporting provider registered with datahub. |
| `config` | | The `datahub_api` config if set at pipeline level. Otherwise, the default `DatahubClientConfig`. See the [defaults](https://github.com/linkedin/datahub/blob/master/metadata-ingestion/src/datahub/ingestion/graph/client.py#L19) here. | The configuration required for initializing the datahub reporting provider. |
25 changes: 14 additions & 11 deletions metadata-ingestion/source_docs/stateful_ingestion.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,11 @@ NOTE: If either `dry-run` or `preview` mode are set, stateful ingestion will be
## Use-cases powered by stateful ingestion.
Following is the list of current use-cases powered by stateful ingestion in datahub.
### Removal of stale tables and views.
Stateful ingestion can be used to automatically soft delete the tables and views that are seen in a previous run
Stateful ingestion can be used to automatically soft-delete the tables and views that are seen in a previous run
but absent in the current run (they are either deleted or no longer desired).

![Stale Metadata Deletion](./images/stale_metadata_deletion.png)

#### Supported sources
* All sql based sources.
#### Additional config details
Expand Down Expand Up @@ -124,22 +127,22 @@ sink:
server: 'http://localhost:8080'
```

## The Ingestion State Provider
The ingestion state provider is responsible for saving and retrieving the ingestion state associated with the ingestion runs
of various jobs inside the source connector of the ingestion pipeline. An ingestion state provider needs to implement the
[IngestionStateProvider](https://github.com/linkedin/datahub/blob/master/metadata-ingestion/src/datahub/ingestion/api/ingestion_state_provider.py) interface and
register itself with datahub by adding an entry under `datahub.ingestion.state_provider.plugins` key of the entry_points section in [setup.py](https://github.com/linkedin/datahub/blob/master/metadata-ingestion/setup.py) with its type and implementation class as shown below.
## The Checkpointing Ingestion State Provider (Developer Guide)
The ingestion checkpointing state provider is responsible for saving and retrieving the ingestion checkpoint state associated with the ingestion runs
of various jobs inside the source connector of the ingestion pipeline. The checkpointing data model is [DatahubIngestionCheckpoint](https://github.com/linkedin/datahub/blob/master/metadata-models/src/main/pegasus/com/linkedin/datajob/datahub/DatahubIngestionCheckpoint.pdl) and it supports any custom state to be stored using the [IngestionCheckpointState](https://github.com/linkedin/datahub/blob/master/metadata-models/src/main/pegasus/com/linkedin/datajob/datahub/IngestionCheckpointState.pdl#L9). A checkpointing ingestion state provider needs to implement the
[IngestionCheckpointingProviderBase](https://github.com/linkedin/datahub/blob/master/metadata-ingestion/src/datahub/ingestion/api/ingestion_job_checkpointing_provider_base.py) interface and
register itself with datahub by adding an entry under `datahub.ingestion.checkpointing_provider.plugins` key of the entry_points section in [setup.py](https://github.com/linkedin/datahub/blob/master/metadata-ingestion/setup.py) with its type and implementation class as shown below.
```python
entry_points = {
# <snip other keys>"
"datahub.ingestion.state_provider.plugins": [
"datahub = datahub.ingestion.source.state_provider.datahub_ingestion_state_provider:DatahubIngestionStateProvider",
]
"datahub.ingestion.checkpointing_provider.plugins": [
"datahub = datahub.ingestion.source.state_provider.datahub_ingestion_checkpointing_provider:DatahubIngestionCheckpointingProvider",
],
}
```

### Datahub Ingestion State Provider
This is the state provider implementation that is avialble out of the box. It's type is `datahub` and it is implemented on top
### Datahub Checkpointing Ingestion State Provider
This is the state provider implementation that is available out of the box. Its type is `datahub` and it is implemented on top
of the `datahub_api` client and the timeseries aspect capabilities of the datahub-backend.
#### Config details

Expand Down
67 changes: 67 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/api/committable.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
from abc import ABC, abstractmethod
from dataclasses import dataclass
from enum import Enum, auto
from typing import Generic, List, Optional, TypeVar


class CommitPolicy(Enum):
ALWAYS = auto
ON_NO_ERRORS = auto
ON_NO_ERRORS_AND_NO_WARNINGS = auto


@dataclass
class _CommittableConcrete:
name: str
commit_policy: CommitPolicy
committed: bool


# The concrete portion Committable is separated from the abstract portion due to
# https://github.com/python/mypy/issues/5374#issuecomment-568335302.
class Committable(_CommittableConcrete, ABC):
def __init__(self, name: str, commit_policy: CommitPolicy):
super(Committable, self).__init__(name, commit_policy, committed=False)

@abstractmethod
def commit(self) -> None:
pass


StateKeyType = TypeVar("StateKeyType")
StateType = TypeVar("StateType")
# TODO: Add a better alternative to a string for the filter.
FilterType = TypeVar("FilterType")


@dataclass
class _StatefulCommittableConcrete(Generic[StateType]):
state_to_commit: StateType


class StatefulCommittable(
Committable,
_StatefulCommittableConcrete[StateType],
Generic[StateKeyType, StateType, FilterType],
):
def __init__(
self, name: str, commit_policy: CommitPolicy, state_to_commit: StateType
):
# _ConcreteCommittable will be the first from this class.
super(StatefulCommittable, self).__init__(
name=name, commit_policy=commit_policy
)
# _StatefulCommittableConcrete will be after _CommittableConcrete in the __mro__.
super(_CommittableConcrete, self).__init__(state_to_commit=state_to_commit)

def has_successfully_committed(self) -> bool:
return True if not self.state_to_commit or self.committed else False

@abstractmethod
def get_previous_states(
self,
state_key: StateKeyType,
last_only: bool = True,
filter_opt: Optional[FilterType] = None,
) -> List[StateType]:
pass
29 changes: 28 additions & 1 deletion metadata-ingestion/src/datahub/ingestion/api/common.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from abc import ABCMeta, abstractmethod
from dataclasses import dataclass
from typing import Generic, Optional, TypeVar
from typing import Dict, Generic, Iterable, Optional, Tuple, TypeVar

from datahub.ingestion.api.committable import Committable
from datahub.ingestion.graph.client import DatahubClientConfig, DataHubGraph

T = TypeVar("T")
Expand Down Expand Up @@ -41,3 +42,29 @@ def __init__(
self.pipeline_name = pipeline_name
self.dry_run_mode = dry_run
self.preview_mode = preview_mode
self.reporters: Dict[str, Committable] = dict()
self.checkpointers: Dict[str, Committable] = dict()

def register_checkpointer(self, committable: Committable) -> None:
if committable.name in self.checkpointers:
raise IndexError(
f"Checkpointing provider {committable.name} already registered."
)
self.checkpointers[committable.name] = committable

def register_reporter(self, committable: Committable) -> None:
if committable.name in self.reporters:
raise IndexError(
f"Reporting provider {committable.name} already registered."
)
self.reporters[committable.name] = committable

def get_reporters(self) -> Iterable[Committable]:
for committable in self.reporters.values():
yield committable

def get_committables(self) -> Iterable[Tuple[str, Committable]]:
for reporting_item_commitable in self.reporters.items():
yield reporting_item_commitable
for checkpointing_item_commitable in self.checkpointers.items():
yield checkpointing_item_commitable
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
from dataclasses import dataclass
from typing import Any, Dict, List, Optional

from datahub.ingestion.api.committable import CommitPolicy
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.ingestion_job_state_provider import (
IngestionJobStateProvider,
IngestionJobStateProviderConfig,
JobId,
JobStateFilterType,
JobStateKey,
JobStatesMap,
)
from datahub.metadata.schema_classes import DatahubIngestionCheckpointClass

#
# Common type exports
#
JobId = JobId
JobStateKey = JobStateKey
JobStateFilterType = JobStateFilterType

#
# Checkpoint state specific types
#
CheckpointJobStateType = DatahubIngestionCheckpointClass
CheckpointJobStatesMap = JobStatesMap[CheckpointJobStateType]


class IngestionCheckpointingProviderConfig(IngestionJobStateProviderConfig):
pass


@dataclass()
class IngestionCheckpointingProviderBase(
IngestionJobStateProvider[CheckpointJobStateType]
):
"""
The base class(non-abstract) for all checkpointing state provider implementations.
This class is implemented this way as a concrete class is needed to work with the registry,
but we don't want to implement any of the functionality yet.
"""

def __init__(
self, name: str, commit_policy: CommitPolicy = CommitPolicy.ON_NO_ERRORS
):
super(IngestionCheckpointingProviderBase, self).__init__(name, commit_policy)

@classmethod
def create(
cls, config_dict: Dict[str, Any], ctx: PipelineContext, name: str
) -> "IngestionJobStateProvider":
raise NotImplementedError("Sub-classes must override this method.")

def get_previous_states(
self,
state_key: JobStateKey,
last_only: bool = True,
filter_opt: Optional[JobStateFilterType] = None,
) -> List[CheckpointJobStatesMap]:
raise NotImplementedError("Sub-classes must override this method.")

def commit(self) -> None:
raise NotImplementedError("Sub-classes must override this method.")
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
from dataclasses import dataclass
from typing import Any, Dict, List, Optional

from datahub.ingestion.api.committable import CommitPolicy
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.ingestion_job_state_provider import (
IngestionJobStateProvider,
IngestionJobStateProviderConfig,
JobId,
JobStateFilterType,
JobStateKey,
JobStatesMap,
)
from datahub.metadata.schema_classes import DatahubIngestionRunSummaryClass

#
# Common type exports
#
JobId = JobId
JobStateKey = JobStateKey
JobStateFilterType = JobStateFilterType

#
# Reporting state specific types
#
ReportingJobStateType = DatahubIngestionRunSummaryClass
ReportingJobStatesMap = JobStatesMap[ReportingJobStateType]


class IngestionReportingProviderConfig(IngestionJobStateProviderConfig):
pass


@dataclass()
class IngestionReportingProviderBase(IngestionJobStateProvider[ReportingJobStateType]):
"""
The base class(non-abstract) for all reporting state provider implementations.
This class is implemented this way as a concrete class is needed to work with the registry,
but we don't want to implement any of the functionality yet.
"""

def __init__(self, name: str, commit_policy: CommitPolicy = CommitPolicy.ALWAYS):
super(IngestionReportingProviderBase, self).__init__(name, commit_policy)

@classmethod
def create(
cls, config_dict: Dict[str, Any], ctx: PipelineContext, name: str
) -> "IngestionJobStateProvider":
raise NotImplementedError("Sub-classes must override this method.")

def get_previous_states(
self,
state_key: JobStateKey,
last_only: bool = True,
filter_opt: Optional[JobStateFilterType] = None,
) -> List[ReportingJobStatesMap]:
raise NotImplementedError("Sub-classes must override this method.")

def commit(self) -> None:
raise NotImplementedError("Sub-classes must override this method.")
Loading

0 comments on commit f20382f

Please sign in to comment.