From d0956ba4517ff00b0abe27bdd7d889e84d8b5676 Mon Sep 17 00:00:00 2001 From: Colt McNealy Date: Thu, 17 Oct 2024 08:50:43 -0700 Subject: [PATCH 1/8] KAFKA-17455: fixes stuck producer by polling again after pollDelayMs in NetworkUtils#awaitReady() --- .../org/apache/kafka/clients/NetworkClientUtils.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClientUtils.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClientUtils.java index e044fd48ee3e1..610ab9c89d487 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClientUtils.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClientUtils.java @@ -71,6 +71,15 @@ public static boolean awaitReady(KafkaClient client, Node node, Time time, long throw new IOException("Connection to " + node + " failed."); } long pollTimeout = timeoutMs - (attemptStartTime - startTime); // initialize in this order to avoid overflow + + // If the network client is delayed for some reason (eg. throttling or retry backoff), polling longer than + // that is potentially dangerous as the producer will get stuck waiting with potential for some pending + // requests to just not get sent. This fixes KAFKA-17455. This is the way. + long timeUntilCanSendDataAgain = client.pollDelayMs(node, startTime); + if (timeUntilCanSendDataAgain > 0 && pollTimeout > timeUntilCanSendDataAgain) { + pollTimeout = timeUntilCanSendDataAgain; + } + client.poll(pollTimeout, attemptStartTime); if (client.authenticationException(node) != null) throw client.authenticationException(node); From d1d7d94e2633c9ac0614ba7787559e5a82c0ea26 Mon Sep 17 00:00:00 2001 From: Colt McNealy Date: Thu, 17 Oct 2024 12:26:48 -0700 Subject: [PATCH 2/8] clarifies comments --- .../apache/kafka/clients/NetworkClientUtils.java | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClientUtils.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClientUtils.java index 610ab9c89d487..3e161b1e9939b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClientUtils.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClientUtils.java @@ -72,12 +72,14 @@ public static boolean awaitReady(KafkaClient client, Node node, Time time, long } long pollTimeout = timeoutMs - (attemptStartTime - startTime); // initialize in this order to avoid overflow - // If the network client is delayed for some reason (eg. throttling or retry backoff), polling longer than - // that is potentially dangerous as the producer will get stuck waiting with potential for some pending - // requests to just not get sent. This fixes KAFKA-17455. This is the way. - long timeUntilCanSendDataAgain = client.pollDelayMs(node, startTime); - if (timeUntilCanSendDataAgain > 0 && pollTimeout > timeUntilCanSendDataAgain) { - pollTimeout = timeUntilCanSendDataAgain; + // If the network client is waiting to send data for some reason (eg. throttling or retry backoff), + // polling longer than that is potentially dangerous as the producer will not attempt to send + // any pending requests. + long waitingTime = client.pollDelayMs(node, startTime); + if (waitingTime > 0 && pollTimeout > waitingTime) { + // Block only until the next-scheduled time that it's okay to send data to the producer, + // wake up, and try again. This is the way. + pollTimeout = waitingTime; } client.poll(pollTimeout, attemptStartTime); From 54507dab95d947fc9a036e471ccb3fa7d60a9271 Mon Sep 17 00:00:00 2001 From: Colt McNealy Date: Thu, 17 Oct 2024 12:27:07 -0700 Subject: [PATCH 3/8] attempts to add test --- .../producer/internals/SenderTest.java | 31 +++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java index 2297109b14836..a458f06d00fcc 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java @@ -567,6 +567,37 @@ public void testMetadataTopicExpiry() throws Exception { assertTrue(future.isDone(), "Request should be completed"); } + @Test + public void senderThreadShouldNotBlockWhenBackingOffAndAddingPartitionsToTxn() { + ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0); + apiVersions.update("0", NodeApiVersions.create(ApiKeys.INIT_PRODUCER_ID.id, (short) 0, (short) 3)); + TransactionManager txnManager = new TransactionManager(logContext, "testUnresolvedSeq", 60000, 100, apiVersions); + + setupWithTransactionState(txnManager); + doInitTransactions(txnManager, producerIdAndEpoch); + + int backoffTimeMs = 10; + long now = time.milliseconds(); + Node nodeToThrottle = metadata.fetch().nodeById(0); + // client.throttle(nodeToThrottle, backoffTimeMs); + client.backoff(nodeToThrottle, backoffTimeMs); + assertTrue(client.isConnected(nodeToThrottle.idString())); + client.disconnect(nodeToThrottle.idString()); + assertFalse(client.isConnected(nodeToThrottle.idString())); + + // Verify node is throttled about 10ms. In real-life Apache Kafka, we observe that this can happen + // as done above (with a disconnect and a backoff) or if the producer receives a response with + // throttleTimeMs > 0. + long currentPollDelay = client.pollDelayMs(nodeToThrottle, now); + assertTrue(currentPollDelay > 0); + assertTrue(currentPollDelay <= backoffTimeMs); + + txnManager.beginTransaction(); + txnManager.maybeAddPartition(tp0); + + // sender.runOnce(); + } + @Test public void testNodeLatencyStats() throws Exception { try (Metrics m = new Metrics()) { From 292d1ef4314af0a14087eba0f9774c46d08c75f4 Mon Sep 17 00:00:00 2001 From: Colt McNealy Date: Thu, 17 Oct 2024 14:16:53 -0700 Subject: [PATCH 4/8] Adds a test but my changes to MockClient.java broke all sorts of stuff --- .../org/apache/kafka/clients/MockClient.java | 15 ++++++++++++- .../producer/internals/SenderTest.java | 22 ++++++++++--------- 2 files changed, 26 insertions(+), 11 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java index 8a195184e937c..d43ddf0ffd45d 100644 --- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java @@ -138,7 +138,7 @@ public long connectionDelay(Node node, long now) { @Override public long pollDelayMs(Node node, long now) { - return connectionDelay(node, now); + return connectionState(node.idString()).pollDelayMs(now); } public void backoff(Node node, long durationMs) { @@ -336,6 +336,12 @@ public List poll(long timeoutMs, long now) { copy.add(response); } + if (copy.isEmpty()) { + // Simulate time advancing. If no responses are received, then we know that + // we waited for the whole timeoutMs. + time.sleep(timeoutMs); + } + return copy; } @@ -795,6 +801,13 @@ long connectionDelay(long now) { return 0; } + long pollDelayMs(long now) { + if (notThrottled(now)) + return connectionDelay(now); + + return throttledUntilMs - now; + } + boolean ready(long now) { switch (state) { case CONNECTED: diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java index a458f06d00fcc..2010afa058107 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java @@ -577,25 +577,27 @@ public void senderThreadShouldNotBlockWhenBackingOffAndAddingPartitionsToTxn() { doInitTransactions(txnManager, producerIdAndEpoch); int backoffTimeMs = 10; - long now = time.milliseconds(); + long startTime = time.milliseconds(); Node nodeToThrottle = metadata.fetch().nodeById(0); - // client.throttle(nodeToThrottle, backoffTimeMs); - client.backoff(nodeToThrottle, backoffTimeMs); - assertTrue(client.isConnected(nodeToThrottle.idString())); - client.disconnect(nodeToThrottle.idString()); - assertFalse(client.isConnected(nodeToThrottle.idString())); + client.throttle(nodeToThrottle, backoffTimeMs); // Verify node is throttled about 10ms. In real-life Apache Kafka, we observe that this can happen - // as done above (with a disconnect and a backoff) or if the producer receives a response with - // throttleTimeMs > 0. - long currentPollDelay = client.pollDelayMs(nodeToThrottle, now); + // as done above by throttling or with a disconnect / backoff. + long currentPollDelay = client.pollDelayMs(nodeToThrottle, startTime); assertTrue(currentPollDelay > 0); assertTrue(currentPollDelay <= backoffTimeMs); txnManager.beginTransaction(); txnManager.maybeAddPartition(tp0); - // sender.runOnce(); + assertFalse(txnManager.hasInFlightRequest()); + sender.runOnce(); + assertTrue(txnManager.hasInFlightRequest()); + + long totalTimeToRunOnce = time.milliseconds() - startTime; + + // It should have blocked roughly only the backoffTimeMs and some change. + assertTrue(totalTimeToRunOnce < REQUEST_TIMEOUT); } @Test From 1bc3443deca5b2273f560b4439300fd764945041 Mon Sep 17 00:00:00 2001 From: Colt McNealy Date: Sat, 19 Oct 2024 08:45:53 -0700 Subject: [PATCH 5/8] test that passes on my branch and fails on trunk --- .../java/org/apache/kafka/clients/MockClient.java | 11 ++++++++--- .../clients/producer/internals/SenderTest.java | 15 ++++++++++----- 2 files changed, 18 insertions(+), 8 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java index d43ddf0ffd45d..39998992d66c0 100644 --- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java @@ -71,6 +71,7 @@ public FutureResponse(Node node, private int correlation; private Runnable wakeupHook; + private boolean advanceTimeDuringPoll; private final Time time; private final MockMetadataUpdater metadataUpdater; private final Map connections = new HashMap<>(); @@ -141,6 +142,10 @@ public long pollDelayMs(Node node, long now) { return connectionState(node.idString()).pollDelayMs(now); } + public void setAdvanceTimeDuringPoll(boolean advanceTimeDuringPoll) { + this.advanceTimeDuringPoll = advanceTimeDuringPoll; + } + public void backoff(Node node, long durationMs) { connectionState(node.idString()).backoff(time.milliseconds() + durationMs); } @@ -336,9 +341,9 @@ public List poll(long timeoutMs, long now) { copy.add(response); } - if (copy.isEmpty()) { - // Simulate time advancing. If no responses are received, then we know that - // we waited for the whole timeoutMs. + // In real life, if poll() is called and we get to the end with no responses, + // time equal to timeoutMs would have passed. + if (advanceTimeDuringPoll) { time.sleep(timeoutMs); } diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java index 2010afa058107..ee2645f2675fb 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java @@ -568,7 +568,10 @@ public void testMetadataTopicExpiry() throws Exception { } @Test - public void senderThreadShouldNotBlockWhenBackingOffAndAddingPartitionsToTxn() { + public void senderThreadShouldNotGetStuckWhenThrottledAndAddingPartitionsToTxn() { + // We want MockClient#poll() to advance time so that eventually the backoff expires. + client.setAdvanceTimeDuringPoll(true); + ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0); apiVersions.update("0", NodeApiVersions.create(ApiKeys.INIT_PRODUCER_ID.id, (short) 0, (short) 3)); TransactionManager txnManager = new TransactionManager(logContext, "testUnresolvedSeq", 60000, 100, apiVersions); @@ -576,16 +579,16 @@ public void senderThreadShouldNotBlockWhenBackingOffAndAddingPartitionsToTxn() { setupWithTransactionState(txnManager); doInitTransactions(txnManager, producerIdAndEpoch); - int backoffTimeMs = 10; + int throttleTimeMs = 1000; long startTime = time.milliseconds(); Node nodeToThrottle = metadata.fetch().nodeById(0); - client.throttle(nodeToThrottle, backoffTimeMs); + client.throttle(nodeToThrottle, throttleTimeMs); - // Verify node is throttled about 10ms. In real-life Apache Kafka, we observe that this can happen + // Verify node is throttled a little bit. In real-life Apache Kafka, we observe that this can happen // as done above by throttling or with a disconnect / backoff. long currentPollDelay = client.pollDelayMs(nodeToThrottle, startTime); assertTrue(currentPollDelay > 0); - assertTrue(currentPollDelay <= backoffTimeMs); + assertTrue(currentPollDelay <= throttleTimeMs); txnManager.beginTransaction(); txnManager.maybeAddPartition(tp0); @@ -598,6 +601,8 @@ public void senderThreadShouldNotBlockWhenBackingOffAndAddingPartitionsToTxn() { // It should have blocked roughly only the backoffTimeMs and some change. assertTrue(totalTimeToRunOnce < REQUEST_TIMEOUT); + + client.setAdvanceTimeDuringPoll(false); } @Test From 3da6309fcc50b01020ca5a73e87202da8ac6069a Mon Sep 17 00:00:00 2001 From: Colt McNealy Date: Sun, 10 Nov 2024 11:44:55 -0800 Subject: [PATCH 6/8] addresses PR feedback: rename MockClient#setAdvanceTimeDuringPoll to advanceTimeDuringPoll() --- .../src/test/java/org/apache/kafka/clients/MockClient.java | 2 +- .../apache/kafka/clients/producer/internals/SenderTest.java | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java index 39998992d66c0..fca0a9ca2121b 100644 --- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java @@ -142,7 +142,7 @@ public long pollDelayMs(Node node, long now) { return connectionState(node.idString()).pollDelayMs(now); } - public void setAdvanceTimeDuringPoll(boolean advanceTimeDuringPoll) { + public void advanceTimeDuringPoll(boolean advanceTimeDuringPoll) { this.advanceTimeDuringPoll = advanceTimeDuringPoll; } diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java index ee2645f2675fb..75305ea7f041e 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java @@ -570,7 +570,7 @@ public void testMetadataTopicExpiry() throws Exception { @Test public void senderThreadShouldNotGetStuckWhenThrottledAndAddingPartitionsToTxn() { // We want MockClient#poll() to advance time so that eventually the backoff expires. - client.setAdvanceTimeDuringPoll(true); + client.advanceTimeDuringPoll(true); ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0); apiVersions.update("0", NodeApiVersions.create(ApiKeys.INIT_PRODUCER_ID.id, (short) 0, (short) 3)); @@ -602,7 +602,7 @@ public void senderThreadShouldNotGetStuckWhenThrottledAndAddingPartitionsToTxn() // It should have blocked roughly only the backoffTimeMs and some change. assertTrue(totalTimeToRunOnce < REQUEST_TIMEOUT); - client.setAdvanceTimeDuringPoll(false); + client.advanceTimeDuringPoll(false); } @Test From e56a5630e281a9edaba610bf981dadbcd49843eb Mon Sep 17 00:00:00 2001 From: Colt McNealy Date: Wed, 8 Jan 2025 18:04:34 -0800 Subject: [PATCH 7/8] feedback from pull request --- .../producer/internals/SenderTest.java | 51 ++++++++++--------- 1 file changed, 27 insertions(+), 24 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java index 75305ea7f041e..f228761f1e144 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java @@ -570,39 +570,42 @@ public void testMetadataTopicExpiry() throws Exception { @Test public void senderThreadShouldNotGetStuckWhenThrottledAndAddingPartitionsToTxn() { // We want MockClient#poll() to advance time so that eventually the backoff expires. - client.advanceTimeDuringPoll(true); + try { + client.advanceTimeDuringPoll(true); - ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0); - apiVersions.update("0", NodeApiVersions.create(ApiKeys.INIT_PRODUCER_ID.id, (short) 0, (short) 3)); - TransactionManager txnManager = new TransactionManager(logContext, "testUnresolvedSeq", 60000, 100, apiVersions); + ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0); + apiVersions.update("0", NodeApiVersions.create(ApiKeys.INIT_PRODUCER_ID.id, (short) 0, (short) 3)); + TransactionManager txnManager = new TransactionManager(logContext, "testUnresolvedSeq", 60000, 100, apiVersions); - setupWithTransactionState(txnManager); - doInitTransactions(txnManager, producerIdAndEpoch); + setupWithTransactionState(txnManager); + doInitTransactions(txnManager, producerIdAndEpoch); - int throttleTimeMs = 1000; - long startTime = time.milliseconds(); - Node nodeToThrottle = metadata.fetch().nodeById(0); - client.throttle(nodeToThrottle, throttleTimeMs); + int throttleTimeMs = 1000; + long startTime = time.milliseconds(); + Node nodeToThrottle = metadata.fetch().nodeById(0); + client.throttle(nodeToThrottle, throttleTimeMs); - // Verify node is throttled a little bit. In real-life Apache Kafka, we observe that this can happen - // as done above by throttling or with a disconnect / backoff. - long currentPollDelay = client.pollDelayMs(nodeToThrottle, startTime); - assertTrue(currentPollDelay > 0); - assertTrue(currentPollDelay <= throttleTimeMs); + // Verify node is throttled a little bit. In real-life Apache Kafka, we observe that this can happen + // as done above by throttling or with a disconnect / backoff. + long currentPollDelay = client.pollDelayMs(nodeToThrottle, startTime); + assertTrue(currentPollDelay > 0); + assertTrue(currentPollDelay == throttleTimeMs); - txnManager.beginTransaction(); - txnManager.maybeAddPartition(tp0); + txnManager.beginTransaction(); + txnManager.maybeAddPartition(tp0); - assertFalse(txnManager.hasInFlightRequest()); - sender.runOnce(); - assertTrue(txnManager.hasInFlightRequest()); + assertFalse(txnManager.hasInFlightRequest()); + sender.runOnce(); + assertTrue(txnManager.hasInFlightRequest()); - long totalTimeToRunOnce = time.milliseconds() - startTime; + long totalTimeToRunOnce = time.milliseconds() - startTime; - // It should have blocked roughly only the backoffTimeMs and some change. - assertTrue(totalTimeToRunOnce < REQUEST_TIMEOUT); + // It should have blocked roughly only the backoffTimeMs and some change. + assertTrue(totalTimeToRunOnce < REQUEST_TIMEOUT); - client.advanceTimeDuringPoll(false); + } finally { + client.advanceTimeDuringPoll(false); + } } @Test From 344a5198f249dee5a1b954f0fe9e5087b4630851 Mon Sep 17 00:00:00 2001 From: Colt McNealy Date: Thu, 9 Jan 2025 07:49:36 -0800 Subject: [PATCH 8/8] final PR feedback --- .../apache/kafka/clients/producer/internals/SenderTest.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java index f228761f1e144..a399357780204 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java @@ -588,8 +588,7 @@ public void senderThreadShouldNotGetStuckWhenThrottledAndAddingPartitionsToTxn() // Verify node is throttled a little bit. In real-life Apache Kafka, we observe that this can happen // as done above by throttling or with a disconnect / backoff. long currentPollDelay = client.pollDelayMs(nodeToThrottle, startTime); - assertTrue(currentPollDelay > 0); - assertTrue(currentPollDelay == throttleTimeMs); + assertEquals(currentPollDelay, throttleTimeMs); txnManager.beginTransaction(); txnManager.maybeAddPartition(tp0);