From dcec7f9043b1d97d19176636f3ca63bc6554e4f8 Mon Sep 17 00:00:00 2001 From: Emmanuel Silva Date: Wed, 20 Dec 2023 15:10:14 -0300 Subject: [PATCH 1/2] support read bootstrap server config when it is defined as list --- .../kafka/receiver/ReceiverOptions.java | 10 ++- .../reactor/kafka/sender/SenderOptions.java | 11 +++- .../kafka/receiver/ReceiverOptionsTest.java | 66 +++++++++++++++++++ .../kafka/sender/SenderOptionsTest.java | 38 +++++++++++ 4 files changed, 123 insertions(+), 2 deletions(-) create mode 100644 src/test/java/reactor/kafka/receiver/ReceiverOptionsTest.java diff --git a/src/main/java/reactor/kafka/receiver/ReceiverOptions.java b/src/main/java/reactor/kafka/receiver/ReceiverOptions.java index 6ee4dcfd..b443e975 100644 --- a/src/main/java/reactor/kafka/receiver/ReceiverOptions.java +++ b/src/main/java/reactor/kafka/receiver/ReceiverOptions.java @@ -570,7 +570,15 @@ default String clientId() { */ @NonNull default String bootstrapServers() { - return (String) Objects.requireNonNull(consumerProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)); + Object bootstrapServers = Objects.requireNonNull(consumerProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)); + + if (bootstrapServers instanceof List) { + @SuppressWarnings("unchecked") + List listOfBootstrapServers = (List) bootstrapServers; + return String.join(",", listOfBootstrapServers); + } + + return (String) bootstrapServers; } /** diff --git a/src/main/java/reactor/kafka/sender/SenderOptions.java b/src/main/java/reactor/kafka/sender/SenderOptions.java index e72bc706..fbe75952 100644 --- a/src/main/java/reactor/kafka/sender/SenderOptions.java +++ b/src/main/java/reactor/kafka/sender/SenderOptions.java @@ -19,6 +19,7 @@ import java.time.Duration; import java.util.Map; import java.util.Objects; +import java.util.List; import java.util.Properties; import javax.naming.AuthenticationException; @@ -286,7 +287,15 @@ default String transactionalId() { */ @NonNull default String bootstrapServers() { - return (String) Objects.requireNonNull(producerProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); + Object bootstrapServers = Objects.requireNonNull(producerProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); + + if (bootstrapServers instanceof List) { + @SuppressWarnings("unchecked") + List listOfBootstrapServers = (List) bootstrapServers; + return String.join(",", listOfBootstrapServers); + } + + return (String) bootstrapServers; } @NonNull diff --git a/src/test/java/reactor/kafka/receiver/ReceiverOptionsTest.java b/src/test/java/reactor/kafka/receiver/ReceiverOptionsTest.java new file mode 100644 index 00000000..6191dc08 --- /dev/null +++ b/src/test/java/reactor/kafka/receiver/ReceiverOptionsTest.java @@ -0,0 +1,66 @@ +/* + * Copyright (c) 2020-2021 VMware Inc. or its affiliates, All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package reactor.kafka.receiver; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; + +public class ReceiverOptionsTest { + + @Test + public void getBootstrapServersFromSingleServerList() { + Map producerProperties = new HashMap<>(); + producerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Collections.singletonList("localhost:9092")); + + ReceiverOptions senderOptions = ReceiverOptions.create(producerProperties); + String bootstrapServers = senderOptions.bootstrapServers(); + + assertEquals("localhost:9092", bootstrapServers); + } + + @Test + public void getBootstrapServersFromMultipleServersList() { + Map producerProperties = new HashMap<>(); + List serverList = Arrays.asList("localhost:9092", "localhost:9093", "localhost:9094"); + producerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, serverList); + + ReceiverOptions senderOptions = ReceiverOptions.create(producerProperties); + String bootstrapServers = senderOptions.bootstrapServers(); + + assertEquals("localhost:9092,localhost:9093,localhost:9094", bootstrapServers); + } + + @Test + public void getBootstrapServersFromString() { + Map producerProperties = new HashMap<>(); + producerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + + ReceiverOptions senderOptions = ReceiverOptions.create(producerProperties); + String bootstrapServers = senderOptions.bootstrapServers(); + + assertEquals("localhost:9092", bootstrapServers); + } + +} diff --git a/src/test/java/reactor/kafka/sender/SenderOptionsTest.java b/src/test/java/reactor/kafka/sender/SenderOptionsTest.java index a72a62bd..3a13133c 100644 --- a/src/test/java/reactor/kafka/sender/SenderOptionsTest.java +++ b/src/test/java/reactor/kafka/sender/SenderOptionsTest.java @@ -16,10 +16,14 @@ package reactor.kafka.sender; +import org.apache.kafka.clients.producer.ProducerConfig; import org.junit.Test; import java.time.Duration; +import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; +import java.util.List; import java.util.Map; import static org.junit.Assert.assertEquals; @@ -33,4 +37,38 @@ public void senderOptionsCloseTimeout() { assertEquals(Duration.ofMillis(100), senderOptions.closeTimeout(Duration.ofMillis(100)).closeTimeout()); } + @Test + public void getBootstrapServersFromSingleServerList() { + Map producerProperties = new HashMap<>(); + producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Collections.singletonList("localhost:9092")); + + SenderOptions senderOptions = SenderOptions.create(producerProperties); + String bootstrapServers = senderOptions.bootstrapServers(); + + assertEquals("localhost:9092", bootstrapServers); + } + + @Test + public void getBootstrapServersFromMultipleServersList() { + Map producerProperties = new HashMap<>(); + List serverList = Arrays.asList("localhost:9092", "localhost:9093", "localhost:9094"); + producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, serverList); + + SenderOptions senderOptions = SenderOptions.create(producerProperties); + String bootstrapServers = senderOptions.bootstrapServers(); + + assertEquals("localhost:9092,localhost:9093,localhost:9094", bootstrapServers); + } + + @Test + public void getBootstrapServersFromString() { + Map producerProperties = new HashMap<>(); + producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + + SenderOptions senderOptions = SenderOptions.create(producerProperties); + String bootstrapServers = senderOptions.bootstrapServers(); + + assertEquals("localhost:9092", bootstrapServers); + } + } From d441c545edaddee67d0922b0496cd06cf5ca251d Mon Sep 17 00:00:00 2001 From: Emmanuel Silva Date: Wed, 10 Jan 2024 09:34:02 -0300 Subject: [PATCH 2/2] update copyright year of the affected classes to 2024 --- src/main/java/reactor/kafka/receiver/ReceiverOptions.java | 2 +- src/main/java/reactor/kafka/sender/SenderOptions.java | 2 +- src/test/java/reactor/kafka/receiver/ReceiverOptionsTest.java | 2 +- src/test/java/reactor/kafka/sender/SenderOptionsTest.java | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/main/java/reactor/kafka/receiver/ReceiverOptions.java b/src/main/java/reactor/kafka/receiver/ReceiverOptions.java index b443e975..478dd8c8 100644 --- a/src/main/java/reactor/kafka/receiver/ReceiverOptions.java +++ b/src/main/java/reactor/kafka/receiver/ReceiverOptions.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2016-2024 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/java/reactor/kafka/sender/SenderOptions.java b/src/main/java/reactor/kafka/sender/SenderOptions.java index fbe75952..d98e64be 100644 --- a/src/main/java/reactor/kafka/sender/SenderOptions.java +++ b/src/main/java/reactor/kafka/sender/SenderOptions.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2016-2024 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/test/java/reactor/kafka/receiver/ReceiverOptionsTest.java b/src/test/java/reactor/kafka/receiver/ReceiverOptionsTest.java index 6191dc08..b04199e0 100644 --- a/src/test/java/reactor/kafka/receiver/ReceiverOptionsTest.java +++ b/src/test/java/reactor/kafka/receiver/ReceiverOptionsTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2021 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2016-2024 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/test/java/reactor/kafka/sender/SenderOptionsTest.java b/src/test/java/reactor/kafka/sender/SenderOptionsTest.java index 3a13133c..d11d66fa 100644 --- a/src/test/java/reactor/kafka/sender/SenderOptionsTest.java +++ b/src/test/java/reactor/kafka/sender/SenderOptionsTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2021 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2020-2024 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License.