From f63e8a2e8cd929c2c173a783c866799e583b840c Mon Sep 17 00:00:00 2001 From: Shirshanka Das Date: Thu, 28 Apr 2022 20:59:50 -0700 Subject: [PATCH 1/2] fix(ingest): fwk - datahub_api should be initialized by datahub-rest sink config --- .../src/datahub/ingestion/run/pipeline.py | 2 +- .../tests/unit/test_pipeline.py | 56 +++++++++++++++++++ 2 files changed, 57 insertions(+), 1 deletion(-) diff --git a/metadata-ingestion/src/datahub/ingestion/run/pipeline.py b/metadata-ingestion/src/datahub/ingestion/run/pipeline.py index 72626372758cd..e5e77724867b4 100644 --- a/metadata-ingestion/src/datahub/ingestion/run/pipeline.py +++ b/metadata-ingestion/src/datahub/ingestion/run/pipeline.py @@ -88,7 +88,7 @@ def datahub_api_should_use_rest_sink_as_default( cls, v: Optional[DatahubClientConfig], values: Dict[str, Any], **kwargs: Any ) -> Optional[DatahubClientConfig]: if v is None: - if "sink" in values and "type" in values["sink"]: + if "sink" in values and hasattr(values["sink"], "type"): sink_type = values["sink"].type if sink_type == "datahub-rest": sink_config = values["sink"].config diff --git a/metadata-ingestion/tests/unit/test_pipeline.py b/metadata-ingestion/tests/unit/test_pipeline.py index eb1061384cfaf..2a88a76d652fa 100644 --- a/metadata-ingestion/tests/unit/test_pipeline.py +++ b/metadata-ingestion/tests/unit/test_pipeline.py @@ -65,6 +65,62 @@ def test_configure_without_sink(self, mock_source, mock_test_connection): "token": "", } + @freeze_time(FROZEN_TIME) + @patch("datahub.emitter.rest_emitter.DatahubRestEmitter.test_connection") + @patch("datahub.ingestion.source.kafka.KafkaSource.get_workunits", autospec=True) + def test_configure_with_rest_sink_initializes_graph( + self, mock_source, mock_test_connection + ): + mock_test_connection.return_value = {"nodeCode": True} + pipeline = Pipeline.create( + { + "source": { + "type": "kafka", + "config": {"connection": {"bootstrap": "localhost:9092"}}, + }, + "sink": { + "type": "datahub-rest", + "config": { + "server": "http://localhost:8081", + "token": "foo", + }, + }, + } + ) + # assert that the default sink config is for a DatahubRestSink + assert isinstance(pipeline.config.sink, DynamicTypedConfig) + assert pipeline.config.sink.type == "datahub-rest" + assert pipeline.config.sink.config == { + "server": "http://localhost:8081", + "token": "foo", + } + assert pipeline.ctx.graph is not None, "DataHubGraph should be initialized" + assert pipeline.ctx.graph.config.server == pipeline.config.sink.config["server"] + assert pipeline.ctx.graph.config.token == pipeline.config.sink.config["token"] + + @freeze_time(FROZEN_TIME) + @patch("datahub.ingestion.source.kafka.KafkaSource.get_workunits", autospec=True) + def test_configure_with_file_sink_does_not_init_graph(self, mock_source): + pipeline = Pipeline.create( + { + "source": { + "type": "kafka", + "config": {"connection": {"bootstrap": "localhost:9092"}}, + }, + "sink": { + "type": "file", + "config": { + "filename": "test.json", + }, + }, + } + ) + # assert that the default sink config is for a DatahubRestSink + assert isinstance(pipeline.config.sink, DynamicTypedConfig) + assert pipeline.config.sink.type == "file" + assert pipeline.config.sink.config == {"filename": "test.json"} + assert pipeline.ctx.graph is None, "DataHubGraph should not be initialized" + @freeze_time(FROZEN_TIME) def test_run_including_fake_transformation(self): pipeline = Pipeline.create( From 0f441343105440e21c720521177bc177731eaaf8 Mon Sep 17 00:00:00 2001 From: Shirshanka Das Date: Thu, 28 Apr 2022 21:52:12 -0700 Subject: [PATCH 2/2] fixing mocks --- .../tests/unit/test_pipeline.py | 37 ++++++++++++------- 1 file changed, 23 insertions(+), 14 deletions(-) diff --git a/metadata-ingestion/tests/unit/test_pipeline.py b/metadata-ingestion/tests/unit/test_pipeline.py index 2a88a76d652fa..827adc8686ce6 100644 --- a/metadata-ingestion/tests/unit/test_pipeline.py +++ b/metadata-ingestion/tests/unit/test_pipeline.py @@ -44,16 +44,20 @@ def test_configure(self, mock_sink, mock_source): mock_sink.assert_called_once() @freeze_time(FROZEN_TIME) - @patch("datahub.emitter.rest_emitter.DatahubRestEmitter.test_connection") - @patch("datahub.ingestion.source.kafka.KafkaSource.get_workunits", autospec=True) - def test_configure_without_sink(self, mock_source, mock_test_connection): - - mock_test_connection.return_value = {"noCode": True} + @patch( + "datahub.emitter.rest_emitter.DatahubRestEmitter.test_connection", + return_value={"noCode": True}, + ) + @patch( + "datahub.ingestion.graph.client.DataHubGraph.get_config", + return_value={"noCode": True}, + ) + def test_configure_without_sink(self, mock_emitter, mock_graph): pipeline = Pipeline.create( { "source": { - "type": "kafka", - "config": {"connection": {"bootstrap": "localhost:9092"}}, + "type": "file", + "config": {"filename": "test_file.json"}, }, } ) @@ -66,22 +70,27 @@ def test_configure_without_sink(self, mock_source, mock_test_connection): } @freeze_time(FROZEN_TIME) - @patch("datahub.emitter.rest_emitter.DatahubRestEmitter.test_connection") - @patch("datahub.ingestion.source.kafka.KafkaSource.get_workunits", autospec=True) + @patch( + "datahub.emitter.rest_emitter.DatahubRestEmitter.test_connection", + return_value={"noCode": True}, + ) + @patch( + "datahub.ingestion.graph.client.DataHubGraph.get_config", + return_value={"noCode": True}, + ) def test_configure_with_rest_sink_initializes_graph( self, mock_source, mock_test_connection ): - mock_test_connection.return_value = {"nodeCode": True} pipeline = Pipeline.create( { "source": { - "type": "kafka", - "config": {"connection": {"bootstrap": "localhost:9092"}}, + "type": "file", + "config": {"filename": "test_events.json"}, }, "sink": { "type": "datahub-rest", "config": { - "server": "http://localhost:8081", + "server": "http://somehost.someplace.some:8080", "token": "foo", }, }, @@ -91,7 +100,7 @@ def test_configure_with_rest_sink_initializes_graph( assert isinstance(pipeline.config.sink, DynamicTypedConfig) assert pipeline.config.sink.type == "datahub-rest" assert pipeline.config.sink.config == { - "server": "http://localhost:8081", + "server": "http://somehost.someplace.some:8080", "token": "foo", } assert pipeline.ctx.graph is not None, "DataHubGraph should be initialized"