Skip to content

Commit

Permalink
feat(ingest/airflow): support disabling iolet materialization (#10305)
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored Apr 18, 2024
1 parent 529710a commit 76b5783
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 7 deletions.
6 changes: 4 additions & 2 deletions docs/lineage/airflow.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ enabled = True # default
| capture_ownership_info | true | Extract DAG ownership. |
| capture_tags_info | true | Extract DAG tags. |
| capture_executions | true | Extract task runs and success/failure statuses. This will show up in DataHub "Runs" tab. |
| materialize_iolets | true | Create or un-soft-delete all entities referenced in lineage. |
| enable_extractors | true | Enable automatic lineage extraction. |
| disable_openlineage_plugin | true | Disable the OpenLineage plugin to avoid duplicative processing. |
| log_level | _no change_ | [debug] Set the log level for the plugin. |
Expand Down Expand Up @@ -135,8 +136,9 @@ conn_id = datahub_rest_default # or datahub_kafka_default
| capture_ownership_info | true | If true, the owners field of the DAG will be capture as a DataHub corpuser. |
| capture_tags_info | true | If true, the tags field of the DAG will be captured as DataHub tags. |
| capture_executions | true | If true, we'll capture task runs in DataHub in addition to DAG definitions. |
| datajob_url_link | taskinstance | If taskinstance, the datajob url will be taskinstance link on airflow. It can also be grid.
|
| materialize_iolets | true | Create or un-soft-delete all entities referenced in lineage. |
| datajob_url_link | taskinstance | If taskinstance, the datajob url will be taskinstance link on airflow. It can also be grid. |
| |
| graceful_exceptions | true | If set to true, most runtime errors in the lineage backend will be suppressed and will not cause the overall task to fail. Note that configuration issues will still throw exceptions. |

#### Validate that the plugin is working
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ class DatahubLineageConfig(ConfigModel):
# If true, the tags field of the DAG will be captured as DataHub tags.
capture_tags_info: bool = True

# If true (default), we'll materialize and un-soft-delete any urns
# referenced by inlets or outlets.
materialize_iolets: bool = True

capture_executions: bool = False

enable_extractors: bool = True
Expand Down Expand Up @@ -67,6 +71,7 @@ def get_lineage_config() -> DatahubLineageConfig:
"datahub", "capture_ownership_info", fallback=True
)
capture_executions = conf.get("datahub", "capture_executions", fallback=True)
materialize_iolets = conf.get("datahub", "materialize_iolets", fallback=True)
enable_extractors = conf.get("datahub", "enable_extractors", fallback=True)
log_level = conf.get("datahub", "log_level", fallback=None)
debug_emitter = conf.get("datahub", "debug_emitter", fallback=False)
Expand All @@ -84,6 +89,7 @@ def get_lineage_config() -> DatahubLineageConfig:
capture_ownership_info=capture_ownership_info,
capture_tags_info=capture_tags_info,
capture_executions=capture_executions,
materialize_iolets=materialize_iolets,
enable_extractors=enable_extractors,
log_level=log_level,
debug_emitter=debug_emitter,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,10 @@ def on_task_instance_running(

# TODO: Add handling for Airflow mapped tasks using task_instance.map_index

datajob.emit(self.emitter, callback=self._make_emit_callback())
for mcp in datajob.generate_mcp(
materialize_iolets=self.config.materialize_iolets
):
self.emitter.emit(mcp, self._make_emit_callback())
logger.debug(f"Emitted DataHub Datajob start: {datajob}")

if self.config.capture_executions:
Expand Down Expand Up @@ -430,7 +433,10 @@ def on_task_instance_finish(
# Add lineage info.
self._extract_lineage(datajob, dagrun, task, task_instance, complete=True)

datajob.emit(self.emitter, callback=self._make_emit_callback())
for mcp in datajob.generate_mcp(
materialize_iolets=self.config.materialize_iolets
):
self.emitter.emit(mcp, self._make_emit_callback())
logger.debug(f"Emitted DataHub Datajob finish w/ status {status}: {datajob}")

if self.config.capture_executions:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,8 @@ def datahub_task_status_callback(context, status):
)

task.log.info(f"Emitting Datahub Datajob: {datajob}")
datajob.emit(emitter, callback=_make_emit_callback(task.log))
for mcp in datajob.generate_mcp(materialize_iolets=config.materialize_iolets):
emitter.emit(mcp, _make_emit_callback(task.log))

if config.capture_executions:
dpi = AirflowGenerator.run_datajob(
Expand Down Expand Up @@ -200,7 +201,8 @@ def datahub_pre_execution(context):
)

task.log.info(f"Emitting Datahub dataJob {datajob}")
datajob.emit(emitter, callback=_make_emit_callback(task.log))
for mcp in datajob.generate_mcp(materialize_iolets=config.materialize_iolets):
emitter.emit(mcp, _make_emit_callback(task.log))

if config.capture_executions:
dpi = AirflowGenerator.run_datajob(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ def send_lineage_to_datahub(
entities_to_datajob_urn_list([let.urn for let in inlets])
)

datajob.emit(emitter)
for mcp in datajob.generate_mcp(materialize_iolets=config.materialize_iolets):
emitter.emit(mcp)
operator.log.info(f"Emitted from Lineage: {datajob}")

if config.capture_executions:
Expand Down

0 comments on commit 76b5783

Please sign in to comment.