Skip to content

Commit

Permalink
feat(cli): improve error reporting, make sink config optional (#4718)
Browse files Browse the repository at this point in the history
  • Loading branch information
shirshanka authored Apr 25, 2022
1 parent b1b1898 commit a518e3d
Show file tree
Hide file tree
Showing 10 changed files with 166 additions and 66 deletions.
24 changes: 11 additions & 13 deletions docs-website/sidebars.js
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,6 @@ module.exports = {
{
Sinks: list_ids_in_directory("metadata-ingestion/sink_docs"),
},
{
"Custom Integrations": [
"metadata-ingestion/as-a-library",
"metadata-integration/java/as-a-library",
"metadata-ingestion/integration_docs/great-expectations",
"metadata-integration/java/datahub-protobuf/README",
],
},
{
Scheduling: [
"metadata-ingestion/schedule_docs/intro",
Expand All @@ -94,14 +86,20 @@ module.exports = {
],
},
{
Lineage: [
"docs/lineage/intro",
"docs/lineage/airflow",
"docker/airflow/local_airflow",
"docs/lineage/sample_code",
"Push-Based Integrations": [
{
Airflow: ["docs/lineage/airflow", "docker/airflow/local_airflow"],
},
"metadata-integration/java/spark-lineage/README",
"metadata-ingestion/integration_docs/great-expectations",
"metadata-integration/java/datahub-protobuf/README",
"metadata-ingestion/as-a-library",
"metadata-integration/java/as-a-library",
],
},
{
Lineage: ["docs/lineage/intro", "docs/lineage/sample_code"],
},
{
Guides: [
"metadata-ingestion/adding-source",
Expand Down
16 changes: 11 additions & 5 deletions docs/lineage/airflow.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
# Lineage with Airflow
# Airflow Integration

There's a couple ways to get lineage information from Airflow into DataHub.
DataHub supports integration of
- Airflow Pipeline (DAG) metadata
- DAG and Task run information as well as
- Lineage information when present

There are a few ways to enable these integrations from Airflow into DataHub.


## Using Datahub's Airflow lineage backend (recommended)
Expand All @@ -11,9 +16,10 @@ The Airflow lineage backend is only supported in Airflow 1.10.15+ and 2.0.2+.

:::

## Running on Docker locally
:::note

If you are looking to run Airflow and DataHub using docker locally, follow the guide [here](../../docker/airflow/local_airflow.md). Otherwise proceed to follow the instructions below.
:::

## Setting up Airflow to use DataHub as Lineage Backend

Expand Down Expand Up @@ -49,7 +55,7 @@ If you are looking to run Airflow and DataHub using docker locally, follow the g
- `cluster` (defaults to "prod"): The "cluster" to associate Airflow DAGs and tasks with.
- `capture_ownership_info` (defaults to true): If true, the owners field of the DAG will be capture as a DataHub corpuser.
- `capture_tags_info` (defaults to true): If true, the tags field of the DAG will be captured as DataHub tags.
- `capture_executions` (defaults to false): If true, it captures task runs as DataHub DatprocessInstances. **This feature only works with Datahub GMS version v0.8.33 or greater.**
- `capture_executions` (defaults to false): If true, it captures task runs as DataHub DataProcessInstances. **This feature only works with Datahub GMS version v0.8.33 or greater.**
- `graceful_exceptions` (defaults to true): If set to true, most runtime errors in the lineage backend will be suppressed and will not cause the overall task to fail. Note that configuration issues will still throw exceptions.
4. Configure `inlets` and `outlets` for your Airflow operators. For reference, look at the sample DAG in [`lineage_backend_demo.py`](../../metadata-ingestion/src/datahub_provider/example_dags/lineage_backend_demo.py), or reference [`lineage_backend_taskflow_demo.py`](../../metadata-ingestion/src/datahub_provider/example_dags/lineage_backend_taskflow_demo.py) if you're using the [TaskFlow API](https://airflow.apache.org/docs/apache-airflow/stable/concepts/taskflow.html).
5. [optional] Learn more about [Airflow lineage](https://airflow.apache.org/docs/apache-airflow/stable/lineage.html), including shorthand notation and some automation.
Expand All @@ -60,4 +66,4 @@ Take a look at this sample DAG:

- [`lineage_emission_dag.py`](../../metadata-ingestion/src/datahub_provider/example_dags/lineage_emission_dag.py) - emits lineage using the DatahubEmitterOperator.

In order to use this example, you must first configure the Datahub hook. Like in ingestion, we support a Datahub REST hook and a Kafka-based hook. See step 1 above for details.
In order to use this example, you must first configure the Datahub hook. Like in ingestion, we support a Datahub REST hook and a Kafka-based hook. See step 1 above for details.
94 changes: 68 additions & 26 deletions metadata-ingestion/README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,28 @@
# Introduction to Metadata Ingestion

![Python version 3.6+](https://img.shields.io/badge/python-3.6%2B-blue)
## Integration Options

### Metadata Ingestion Source Status
DataHub supports both **push-based** and **pull-based** metadata integration.

Push-based integrations allow you to emit metadata directly from your data systems when metadata changes, while pull-based integrations allow you to "crawl" or "ingest" metadata from the data systems by connecting to them and extracting metadata in a batch or incremental-batch manner. Supporting both mechanisms means that you can integrate with all your systems in the most flexible way possible.

Examples of push-based integrations include [Airflow](../docs/lineage/airflow.md), [Spark](../metadata-integration/java/spark-lineage/README.md), [Great Expectations](./integration_docs/great-expectations.md) and [Protobuf Schemas](../metadata-integration/java/datahub-protobuf/README.md). This allows you to get low-latency metadata integration from the "active" agents in your data ecosystem. Examples of pull-based integrations include BigQuery, Snowflake, Looker, Tableau and many others.

This document describes the pull-based metadata ingestion system that is built into DataHub for easy integration with a wide variety of sources in your data stack.

## Getting Started

### Prerequisites

Before running any metadata ingestion job, you should make sure that DataHub backend services are all running. You can either run ingestion via the [UI](../docs/ui-ingestion.md) or via the [CLI](../docs/cli.md). You can reference the CLI usage guide given there as you go through this page.

## Core Concepts

### Sources

Data systems that we are extracting metadata from are referred to as **Sources**. The `Sources` tab on the left in the sidebar shows you all the sources that are available for you to ingest metadata from. For example, we have sources for [BigQuery](./source_docs/bigquery.md), [Looker](./source_docs/looker.md), [Tableau](./source_docs/tableau.md) and many others.

#### Metadata Ingestion Source Status

We apply a Support Status to each Metadata Source to help you understand the integration reliability at a glance.

Expand All @@ -12,23 +32,24 @@ We apply a Support Status to each Metadata Source to help you understand the int

![Testing](https://img.shields.io/badge/support%20status-testing-lightgrey): Testing Sources are available for experiementation by DataHub Community members, but may change without notice.

## Getting Started
### Sinks

### Prerequisites
Sinks are destinations for metadata. When configuring ingestion for DataHub, you're likely to be sending the metadata to DataHub over either the [REST (datahub-sink)](./sink_docs/datahub.md#datahub-rest) or the [Kafka (datahub-kafka)](./sink_docs/datahub.md#datahub-kafka) sink. In some cases, the [File](./sink_docs/file.md) sink is also helpful to store a persistent offline copy of the metadata during debugging.

Before running any metadata ingestion job, you should make sure that DataHub backend services are all running. If you are trying this out locally check out the [CLI](../docs/cli.md) to install the CLI and understand the options available in the CLI. You can reference the CLI usage guide given there as you go through this page.
The default sink that most of the ingestion systems and guides assume is the `datahub-rest` sink, but you should be able to adapt all of them for the other sinks as well!

### Core Concepts
### Recipes

## Recipes
A recipe is the main configuration file that puts it all together. It tells our ingestion scripts where to pull data from (source) and where to put it (sink).

A recipe is a configuration file that tells our ingestion scripts where to pull data from (source) and where to put it (sink).
Here's a simple example that pulls metadata from MSSQL (source) and puts it into datahub rest (sink).
Since `acryl-datahub` version `>=0.8.33.2`, the default sink is assumed to be a DataHub REST endpoint:
- Hosted at "http://localhost:8080" or the environment variable `${DATAHUB_HOST}` if present
- With an empty auth token or the environment variable `${DATAHUB_TOKEN}` if present.

> Note that one recipe file can only have 1 source and 1 sink. If you want multiple sources then you will need multiple recipe files.
Here's a simple recipe that pulls metadata from MSSQL (source) and puts it into the default sink (datahub rest).

```yaml
# A sample recipe that pulls metadata from MSSQL and puts it into DataHub
# The simplest recipe that pulls metadata from MSSQL and puts it into DataHub
# using the Rest API.
source:
type: mssql
Expand All @@ -37,20 +58,23 @@ source:
password: ${MSSQL_PASSWORD}
database: DemoData

transformers:
- type: "fully-qualified-class-name-of-transformer"
config:
some_property: "some.value"
# sink section omitted as we want to use the default datahub-rest sink
```

Running this recipe is as simple as:
```shell
datahub ingest -c recipe.yaml
```

sink:
type: "datahub-rest"
config:
server: "http://localhost:8080"
or if you want to override the default endpoints, you can provide the environment variables as part of the command like below:
```shell
DATAHUB_SERVER="https://my-datahub-server:8080" DATAHUB_TOKEN="my-datahub-token" datahub ingest -c recipe.yaml
```

A number of recipes are included in the [examples/recipes](./examples/recipes) directory. For full info and context on each source and sink, see the pages described in the [table of plugins](../docs/cli.md#installing-plugins).

> Note that one recipe file can only have 1 source and 1 sink. If you want multiple sources then you will need multiple recipe files.
### Handling sensitive information in recipes

We automatically expand environment variables in the config (e.g. `${MSSQL_PASSWORD}`),
Expand All @@ -64,8 +88,8 @@ pip install 'acryl-datahub[datahub-rest]' # install the required plugin
datahub ingest -c ./examples/recipes/mssql_to_datahub.yml
```

The `--dry-run` option of the `ingest` command performs all of the ingestion steps, except writing to the sink. This is useful to ensure that the
ingestion recipe is producing the desired workunits before ingesting them into datahub.
The `--dry-run` option of the `ingest` command performs all of the ingestion steps, except writing to the sink. This is useful to validate that the
ingestion recipe is producing the desired metadata events before ingesting them into datahub.

```shell
# Dry run
Expand Down Expand Up @@ -100,19 +124,37 @@ datahub ingest -c ./examples/recipes/example_to_datahub_rest.yml --suppress-erro

## Transformations

If you'd like to modify data before it reaches the ingestion sinks – for instance, adding additional owners or tags – you can use a transformer to write your own module and integrate it with DataHub.
If you'd like to modify data before it reaches the ingestion sinks – for instance, adding additional owners or tags – you can use a transformer to write your own module and integrate it with DataHub. Transformers require extending the recipe with a new section to describe the transformers that you want to run.

For example, a pipeline that ingests metadata from MSSQL and applies a default "important" tag to all datasets is described below:
```yaml
# A recipe to ingest metadata from MSSQL and apply default tags to all tables
source:
type: mssql
config:
username: sa
password: ${MSSQL_PASSWORD}
database: DemoData

transformers: # an array of transformers applied sequentially
- type: simple_add_dataset_tags
config:
tag_urns:
- "urn:li:tag:Important"

# default sink, no config needed
```

Check out the [transformers guide](./transformers.md) for more info!
Check out the [transformers guide](./transformers.md) to learn more about how you can create really flexible pipelines for processing metadata using Transformers!

## Using as a library
## Using as a library (SDK)

In some cases, you might want to construct Metadata events directly and use programmatic ways to emit that metadata to DataHub. In this case, take a look at the [Python emitter](./as-a-library.md) and the [Java emitter](../metadata-integration/java/as-a-library.md) libraries which can be called from your own code.

### Programmatic Pipeline
In some cases, you might want to configure and run a pipeline entirely from within your custom python script. Here is an example of how to do it.
In some cases, you might want to configure and run a pipeline entirely from within your custom Python script. Here is an example of how to do it.
- [programmatic_pipeline.py](./examples/library/programatic_pipeline.py) - a basic mysql to REST programmatic pipeline.


## Developing

See the guides on [developing](./developing.md), [adding a source](./adding-source.md) and [using transformers](./transformers.md).
Expand Down
7 changes: 3 additions & 4 deletions metadata-ingestion/integration_docs/great-expectations.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
# Data Quality with Great Expectations

This guide helps to setup and configure `DataHubValidationAction` in Great Expectations to send assertions(expectations) and their results to Datahub using Datahub's Python Rest emitter.
# Great Expectations

This guide helps to setup and configure `DataHubValidationAction` in Great Expectations to send assertions(expectations) and their results to DataHub using DataHub's Python Rest emitter.

## Capabilities

Expand Down Expand Up @@ -51,4 +50,4 @@ This integration does not support
## Learn more
To see the Great Expectations in action, check out [this demo](https://www.loom.com/share/d781c9f0b270477fb5d6b0c26ef7f22d) from the Feb 2022 townhall.
To see the Great Expectations in action, check out [this demo](https://www.loom.com/share/d781c9f0b270477fb5d6b0c26ef7f22d) from the Feb 2022 townhall.
22 changes: 14 additions & 8 deletions metadata-ingestion/src/datahub/entrypoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import click
import stackprinter
from pydantic import ValidationError

import datahub as datahub_package
from datahub.cli.check_cli import check
Expand All @@ -18,6 +19,7 @@
from datahub.cli.telemetry import telemetry as telemetry_cli
from datahub.cli.timeline_cli import timeline
from datahub.configuration import SensitiveError
from datahub.configuration.common import ConfigurationError
from datahub.telemetry import telemetry
from datahub.utilities.server_config_util import get_gms_config

Expand Down Expand Up @@ -149,15 +151,19 @@ def main(**kwargs):
kwargs = {"show_vals": None}
exc = sensitive_cause

logger.error(
stackprinter.format(
exc,
line_wrap=MAX_CONTENT_WIDTH,
truncate_vals=10 * MAX_CONTENT_WIDTH,
suppressed_paths=[r"lib/python.*/site-packages/click/"],
**kwargs,
# suppress stack printing for common configuration errors
if isinstance(exc, (ConfigurationError, ValidationError)):
logger.error(exc)
else:
logger.error(
stackprinter.format(
exc,
line_wrap=MAX_CONTENT_WIDTH,
truncate_vals=10 * MAX_CONTENT_WIDTH,
suppressed_paths=[r"lib/python.*/site-packages/click/"],
**kwargs,
)
)
)
logger.info(
f"DataHub CLI version: {datahub_package.__version__} at {datahub_package.__file__}"
)
Expand Down
29 changes: 24 additions & 5 deletions metadata-ingestion/src/datahub/ingestion/run/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@
from typing import Any, Dict, Iterable, List, Optional

import click
from pydantic import validator
from pydantic import root_validator, validator

from datahub.configuration import config_loader
from datahub.configuration.common import (
ConfigModel,
DynamicTypedConfig,
Expand Down Expand Up @@ -53,8 +54,8 @@ def run_id_should_be_semantic(
cls, v: Optional[str], values: Dict[str, Any], **kwargs: Any
) -> str:
if v == "__DEFAULT_RUN_ID":
if values["source"] is not None:
if values["source"].type is not None:
if "source" in values:
if hasattr(values["source"], "type"):
source_type = values["source"].type
current_time = datetime.datetime.now().strftime("%Y_%m_%d-%H_%M_%S")
return f"{source_type}-{current_time}"
Expand All @@ -64,12 +65,30 @@ def run_id_should_be_semantic(
assert v is not None
return v

@root_validator(pre=True)
def default_sink_is_datahub_rest(cls, values: Dict[str, Any]) -> Any:
if "sink" not in values:
default_sink_config = {
"type": "datahub-rest",
"config": {
"server": "${DATAHUB_SERVER:-http://localhost:8080}",
"token": "${DATAHUB_TOKEN:-}",
},
}
# resolve env variables if present
default_sink_config = config_loader.resolve_env_variables(
default_sink_config
)
values["sink"] = default_sink_config

return values

@validator("datahub_api", always=True)
def datahub_api_should_use_rest_sink_as_default(
cls, v: Optional[DatahubClientConfig], values: Dict[str, Any], **kwargs: Any
) -> Optional[DatahubClientConfig]:
if v is None:
if values["sink"].type is not None:
if "sink" in values and "type" in values["sink"]:
sink_type = values["sink"].type
if sink_type == "datahub-rest":
sink_config = values["sink"].config
Expand Down Expand Up @@ -123,7 +142,7 @@ def __init__(

sink_type = self.config.sink.type
sink_class = sink_registry.get(sink_type)
sink_config = self.config.sink.dict().get("config", {})
sink_config = self.config.sink.dict().get("config") or {}
self.sink: Sink = sink_class.create(sink_config, self.ctx)
logger.debug(f"Sink type:{self.config.sink.type},{sink_class} configured")

Expand Down
11 changes: 9 additions & 2 deletions metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from typing import Union, cast

from datahub.cli.cli_utils import set_env_variables_override_config
from datahub.configuration.common import OperationalError
from datahub.configuration.common import ConfigurationError, OperationalError
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.ingestion.api.common import PipelineContext, RecordEnvelope, WorkUnit
Expand Down Expand Up @@ -52,7 +52,14 @@ def __init__(self, ctx: PipelineContext, config: DatahubRestSinkConfig):
extra_headers=self.config.extra_headers,
ca_certificate_path=self.config.ca_certificate_path,
)
gms_config = self.emitter.test_connection()
try:
gms_config = self.emitter.test_connection()
except Exception as exc:
raise ConfigurationError(
f"💥 Failed to connect to DataHub@{self.config.server} (token:{'XXX-redacted' if self.config.token else 'empty'}) over REST",
exc,
)

self.report.gms_version = (
gms_config.get("versions", {})
.get("linkedin/datahub", {})
Expand Down
Loading

0 comments on commit a518e3d

Please sign in to comment.