Skip to content

Commit

Permalink
GH-374: Support bootstrap server property as a list
Browse files Browse the repository at this point in the history
Resolves #374

* support read bootstrap server config when it is defined as list

* update copyright year of the affected classes to 2024
  • Loading branch information
emmanuelsilva authored Jan 10, 2024
1 parent 57b7085 commit 9b09225
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 5 deletions.
12 changes: 10 additions & 2 deletions src/main/java/reactor/kafka/receiver/ReceiverOptions.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2024 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -570,7 +570,15 @@ default String clientId() {
*/
@NonNull
default String bootstrapServers() {
return (String) Objects.requireNonNull(consumerProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
Object bootstrapServers = Objects.requireNonNull(consumerProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));

if (bootstrapServers instanceof List) {
@SuppressWarnings("unchecked")
List<String> listOfBootstrapServers = (List<String>) bootstrapServers;
return String.join(",", listOfBootstrapServers);
}

return (String) bootstrapServers;
}

/**
Expand Down
13 changes: 11 additions & 2 deletions src/main/java/reactor/kafka/sender/SenderOptions.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2024 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,6 +19,7 @@
import java.time.Duration;
import java.util.Map;
import java.util.Objects;
import java.util.List;
import java.util.Properties;

import javax.naming.AuthenticationException;
Expand Down Expand Up @@ -286,7 +287,15 @@ default String transactionalId() {
*/
@NonNull
default String bootstrapServers() {
return (String) Objects.requireNonNull(producerProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
Object bootstrapServers = Objects.requireNonNull(producerProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));

if (bootstrapServers instanceof List) {
@SuppressWarnings("unchecked")
List<String> listOfBootstrapServers = (List<String>) bootstrapServers;
return String.join(",", listOfBootstrapServers);
}

return (String) bootstrapServers;
}

@NonNull
Expand Down
66 changes: 66 additions & 0 deletions src/test/java/reactor/kafka/receiver/ReceiverOptionsTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright (c) 2016-2024 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package reactor.kafka.receiver;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.junit.Test;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.junit.Assert.assertEquals;

public class ReceiverOptionsTest {

@Test
public void getBootstrapServersFromSingleServerList() {
Map<String, Object> producerProperties = new HashMap<>();
producerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Collections.singletonList("localhost:9092"));

ReceiverOptions<Integer, String> senderOptions = ReceiverOptions.create(producerProperties);
String bootstrapServers = senderOptions.bootstrapServers();

assertEquals("localhost:9092", bootstrapServers);
}

@Test
public void getBootstrapServersFromMultipleServersList() {
Map<String, Object> producerProperties = new HashMap<>();
List<String> serverList = Arrays.asList("localhost:9092", "localhost:9093", "localhost:9094");
producerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, serverList);

ReceiverOptions<Integer, String> senderOptions = ReceiverOptions.create(producerProperties);
String bootstrapServers = senderOptions.bootstrapServers();

assertEquals("localhost:9092,localhost:9093,localhost:9094", bootstrapServers);
}

@Test
public void getBootstrapServersFromString() {
Map<String, Object> producerProperties = new HashMap<>();
producerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

ReceiverOptions<Integer, String> senderOptions = ReceiverOptions.create(producerProperties);
String bootstrapServers = senderOptions.bootstrapServers();

assertEquals("localhost:9092", bootstrapServers);
}

}
40 changes: 39 additions & 1 deletion src/test/java/reactor/kafka/sender/SenderOptionsTest.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2020-2024 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,10 +16,14 @@

package reactor.kafka.sender;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.junit.Test;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.junit.Assert.assertEquals;
Expand All @@ -33,4 +37,38 @@ public void senderOptionsCloseTimeout() {
assertEquals(Duration.ofMillis(100), senderOptions.closeTimeout(Duration.ofMillis(100)).closeTimeout());
}

@Test
public void getBootstrapServersFromSingleServerList() {
Map<String, Object> producerProperties = new HashMap<>();
producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Collections.singletonList("localhost:9092"));

SenderOptions<Integer, String> senderOptions = SenderOptions.create(producerProperties);
String bootstrapServers = senderOptions.bootstrapServers();

assertEquals("localhost:9092", bootstrapServers);
}

@Test
public void getBootstrapServersFromMultipleServersList() {
Map<String, Object> producerProperties = new HashMap<>();
List<String> serverList = Arrays.asList("localhost:9092", "localhost:9093", "localhost:9094");
producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, serverList);

SenderOptions<Integer, String> senderOptions = SenderOptions.create(producerProperties);
String bootstrapServers = senderOptions.bootstrapServers();

assertEquals("localhost:9092,localhost:9093,localhost:9094", bootstrapServers);
}

@Test
public void getBootstrapServersFromString() {
Map<String, Object> producerProperties = new HashMap<>();
producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

SenderOptions<Integer, String> senderOptions = SenderOptions.create(producerProperties);
String bootstrapServers = senderOptions.bootstrapServers();

assertEquals("localhost:9092", bootstrapServers);
}

}

0 comments on commit 9b09225

Please sign in to comment.