Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-17455: fix stuck producer when throttling or retrying #17527

Merged
merged 8 commits into from
Jan 9, 2025
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,17 @@ public static boolean awaitReady(KafkaClient client, Node node, Time time, long
throw new IOException("Connection to " + node + " failed.");
}
long pollTimeout = timeoutMs - (attemptStartTime - startTime); // initialize in this order to avoid overflow

// If the network client is waiting to send data for some reason (eg. throttling or retry backoff),
// polling longer than that is potentially dangerous as the producer will not attempt to send
// any pending requests.
long waitingTime = client.pollDelayMs(node, startTime);
if (waitingTime > 0 && pollTimeout > waitingTime) {
// Block only until the next-scheduled time that it's okay to send data to the producer,
// wake up, and try again. This is the way.
pollTimeout = waitingTime;
}

client.poll(pollTimeout, attemptStartTime);
if (client.authenticationException(node) != null)
throw client.authenticationException(node);
Expand Down
20 changes: 19 additions & 1 deletion clients/src/test/java/org/apache/kafka/clients/MockClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ public FutureResponse(Node node,

private int correlation;
private Runnable wakeupHook;
private boolean advanceTimeDuringPoll;
private final Time time;
private final MockMetadataUpdater metadataUpdater;
private final Map<String, ConnectionState> connections = new HashMap<>();
Expand Down Expand Up @@ -138,7 +139,11 @@ public long connectionDelay(Node node, long now) {

@Override
public long pollDelayMs(Node node, long now) {
return connectionDelay(node, now);
return connectionState(node.idString()).pollDelayMs(now);
}

public void advanceTimeDuringPoll(boolean advanceTimeDuringPoll) {
this.advanceTimeDuringPoll = advanceTimeDuringPoll;
}

public void backoff(Node node, long durationMs) {
Expand Down Expand Up @@ -336,6 +341,12 @@ public List<ClientResponse> poll(long timeoutMs, long now) {
copy.add(response);
}

// In real life, if poll() is called and we get to the end with no responses,
// time equal to timeoutMs would have passed.
if (advanceTimeDuringPoll) {
time.sleep(timeoutMs);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed when developing my test that repeated calls to poll() with timeoutMs == 10 never resulted in time.milliseconds() advancing, so the fictitious Node in my test never became ready (time never advanced past the throttled time). That's why I made this change; however, it broke all sorts of things.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess most test don't expect that time advanced "automatically"... It think we need to remove this.

I think there two possibilities: start a background thread in your test before you call runOnce() and let the background thread advance mockTime (maybe too complex?), or, make the MockTime object more advanced and add some "auto-time-advance" feature, ie, each time milliseconds() is called, advance time by 1ms (or some other value passed into MockTime -- only your test would enable this feature.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Matthias. I did something somewhat inspired by your comment. Ready for review.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should work, too, but why don't you just create new MockTime(1L) and use the already existing "auto-tick" feature?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mjsax the problem is that the following test still passes on trunk using new MockTime(1L). However, the test that I have in the PR as it stands passes on my branch but fails on trunk:

    @Test
    public void senderThreadShouldNotGetStuckWhenThrottledAndAddingPartitionsToTxn() {
        // We want MockClient#poll() to advance time so that eventually the backoff expires.
        // client.advanceTimeDuringPoll(true);
        time = new MockTime(1L);

        ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
        apiVersions.update("0", NodeApiVersions.create(ApiKeys.INIT_PRODUCER_ID.id, (short) 0, (short) 3));
        TransactionManager txnManager = new TransactionManager(logContext, "testUnresolvedSeq", 60000, 100, apiVersions);

        setupWithTransactionState(txnManager);
        doInitTransactions(txnManager, producerIdAndEpoch);

        int throttleTimeMs = 1000;
        long startTime = time.milliseconds();
        Node nodeToThrottle = metadata.fetch().nodeById(0);
        client.throttle(nodeToThrottle, throttleTimeMs);

        // Verify node is throttled a little bit. In real-life Apache Kafka, we observe that this can happen
        // as done above by throttling or with a disconnect / backoff.
        long currentPollDelay = client.pollDelayMs(nodeToThrottle, startTime);
        assertTrue(currentPollDelay > 0);
        assertTrue(currentPollDelay <= throttleTimeMs);

        txnManager.beginTransaction();
        txnManager.maybeAddPartition(tp0);

        assertFalse(txnManager.hasInFlightRequest());
        sender.runOnce();
        assertTrue(txnManager.hasInFlightRequest());

        long totalTimeToRunOnce = time.milliseconds() - startTime;

        // It should have blocked roughly only the backoffTimeMs and some change.
        assertTrue(totalTimeToRunOnce < REQUEST_TIMEOUT);
    }

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose that the test passes with new MockTime(1L) because the new time is not used by the client. The client must be created after the time is set. Would it work? I would also prefer this over adding the sleep in the mock client.

}

return copy;
}

Expand Down Expand Up @@ -795,6 +806,13 @@ long connectionDelay(long now) {
return 0;
}

long pollDelayMs(long now) {
if (notThrottled(now))
return connectionDelay(now);

return throttledUntilMs - now;
}

boolean ready(long now) {
switch (state) {
case CONNECTED:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,47 @@ public void testMetadataTopicExpiry() throws Exception {
assertTrue(future.isDone(), "Request should be completed");
}

@Test
public void senderThreadShouldNotGetStuckWhenThrottledAndAddingPartitionsToTxn() {
// We want MockClient#poll() to advance time so that eventually the backoff expires.
try {
client.advanceTimeDuringPoll(true);

ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
apiVersions.update("0", NodeApiVersions.create(ApiKeys.INIT_PRODUCER_ID.id, (short) 0, (short) 3));
TransactionManager txnManager = new TransactionManager(logContext, "testUnresolvedSeq", 60000, 100, apiVersions);

setupWithTransactionState(txnManager);
doInitTransactions(txnManager, producerIdAndEpoch);

int throttleTimeMs = 1000;
long startTime = time.milliseconds();
Node nodeToThrottle = metadata.fetch().nodeById(0);
client.throttle(nodeToThrottle, throttleTimeMs);

// Verify node is throttled a little bit. In real-life Apache Kafka, we observe that this can happen
// as done above by throttling or with a disconnect / backoff.
long currentPollDelay = client.pollDelayMs(nodeToThrottle, startTime);
assertTrue(currentPollDelay > 0);
assertTrue(currentPollDelay == throttleTimeMs);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: We could use assertEquals(throttleTimeMs, currentPollDelay instead of those two lines.


txnManager.beginTransaction();
txnManager.maybeAddPartition(tp0);

assertFalse(txnManager.hasInFlightRequest());
sender.runOnce();
assertTrue(txnManager.hasInFlightRequest());

long totalTimeToRunOnce = time.milliseconds() - startTime;

// It should have blocked roughly only the backoffTimeMs and some change.
assertTrue(totalTimeToRunOnce < REQUEST_TIMEOUT);

} finally {
client.advanceTimeDuringPoll(false);
}
}

@Test
public void testNodeLatencyStats() throws Exception {
try (Metrics m = new Metrics()) {
Expand Down
Loading