diff --git a/src/main/java/reactor/kafka/receiver/internals/ConsumerHandler.java b/src/main/java/reactor/kafka/receiver/internals/ConsumerHandler.java index 68115548..4260bfb6 100644 --- a/src/main/java/reactor/kafka/receiver/internals/ConsumerHandler.java +++ b/src/main/java/reactor/kafka/receiver/internals/ConsumerHandler.java @@ -65,7 +65,8 @@ class ConsumerHandler { "resume", "offsetsForTimes", "beginningOffsets", - "endOffsets" + "endOffsets", + "currentLag" )); private static final AtomicInteger COUNTER = new AtomicInteger(); diff --git a/src/test/java/reactor/kafka/receiver/KafkaReceiverTest.java b/src/test/java/reactor/kafka/receiver/KafkaReceiverTest.java index 00fe028b..c4dba3e7 100644 --- a/src/test/java/reactor/kafka/receiver/KafkaReceiverTest.java +++ b/src/test/java/reactor/kafka/receiver/KafkaReceiverTest.java @@ -59,6 +59,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.OptionalLong; import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; @@ -288,12 +289,17 @@ public void manualAssignmentWithCommit() throws Exception { receiverOptions = receiverOptions.commitInterval(Duration.ZERO) .commitBatchSize(0) .assignment(getTopicPartitions()); + KafkaReceiver receiver = KafkaReceiver.create(receiverOptions); Flux> kafkaFlux = - KafkaReceiver.create(receiverOptions) + receiver .receive() .delayUntil(r -> r.receiverOffset().commit()) .doOnSubscribe(s -> assignSemaphore.release()); sendReceiveWithSendDelay(kafkaFlux, Duration.ofMillis(1000), 0, 10); + OptionalLong lag = receiver + .doOnConsumer(consumer -> consumer.currentLag(getTopicPartitions().iterator().next())) + .block(Duration.ofSeconds(10)); + assertThat(lag.isPresent()).isTrue(); } @Test