Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest): add Airflow lineage backend #2368

Merged
merged 25 commits into from
Apr 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 40 additions & 9 deletions metadata-ingestion/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ The folks over at [Acryl](https://www.acryl.io/) maintain a PyPI package for Dat

```sh
# Requires Python 3.6+
python3 -m pip install --upgrade pip==20.2.4 wheel setuptools
python3 -m pip install --upgrade pip wheel setuptools
python3 -m pip uninstall datahub acryl-datahub || true # sanity check - ok if it fails
python3 -m pip install acryl-datahub
python3 -m pip install --upgrade acryl-datahub
datahub version
# If you see "command not found", try running this instead: python3 -m datahub version
```
Expand Down Expand Up @@ -91,13 +91,6 @@ _Limitation: the datahub_docker.sh convenience script assumes that the recipe an

If you'd like to install from source, see the [developer guide](./developing.md).

### Usage within Airflow

We have also included a couple [sample DAGs](./examples/airflow) that can be used with [Airflow](https://airflow.apache.org/).

- `generic_recipe_sample_dag.py` - a simple Airflow DAG that picks up a DataHub ingestion recipe configuration and runs it.
- `mysql_sample_dag.py` - an Airflow DAG that runs a MySQL metadata ingestion pipeline using an inlined configuration.

## Recipes

A recipe is a configuration file that tells our ingestion scripts where to pull data from (source) and where to put it (sink).
Expand Down Expand Up @@ -506,6 +499,44 @@ In some cases, you might want to construct the MetadataChangeEvents yourself but

For a basic usage example, see the [lineage_emitter.py](./examples/library/lineage_emitter.py) example.

## Usage with Airflow

There's a couple ways to integrate DataHub with Airflow.

### Running ingestion on a schedule

Take a look at these sample DAGs:

- [`generic_recipe_sample_dag.py`](./examples/airflow/generic_recipe_sample_dag.py) - a simple Airflow DAG that picks up a DataHub ingestion recipe configuration and runs it.
- [`mysql_sample_dag.py`](./examples/airflow/mysql_sample_dag.py) - an Airflow DAG that runs a MySQL metadata ingestion pipeline using an inlined configuration.

### Emitting lineage via a separate operator

Take a look at this sample DAG:

- [`lineage_emission_dag.py`](./examples/airflow/lineage_emission_dag.py) - emits lineage using the DatahubEmitterOperator.

In order to use this example, you must first configure the Datahub hook. Like in ingestion, we support a Datahub REST hook and a Kafka-based hook.

```sh
# For REST-based:
airflow connections add --conn-type 'datahub_rest' 'datahub_rest_default' --conn-host 'http://localhost:8080'
# For Kafka-based (standard Kafka sink config can be passed via extras):
airflow connections add --conn-type 'datahub_kafka' 'datahub_kafka_default' --conn-host 'broker:9092' --conn-extra '{}'
```

### Using Datahub's Airflow lineage backend

1. First, you must configure the Airflow hooks. See above for details.
2. Add the following lines to your `airflow.cfg` file. You might need to
```ini
[lineage]
backend = datahub.integrations.airflow.DatahubAirflowLineageBackend
datahub_conn_id = datahub_rest_default # or datahub_kafka_default - whatever you named the connection in step 1
```
3. Configure `inlets` and `outlets` for your Airflow operators. For reference, look at the sample DAG in [`lineage_backend_demo.py`](./examples/airflow/lineage_backend_demo.py).
4. [optional] Learn more about [Airflow lineage](https://airflow.apache.org/docs/apache-airflow/stable/lineage.html), including shorthand notation and some automation.

## Developing

See the [developing guide](./developing.md).
6 changes: 3 additions & 3 deletions metadata-ingestion/developing.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ The architecture of this metadata ingestion framework is heavily inspired by [Ap
```sh
python3 -m venv venv
source venv/bin/activate
pip install --upgrade pip==20.2.4 wheel setuptools
pip install --upgrade pip wheel setuptools
pip uninstall datahub || true ; rm -r src/*.egg-info || true
pip install -e .
./scripts/codegen.sh
Expand All @@ -45,7 +45,7 @@ If you've already run the pip install, but running `datahub` in your command lin
The easiest way to circumvent this is to install and run via Python, and use `python3 -m datahub` in place of `datahub`.

```sh
python3 -m pip install acryl-datahub
python3 -m pip install --upgrade acryl-datahub
python3 -m datahub --help
```

Expand All @@ -57,7 +57,7 @@ python3 -m datahub --help
This means Python's `wheel` is not installed. Try running the following commands and then retry.

```sh
pip install --upgrade pip==20.2.4 wheel setuptools
pip install --upgrade pip wheel setuptools
pip cache purge
```

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@

import yaml
from airflow import DAG
from airflow.operators.python import PythonOperator

try:
from airflow.operators.python import PythonOperator
except ImportError:
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago

from datahub.ingestion.run.pipeline import Pipeline
Expand Down Expand Up @@ -40,7 +44,6 @@ def datahub_recipe():
description="An example DAG which runs a DataHub ingestion recipe",
schedule_interval=timedelta(days=1),
start_date=days_ago(2),
tags=["datahub-ingest"],
catchup=False,
) as dag:
ingest_task = PythonOperator(
Expand Down
46 changes: 46 additions & 0 deletions metadata-ingestion/examples/airflow/lineage_backend_demo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
"""Lineage Backend

An example DAG demonstrating the usage of DataHub's Airflow lineage backend.
"""

from datetime import timedelta

from airflow import DAG
from airflow.utils.dates import days_ago

try:
from airflow.operators.bash import BashOperator
except ImportError:
from airflow.operators.bash_operator import BashOperator

from datahub.integrations.airflow.entities import Dataset

default_args = {
"owner": "airflow",
"depends_on_past": False,
"email": ["[email protected]"],
"email_on_failure": False,
"execution_timeout": timedelta(minutes=5),
}


with DAG(
"datahub_lineage_backend_demo",
default_args=default_args,
description="An example DAG demonstrating the usage of DataHub's Airflow lineage backend.",
schedule_interval=timedelta(days=1),
start_date=days_ago(2),
catchup=False,
) as dag:
task1 = BashOperator(
task_id="run_data_task",
dag=dag,
bash_command="echo 'This is where you might run your data tooling.'",
inlets={
"datasets": [
Dataset("snowflake", "mydb.schema.tableA"),
Dataset("snowflake", "mydb.schema.tableB"),
],
},
outlets={"datasets": [Dataset("snowflake", "mydb.schema.tableC")]},
)
9 changes: 5 additions & 4 deletions metadata-ingestion/examples/airflow/lineage_emission_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
description="An example DAG demonstrating lineage emission within an Airflow DAG.",
schedule_interval=timedelta(days=1),
start_date=days_ago(2),
tags=["datahub-ingest"],
catchup=False,
) as dag:
# This example shows a SnowflakeOperator followed by a lineage emission. However, the
Expand All @@ -54,14 +53,16 @@

emit_lineage_task = DatahubEmitterOperator(
task_id="emit_lineage",
datahub_rest_conn_id="datahub_rest_default",
datahub_conn_id="datahub_rest_default",
mces=[
builder.make_lineage_mce(
[
upstream_urns=[
builder.make_dataset_urn("snowflake", "mydb.schema.tableA"),
builder.make_dataset_urn("snowflake", "mydb.schema.tableB"),
],
builder.make_dataset_urn("snowflake", "mydb.schema.tableC"),
downstream_urn=builder.make_dataset_urn(
"snowflake", "mydb.schema.tableC"
),
)
],
)
Expand Down
7 changes: 5 additions & 2 deletions metadata-ingestion/examples/airflow/mysql_sample_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,13 @@
from datetime import timedelta

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago

try:
from airflow.operators.python import PythonOperator
except ImportError:
from airflow.operators.python_operator import PythonOperator

from datahub.ingestion.run.pipeline import Pipeline

default_args = {
Expand Down Expand Up @@ -53,7 +57,6 @@ def ingest_from_mysql():
description="An example DAG which ingests metadata from MySQL to DataHub",
schedule_interval=timedelta(days=1),
start_date=days_ago(2),
tags=["datahub-ingest"],
catchup=False,
) as dag:
ingest_task = PythonOperator(
Expand Down
6 changes: 4 additions & 2 deletions metadata-ingestion/scripts/ci.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ else
ldap-utils
fi

python -m pip install --upgrade pip==20.2.4 wheel setuptools
pip install -e ".[dev]"
python -m pip install --upgrade pip wheel setuptools
pip install --upgrade apache-airflow==1.10.15 -c https://raw.githubusercontent.com/apache/airflow/constraints-1.10.15/constraints-3.6.txt
airflow db init
python -m pip install -e ".[dev]"

./scripts/codegen.sh

Expand Down
1 change: 1 addition & 0 deletions metadata-ingestion/setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ show_missing = true
exclude_lines =
pragma: no cover
@abstract
if TYPE_CHECKING:
omit =
# omit codegen
src/datahub/metadata/*
23 changes: 14 additions & 9 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,14 @@ def get_long_description():
}

framework_common = {
"click>=7.1.1",
"pyyaml>=5.4.1",
"click>=6.0.0",
"PyYAML",
"toml>=0.10.0",
"docker>=4.4",
"docker",
"expandvars>=0.6.5",
"avro-gen3==0.3.8",
"avro-gen3==0.3.9",
"avro-python3>=1.8.2",
"python-dateutil",
}

kafka_common = {
Expand All @@ -43,21 +44,21 @@ def get_long_description():
# fastavro for serialization. We do not use confluent_kafka[avro], since it
# is incompatible with its own dep on avro-python3.
"confluent_kafka>=1.5.0",
"fastavro>=1.3.0",
"fastavro>=1.2.0",
}

sql_common = {
# Required for all SQL sources.
"sqlalchemy>=1.3.23",
"sqlalchemy>=1.3.24",
}

# Note: for all of these, framework_common will be added.
plugins: Dict[str, Set[str]] = {
# Sink plugins.
"datahub-kafka": kafka_common,
"datahub-rest": {"requests>=2.25.1"},
"datahub-rest": {"requests"},
# Integrations.
"airflow": {"apache-airflow >= 1.10.3"},
"airflow": {"apache-airflow >= 1.10.2"},
# Source plugins
"kafka": kafka_common,
"athena": sql_common | {"PyAthena[SQLAlchemy]"},
Expand Down Expand Up @@ -91,6 +92,8 @@ def get_long_description():
"build",
"twine",
# Also add the plugins which are used for tests.
"apache-airflow==1.10.15", # Airflow 2.x does not have LineageBackend packaged yet.
"apache-airflow-backport-providers-snowflake", # Used in the example DAGs.
*list(
dependency
for plugin in [
Expand All @@ -106,7 +109,6 @@ def get_long_description():
]
for dependency in plugins[plugin]
),
"apache-airflow-providers-snowflake", # Used in the example DAGs.
}


Expand Down Expand Up @@ -183,6 +185,9 @@ def get_long_description():
"apache_airflow_provider": [
"provider_info=datahub.integrations.airflow.get_provider_info:get_provider_info"
],
"airflow.plugins": [
"datahub = datahub.integrations.airflow.get_provider_info:DatahubAirflowPlugin"
],
},
# Dependencies.
install_requires=list(base_requirements | framework_common),
Expand Down
23 changes: 22 additions & 1 deletion metadata-ingestion/src/datahub/emitter/mce_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,36 @@
UpstreamLineageClass,
)

DEFAULT_ENV = "PROD"
DEFAULT_FLOW_CLUSTER = "prod"

def make_dataset_urn(platform: str, name: str, env: str = "PROD"):

def make_dataset_urn(platform: str, name: str, env: str = DEFAULT_ENV):
return f"urn:li:dataset:(urn:li:dataPlatform:{platform},{name},{env})"


def make_user_urn(username: str):
return f"urn:li:corpuser:{username}"


def make_data_flow_urn(
orchestrator: str, flow_id: str, cluster: str = DEFAULT_FLOW_CLUSTER
):
return f"urn:li:dataFlow:({orchestrator},{flow_id},{cluster})"


def make_data_job_urn_with_flow(flow_urn: str, job_id: str):
return f"urn:li:dataJob:({flow_urn},{job_id})"


def make_data_job_urn(
orchestrator: str, flow_id: str, job_id: str, cluster: str = DEFAULT_FLOW_CLUSTER
):
return make_data_job_urn_with_flow(
make_data_flow_urn(orchestrator, flow_id, cluster), job_id
)


def make_lineage_mce(
upstream_urns: Union[str, List[str]],
downstream_urn: str,
Expand Down
4 changes: 4 additions & 0 deletions metadata-ingestion/src/datahub/emitter/rest_emitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
CorpGroupSnapshotClass,
CorpUserSnapshotClass,
DashboardSnapshotClass,
DataFlowSnapshotClass,
DataJobSnapshotClass,
DataProcessSnapshotClass,
DatasetSnapshotClass,
MLModelSnapshotClass,
Expand All @@ -24,6 +26,8 @@
CorpGroupSnapshotClass: "corpGroups",
DatasetSnapshotClass: "datasets",
DataProcessSnapshotClass: "dataProcesses",
DataFlowSnapshotClass: "dataFlows",
DataJobSnapshotClass: "dataJobs",
MLModelSnapshotClass: "mlModels",
TagSnapshotClass: "tags",
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
try:
from datahub.integrations.airflow.lineage_backend import (
DatahubAirflowLineageBackend,
)
except ImportError:
# Compat for Airflow 2.x.
pass
24 changes: 24 additions & 0 deletions metadata-ingestion/src/datahub/integrations/airflow/entities.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import attr

import datahub.emitter.mce_builder as builder


class _Entity:
def set_context(self, context):
# Required for compat with Airflow 1.10.x
pass

def as_dict(self):
# Required for compat with Airflow 1.10.x
return attr.asdict(self)


@attr.s(auto_attribs=True)
class Dataset(_Entity):
platform: str
name: str
env: str = builder.DEFAULT_ENV

@property
def urn(self):
return builder.make_dataset_urn(self.platform, self.name, self.env)
Loading