Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest/dagster): Dagster source #10071

Merged
merged 49 commits into from
Mar 25, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
1d5b308
Dagster source metadata ingestion code added
shubhamjagtap639 Jul 5, 2023
92e3cea
Job failed and canceled execution metadata emit code added
shubhamjagtap639 Jul 6, 2023
e006275
Code changes as per review comment and dagster test cases added
shubhamjagtap639 Jul 7, 2023
d24e27f
Dagster source integration doc added
shubhamjagtap639 Jul 7, 2023
28fcad2
Dagster sensor renamed to datahub sensor
shubhamjagtap639 Jul 7, 2023
78220aa
Code and doc changes as per review comment
shubhamjagtap639 Jul 7, 2023
9b8b50a
File reformatted
shubhamjagtap639 Jul 10, 2023
f7c6b08
Lint error fixed
shubhamjagtap639 Jul 11, 2023
fbd73d8
Dagster package added
shubhamjagtap639 Jul 11, 2023
de5ad77
Temp changes
shubhamjagtap639 Jul 14, 2023
12bc920
Dagster version set to >1.3.3
shubhamjagtap639 Jul 14, 2023
a9c1a83
Datahub dagster plugin code added
shubhamjagtap639 Sep 5, 2023
29bb752
Extra init file removed
shubhamjagtap639 Sep 5, 2023
c419a61
cl failed error resolved
shubhamjagtap639 Sep 6, 2023
b996394
Revert doc changes
shubhamjagtap639 Sep 6, 2023
50e5724
Added missing command in lint task
shubhamjagtap639 Sep 6, 2023
e5e51fd
Temp changes
shubhamjagtap639 Sep 6, 2023
d134b74
Typo error resolved
shubhamjagtap639 Sep 6, 2023
c7a3d18
dataset entity added in datahub/api/entities. Dagster example files a…
shubhamjagtap639 Sep 7, 2023
00cb35e
lint error fixed
shubhamjagtap639 Sep 7, 2023
a7efd7c
Code changes as per review comment
shubhamjagtap639 Sep 11, 2023
9fad5c4
Initial commit of the reworked Dagster plugin
treff7es Mar 18, 2024
74dc9cf
Adding entities
treff7es Mar 18, 2024
5f373dc
Using java17 build
treff7es Mar 18, 2024
281e666
Black formatting
treff7es Mar 18, 2024
5ca6867
Silenting some mypy error temporary
treff7es Mar 18, 2024
7eb7a8e
Black formatting
treff7es Mar 18, 2024
059ec35
Fixing test
treff7es Mar 18, 2024
b7bb9e3
Add missing import
treff7es Mar 18, 2024
6efc54f
Fixing url generation
treff7es Mar 19, 2024
b48b061
Fixing linter issues
treff7es Mar 19, 2024
5d68929
Fixes
treff7es Mar 19, 2024
c1f7557
Fixing build
treff7es Mar 19, 2024
512824d
Not pinning datahub client version
treff7es Mar 19, 2024
f788ec5
Adding way to capture assets
treff7es Mar 20, 2024
508fcbc
Add way to test multiple dagster version
treff7es Mar 20, 2024
8ec8f4e
Adding way to bring your own lineage extractor
treff7es Mar 21, 2024
63df8c1
Pr review fixes
treff7es Mar 22, 2024
6bfe22d
Fixing imports
treff7es Mar 22, 2024
479c2cf
Fixing golden files and pydantic v2 issues with s3 tests
treff7es Mar 22, 2024
80d9a89
Fixing test
treff7es Mar 22, 2024
1450b83
Fixing tests
treff7es Mar 22, 2024
975b19d
Disabling Dagster tests
treff7es Mar 22, 2024
295b91a
Reverting accidentally downgraded package
treff7es Mar 22, 2024
39f23cc
- Moving examples out from the package
treff7es Mar 25, 2024
186069e
Merge branch 'master' into dagster_source-rebase
treff7es Mar 25, 2024
85cff86
Fixing doc links
treff7es Mar 25, 2024
d0f5365
Fixing doc build concurrency issue
treff7es Mar 25, 2024
39e2a7f
Fixing typo
treff7es Mar 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 29 additions & 22 deletions docs/lineage/dagster.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,35 +27,21 @@ pip install acryl_datahub_dagster_plugin
**Using Definitions class:**

```python
from dagster import Definitions
from datahub_dagster_plugin.sensors.datahub_sensors import DagsterSourceConfig, make_datahub_sensor

config = DagsterSourceConfig(
rest_sink_config={
"server": "https://your_datahub_url/gms",
"token": "your_datahub_token"
},
dagster_url = "https://my-dagster-cloud.dagster.cloud",
)

datahub_sensor = make_datahub_sensor(config=config)

defs = Definitions(
sensors=[datahub_sensor],
)
{{ inline /metadata-ingestion-modules/dagster-plugin/src/datahub_dagster_plugin/example_jobs/basic_setup.py }}
```

3. The DataHub dagster plugin provided sensor internally uses below configs. You can set these configs using environment variables. If not set, the sensor will take the default value.

**Configuration options:**

| Configuration Option | Default value | Description |
|-------------------------------|---------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| rest_sink_config | | The rest sink config |
| dagster_url | | The url to your Dagster Webserver. |
| capture_asset_materialization | True | Whether to capture asset keys as Dataset on AssetMaterialization event |
| Configuration Option | Default value | Description |
|-------------------------------|---------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| datahub_client_config | | The DataHub client config |
| dagster_url | | The url to your Dagster Webserver. |
| capture_asset_materialization | True | Whether to capture asset keys as Dataset on AssetMaterialization event |
| capture_input_output | True | Whether to capture and try to parse input and output from HANDLED_OUTPUT,.LOADED_INPUT events. (currently only [PathMetadataValue](https://github.com/dagster-io/dagster/blob/7e08c05dcecef9fd07f887c7846bd1c9a90e7d84/python_modules/dagster/dagster/_core/definitions/metadata/__init__.py#L655) metadata supported (EXPERIMENTAL) |
| platform_instance | | The instance of the platform that all assets produced by this recipe belong to. It is optional |
| platform_instance | | The instance of the platform that all assets produced by this recipe belong to. It is optional |
| asset_lineage_extractor | | You can implement your own logic to capture asset lineage information. See example for details[] |

4. Once Dagster UI is up, you need to turn on the provided sensor execution. To turn on the sensor, click on Overview tab and then on Sensors tab. You will see a toggle button in front of all defined sensors to turn it on/off.

Expand All @@ -75,6 +61,27 @@ datahub_sensor - Emitting metadata...
We can provide inputs and outputs to both assets and ops explicitly using a dictionary of `Ins` and `Out` corresponding to the decorated function arguments. While providing inputs and outputs explicitly we can provide metadata as well.
To create dataset upstream and downstream dependency for the assets and ops you can use an ins and out dictionary with metadata provided. For reference, look at the sample jobs created using assets [`assets_job.py`](../../metadata-ingestion-modules/dagster-plugin/src/datahub_dagster_plugin/example_jobs/assets_job.py), or ops [`ops_job.py`](../../metadata-ingestion-modules/dagster-plugin/src/datahub_dagster_plugin/example_jobs/ops_job.py).

## Add define your custom logic to capture asset lineage information
You can define your own logic to capture asset lineage information.

The output Tuple contains two dictionaries, one for input assets and the other for output assets. The key of the dictionary is the op key and the value is the set of asset urns that are upstream or downstream of the op.

```python
def asset_lineage_extractor(
context: RunStatusSensorContext,
dagster_generator: DagsterGenerator,
graph: DataHubGraph,
) -> Tuple[Dict[str, Set], Dict[str, Set]]:

input_assets:Dict[str, Set] = {}
output_assets:Dict[str, Set] = {}

# Extracting input and output assets from the context
return input_assets, output_assets
```

[See example job here](https://github.com/datahub-project/datahub/blob/master/metadata-ingestion-modules/dagster-plugin/src/datahub_dagster_plugin/example_jobs/advanced_ops_jobs.py).

## Debugging

### Connection error for Datahub Rest URL
Expand Down
1 change: 0 additions & 1 deletion metadata-ingestion-modules/dagster-plugin/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ task lint(type: Exec, dependsOn: installDev) {
"venv/lib/python3.8/site-packages/airflow/_vendor/connexion/spec.py:169: error: invalid syntax".
*/
commandLine 'bash', '-c',
"find ${venv_name}/lib -path *airflow/_vendor/connexion/spec.py -exec sed -i.bak -e '169,169s/ # type: List\\[str\\]//g' {} \\; && " +
"source ${venv_name}/bin/activate && set -x && " +
"black --check --diff src/ tests/ && " +
"isort --check --diff src/ tests/ && " +
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from dataclasses import dataclass
from logging import Logger
from typing import Any, Dict, List, Optional, Set
from typing import Any, Callable, Dict, List, Optional, Sequence, Set, Tuple
from urllib.parse import urlsplit

import pydantic
from dagster import DagsterRunStatus, PathMetadataValue
from dagster import DagsterRunStatus, PathMetadataValue, RunStatusSensorContext
from dagster._core.execution.stats import RunStepKeyStatsSnapshot, StepEventStatus
from dagster._core.snap import JobSnapshot
from dagster._core.snap.node import OpDefSnap
Expand All @@ -14,9 +14,15 @@
DataProcessInstance,
InstanceRunResult,
)
from datahub.api.entities.dataset.dataset import Dataset
from datahub.configuration.source_common import DatasetSourceConfigMixin
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.ingestion.sink.datahub_rest import DatahubRestSinkConfig
from datahub.emitter.mce_builder import (
make_data_platform_urn,
make_dataplatform_instance_urn,
)
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.graph.client import DatahubClientConfig, DataHubGraph
from datahub.metadata.schema_classes import DataPlatformInstanceClass, SubTypesClass
from datahub.utilities.urns.data_flow_urn import DataFlowUrn
from datahub.utilities.urns.data_job_urn import DataJobUrn
from datahub.utilities.urns.dataset_urn import DatasetUrn
Expand Down Expand Up @@ -63,10 +69,10 @@ class Constant:
ATTEMPTS = "attempts"


class DagsterSourceConfig(DatasetSourceConfigMixin):
rest_sink_config: DatahubRestSinkConfig = pydantic.Field(
default=DatahubRestSinkConfig(),
description="Datahub rest sink config",
class DatahubDagsterSourceConfig(DatasetSourceConfigMixin):
datahub_client_config: DatahubClientConfig = pydantic.Field(
default=DatahubClientConfig(),
description="Datahub client config",
)

dagster_url: Optional[str] = pydantic.Field(
Expand All @@ -84,6 +90,16 @@ class DagsterSourceConfig(DatasetSourceConfigMixin):
description="Whether to capture and try to parse input and output from HANDLED_OUTPUT,.LOADED_INPUT event. (currently only filepathvalue metadata supported",
)

asset_lineage_extractor: Optional[
Callable[
[RunStatusSensorContext, "DagsterGenerator", DataHubGraph],
Tuple[Dict[str, set], Dict[str, set]],
]
] = pydantic.Field(
default=None,
description="Whether to capture and try to parse input and output from HANDLED_OUTPUT,.LOADED_INPUT event. (currently only filepathvalue metadata supported",
)


def _str_urn_to_dataset_urn(urns: List[str]) -> List[DatasetUrn]:
return [DatasetUrn.create_from_string(urn) for urn in urns]
Expand Down Expand Up @@ -114,7 +130,7 @@ class DagsterGenerator:
def __init__(
self,
logger: Logger,
config: DagsterSourceConfig,
config: DatahubDagsterSourceConfig,
dagster_environment: DagsterEnvironment,
):
self.logger = logger
Expand Down Expand Up @@ -290,14 +306,14 @@ def generate_datajob(

def emit_job_run(
self,
emitter: DatahubRestEmitter,
graph: DataHubGraph,
dataflow: DataFlow,
run: DagsterRun,
run_stats: DagsterRunStatsSnapshot,
) -> None:
"""
Emit a latest job run
:param emitter: DatahubRestEmitter
:param graph: DatahubRestEmitter
:param dataflow: DataFlow - DataFlow object
:param run: DagsterRun - Dagster Run object
:param run_stats: DagsterRunStatsSnapshot - latest job run stats
Expand Down Expand Up @@ -348,27 +364,27 @@ def emit_job_run(

if run_stats.start_time is not None:
dpi.emit_process_start(
emitter=emitter,
emitter=graph,
start_timestamp_millis=int(run_stats.start_time * 1000),
)

if run_stats.end_time is not None:
dpi.emit_process_end(
emitter=emitter,
emitter=graph,
end_timestamp_millis=int(run_stats.end_time * 1000),
result=status_result_map[run.status],
result_type=Constant.ORCHESTRATOR,
)

def emit_op_run(
self,
emitter: DatahubRestEmitter,
graph: DataHubGraph,
datajob: DataJob,
run_step_stats: RunStepKeyStatsSnapshot,
) -> None:
"""
Emit an op run
:param emitter: DatahubRestEmitter
:param graph: DataHubGraph
:param datajob: DataJob - DataJob object
:param run_step_stats: RunStepKeyStatsSnapshot - step(op) run stats
"""
Expand Down Expand Up @@ -415,14 +431,69 @@ def emit_op_run(

if run_step_stats.start_time is not None:
dpi.emit_process_start(
emitter=emitter,
emitter=graph,
start_timestamp_millis=int(run_step_stats.start_time * 1000),
)

if run_step_stats.end_time is not None:
dpi.emit_process_end(
emitter=emitter,
emitter=graph,
end_timestamp_millis=int(run_step_stats.end_time * 1000),
result=status_result_map[run_step_stats.status],
result_type=Constant.ORCHESTRATOR,
)

def dataset_urn_from_asset(self, asset_key: Sequence[str]) -> DatasetUrn:
"""
Generate dataset urn from asset key
"""
return DatasetUrn(
platform="dagster", env=self.config.env, name="/".join(asset_key)
)

def emit_asset(
self,
graph: DataHubGraph,
asset_key: Sequence[str],
description: Optional[str],
properties: Optional[Dict[str, str]],
) -> str:
"""
Emit asset to datahub
"""
dataset_urn = self.dataset_urn_from_asset(asset_key)
dataset = Dataset(
id=None,
urn=dataset_urn.urn(),
platform="dagster",
name=asset_key[-1],
schema=None,
downstreams=None,
subtype="Asset",
subtypes=None,
description=description,
env=self.config.env,
properties=properties,
)
for mcp in dataset.generate_mcp():
graph.emit_mcp(mcp)

mcp = MetadataChangeProposalWrapper(
entityUrn=dataset_urn.urn(),
aspect=SubTypesClass(typeNames=["Asset"]),
)
graph.emit_mcp(mcp)

if self.config.platform_instance:
mcp = MetadataChangeProposalWrapper(
entityUrn=dataset_urn.urn(),
aspect=DataPlatformInstanceClass(
instance=make_dataplatform_instance_urn(
instance=self.config.platform_instance,
platform="dagster",
),
platform=make_data_platform_urn("dagster"),
),
)
graph.emit_mcp(mcp)
return dataset_urn.urn()
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
from typing import Dict, Set, Tuple

from dagster import (
Definitions,
In,
Out,
PythonObjectDagsterType,
RunStatusSensorContext,
job,
op,
)
from datahub.ingestion.graph.client import DataHubGraph
from datahub.utilities.urns.dataset_urn import DatasetUrn

from datahub_dagster_plugin.client.dagster_generator import DagsterGenerator
from datahub_dagster_plugin.sensors.datahub_sensors import (
DatahubDagsterSourceConfig,
make_datahub_sensor,
)


@op
def extract():
results = [1, 2, 3, 4]
return results


@op(
ins={
"data": In(
dagster_type=PythonObjectDagsterType(list),
metadata={"datahub.inputs": [DatasetUrn("snowflake", "tableA").urn]},
)
},
out={
"result": Out(
metadata={"datahub.outputs": [DatasetUrn("snowflake", "tableB").urn]}
)
},
)
def transform(data):
results = []
for each in data:
results.append(str(each))
return results


@job
def do_stuff():
transform(extract())


config = DatahubDagsterSourceConfig.parse_obj(
{
"rest_sink_config": {
"server": "http://localhost:8080",
},
"dagster_url": "http://localhost:3000",
}
)


def asset_lineage_extractor(
context: RunStatusSensorContext,
dagster_generator: DagsterGenerator,
graph: DataHubGraph,
) -> Tuple[Dict[str, Set], Dict[str, Set]]:
treff7es marked this conversation as resolved.
Show resolved Hide resolved
from dagster._core.events import DagsterEventType

logs = context.instance.all_logs(
context.dagster_run.run_id,
{
DagsterEventType.ASSET_MATERIALIZATION,
DagsterEventType.ASSET_OBSERVATION,
DagsterEventType.HANDLED_OUTPUT,
DagsterEventType.LOADED_INPUT,
},
)
dataset_inputs: Dict[str, Set] = {}
dataset_outputs: Dict[str, Set] = {}

for log in logs:
if not log.dagster_event or not log.step_key:
continue

if log.dagster_event.event_type == DagsterEventType.ASSET_MATERIALIZATION:
if log.step_key not in dataset_outputs:
dataset_outputs[log.step_key] = set()

materialization = log.asset_materialization
if not materialization:
continue

properties = {
key: str(value) for (key, value) in materialization.metadata.items()
}
asset_key = materialization.asset_key.path
dataset_urn = dagster_generator.emit_asset(
graph, asset_key, materialization.description, properties
)
dataset_outputs[log.step_key].add(dataset_urn)
return dataset_inputs, dataset_outputs


config.asset_lineage_extractor = asset_lineage_extractor
treff7es marked this conversation as resolved.
Show resolved Hide resolved
datahub_sensor = make_datahub_sensor(config=config)
defs = Definitions(jobs=[do_stuff], sensors=[datahub_sensor])
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from datahub.utilities.urns.dataset_urn import DatasetUrn

from datahub_dagster_plugin.sensors.datahub_sensors import (
DagsterSourceConfig,
DatahubDagsterSourceConfig,
make_datahub_sensor,
)

Expand Down Expand Up @@ -47,7 +47,7 @@ def transform(extract):

assets_job = define_asset_job(name="assets_job")

config = DagsterSourceConfig.parse_obj(
config = DatahubDagsterSourceConfig.parse_obj(
{
"rest_sink_config": {
"server": "http://localhost:8080",
Expand Down
Loading
Loading