Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest/dagster): Dagster source #10071

Merged
merged 49 commits into from
Mar 25, 2024
Merged
Show file tree
Hide file tree
Changes from 37 commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
1d5b308
Dagster source metadata ingestion code added
shubhamjagtap639 Jul 5, 2023
92e3cea
Job failed and canceled execution metadata emit code added
shubhamjagtap639 Jul 6, 2023
e006275
Code changes as per review comment and dagster test cases added
shubhamjagtap639 Jul 7, 2023
d24e27f
Dagster source integration doc added
shubhamjagtap639 Jul 7, 2023
28fcad2
Dagster sensor renamed to datahub sensor
shubhamjagtap639 Jul 7, 2023
78220aa
Code and doc changes as per review comment
shubhamjagtap639 Jul 7, 2023
9b8b50a
File reformatted
shubhamjagtap639 Jul 10, 2023
f7c6b08
Lint error fixed
shubhamjagtap639 Jul 11, 2023
fbd73d8
Dagster package added
shubhamjagtap639 Jul 11, 2023
de5ad77
Temp changes
shubhamjagtap639 Jul 14, 2023
12bc920
Dagster version set to >1.3.3
shubhamjagtap639 Jul 14, 2023
a9c1a83
Datahub dagster plugin code added
shubhamjagtap639 Sep 5, 2023
29bb752
Extra init file removed
shubhamjagtap639 Sep 5, 2023
c419a61
cl failed error resolved
shubhamjagtap639 Sep 6, 2023
b996394
Revert doc changes
shubhamjagtap639 Sep 6, 2023
50e5724
Added missing command in lint task
shubhamjagtap639 Sep 6, 2023
e5e51fd
Temp changes
shubhamjagtap639 Sep 6, 2023
d134b74
Typo error resolved
shubhamjagtap639 Sep 6, 2023
c7a3d18
dataset entity added in datahub/api/entities. Dagster example files a…
shubhamjagtap639 Sep 7, 2023
00cb35e
lint error fixed
shubhamjagtap639 Sep 7, 2023
a7efd7c
Code changes as per review comment
shubhamjagtap639 Sep 11, 2023
9fad5c4
Initial commit of the reworked Dagster plugin
treff7es Mar 18, 2024
74dc9cf
Adding entities
treff7es Mar 18, 2024
5f373dc
Using java17 build
treff7es Mar 18, 2024
281e666
Black formatting
treff7es Mar 18, 2024
5ca6867
Silenting some mypy error temporary
treff7es Mar 18, 2024
7eb7a8e
Black formatting
treff7es Mar 18, 2024
059ec35
Fixing test
treff7es Mar 18, 2024
b7bb9e3
Add missing import
treff7es Mar 18, 2024
6efc54f
Fixing url generation
treff7es Mar 19, 2024
b48b061
Fixing linter issues
treff7es Mar 19, 2024
5d68929
Fixes
treff7es Mar 19, 2024
c1f7557
Fixing build
treff7es Mar 19, 2024
512824d
Not pinning datahub client version
treff7es Mar 19, 2024
f788ec5
Adding way to capture assets
treff7es Mar 20, 2024
508fcbc
Add way to test multiple dagster version
treff7es Mar 20, 2024
8ec8f4e
Adding way to bring your own lineage extractor
treff7es Mar 21, 2024
63df8c1
Pr review fixes
treff7es Mar 22, 2024
6bfe22d
Fixing imports
treff7es Mar 22, 2024
479c2cf
Fixing golden files and pydantic v2 issues with s3 tests
treff7es Mar 22, 2024
80d9a89
Fixing test
treff7es Mar 22, 2024
1450b83
Fixing tests
treff7es Mar 22, 2024
975b19d
Disabling Dagster tests
treff7es Mar 22, 2024
295b91a
Reverting accidentally downgraded package
treff7es Mar 22, 2024
39f23cc
- Moving examples out from the package
treff7es Mar 25, 2024
186069e
Merge branch 'master' into dagster_source-rebase
treff7es Mar 25, 2024
85cff86
Fixing doc links
treff7es Mar 25, 2024
d0f5365
Fixing doc build concurrency issue
treff7es Mar 25, 2024
39e2a7f
Fixing typo
treff7es Mar 25, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/build-and-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ jobs:
-x :metadata-io:test \
-x :metadata-ingestion-modules:airflow-plugin:build \
-x :metadata-ingestion-modules:airflow-plugin:check \
-x :metadata-ingestion-modules:dagster-plugin:build \
-x :metadata-ingestion-modules:dagster-plugin:check \
-x :datahub-frontend:build \
-x :datahub-web-react:build \
--parallel
Expand Down
85 changes: 85 additions & 0 deletions .github/workflows/dagster-plugin.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
name: Dagster Plugin
on:
push:
branches:
- master
paths:
- ".github/workflows/dagster-plugin.yml"
- "metadata-ingestion-modules/dagster-plugin/**"
- "metadata-ingestion/**"
- "metadata-models/**"
pull_request:
branches:
- master
paths:
- ".github/**"
- "metadata-ingestion-modules/dagster-plugin/**"
- "metadata-ingestion/**"
- "metadata-models/**"
release:
types: [published]

concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true

jobs:
dagster-plugin:
runs-on: ubuntu-latest
env:
SPARK_VERSION: 3.0.3
DATAHUB_TELEMETRY_ENABLED: false
strategy:
matrix:
python-version: ["3.8", "3.10"]
include:
- python-version: "3.8"
extraPythonRequirement: "dagster>=1.3.3"
- python-version: "3.10"
extraPythonRequirement: "dagster>=1.3.3"
fail-fast: false
steps:
- name: Set up JDK 17
uses: actions/setup-java@v3
with:
distribution: "zulu"
java-version: 17
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}
cache: "pip"
- name: Install dependencies
run: ./metadata-ingestion/scripts/install_deps.sh
- name: Install dagster package and test (extras ${{ matrix.extraPythonRequirement }})

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[actionlint] reported by reviewdog 🐶
property "extrapythonrequirement" is not defined in object type {python-version: number} [expression]

run: ./gradlew -Pextra_pip_requirements='${{ matrix.extraPythonRequirement }}' :metadata-ingestion-modules:dagster-plugin:lint :metadata-ingestion-modules:dagster-plugin:testQuick

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[actionlint] reported by reviewdog 🐶
property "extrapythonrequirement" is not defined in object type {python-version: number} [expression]

- name: pip freeze show list installed
if: always()
run: source metadata-ingestion-modules/dagster-plugin/venv/bin/activate && pip freeze
- uses: actions/upload-artifact@v3
if: ${{ always() && matrix.python-version == '3.10' && matrix.extraPythonRequirement == 'dagster>=1.3.3' }}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[actionlint] reported by reviewdog 🐶
property "extrapythonrequirement" is not defined in object type {python-version: number} [expression]

with:
name: Test Results (dagster Plugin ${{ matrix.python-version}})
path: |
**/build/reports/tests/test/**
**/build/test-results/test/**
**/junit.*.xml
- name: Upload coverage to Codecov
if: always()
uses: codecov/codecov-action@v3
with:
token: ${{ secrets.CODECOV_TOKEN }}
directory: .
fail_ci_if_error: false
flags: dagster-${{ matrix.python-version }}-${{ matrix.extraPythonRequirement }}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[actionlint] reported by reviewdog 🐶
property "extrapythonrequirement" is not defined in object type {python-version: number} [expression]

name: pytest-dagster
verbose: true

event-file:
runs-on: ubuntu-latest
steps:
- name: Upload
uses: actions/upload-artifact@v3
with:
name: Event File
path: ${{ github.event_path }}
2 changes: 1 addition & 1 deletion .github/workflows/test-results.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ name: Test Results

on:
workflow_run:
workflows: ["build & test", "metadata ingestion", "Airflow Plugin"]
workflows: ["build & test", "metadata ingestion", "Airflow Plugin", "Dagster Plugin"]
types:
- completed

Expand Down
3 changes: 2 additions & 1 deletion docs-website/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ task yarnInstall(type: YarnTask) {
task yarnGenerate(type: YarnTask, dependsOn: [yarnInstall,
generateGraphQLSchema, generateJsonSchema,
':metadata-ingestion:modelDocGen', ':metadata-ingestion:docGen',
':metadata-ingestion:buildWheel', ':metadata-ingestion-modules:airflow-plugin:buildWheel'] ) {
':metadata-ingestion:buildWheel', ':metadata-ingestion-modules:airflow-plugin:buildWheel',
':metadata-ingestion-modules:dagster-plugin:buildWheel'] ) {
inputs.files(projectMdFiles)
outputs.cacheIf { true }
args = ['run', 'generate']
Expand Down
1 change: 1 addition & 0 deletions docs-website/generateDocsDir.ts
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,7 @@ function copy_python_wheels(): void {
const wheel_dirs = [
"../metadata-ingestion/dist",
"../metadata-ingestion-modules/airflow-plugin/dist",
"../metadata-ingestion-modules/dagster-plugin/dist",
];

const wheel_output_directory = path.join(STATIC_DIRECTORY, "wheels");
Expand Down
6 changes: 6 additions & 0 deletions docs-website/sidebars.js
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,11 @@ module.exports = {
id: "docs/lineage/airflow",
label: "Airflow",
},
{
type: "doc",
id: "docs/lineage/dagster",
label: "Dagster",
},
//"docker/airflow/local_airflow",
"metadata-integration/java/spark-lineage/README",
"metadata-ingestion/integration_docs/great-expectations",
Expand Down Expand Up @@ -751,6 +756,7 @@ module.exports = {
// "metadata-integration/java/spark-lineage-beta/README.md
// "metadata-integration/java/openlineage-converter/README"
//"metadata-ingestion-modules/airflow-plugin/README"
//"metadata-ingestion-modules/dagster-plugin/README"
// "metadata-ingestion/schedule_docs/datahub", // we can delete this
// TODO: change the titles of these, removing the "What is..." portion from the sidebar"
// "docs/what/entity",
Expand Down
89 changes: 89 additions & 0 deletions docs/lineage/dagster.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# Dagster Integration
DataHub supports the integration of

- Dagster Pipeline metadata
- Job and Op run information as well as
- Lineage information when present

## Using Datahub's Dagster Sensor

Dagster sensors allow us to perform some actions based on some state change. Datahub's defined dagster sensor will emit metadata after every dagster pipeline run execution. This sensor is able to emit both pipeline success as well as failures. For more details about Dagster sensors please refer [Sensors](https://docs.dagster.io/concepts/partitions-schedules-sensors/sensors).

### Prerequisites

1. You need to create a new dagster project. See <https://docs.dagster.io/getting-started/create-new-project>.
2. There are two ways to define Dagster definition before starting dagster UI. One using [Definitions](https://docs.dagster.io/_apidocs/definitions#dagster.Definitions) class (recommended) and second using [Repositories](https://docs.dagster.io/concepts/repositories-workspaces/repositories#repositories).
3. Creation of new dagster project by default uses Definition class to define Dagster definition.

### Setup

1. You need to install the required dependency.

```shell
pip install acryl_datahub_dagster_plugin
```

2. You need to import DataHub dagster plugin provided sensor definition and add it in Dagster definition or dagster repository before starting dagster UI as show below:
**Using Definitions class:**

```python
{{ inline /metadata-ingestion-modules/dagster-plugin/src/datahub_dagster_plugin/example_jobs/basic_setup.py }}
```

3. The DataHub dagster plugin provided sensor internally uses below configs. You can set these configs using environment variables. If not set, the sensor will take the default value.

**Configuration options:**

| Configuration Option | Default value | Description |
|-------------------------------|---------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| datahub_client_config | | The DataHub client config |
| dagster_url | | The url to your Dagster Webserver. |
| capture_asset_materialization | True | Whether to capture asset keys as Dataset on AssetMaterialization event |
| capture_input_output | True | Whether to capture and try to parse input and output from HANDLED_OUTPUT,.LOADED_INPUT events. (currently only [PathMetadataValue](https://github.com/dagster-io/dagster/blob/7e08c05dcecef9fd07f887c7846bd1c9a90e7d84/python_modules/dagster/dagster/_core/definitions/metadata/__init__.py#L655) metadata supported (EXPERIMENTAL) |
| platform_instance | | The instance of the platform that all assets produced by this recipe belong to. It is optional |
| asset_lineage_extractor | | You can implement your own logic to capture asset lineage information. See example for details[] |

4. Once Dagster UI is up, you need to turn on the provided sensor execution. To turn on the sensor, click on Overview tab and then on Sensors tab. You will see a toggle button in front of all defined sensors to turn it on/off.

5. DataHub dagster plugin provided sensor is ready to emit metadata after every dagster pipeline run execution.

### How to validate installation

1. Go and check in Dagster UI at Overview -> Sensors menu if you can see the 'datahub_sensor'.
2. Run a Dagster Job. In the dagster daemon logs, you should see DataHub related log messages like:

```
datahub_sensor - Emitting metadata...
```

## Dagster Ins and Out

We can provide inputs and outputs to both assets and ops explicitly using a dictionary of `Ins` and `Out` corresponding to the decorated function arguments. While providing inputs and outputs explicitly we can provide metadata as well.
To create dataset upstream and downstream dependency for the assets and ops you can use an ins and out dictionary with metadata provided. For reference, look at the sample jobs created using assets [`assets_job.py`](../../metadata-ingestion-modules/dagster-plugin/src/datahub_dagster_plugin/example_jobs/assets_job.py), or ops [`ops_job.py`](../../metadata-ingestion-modules/dagster-plugin/src/datahub_dagster_plugin/example_jobs/ops_job.py).

## Add define your custom logic to capture asset lineage information
You can define your own logic to capture asset lineage information.

The output Tuple contains two dictionaries, one for input assets and the other for output assets. The key of the dictionary is the op key and the value is the set of asset urns that are upstream or downstream of the op.

```python
def asset_lineage_extractor(
context: RunStatusSensorContext,
dagster_generator: DagsterGenerator,
graph: DataHubGraph,
) -> Tuple[Dict[str, Set], Dict[str, Set]]:

input_assets:Dict[str, Set] = {}
output_assets:Dict[str, Set] = {}

# Extracting input and output assets from the context
return input_assets, output_assets
```

[See example job here](https://github.com/datahub-project/datahub/blob/master/metadata-ingestion-modules/dagster-plugin/src/datahub_dagster_plugin/example_jobs/advanced_ops_jobs.py).

## Debugging

### Connection error for Datahub Rest URL

If you get ConnectionError: HTTPConnectionPool(host='localhost', port=8080), then in that case your DataHub GMS service is not up.
143 changes: 143 additions & 0 deletions metadata-ingestion-modules/dagster-plugin/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
.envrc
src/datahub_dagster_plugin/__init__.py.bak
.vscode/
output
pvenv36/
bq_credentials.json
/tmp
*.bak

# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class

# C extensions
*.so

# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
pip-wheel-metadata/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST

# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec

# Installer logs
pip-log.txt
pip-delete-this-directory.txt

# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/

# Translations
*.mo
*.pot

# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal

# Flask stuff:
instance/
.webassets-cache

# Scrapy stuff:
.scrapy

# Sphinx documentation
docs/_build/

# PyBuilder
target/

# Jupyter Notebook
.ipynb_checkpoints

# IPython
profile_default/
ipython_config.py

# pyenv
.python-version

# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock

# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/

# Celery stuff
celerybeat-schedule
celerybeat.pid

# SageMath parsed files
*.sage.py

# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/

# Spyder project settings
.spyderproject
.spyproject

# Rope project settings
.ropeproject

# mkdocs documentation
/site

# mypy
.mypy_cache/
.dmypy.json
dmypy.json

# Pyre type checker
.pyre/

# Generated classes
src/datahub/metadata/
wheels/
junit.quick.xml
4 changes: 4 additions & 0 deletions metadata-ingestion-modules/dagster-plugin/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Datahub Dagster Plugin

See the DataHub Dagster docs for details.

Loading
Loading