Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest/airflow): support disabling iolet materialization #10305

Merged
merged 2 commits into from
Apr 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions docs/lineage/airflow.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ enabled = True # default
| capture_ownership_info | true | Extract DAG ownership. |
| capture_tags_info | true | Extract DAG tags. |
| capture_executions | true | Extract task runs and success/failure statuses. This will show up in DataHub "Runs" tab. |
| materialize_iolets | true | Create or un-soft-delete all entities referenced in lineage. |
| enable_extractors | true | Enable automatic lineage extraction. |
| disable_openlineage_plugin | true | Disable the OpenLineage plugin to avoid duplicative processing. |
| log_level | _no change_ | [debug] Set the log level for the plugin. |
Expand Down Expand Up @@ -135,8 +136,9 @@ conn_id = datahub_rest_default # or datahub_kafka_default
| capture_ownership_info | true | If true, the owners field of the DAG will be capture as a DataHub corpuser. |
| capture_tags_info | true | If true, the tags field of the DAG will be captured as DataHub tags. |
| capture_executions | true | If true, we'll capture task runs in DataHub in addition to DAG definitions. |
| datajob_url_link | taskinstance | If taskinstance, the datajob url will be taskinstance link on airflow. It can also be grid.
|
| materialize_iolets | true | Create or un-soft-delete all entities referenced in lineage. |
| datajob_url_link | taskinstance | If taskinstance, the datajob url will be taskinstance link on airflow. It can also be grid. |
| |
| graceful_exceptions | true | If set to true, most runtime errors in the lineage backend will be suppressed and will not cause the overall task to fail. Note that configuration issues will still throw exceptions. |

#### Validate that the plugin is working
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ class DatahubLineageConfig(ConfigModel):
# If true, the tags field of the DAG will be captured as DataHub tags.
capture_tags_info: bool = True

# If true (default), we'll materialize and un-soft-delete any urns
# referenced by inlets or outlets.
materialize_iolets: bool = True

capture_executions: bool = False

enable_extractors: bool = True
Expand Down Expand Up @@ -67,6 +71,7 @@ def get_lineage_config() -> DatahubLineageConfig:
"datahub", "capture_ownership_info", fallback=True
)
capture_executions = conf.get("datahub", "capture_executions", fallback=True)
materialize_iolets = conf.get("datahub", "materialize_iolets", fallback=True)
enable_extractors = conf.get("datahub", "enable_extractors", fallback=True)
log_level = conf.get("datahub", "log_level", fallback=None)
debug_emitter = conf.get("datahub", "debug_emitter", fallback=False)
Expand All @@ -84,6 +89,7 @@ def get_lineage_config() -> DatahubLineageConfig:
capture_ownership_info=capture_ownership_info,
capture_tags_info=capture_tags_info,
capture_executions=capture_executions,
materialize_iolets=materialize_iolets,
enable_extractors=enable_extractors,
log_level=log_level,
debug_emitter=debug_emitter,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,10 @@ def on_task_instance_running(

# TODO: Add handling for Airflow mapped tasks using task_instance.map_index

datajob.emit(self.emitter, callback=self._make_emit_callback())
for mcp in datajob.generate_mcp(
materialize_iolets=self.config.materialize_iolets
):
self.emitter.emit(mcp, self._make_emit_callback())
logger.debug(f"Emitted DataHub Datajob start: {datajob}")

if self.config.capture_executions:
Expand Down Expand Up @@ -430,7 +433,10 @@ def on_task_instance_finish(
# Add lineage info.
self._extract_lineage(datajob, dagrun, task, task_instance, complete=True)

datajob.emit(self.emitter, callback=self._make_emit_callback())
for mcp in datajob.generate_mcp(
materialize_iolets=self.config.materialize_iolets
):
self.emitter.emit(mcp, self._make_emit_callback())
logger.debug(f"Emitted DataHub Datajob finish w/ status {status}: {datajob}")

if self.config.capture_executions:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,8 @@ def datahub_task_status_callback(context, status):
)

task.log.info(f"Emitting Datahub Datajob: {datajob}")
datajob.emit(emitter, callback=_make_emit_callback(task.log))
for mcp in datajob.generate_mcp(materialize_iolets=config.materialize_iolets):
emitter.emit(mcp, _make_emit_callback(task.log))

if config.capture_executions:
dpi = AirflowGenerator.run_datajob(
Expand Down Expand Up @@ -200,7 +201,8 @@ def datahub_pre_execution(context):
)

task.log.info(f"Emitting Datahub dataJob {datajob}")
datajob.emit(emitter, callback=_make_emit_callback(task.log))
for mcp in datajob.generate_mcp(materialize_iolets=config.materialize_iolets):
emitter.emit(mcp, _make_emit_callback(task.log))

if config.capture_executions:
dpi = AirflowGenerator.run_datajob(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ def send_lineage_to_datahub(
entities_to_datajob_urn_list([let.urn for let in inlets])
)

datajob.emit(emitter)
for mcp in datajob.generate_mcp(materialize_iolets=config.materialize_iolets):
emitter.emit(mcp)
operator.log.info(f"Emitted from Lineage: {datajob}")

if config.capture_executions:
Expand Down
Loading