Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-9137: FETCH_SESSION_ID_NOT_FOUND caused by incorrect FetchSessionCache eviction #7640

Merged
merged 4 commits into from
Nov 6, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions core/src/main/scala/kafka/server/FetchSession.scala
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ case class FetchSession(val id: Int,
", privileged=" + privileged +
", partitionMap.size=" + partitionMap.size +
", creationMs=" + creationMs +
", creationMs=" + lastUsedMs +
", lastUsedMs=" + lastUsedMs +
", epoch=" + epoch + ")"
}
}
Expand Down Expand Up @@ -781,11 +781,7 @@ class FetchManager(private val time: Time,
cache.remove(session)
new SessionlessFetchContext(fetchData)
} else {
if (session.size != session.cachedSize) {
// If the number of partitions in the session changed, update the session's
// position in the cache.
cache.touch(session, session.lastUsedMs)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently supplies the session.lastUsedMs which means that lastUsedMs will be set to the current lastUsedMs.

}
cache.touch(session, time.milliseconds())
session.epoch = JFetchMetadata.nextEpoch(session.epoch)
debug(s"Created a new incremental FetchContext for session id ${session.id}, " +
s"epoch ${session.epoch}: added ${partitionsToLogString(added)}, " +
Expand Down
72 changes: 72 additions & 0 deletions core/src/test/scala/unit/kafka/server/FetchSessionTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,78 @@ class FetchSessionTest {
assertTrue(resp2.sessionId > 0)
}

@Test
def testFetchManagerCacheUsage(): Unit = {
val time = new MockTime()
// set maximum entries to 2 to allow for eviction later
val cache = new FetchSessionCache(2, 1000)
val fetchManager = new FetchManager(time, cache)

// Create a new fetch session, session 1
val session1req = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData]
session1req.put(new TopicPartition("foo", 0), new FetchRequest.PartitionData(0, 0, 100,
Optional.empty()))
session1req.put(new TopicPartition("foo", 1), new FetchRequest.PartitionData(10, 0, 100,
Optional.empty()))
val session1context1 = fetchManager.newContext(JFetchMetadata.INITIAL, session1req, EMPTY_PART_LIST, false)
assertEquals(classOf[FullFetchContext], session1context1.getClass)
val respData1 = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData[Records]]
respData1.put(new TopicPartition("foo", 0), new FetchResponse.PartitionData(
Errors.NONE, 100, 100, 100, null, null))
respData1.put(new TopicPartition("foo", 1), new FetchResponse.PartitionData(
Errors.NONE, 10, 10, 10, null, null))
val session1resp = session1context1.updateAndGenerateResponseData(respData1)
assertEquals(Errors.NONE, session1resp.error())
assertTrue(session1resp.sessionId() != INVALID_SESSION_ID)
assertEquals(2, session1resp.responseData().size())

// check session entered into case
assertTrue(cache.get(session1resp.sessionId()).isDefined)
time.sleep(500)

// Create a second new fetch session
val session2req = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData]
session2req.put(new TopicPartition("foo", 0), new FetchRequest.PartitionData(0, 0, 100,
Optional.empty()))
session2req.put(new TopicPartition("foo", 1), new FetchRequest.PartitionData(10, 0, 100,
Optional.empty()))
val session2context = fetchManager.newContext(JFetchMetadata.INITIAL, session1req, EMPTY_PART_LIST, false)
assertEquals(classOf[FullFetchContext], session2context.getClass)
val session2RespData = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData[Records]]
session2RespData.put(new TopicPartition("foo", 0), new FetchResponse.PartitionData(
Errors.NONE, 100, 100, 100, null, null))
session2RespData.put(new TopicPartition("foo", 1), new FetchResponse.PartitionData(
Errors.NONE, 10, 10, 10, null, null))
val session2resp = session2context.updateAndGenerateResponseData(respData1)
assertEquals(Errors.NONE, session2resp.error())
assertTrue(session2resp.sessionId() != INVALID_SESSION_ID)
assertEquals(2, session2resp.responseData().size())

// both newly created entries are present in cache
assertTrue(cache.get(session1resp.sessionId()).isDefined)
assertTrue(cache.get(session2resp.sessionId()).isDefined)
time.sleep(500)

// Create an incremental fetch request for session 1
val context1v2 = fetchManager.newContext(
new JFetchMetadata(session1resp.sessionId(), 1),
new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData],
new util.ArrayList[TopicPartition], false)
assertEquals(classOf[IncrementalFetchContext], context1v2.getClass)

// total sleep time will now be large enough that fetch session 1 will be evicted if not correctly touched
time.sleep(501)

// create one final session directly in cache to test that the least recently used entry is evicted
// the second session should be evicted because the first session was incrementally fetched more recently than
// the second session was created
val latestSessionId = cache.maybeCreateSession(time.milliseconds(), false, 40, () => dummyCreate(40))
assertTrue(cache.get(session1resp.sessionId()).isDefined)
assertFalse("session 2 should have been evicted by latest session, as session 1 was used more recently",
cache.get(session2resp.sessionId()).isDefined)
assertTrue(cache.get(latestSessionId).isDefined)
}

@Test
def testZeroSizeFetchSession(): Unit = {
val time = new MockTime()
Expand Down