diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClientUtils.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClientUtils.java index e044fd48ee3e1..3e161b1e9939b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClientUtils.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClientUtils.java @@ -71,6 +71,17 @@ public static boolean awaitReady(KafkaClient client, Node node, Time time, long throw new IOException("Connection to " + node + " failed."); } long pollTimeout = timeoutMs - (attemptStartTime - startTime); // initialize in this order to avoid overflow + + // If the network client is waiting to send data for some reason (eg. throttling or retry backoff), + // polling longer than that is potentially dangerous as the producer will not attempt to send + // any pending requests. + long waitingTime = client.pollDelayMs(node, startTime); + if (waitingTime > 0 && pollTimeout > waitingTime) { + // Block only until the next-scheduled time that it's okay to send data to the producer, + // wake up, and try again. This is the way. + pollTimeout = waitingTime; + } + client.poll(pollTimeout, attemptStartTime); if (client.authenticationException(node) != null) throw client.authenticationException(node); diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java index 86d1ddf5f41c4..3e50d5abb07dc 100644 --- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java @@ -71,6 +71,7 @@ public FutureResponse(Node node, private int correlation; private Runnable wakeupHook; + private boolean advanceTimeDuringPoll; private final Time time; private final MockMetadataUpdater metadataUpdater; private final Map connections = new HashMap<>(); @@ -138,7 +139,11 @@ public long connectionDelay(Node node, long now) { @Override public long pollDelayMs(Node node, long now) { - return connectionDelay(node, now); + return connectionState(node.idString()).pollDelayMs(now); + } + + public void advanceTimeDuringPoll(boolean advanceTimeDuringPoll) { + this.advanceTimeDuringPoll = advanceTimeDuringPoll; } public void backoff(Node node, long durationMs) { @@ -335,6 +340,12 @@ public List poll(long timeoutMs, long now) { copy.add(response); } + // In real life, if poll() is called and we get to the end with no responses, + // time equal to timeoutMs would have passed. + if (advanceTimeDuringPoll) { + time.sleep(timeoutMs); + } + return copy; } @@ -794,6 +805,13 @@ long connectionDelay(long now) { return 0; } + long pollDelayMs(long now) { + if (notThrottled(now)) + return connectionDelay(now); + + return throttledUntilMs - now; + } + boolean ready(long now) { switch (state) { case CONNECTED: diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java index 2297109b14836..a399357780204 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java @@ -567,6 +567,46 @@ public void testMetadataTopicExpiry() throws Exception { assertTrue(future.isDone(), "Request should be completed"); } + @Test + public void senderThreadShouldNotGetStuckWhenThrottledAndAddingPartitionsToTxn() { + // We want MockClient#poll() to advance time so that eventually the backoff expires. + try { + client.advanceTimeDuringPoll(true); + + ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0); + apiVersions.update("0", NodeApiVersions.create(ApiKeys.INIT_PRODUCER_ID.id, (short) 0, (short) 3)); + TransactionManager txnManager = new TransactionManager(logContext, "testUnresolvedSeq", 60000, 100, apiVersions); + + setupWithTransactionState(txnManager); + doInitTransactions(txnManager, producerIdAndEpoch); + + int throttleTimeMs = 1000; + long startTime = time.milliseconds(); + Node nodeToThrottle = metadata.fetch().nodeById(0); + client.throttle(nodeToThrottle, throttleTimeMs); + + // Verify node is throttled a little bit. In real-life Apache Kafka, we observe that this can happen + // as done above by throttling or with a disconnect / backoff. + long currentPollDelay = client.pollDelayMs(nodeToThrottle, startTime); + assertEquals(currentPollDelay, throttleTimeMs); + + txnManager.beginTransaction(); + txnManager.maybeAddPartition(tp0); + + assertFalse(txnManager.hasInFlightRequest()); + sender.runOnce(); + assertTrue(txnManager.hasInFlightRequest()); + + long totalTimeToRunOnce = time.milliseconds() - startTime; + + // It should have blocked roughly only the backoffTimeMs and some change. + assertTrue(totalTimeToRunOnce < REQUEST_TIMEOUT); + + } finally { + client.advanceTimeDuringPoll(false); + } + } + @Test public void testNodeLatencyStats() throws Exception { try (Metrics m = new Metrics()) {