Skip to content

Commit

Permalink
fix(ingest): fwk - datahub_api should be initialized by datahub-rest … (
Browse files Browse the repository at this point in the history
  • Loading branch information
shirshanka authored Apr 29, 2022
1 parent 91f166c commit d0eb772
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 8 deletions.
2 changes: 1 addition & 1 deletion metadata-ingestion/src/datahub/ingestion/run/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def datahub_api_should_use_rest_sink_as_default(
cls, v: Optional[DatahubClientConfig], values: Dict[str, Any], **kwargs: Any
) -> Optional[DatahubClientConfig]:
if v is None:
if "sink" in values and "type" in values["sink"]:
if "sink" in values and hasattr(values["sink"], "type"):
sink_type = values["sink"].type
if sink_type == "datahub-rest":
sink_config = values["sink"].config
Expand Down
79 changes: 72 additions & 7 deletions metadata-ingestion/tests/unit/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,20 @@ def test_configure(self, mock_sink, mock_source):
mock_sink.assert_called_once()

@freeze_time(FROZEN_TIME)
@patch("datahub.emitter.rest_emitter.DatahubRestEmitter.test_connection")
@patch("datahub.ingestion.source.kafka.KafkaSource.get_workunits", autospec=True)
def test_configure_without_sink(self, mock_source, mock_test_connection):

mock_test_connection.return_value = {"noCode": True}
@patch(
"datahub.emitter.rest_emitter.DatahubRestEmitter.test_connection",
return_value={"noCode": True},
)
@patch(
"datahub.ingestion.graph.client.DataHubGraph.get_config",
return_value={"noCode": True},
)
def test_configure_without_sink(self, mock_emitter, mock_graph):
pipeline = Pipeline.create(
{
"source": {
"type": "kafka",
"config": {"connection": {"bootstrap": "localhost:9092"}},
"type": "file",
"config": {"filename": "test_file.json"},
},
}
)
Expand All @@ -65,6 +69,67 @@ def test_configure_without_sink(self, mock_source, mock_test_connection):
"token": "",
}

@freeze_time(FROZEN_TIME)
@patch(
"datahub.emitter.rest_emitter.DatahubRestEmitter.test_connection",
return_value={"noCode": True},
)
@patch(
"datahub.ingestion.graph.client.DataHubGraph.get_config",
return_value={"noCode": True},
)
def test_configure_with_rest_sink_initializes_graph(
self, mock_source, mock_test_connection
):
pipeline = Pipeline.create(
{
"source": {
"type": "file",
"config": {"filename": "test_events.json"},
},
"sink": {
"type": "datahub-rest",
"config": {
"server": "http://somehost.someplace.some:8080",
"token": "foo",
},
},
}
)
# assert that the default sink config is for a DatahubRestSink
assert isinstance(pipeline.config.sink, DynamicTypedConfig)
assert pipeline.config.sink.type == "datahub-rest"
assert pipeline.config.sink.config == {
"server": "http://somehost.someplace.some:8080",
"token": "foo",
}
assert pipeline.ctx.graph is not None, "DataHubGraph should be initialized"
assert pipeline.ctx.graph.config.server == pipeline.config.sink.config["server"]
assert pipeline.ctx.graph.config.token == pipeline.config.sink.config["token"]

@freeze_time(FROZEN_TIME)
@patch("datahub.ingestion.source.kafka.KafkaSource.get_workunits", autospec=True)
def test_configure_with_file_sink_does_not_init_graph(self, mock_source):
pipeline = Pipeline.create(
{
"source": {
"type": "kafka",
"config": {"connection": {"bootstrap": "localhost:9092"}},
},
"sink": {
"type": "file",
"config": {
"filename": "test.json",
},
},
}
)
# assert that the default sink config is for a DatahubRestSink
assert isinstance(pipeline.config.sink, DynamicTypedConfig)
assert pipeline.config.sink.type == "file"
assert pipeline.config.sink.config == {"filename": "test.json"}
assert pipeline.ctx.graph is None, "DataHubGraph should not be initialized"

@freeze_time(FROZEN_TIME)
def test_run_including_fake_transformation(self):
pipeline = Pipeline.create(
Expand Down

0 comments on commit d0eb772

Please sign in to comment.