Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Sphinx Docstrings to Airflow Modules #2562

Merged
merged 2 commits into from
May 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions metadata-ingestion/src/datahub_provider/hooks/datahub.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,18 @@


class DatahubRestHook(BaseHook):
"""
Creates a DataHub Rest API connection used to send metadata to DataHub.
Takes the endpoint for your DataHub Rest API in the Server Endpoint(host) field.

URI example: ::

AIRFLOW_CONN_DATAHUB_REST_DEFAULT='datahub-rest://rest-endpoint'

:param datahub_rest_conn_id: Reference to the DataHub Rest connection.
:type datahub_rest_conn_id: str
"""

conn_name_attr = "datahub_rest_conn_id"
default_conn_name = "datahub_rest_default"
conn_type = "datahub_rest"
Expand Down Expand Up @@ -63,6 +75,18 @@ def emit_mces(self, mces: List[MetadataChangeEvent]) -> None:


class DatahubKafkaHook(BaseHook):
"""
Creates a DataHub Kafka connection used to send metadata to DataHub.
Takes your kafka broker in the Kafka Broker(host) field.

URI example: ::

AIRFLOW_CONN_DATAHUB_KAFKA_DEFAULT='datahub-kafka://kafka-broker'

:param datahub_kafka_conn_id: Reference to the DataHub Kafka connection.
:type datahub_kafka_conn_id: str
"""

conn_name_attr = "datahub_kafka_conn_id"
default_conn_name = "datahub_kafka_default"
conn_type = "datahub_kafka"
Expand Down Expand Up @@ -121,6 +145,14 @@ def callback(exc, msg):


class DatahubGenericHook(BaseHook):
"""
Emits Metadata Change Events using either the DatahubRestHook or the
DatahubKafkaHook. Set up a DataHub Rest or Kafka connection to use.

:param datahub_conn_id: Reference to the DataHub connection.
:type datahub_conn_id: str
"""

def __init__(self, datahub_conn_id: str) -> None:
super().__init__(*_default_hook_args)
self.datahub_conn_id = datahub_conn_id
Expand Down
20 changes: 20 additions & 0 deletions metadata-ingestion/src/datahub_provider/lineage/datahub.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,26 @@ def get_lineage_config() -> DatahubLineageConfig:


class DatahubLineageBackend(LineageBackend):
"""
Sends lineage data from tasks to DataHub.

Configurable via ``airflow.cfg`` as follows: ::

# For REST-based:
airflow connections add --conn-type 'datahub_rest' 'datahub_rest_default' --conn-host 'http://localhost:8080'
# For Kafka-based (standard Kafka sink config can be passed via extras):
airflow connections add --conn-type 'datahub_kafka' 'datahub_kafka_default' --conn-host 'broker:9092' --conn-extra '{}'

[lineage]
backend = datahub_provider.lineage.datahub.DatahubLineageBackend
datahub_kwargs = {
"datahub_conn_id": "datahub_rest_default",
"capture_ownership_info": true,
"capture_tags_info": true,
"graceful_exceptions": true }
# The above indentation is important!
"""

def __init__(self) -> None:
super().__init__()

Expand Down
12 changes: 12 additions & 0 deletions metadata-ingestion/src/datahub_provider/operators/datahub.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@


class DatahubBaseOperator(BaseOperator):
"""
The DatahubBaseOperator is used as a base operator all DataHub operators.
"""

ui_color = "#4398c8"

hook: Union[DatahubRestHook, DatahubKafkaHook]
Expand All @@ -33,6 +37,14 @@ def __init__( # type: ignore[no-untyped-def]


class DatahubEmitterOperator(DatahubBaseOperator):
"""
Emits a Metadata Change Event to DataHub using either a DataHub
Rest or Kafka connection.

:param datahub_conn_id: Reference to the DataHub Rest or Kafka Connection.
:type datahub_conn_id: str
"""

# See above for why these mypy type issues are ignored here.
@apply_defaults # type: ignore[misc]
def __init__( # type: ignore[no-untyped-def]
Expand Down