Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(airflow): Airflow lineage ingestion plugin #4833

Merged
merged 13 commits into from
May 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs-website/sidebars.js
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ module.exports = {
"metadata-integration/java/datahub-protobuf/README",
"metadata-ingestion/as-a-library",
"metadata-integration/java/as-a-library",
//"metadata-ingestion-modules/airflow-plugin/README"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you plan to revert this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the doc about this plugin to the original Airflow docs but because I have a README here is as well doc generation complained about having a markdown file that is missing from the sidebar.js.
As far as I know, this is the way to let doc generation know to ignore a file.

],
},
{
Expand Down
64 changes: 60 additions & 4 deletions docs/lineage/airflow.md
Original file line number Diff line number Diff line change
@@ -1,18 +1,72 @@
# Airflow Integration

DataHub supports integration of
DataHub supports integration of
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mind filling out the PR description to discuss what exactly is the scope of PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I will do


- Airflow Pipeline (DAG) metadata
- DAG and Task run information as well as
- DAG and Task run information as well as
- Lineage information when present

There are a few ways to enable these integrations from Airflow into DataHub.

## Using Datahub's Airflow lineage plugin (new)

::: note

We recommend you use the lineage plugin if you are on Airflow version >= 2.0.2 or on MWAA with an Airflow version >= 2.0.2
:::

1. You need to install the required dependency in your airflow.

```shell
pip install acryl-datahub-airflow-plugin
```

2. Disable lazy plugin load in your airflow.cfg.
On MWAA you should add this config to your [Apache Airflow configuration options](https://docs.aws.amazon.com/mwaa/latest/userguide/configuring-env-variables.html#configuring-2.0-airflow-override).

## Using Datahub's Airflow lineage backend (recommended)
```yaml
core.lazy_load_plugins : False
```

3. You must configure an Airflow hook for Datahub. We support both a Datahub REST hook and a Kafka-based hook, but you only need one.

```shell
# For REST-based:
airflow connections add --conn-type 'datahub_rest' 'datahub_rest_default' --conn-host 'http://localhost:8080'
# For Kafka-based (standard Kafka sink config can be passed via extras):
airflow connections add --conn-type 'datahub_kafka' 'datahub_kafka_default' --conn-host 'broker:9092' --conn-extra '{}'
```

4. Add your `datahub_conn_id` and/or `cluster` to your `airflow.cfg` file if it is not align with the default values. See configuration parameters below

**Configuration options:**

|Name | Default value | Description |
|---|---|---|
| datahub.datahub_conn_id | datahub_rest_default | The name of the datahub connection you set in step 1. |
| datahub.cluster | prod | name of the airflow cluster |
| datahub.capture_ownership_info | true | If true, the owners field of the DAG will be capture as a DataHub corpuser. |
| datahub.capture_tags_info | true | If true, the tags field of the DAG will be captured as DataHub tags. |
| datahub.graceful_exceptions | true | If set to true, most runtime errors in the lineage backend will be suppressed and will not cause the overall task to fail. Note that configuration issues will still throw exceptions.|

5. Configure `inlets` and `outlets` for your Airflow operators. For reference, look at the sample DAG in [`lineage_backend_demo.py`](../../metadata-ingestion/src/datahub_provider/example_dags/lineage_backend_demo.py), or reference [`lineage_backend_taskflow_demo.py`](../../metadata-ingestion/src/datahub_provider/example_dags/lineage_backend_taskflow_demo.py) if you're using the [TaskFlow API](https://airflow.apache.org/docs/apache-airflow/stable/concepts/taskflow.html).
6. [optional] Learn more about [Airflow lineage](https://airflow.apache.org/docs/apache-airflow/stable/lineage.html), including shorthand notation and some automation.

### How to validate installation

1. Go and check in Airflow at Admin -> Plugins menu if you can see the Datahub plugin
2. Run an Airflow DAG and you should see in the task logs Datahub releated log messages like:

```
Emitting Datahub ...
```

## Using Datahub's Airflow lineage backend

:::caution

The Airflow lineage backend is only supported in Airflow 1.10.15+ and 2.0.2+.
For managed services like MWAA you should use the Datahub Airflow plugin as the lineage backend is not supported there

:::

Expand All @@ -23,7 +77,7 @@ If you are looking to run Airflow and DataHub using docker locally, follow the g

## Setting up Airflow to use DataHub as Lineage Backend

1. You need to install the required dependency in your airflow. See https://registry.astronomer.io/providers/datahub/modules/datahublineagebackend
1. You need to install the required dependency in your airflow. See <https://registry.astronomer.io/providers/datahub/modules/datahublineagebackend>

```shell
pip install acryl-datahub[airflow]
Expand All @@ -39,6 +93,7 @@ If you are looking to run Airflow and DataHub using docker locally, follow the g
```

3. Add the following lines to your `airflow.cfg` file.

```ini
[lineage]
backend = datahub_provider.lineage.datahub.DatahubLineageBackend
Expand All @@ -50,6 +105,7 @@ If you are looking to run Airflow and DataHub using docker locally, follow the g
"graceful_exceptions": true }
# The above indentation is important!
```

**Configuration options:**
- `datahub_conn_id` (required): Usually `datahub_rest_default` or `datahub_kafka_default`, depending on what you named the connection in step 1.
- `cluster` (defaults to "prod"): The "cluster" to associate Airflow DAGs and tasks with.
Expand Down
141 changes: 141 additions & 0 deletions metadata-ingestion-modules/airflow-plugin/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
.envrc
.vscode/
output
pvenv36/
bq_credentials.json
/tmp

# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class

# C extensions
*.so

# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
pip-wheel-metadata/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST

# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec

# Installer logs
pip-log.txt
pip-delete-this-directory.txt

# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/

# Translations
*.mo
*.pot

# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal

# Flask stuff:
instance/
.webassets-cache

# Scrapy stuff:
.scrapy

# Sphinx documentation
docs/_build/

# PyBuilder
target/

# Jupyter Notebook
.ipynb_checkpoints

# IPython
profile_default/
ipython_config.py

# pyenv
.python-version

# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock

# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/

# Celery stuff
celerybeat-schedule
celerybeat.pid

# SageMath parsed files
*.sage.py

# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/

# Spyder project settings
.spyderproject
.spyproject

# Rope project settings
.ropeproject

# mkdocs documentation
/site

# mypy
.mypy_cache/
.dmypy.json
dmypy.json

# Pyre type checker
.pyre/

# Generated classes
src/datahub/metadata/
wheels/
junit.quick.xml
67 changes: 67 additions & 0 deletions metadata-ingestion-modules/airflow-plugin/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# Datahub Airflow Plugin

## Capabilities

DataHub supports integration of

- Airflow Pipeline (DAG) metadata
- DAG and Task run information
- Lineage information when present

## Installation

1. You need to install the required dependency in your airflow.

```shell
pip install acryl-datahub-airflow-plugin
```

::: note

We recommend you use the lineage plugin if you are on Airflow version >= 2.0.2 or on MWAA with an Airflow version >= 2.0.2
:::

2. Disable lazy plugin load in your airflow.cfg

```yaml
core.lazy_load_plugins : False
```

3. You must configure an Airflow hook for Datahub. We support both a Datahub REST hook and a Kafka-based hook, but you only need one.

```shell
# For REST-based:
airflow connections add --conn-type 'datahub_rest' 'datahub_rest_default' --conn-host 'http://localhost:8080'
# For Kafka-based (standard Kafka sink config can be passed via extras):
airflow connections add --conn-type 'datahub_kafka' 'datahub_kafka_default' --conn-host 'broker:9092' --conn-extra '{}'
```

4. Add your `datahub_conn_id` and/or `cluster` to your `airflow.cfg` file if it is not align with the default values. See configuration parameters below

**Configuration options:**

|Name | Default value | Description |
|---|---|---|
| datahub.datahub_conn_id | datahub_rest_deafault | The name of the datahub connection you set in step 1. |
| datahub.cluster | prod | name of the airflow cluster |
| datahub.capture_ownership_info | true | If true, the owners field of the DAG will be capture as a DataHub corpuser. |
| datahub.capture_tags_info | true | If true, the tags field of the DAG will be captured as DataHub tags. |
| datahub.graceful_exceptions | true | If set to true, most runtime errors in the lineage backend will be suppressed and will not cause the overall task to fail. Note that configuration issues will still throw exceptions.|

5. Configure `inlets` and `outlets` for your Airflow operators. For reference, look at the sample DAG in [`lineage_backend_demo.py`](../../metadata-ingestion/src/datahub_provider/example_dags/lineage_backend_demo.py), or reference [`lineage_backend_taskflow_demo.py`](../../metadata-ingestion/src/datahub_provider/example_dags/lineage_backend_taskflow_demo.py) if you're using the [TaskFlow API](https://airflow.apache.org/docs/apache-airflow/stable/concepts/taskflow.html).
6. [optional] Learn more about [Airflow lineage](https://airflow.apache.org/docs/apache-airflow/stable/lineage.html), including shorthand notation and some automation.

## How to validate installation

1. Go and check in Airflow at Admin -> Plugins menu if you can see the Datahub plugin
2. Run an Airflow DAG and you should see in the task logs Datahub releated log messages like:

```
Emitting Datahub ...
```

## Additional references

Related Datahub videos:
[Airflow Lineage](https://www.youtube.com/watch?v=3wiaqhb8UR0)
[Airflow Run History in DataHub](https://www.youtube.com/watch?v=YpUOqDU5ZYg)
Loading