Skip to content

Commit

Permalink
fix(citrus-kafka): delombok module
Browse files Browse the repository at this point in the history
  • Loading branch information
bbortt committed Oct 1, 2024
1 parent c6d6f3e commit 10f2c13
Show file tree
Hide file tree
Showing 10 changed files with 417 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

package org.citrusframework.kafka.endpoint;

import lombok.Getter;
import lombok.Setter;
import org.citrusframework.context.TestContext;
import org.citrusframework.message.Message;
import org.citrusframework.messaging.AbstractSelectiveMessageConsumer;
Expand All @@ -41,8 +39,6 @@
import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
import static org.citrusframework.kafka.message.KafkaMessageHeaders.KAFKA_PREFIX;

@Setter
@Getter
public class KafkaConsumer extends AbstractSelectiveMessageConsumer {

private static final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);
Expand All @@ -57,6 +53,14 @@ public KafkaConsumer(String name, KafkaEndpointConfiguration endpointConfigurati
this.consumer = createConsumer();
}

public org.apache.kafka.clients.consumer.KafkaConsumer<Object, Object> getConsumer() {
return consumer;
}

public void setConsumer(org.apache.kafka.clients.consumer.KafkaConsumer<Object, Object> consumer) {
this.consumer = consumer;
}

@Override
public Message receive(TestContext testContext, long timeout) {
logger.debug("Receiving single message");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,7 @@

package org.citrusframework.kafka.endpoint;

import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
import lombok.Builder;
import lombok.Getter;
import org.apache.commons.lang3.RandomStringUtils;
import org.citrusframework.actions.ReceiveMessageAction;
import org.citrusframework.common.ShutdownPhase;
Expand All @@ -29,7 +26,6 @@

import static java.lang.Boolean.TRUE;
import static java.util.Objects.nonNull;
import static lombok.AccessLevel.PACKAGE;
import static org.citrusframework.actions.ReceiveMessageAction.Builder.receive;
import static org.citrusframework.kafka.endpoint.selector.KafkaMessageByHeaderSelector.kafkaHeaderEquals;
import static org.citrusframework.kafka.message.KafkaMessageHeaders.KAFKA_PREFIX;
Expand All @@ -42,7 +38,6 @@
*
* @since 2.8
*/
@Getter(PACKAGE)
public class KafkaEndpoint extends AbstractEndpoint implements ShutdownPhase {

/**
Expand All @@ -51,6 +46,10 @@ public class KafkaEndpoint extends AbstractEndpoint implements ShutdownPhase {
private @Nullable KafkaProducer kafkaProducer;
private @Nullable KafkaConsumer kafkaConsumer;

public static SimpleKafkaEndpointBuilder builder() {
return new SimpleKafkaEndpointBuilder();
}

/**
* Default constructor initializing endpoint configuration.
*/
Expand All @@ -65,8 +64,7 @@ public KafkaEndpoint(KafkaEndpointConfiguration endpointConfiguration) {
super(endpointConfiguration);
}

@Builder
public static @Nonnull KafkaEndpoint newKafkaEndpoint(
static KafkaEndpoint newKafkaEndpoint(
@Nullable org.apache.kafka.clients.consumer.KafkaConsumer<Object, Object> kafkaConsumer,
@Nullable org.apache.kafka.clients.producer.KafkaProducer<Object, Object> kafkaProducer,
@Nullable Boolean randomConsumerGroup,
Expand Down Expand Up @@ -101,6 +99,16 @@ public KafkaEndpoint(KafkaEndpointConfiguration endpointConfiguration) {
return kafkaEndpoint;
}

@Nullable
KafkaProducer getKafkaProducer() {
return kafkaProducer;
}

@Nullable
KafkaConsumer getKafkaConsumer() {
return kafkaConsumer;
}

@Override
public KafkaConsumer createConsumer() {
if (kafkaConsumer == null) {
Expand Down Expand Up @@ -141,4 +149,48 @@ public ReceiveMessageAction.ReceiveMessageActionBuilderSupport findKafkaEventHea
)
.getMessageBuilderSupport();
}

public static class SimpleKafkaEndpointBuilder {

private org.apache.kafka.clients.consumer.KafkaConsumer<Object, Object> kafkaConsumer;
private org.apache.kafka.clients.producer.KafkaProducer<Object, Object> kafkaProducer;
private Boolean randomConsumerGroup;
private String server;
private Long timeout;
private String topic;

public SimpleKafkaEndpointBuilder kafkaConsumer(org.apache.kafka.clients.consumer.KafkaConsumer<Object, Object> kafkaConsumer) {
this.kafkaConsumer = kafkaConsumer;
return this;
}

public SimpleKafkaEndpointBuilder kafkaProducer(org.apache.kafka.clients.producer.KafkaProducer<Object, Object> kafkaProducer) {
this.kafkaProducer = kafkaProducer;
return this;
}

public SimpleKafkaEndpointBuilder randomConsumerGroup(Boolean randomConsumerGroup) {
this.randomConsumerGroup = randomConsumerGroup;
return this;
}

public SimpleKafkaEndpointBuilder server(String server) {
this.server = server;
return this;
}

public SimpleKafkaEndpointBuilder timeout(Long timeout) {
this.timeout = timeout;
return this;
}

public SimpleKafkaEndpointBuilder topic(String topic) {
this.topic = topic;
return this;
}

public KafkaEndpoint build() {
return KafkaEndpoint.newKafkaEndpoint(kafkaConsumer, kafkaProducer, randomConsumerGroup, server, timeout, topic);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

package org.citrusframework.kafka.endpoint;

import lombok.Getter;
import lombok.Setter;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
Expand All @@ -30,8 +28,6 @@
import java.util.HashMap;
import java.util.Map;

@Getter
@Setter
public class KafkaEndpointConfiguration extends AbstractPollableEndpointConfiguration {

/**
Expand Down Expand Up @@ -97,4 +93,132 @@ public class KafkaEndpointConfiguration extends AbstractPollableEndpointConfigur
* Topic partition
*/
private int partition = 0;

public String getClientId() {
return clientId;
}

public void setClientId(String clientId) {
this.clientId = clientId;
}

public String getConsumerGroup() {
return consumerGroup;
}

public void setConsumerGroup(String consumerGroup) {
this.consumerGroup = consumerGroup;
}

public String getTopic() {
return topic;
}

public void setTopic(String topic) {
this.topic = topic;
}

public String getServer() {
return server;
}

public void setServer(String server) {
this.server = server;
}

public KafkaMessageHeaderMapper getHeaderMapper() {
return headerMapper;
}

public void setHeaderMapper(KafkaMessageHeaderMapper headerMapper) {
this.headerMapper = headerMapper;
}

public KafkaMessageConverter getMessageConverter() {
return messageConverter;
}

public void setMessageConverter(KafkaMessageConverter messageConverter) {
this.messageConverter = messageConverter;
}

public Class<? extends Serializer> getKeySerializer() {
return keySerializer;
}

public void setKeySerializer(Class<? extends Serializer> keySerializer) {
this.keySerializer = keySerializer;
}

public Class<? extends Serializer> getValueSerializer() {
return valueSerializer;
}

public void setValueSerializer(Class<? extends Serializer> valueSerializer) {
this.valueSerializer = valueSerializer;
}

public Class<? extends Deserializer> getKeyDeserializer() {
return keyDeserializer;
}

public void setKeyDeserializer(Class<? extends Deserializer> keyDeserializer) {
this.keyDeserializer = keyDeserializer;
}

public Class<? extends Deserializer> getValueDeserializer() {
return valueDeserializer;
}

public void setValueDeserializer(Class<? extends Deserializer> valueDeserializer) {
this.valueDeserializer = valueDeserializer;
}

public Map<String, Object> getConsumerProperties() {
return consumerProperties;
}

public void setConsumerProperties(Map<String, Object> consumerProperties) {
this.consumerProperties = consumerProperties;
}

public Map<String, Object> getProducerProperties() {
return producerProperties;
}

public void setProducerProperties(Map<String, Object> producerProperties) {
this.producerProperties = producerProperties;
}

public boolean isAutoCommit() {
return autoCommit;
}

public void setAutoCommit(boolean autoCommit) {
this.autoCommit = autoCommit;
}

public int getAutoCommitInterval() {
return autoCommitInterval;
}

public void setAutoCommitInterval(int autoCommitInterval) {
this.autoCommitInterval = autoCommitInterval;
}

public String getOffsetReset() {
return offsetReset;
}

public void setOffsetReset(String offsetReset) {
this.offsetReset = offsetReset;
}

public int getPartition() {
return partition;
}

public void setPartition(int partition) {
this.partition = partition;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package org.citrusframework.kafka.endpoint;

import lombok.NoArgsConstructor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.citrusframework.context.TestContext;
import org.citrusframework.exceptions.CitrusRuntimeException;
Expand All @@ -27,9 +26,6 @@
import java.util.List;
import java.util.Optional;

import static lombok.AccessLevel.PRIVATE;

@NoArgsConstructor(access = PRIVATE)
final class KafkaMessageConsumerUtils {

private static final Logger logger = LoggerFactory.getLogger(KafkaMessageConsumerUtils.class);
Expand Down Expand Up @@ -67,4 +63,8 @@ record -> logger.debug("Received message: ({}, {}) at offset {}", record.key(),

return received;
}

private KafkaMessageConsumerUtils() {
// Static utility class
}
}
Loading

0 comments on commit 10f2c13

Please sign in to comment.