-
Notifications
You must be signed in to change notification settings - Fork 14.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-17455: fix stuck producer when throttling or retrying #17527
Changes from 6 commits
d0956ba
d1d7d94
54507da
292d1ef
1bc3443
3da6309
e56a563
344a519
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -567,6 +567,44 @@ public void testMetadataTopicExpiry() throws Exception { | |
assertTrue(future.isDone(), "Request should be completed"); | ||
} | ||
|
||
@Test | ||
public void senderThreadShouldNotGetStuckWhenThrottledAndAddingPartitionsToTxn() { | ||
// We want MockClient#poll() to advance time so that eventually the backoff expires. | ||
client.advanceTimeDuringPoll(true); | ||
|
||
ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0); | ||
apiVersions.update("0", NodeApiVersions.create(ApiKeys.INIT_PRODUCER_ID.id, (short) 0, (short) 3)); | ||
TransactionManager txnManager = new TransactionManager(logContext, "testUnresolvedSeq", 60000, 100, apiVersions); | ||
|
||
setupWithTransactionState(txnManager); | ||
doInitTransactions(txnManager, producerIdAndEpoch); | ||
|
||
int throttleTimeMs = 1000; | ||
long startTime = time.milliseconds(); | ||
Node nodeToThrottle = metadata.fetch().nodeById(0); | ||
client.throttle(nodeToThrottle, throttleTimeMs); | ||
|
||
// Verify node is throttled a little bit. In real-life Apache Kafka, we observe that this can happen | ||
// as done above by throttling or with a disconnect / backoff. | ||
long currentPollDelay = client.pollDelayMs(nodeToThrottle, startTime); | ||
assertTrue(currentPollDelay > 0); | ||
assertTrue(currentPollDelay <= throttleTimeMs); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: I wonder if we could just assert that currentPollDelay equals to throttleTimeMs. The time does not seem to advance before pollDelayMs is called. Would it work? |
||
|
||
txnManager.beginTransaction(); | ||
txnManager.maybeAddPartition(tp0); | ||
|
||
assertFalse(txnManager.hasInFlightRequest()); | ||
sender.runOnce(); | ||
assertTrue(txnManager.hasInFlightRequest()); | ||
|
||
long totalTimeToRunOnce = time.milliseconds() - startTime; | ||
|
||
// It should have blocked roughly only the backoffTimeMs and some change. | ||
assertTrue(totalTimeToRunOnce < REQUEST_TIMEOUT); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: Same question here. Could we assert that it equals to currentPollDelay? |
||
|
||
client.advanceTimeDuringPoll(false); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note that this line won't be called if any of the previous code throws (e.g. failed assertions). We should use a try..finally to ensure that we restore the state. An alternative would be to create the client in the setup in order to ensure a clean state for each test. |
||
} | ||
|
||
@Test | ||
public void testNodeLatencyStats() throws Exception { | ||
try (Metrics m = new Metrics()) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I noticed when developing my test that repeated calls to
poll()
withtimeoutMs == 10
never resulted intime.milliseconds()
advancing, so the fictitiousNode
in my test never became ready (time never advanced past the throttled time). That's why I made this change; however, it broke all sorts of things.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess most test don't expect that time advanced "automatically"... It think we need to remove this.
I think there two possibilities: start a background thread in your test before you call
runOnce()
and let the background thread advance mockTime (maybe too complex?), or, make theMockTime
object more advanced and add some "auto-time-advance" feature, ie, each timemilliseconds()
is called, advance time by 1ms (or some other value passed into MockTime -- only your test would enable this feature.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks Matthias. I did something somewhat inspired by your comment. Ready for review.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should work, too, but why don't you just create
new MockTime(1L)
and use the already existing "auto-tick" feature?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mjsax the problem is that the following test still passes on
trunk
usingnew MockTime(1L)
. However, the test that I have in the PR as it stands passes on my branch but fails on trunk:There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suppose that the test passes with
new MockTime(1L)
because the newtime
is not used by theclient
. The client must be created after thetime
is set. Would it work? I would also prefer this over adding the sleep in the mock client.