From 9597c9388f33b03817c439159a81807ab6300123 Mon Sep 17 00:00:00 2001 From: mohamed-gara Date: Mon, 11 Sep 2023 17:44:10 +0200 Subject: [PATCH] Add receive batch to receiver (#359) * Upgrade confluentinc/cp-kafka to support running tests on arm * Add receiveBatch in KafkaReceiver (#261) * Add receiveBatch documentation (#261) * Fix receiveAutoAck documentation (#261) --- src/docs/asciidoc/api-guide.adoc | 35 ++++++++++++++++- .../reactor/kafka/receiver/KafkaReceiver.java | 31 ++++++++++++++- .../internals/DefaultKafkaReceiver.java | 17 ++++++++ .../java/reactor/kafka/AbstractKafkaTest.java | 4 +- .../kafka/receiver/KafkaReceiverTest.java | 39 +++++++++++++++++++ 5 files changed, 121 insertions(+), 5 deletions(-) diff --git a/src/docs/asciidoc/api-guide.adoc b/src/docs/asciidoc/api-guide.adoc index 5ea032d7..f1653632 100644 --- a/src/docs/asciidoc/api-guide.adoc +++ b/src/docs/asciidoc/api-guide.adoc @@ -455,7 +455,38 @@ The maximum number of records in each batch can be controlled using the `KafkaCo KafkaConsumer to control the amount of data fetched from Kafka brokers in each poll. Each batch is returned as a Flux that is acknowledged after the Flux terminates. Acknowledged records are committed periodically based on the configured commit interval and batch size. This mode is simple to use since applications -do not need to perform any acknowledge or commit actions. It is efficient as well and can be used +do not need to perform any acknowledge or commit actions. It is efficient as well but can not be used +for at-least-once delivery of messages. + +==== Manual acknowledgement of batches of records + +`KafkaReceiver#receiveBatch` returns a `Flux` of batches of records returned by each `KafkaConsumer#poll()`. +The records in each batch should be manually acknowledged or committed. +[source,java] +-------- +KafkaReceiver.create(receiverOptions) + .receiveBatch() + .concatMap(b -> b) // <1> + .subscribe(r -> { + System.out.println("Received message: " + r); // <2> + r.receiverOffset().acknowledge(); // <3> + }); +-------- +<1> Concatenate in order +<2> Print out each consumer record received +<3> Explicit ack for each message + +Same as the `KafkaReceiver#receiveAutoAck` method, the maximum number of records in each batch can be controlled +using the `KafkaConsumer` property `MAX_POLL_RECORDS`. This is used together with the fetch size and wait times +configured on the KafkaConsumer to control the amount of data fetched from Kafka brokers in each poll. +But unlike the `KafkaReceiver#receiveAutoAck`, each batch is returned as a Flux that should be acknowledged +or committed using `ReceiverOffset`. + +As the `KafkaReceiver#receive` method messages, each message in the batch is represented as a `ReceiverRecord` +which has a committable `ReceiverOffset` instance. + +`KafkaReceiver#receiveBatch` combines the batch consumption mode of `KafkaReceiver#receiveAutoAck` with the manual +acknowledgement/commit mode of `KafkaReceiver#receive`. This batching mode is efficient and is easy to use for at-least-once delivery of messages. ==== Disabling automatic commits @@ -526,7 +557,7 @@ By default, receivers start consuming records from the last committed offset of If a committed offset is not available, the offset reset strategy `ConsumerConfig#AUTO_OFFSET_RESET_CONFIG` configured for the `KafkaConsumer` is used to set the start offset to the earliest or latest offset on the partition. Applications can override offsets by seeking to new offsets in an assignment listener. Methods are provided on -`ReceiverPartition` to seek to the earliest, latest, a specific offset in the partition, or to a record with +`ReceiverPartition` to seek to the earliest, latest, a specific offset in the partition, or to a record with a timestamp later than a point in time. diff --git a/src/main/java/reactor/kafka/receiver/KafkaReceiver.java b/src/main/java/reactor/kafka/receiver/KafkaReceiver.java index 447a4d82..4fbb42cb 100644 --- a/src/main/java/reactor/kafka/receiver/KafkaReceiver.java +++ b/src/main/java/reactor/kafka/receiver/KafkaReceiver.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -97,6 +97,35 @@ default Flux> receive() { return receive(null); } + /** + * Returns a {@link Flux} containing each batch of consumer records returned by {@link Consumer#poll(long)}. + * The maximum number of records returned in each batch can be configured on {@link ReceiverOptions} by setting + * the consumer property {@link ConsumerConfig#MAX_POLL_RECORDS_CONFIG}. Each batch is returned as one Flux. + * Every record must be acknowledged using ReceiverOffset.acknowledge() in order to commit the offset + * corresponding to the record. Acknowledged records are committed based on the configured commit interval + * and commit batch size in ReceiverOptions. Records may also be committed manually using ReceiverOffset.commit(). + * + * @param prefetch amount of prefetched batches + * @return Flux of consumer record batches from Kafka that are committed only after acknowledgement + * @since 1.3.21 + */ + Flux>> receiveBatch(Integer prefetch); + + /** + * Returns a {@link Flux} containing each batch of consumer records returned by {@link Consumer#poll(long)}. + * The maximum number of records returned in each batch can be configured on {@link ReceiverOptions} by setting + * the consumer property {@link ConsumerConfig#MAX_POLL_RECORDS_CONFIG}. Each batch is returned as one Flux. + * Every record must be acknowledged using ReceiverOffset.acknowledge() in order to commit the offset + * corresponding to the record. Acknowledged records are committed based on the configured commit interval + * and commit batch size in ReceiverOptions. Records may also be committed manually using ReceiverOffset.commit(). + * + * @return Flux of consumer record batches from Kafka that are committed only after acknowledgement + * @since 1.3.21 + */ + default Flux>> receiveBatch() { + return receiveBatch(null); + } + /** * Returns a {@link Flux} containing each batch of consumer records returned by {@link Consumer#poll(long)}. * The maximum number of records returned in each batch can be configured on {@link ReceiverOptions} by setting diff --git a/src/main/java/reactor/kafka/receiver/internals/DefaultKafkaReceiver.java b/src/main/java/reactor/kafka/receiver/internals/DefaultKafkaReceiver.java index 891be8d1..3f370608 100644 --- a/src/main/java/reactor/kafka/receiver/internals/DefaultKafkaReceiver.java +++ b/src/main/java/reactor/kafka/receiver/internals/DefaultKafkaReceiver.java @@ -69,6 +69,23 @@ public Flux> receive(Integer prefetch) { }); } + @Override + public Flux>> receiveBatch(Integer prefetch) { + return withHandler(AckMode.MANUAL_ACK, (scheduler, handler) -> { + int prefetchCalculated = preparePublishOnQueueSize(prefetch); + return handler + .receive() + .filter(it -> !it.isEmpty()) + .publishOn(scheduler, prefetchCalculated) + .map(records -> Flux.fromIterable(records) + .map(record -> new ReceiverRecord<>( + record, + handler.toCommittableOffset(record) + )) + ); + }); + } + @Override public Flux>> receiveAutoAck(Integer prefetch) { return withHandler(AckMode.AUTO_ACK, (scheduler, handler) -> handler diff --git a/src/test/java/reactor/kafka/AbstractKafkaTest.java b/src/test/java/reactor/kafka/AbstractKafkaTest.java index 6e65db8e..7b2ff35c 100644 --- a/src/test/java/reactor/kafka/AbstractKafkaTest.java +++ b/src/test/java/reactor/kafka/AbstractKafkaTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -65,7 +65,7 @@ public abstract class AbstractKafkaTest { public static final int DEFAULT_TEST_TIMEOUT = 60_000; - private static final KafkaContainer KAFKA = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1")) + private static final KafkaContainer KAFKA = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.5.0")) .withNetwork(null) .withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1") .withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1") diff --git a/src/test/java/reactor/kafka/receiver/KafkaReceiverTest.java b/src/test/java/reactor/kafka/receiver/KafkaReceiverTest.java index 1a334047..efa4e1fd 100644 --- a/src/test/java/reactor/kafka/receiver/KafkaReceiverTest.java +++ b/src/test/java/reactor/kafka/receiver/KafkaReceiverTest.java @@ -517,6 +517,45 @@ public void manualCommitSync() throws Exception { checkCommitCallbacks(commitLatch, committedOffsets); } + @Test + public void manualCommitBatchSync() throws Exception { + int count = 10; + CountDownLatch commitLatch = new CountDownLatch(count); + long[] committedOffsets = new long[partitions]; + for (int i = 0; i < committedOffsets.length; i++) + committedOffsets[i] = 0; + receiverOptions = receiverOptions.commitInterval(Duration.ZERO).commitBatchSize(0); + KafkaReceiver receiver = createReceiver(); + Flux> kafkaFlux = receiver.receiveBatch() + .flatMap(v -> v) + .delayUntil(record -> { + assertEquals(committedOffsets[record.partition()], record.offset()); + return record.receiverOffset().commit() + .doOnSuccess(i -> onCommit(record, commitLatch, committedOffsets)); + }) + .doOnError(e -> log.error("KafkaFlux exception", e)); + + sendAndWaitForMessages(kafkaFlux, count); + checkCommitCallbacks(commitLatch, committedOffsets); + } + + @Test + public void batchRecordsShouldNotBeAutoCommitted() throws Exception { + receiverOptions = receiverOptions.closeTimeout(Duration.ofMillis(1000)) + .commitBatchSize(10) + .commitInterval(Duration.ofMillis(10)) + .consumerProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + KafkaReceiver receiver = createReceiver(); + Flux> firstReceiveBatch = receiver.receiveBatch().flatMap(v -> v); + sendReceive(firstReceiveBatch, 0, 100, 0, 100); + + // Check that close commits ack'ed records, does not commit un-ack'ed records + cancelSubscriptions(true); + clearReceivedMessages(); + Flux> secondReceiveBatch = createReceiver().receiveBatch().flatMap(r -> r); + sendReceive(secondReceiveBatch, 100, 100, 0, 200); + } + @Test public void manualCommitSyncNoPoll() throws Exception { CountDownLatch commitLatch = new CountDownLatch(1);