Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-9137: FETCH_SESSION_ID_NOT_FOUND caused by incorrect FetchSessionCache eviction #7640

Merged
merged 4 commits into from
Nov 6, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions core/src/main/scala/kafka/server/FetchSession.scala
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ case class FetchSession(val id: Int,
", privileged=" + privileged +
", partitionMap.size=" + partitionMap.size +
", creationMs=" + creationMs +
", creationMs=" + lastUsedMs +
", lastUsedMs=" + lastUsedMs +
", epoch=" + epoch + ")"
}
}
Expand Down Expand Up @@ -781,11 +781,7 @@ class FetchManager(private val time: Time,
cache.remove(session)
new SessionlessFetchContext(fetchData)
} else {
if (session.size != session.cachedSize) {
// If the number of partitions in the session changed, update the session's
// position in the cache.
cache.touch(session, session.lastUsedMs)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently supplies the session.lastUsedMs which means that lastUsedMs will be set to the current lastUsedMs.

}
cache.touch(session, time.milliseconds())
session.epoch = JFetchMetadata.nextEpoch(session.epoch)
debug(s"Created a new incremental FetchContext for session id ${session.id}, " +
s"epoch ${session.epoch}: added ${partitionsToLogString(added)}, " +
Expand Down
194 changes: 194 additions & 0 deletions core/src/test/scala/unit/kafka/server/FetchSessionTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,200 @@ class FetchSessionTest {
assertTrue(resp2.sessionId > 0)
}

@Test
def testFetchSessionExpiration(): Unit = {
val time = new MockTime()
// set maximum entries to 2 to allow for eviction later
val cache = new FetchSessionCache(2, 1000)
val fetchManager = new FetchManager(time, cache)

// Create a new fetch session, session 1
val session1req = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData]
session1req.put(new TopicPartition("foo", 0), new FetchRequest.PartitionData(0, 0, 100,
Optional.empty()))
session1req.put(new TopicPartition("foo", 1), new FetchRequest.PartitionData(10, 0, 100,
Optional.empty()))
val session1context1 = fetchManager.newContext(JFetchMetadata.INITIAL, session1req, EMPTY_PART_LIST, false)
assertEquals(classOf[FullFetchContext], session1context1.getClass)
val respData1 = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData[Records]]
respData1.put(new TopicPartition("foo", 0), new FetchResponse.PartitionData(
Errors.NONE, 100, 100, 100, null, null))
respData1.put(new TopicPartition("foo", 1), new FetchResponse.PartitionData(
Errors.NONE, 10, 10, 10, null, null))
val session1resp = session1context1.updateAndGenerateResponseData(respData1)
assertEquals(Errors.NONE, session1resp.error())
assertTrue(session1resp.sessionId() != INVALID_SESSION_ID)
assertEquals(2, session1resp.responseData().size())

// check session entered into case
assertTrue(cache.get(session1resp.sessionId()).isDefined)
time.sleep(500)

// Create a second new fetch session
val session2req = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData]
session2req.put(new TopicPartition("foo", 0), new FetchRequest.PartitionData(0, 0, 100,
Optional.empty()))
session2req.put(new TopicPartition("foo", 1), new FetchRequest.PartitionData(10, 0, 100,
Optional.empty()))
val session2context = fetchManager.newContext(JFetchMetadata.INITIAL, session1req, EMPTY_PART_LIST, false)
assertEquals(classOf[FullFetchContext], session2context.getClass)
val session2RespData = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData[Records]]
session2RespData.put(new TopicPartition("foo", 0), new FetchResponse.PartitionData(
Errors.NONE, 100, 100, 100, null, null))
session2RespData.put(new TopicPartition("foo", 1), new FetchResponse.PartitionData(
Errors.NONE, 10, 10, 10, null, null))
val session2resp = session2context.updateAndGenerateResponseData(respData1)
assertEquals(Errors.NONE, session2resp.error())
assertTrue(session2resp.sessionId() != INVALID_SESSION_ID)
assertEquals(2, session2resp.responseData().size())

// both newly created entries are present in cache
assertTrue(cache.get(session1resp.sessionId()).isDefined)
assertTrue(cache.get(session2resp.sessionId()).isDefined)
time.sleep(500)

// Create an incremental fetch request for session 1
val context1v2 = fetchManager.newContext(
new JFetchMetadata(session1resp.sessionId(), 1),
new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData],
new util.ArrayList[TopicPartition], false)
assertEquals(classOf[IncrementalFetchContext], context1v2.getClass)

// total sleep time will now be large enough that fetch session 1 will be evicted if not correctly touched
time.sleep(501)

// create one final session to test that the least recently used entry is evicted
// the second session should be evicted because the first session was incrementally fetched
// more recently than the second session was created
val session3req = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData]
session3req.put(new TopicPartition("foo", 0), new FetchRequest.PartitionData(0, 0, 100,
Optional.empty()))
session3req.put(new TopicPartition("foo", 1), new FetchRequest.PartitionData(0, 0, 100,
Optional.empty()))
val session3context = fetchManager.newContext(JFetchMetadata.INITIAL, session3req, EMPTY_PART_LIST, false)
assertEquals(classOf[FullFetchContext], session3context.getClass)
val respData3 = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData[Records]]
respData3.put(new TopicPartition("foo", 0), new FetchResponse.PartitionData(
Errors.NONE, 100, 100, 100, null, null))
respData3.put(new TopicPartition("foo", 1), new FetchResponse.PartitionData(
Errors.NONE, 10, 10, 10, null, null))
val session3resp = session3context.updateAndGenerateResponseData(respData3)
assertEquals(Errors.NONE, session3resp.error())
assertTrue(session3resp.sessionId() != INVALID_SESSION_ID)
assertEquals(2, session3resp.responseData().size())

assertTrue(cache.get(session1resp.sessionId()).isDefined)
assertFalse("session 2 should have been evicted by latest session, as session 1 was used more recently",
cache.get(session2resp.sessionId()).isDefined)
assertTrue(cache.get(session3resp.sessionId()).isDefined)
}

@Test
def testPrivilegedSessionHandling(): Unit = {
val time = new MockTime()
// set maximum entries to 2 to allow for eviction later
val cache = new FetchSessionCache(2, 1000)
val fetchManager = new FetchManager(time, cache)

// Create a new fetch session, session 1
val session1req = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData]
session1req.put(new TopicPartition("foo", 0), new FetchRequest.PartitionData(0, 0, 100,
Optional.empty()))
session1req.put(new TopicPartition("foo", 1), new FetchRequest.PartitionData(10, 0, 100,
Optional.empty()))
val session1context = fetchManager.newContext(JFetchMetadata.INITIAL, session1req, EMPTY_PART_LIST, true)
assertEquals(classOf[FullFetchContext], session1context.getClass)
val respData1 = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData[Records]]
respData1.put(new TopicPartition("foo", 0), new FetchResponse.PartitionData(
Errors.NONE, 100, 100, 100, null, null))
respData1.put(new TopicPartition("foo", 1), new FetchResponse.PartitionData(
Errors.NONE, 10, 10, 10, null, null))
val session1resp = session1context.updateAndGenerateResponseData(respData1)
assertEquals(Errors.NONE, session1resp.error())
assertTrue(session1resp.sessionId() != INVALID_SESSION_ID)
assertEquals(2, session1resp.responseData().size())
assertEquals(1, cache.size)

// move time forward to age session 1 a little compared to session 2
time.sleep(500)

// Create a second new fetch session, unprivileged
val session2req = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData]
session2req.put(new TopicPartition("foo", 0), new FetchRequest.PartitionData(0, 0, 100,
Optional.empty()))
session2req.put(new TopicPartition("foo", 1), new FetchRequest.PartitionData(10, 0, 100,
Optional.empty()))
val session2context = fetchManager.newContext(JFetchMetadata.INITIAL, session1req, EMPTY_PART_LIST, false)
assertEquals(classOf[FullFetchContext], session2context.getClass)
val session2RespData = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData[Records]]
session2RespData.put(new TopicPartition("foo", 0), new FetchResponse.PartitionData(
Errors.NONE, 100, 100, 100, null, null))
session2RespData.put(new TopicPartition("foo", 1), new FetchResponse.PartitionData(
Errors.NONE, 10, 10, 10, null, null))
val session2resp = session2context.updateAndGenerateResponseData(respData1)
assertEquals(Errors.NONE, session2resp.error())
assertTrue(session2resp.sessionId() != INVALID_SESSION_ID)
assertEquals(2, session2resp.responseData().size())

// both newly created entries are present in cache
assertTrue(cache.get(session1resp.sessionId()).isDefined)
assertTrue(cache.get(session2resp.sessionId()).isDefined)
assertEquals(2, cache.size)
time.sleep(500)

// create a session to test session1 privileges mean that session 1 is retained and session 2 is evicted
val session3req = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData]
session3req.put(new TopicPartition("foo", 0), new FetchRequest.PartitionData(0, 0, 100,
Optional.empty()))
session3req.put(new TopicPartition("foo", 1), new FetchRequest.PartitionData(0, 0, 100,
Optional.empty()))
val session3context = fetchManager.newContext(JFetchMetadata.INITIAL, session3req, EMPTY_PART_LIST, true)
assertEquals(classOf[FullFetchContext], session3context.getClass)
val respData3 = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData[Records]]
respData3.put(new TopicPartition("foo", 0), new FetchResponse.PartitionData(
Errors.NONE, 100, 100, 100, null, null))
respData3.put(new TopicPartition("foo", 1), new FetchResponse.PartitionData(
Errors.NONE, 10, 10, 10, null, null))
val session3resp = session3context.updateAndGenerateResponseData(respData3)
assertEquals(Errors.NONE, session3resp.error())
assertTrue(session3resp.sessionId() != INVALID_SESSION_ID)
assertEquals(2, session3resp.responseData().size())

assertTrue(cache.get(session1resp.sessionId()).isDefined)
// even though session 2 is more recent than session 1, and has not reached expiry time, it is less
// privileged than session 2, and thus session 3 should be entered and session 2 evicted.
assertFalse("session 2 should have been evicted by session 3",
cache.get(session2resp.sessionId()).isDefined)
assertTrue(cache.get(session3resp.sessionId()).isDefined)
assertEquals(2, cache.size)

time.sleep(501)

// create a final session to test whether session1 can be evicted due to age even though it is privileged
val session4req = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData]
session4req.put(new TopicPartition("foo", 0), new FetchRequest.PartitionData(0, 0, 100,
Optional.empty()))
session4req.put(new TopicPartition("foo", 1), new FetchRequest.PartitionData(0, 0, 100,
Optional.empty()))
val session4context = fetchManager.newContext(JFetchMetadata.INITIAL, session4req, EMPTY_PART_LIST, true)
assertEquals(classOf[FullFetchContext], session4context.getClass)
val respData4 = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData[Records]]
respData4.put(new TopicPartition("foo", 0), new FetchResponse.PartitionData(
Errors.NONE, 100, 100, 100, null, null))
respData4.put(new TopicPartition("foo", 1), new FetchResponse.PartitionData(
Errors.NONE, 10, 10, 10, null, null))
val session4resp = session3context.updateAndGenerateResponseData(respData4)
assertEquals(Errors.NONE, session4resp.error())
assertTrue(session4resp.sessionId() != INVALID_SESSION_ID)
assertEquals(2, session4resp.responseData().size())

assertFalse("session 1 should have been evicted by session 4 even though it is privileged as it has hit eviction time",
cache.get(session1resp.sessionId()).isDefined)
assertTrue(cache.get(session3resp.sessionId()).isDefined)
assertTrue(cache.get(session4resp.sessionId()).isDefined)
assertEquals(2, cache.size)
}

@Test
def testZeroSizeFetchSession(): Unit = {
val time = new MockTime()
Expand Down