-
Notifications
You must be signed in to change notification settings - Fork 14.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-9137: FETCH_SESSION_ID_NOT_FOUND caused by incorrect FetchSessionCache eviction #7640
Conversation
if (session.size != session.cachedSize) { | ||
// If the number of partitions in the session changed, update the session's | ||
// position in the cache. | ||
cache.touch(session, session.lastUsedMs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently supplies the session.lastUsedMs which means that lastUsedMs will be set to the current lastUsedMs.
I think it'd probably be good to add another test of this behavior. I'll think about any other tests we may be missing for the integration between the FetchSessionCache and the FetchSession code. |
@@ -289,6 +289,94 @@ class FetchSessionTest { | |||
assertTrue(resp2.sessionId > 0) | |||
} | |||
|
|||
@Test | |||
def testFetchManagerCacheIntegration(): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like this is more of a test of correctly updating lastUsedMs, right? "integration" is kind of vague (and also suggests that this is an integration test rather than unit test.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cmccabe yeah, the name is bad and this is the second time I've changed it. I'll think of something else. I wanted to imply that it tests the interaction between the two, but you're right that there's a better name out there.
Thanks for this, @lbradstreet. I think this is a long-standing bug that led to us not properly prioritizing larger fetch requests and replica fetch requests. |
@cmccabe I renamed the test and have also added a test for the privileged session management which I thought needed more testing. Let me know what you think and whether you think anything else is required. |
LGTM |
KAFKA-9137 describes a case where fetch sessions appear to be prematurely evicted from the FetchSessionCache causing frequent
FETCH_SESSION_ID_NOT_FOUND
errors and fetch session restarts.I believe the handling in
kafka.server.FetchSession
is incorrect in two ways. Firstly it only touches the cache entry if the number of partitions changed between fetches. This means that a fetch session that is in steady state will not have its last used time updated. The second problem is that it supplies the current lastUsedMs as the current time when updating lastUsedMs for the session. As a result, even when the cache entry is touched, the time is not updated. Overall I believe this means that every entry's lastUsedMs will be the time it was created. When the fetch session becomes full, the oldest entry will be evicted, not the least recently used entry. Note that this also breaks the privileged session semantics as recently used ReplicaFetcher sessions will be considered to have been lost as they will not be considered used for longer thanMIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS
.The cache itself is currently tested reasonably well, but the interaction between FetchSession and FetchSessionCache is not currently tested. I have added a new test that tests the integration point between FetchSession and FetchSessionCache by adding new sessions and incrementally fetching them to check whether the right session is evicted.