diff --git a/metadata-ingestion/src/datahub/ingestion/run/pipeline.py b/metadata-ingestion/src/datahub/ingestion/run/pipeline.py index 72626372758cd..e5e77724867b4 100644 --- a/metadata-ingestion/src/datahub/ingestion/run/pipeline.py +++ b/metadata-ingestion/src/datahub/ingestion/run/pipeline.py @@ -88,7 +88,7 @@ def datahub_api_should_use_rest_sink_as_default( cls, v: Optional[DatahubClientConfig], values: Dict[str, Any], **kwargs: Any ) -> Optional[DatahubClientConfig]: if v is None: - if "sink" in values and "type" in values["sink"]: + if "sink" in values and hasattr(values["sink"], "type"): sink_type = values["sink"].type if sink_type == "datahub-rest": sink_config = values["sink"].config diff --git a/metadata-ingestion/tests/unit/test_pipeline.py b/metadata-ingestion/tests/unit/test_pipeline.py index eb1061384cfaf..827adc8686ce6 100644 --- a/metadata-ingestion/tests/unit/test_pipeline.py +++ b/metadata-ingestion/tests/unit/test_pipeline.py @@ -44,16 +44,20 @@ def test_configure(self, mock_sink, mock_source): mock_sink.assert_called_once() @freeze_time(FROZEN_TIME) - @patch("datahub.emitter.rest_emitter.DatahubRestEmitter.test_connection") - @patch("datahub.ingestion.source.kafka.KafkaSource.get_workunits", autospec=True) - def test_configure_without_sink(self, mock_source, mock_test_connection): - - mock_test_connection.return_value = {"noCode": True} + @patch( + "datahub.emitter.rest_emitter.DatahubRestEmitter.test_connection", + return_value={"noCode": True}, + ) + @patch( + "datahub.ingestion.graph.client.DataHubGraph.get_config", + return_value={"noCode": True}, + ) + def test_configure_without_sink(self, mock_emitter, mock_graph): pipeline = Pipeline.create( { "source": { - "type": "kafka", - "config": {"connection": {"bootstrap": "localhost:9092"}}, + "type": "file", + "config": {"filename": "test_file.json"}, }, } ) @@ -65,6 +69,67 @@ def test_configure_without_sink(self, mock_source, mock_test_connection): "token": "", } + @freeze_time(FROZEN_TIME) + @patch( + "datahub.emitter.rest_emitter.DatahubRestEmitter.test_connection", + return_value={"noCode": True}, + ) + @patch( + "datahub.ingestion.graph.client.DataHubGraph.get_config", + return_value={"noCode": True}, + ) + def test_configure_with_rest_sink_initializes_graph( + self, mock_source, mock_test_connection + ): + pipeline = Pipeline.create( + { + "source": { + "type": "file", + "config": {"filename": "test_events.json"}, + }, + "sink": { + "type": "datahub-rest", + "config": { + "server": "http://somehost.someplace.some:8080", + "token": "foo", + }, + }, + } + ) + # assert that the default sink config is for a DatahubRestSink + assert isinstance(pipeline.config.sink, DynamicTypedConfig) + assert pipeline.config.sink.type == "datahub-rest" + assert pipeline.config.sink.config == { + "server": "http://somehost.someplace.some:8080", + "token": "foo", + } + assert pipeline.ctx.graph is not None, "DataHubGraph should be initialized" + assert pipeline.ctx.graph.config.server == pipeline.config.sink.config["server"] + assert pipeline.ctx.graph.config.token == pipeline.config.sink.config["token"] + + @freeze_time(FROZEN_TIME) + @patch("datahub.ingestion.source.kafka.KafkaSource.get_workunits", autospec=True) + def test_configure_with_file_sink_does_not_init_graph(self, mock_source): + pipeline = Pipeline.create( + { + "source": { + "type": "kafka", + "config": {"connection": {"bootstrap": "localhost:9092"}}, + }, + "sink": { + "type": "file", + "config": { + "filename": "test.json", + }, + }, + } + ) + # assert that the default sink config is for a DatahubRestSink + assert isinstance(pipeline.config.sink, DynamicTypedConfig) + assert pipeline.config.sink.type == "file" + assert pipeline.config.sink.config == {"filename": "test.json"} + assert pipeline.ctx.graph is None, "DataHubGraph should not be initialized" + @freeze_time(FROZEN_TIME) def test_run_including_fake_transformation(self): pipeline = Pipeline.create(