From 0c4ec764f4916b451ff71b05e16535bd4d25119f Mon Sep 17 00:00:00 2001 From: KJ Tsanaktsidis Date: Wed, 18 Mar 2020 16:01:15 +1100 Subject: [PATCH] Fix brokers continually allocating new Session IDs KIP-227 says that if a client provides a session ID of 0 and a session epoch of 0 in a FetchRequest, the broker should allocate a new sessionID. Then, the consumer can use this ID to avoid repeating the list of partitions to consume on every call. Sarama is currently sending 0 for both values, but totally ignoring the session ID generated by the broker. This means that the broker is allocating a new session ID on every request, filling its session ID cache and leading to FETCH_SESSION_ID_NOT_FOUND logs being written by the broker because other _actual_ session IDs are evicted. Instead, we should set the epoch to -1, which instructs the broker not to allocate a new session ID. --- consumer.go | 8 ++++++++ consumer_test.go | 51 +++++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 58 insertions(+), 1 deletion(-) diff --git a/consumer.go b/consumer.go index e311c7e89..e16d08aa9 100644 --- a/consumer.go +++ b/consumer.go @@ -887,6 +887,14 @@ func (bc *brokerConsumer) fetchNewMessages() (*FetchResponse, error) { request.Version = 4 request.Isolation = bc.consumer.conf.Consumer.IsolationLevel } + if bc.consumer.conf.Version.IsAtLeast(V1_1_0_0) { + request.Version = 7 + // We do not currently implement KIP-227 FetchSessions. Setting the id to 0 + // and the epoch to -1 tells the broker not to generate as session ID we're going + // to just ignore anyway. + request.SessionID = 0 + request.SessionEpoch = -1 + } if bc.consumer.conf.Version.IsAtLeast(V2_1_0_0) { request.Version = 10 } diff --git a/consumer_test.go b/consumer_test.go index 7b29ebce5..ae2306526 100644 --- a/consumer_test.go +++ b/consumer_test.go @@ -488,7 +488,7 @@ func TestConsumerReceivingFetchResponseWithTooOldRecords(t *testing.T) { cfg := NewConfig() cfg.Consumer.Return.Errors = true - cfg.Version = V1_1_0_0 + cfg.Version = V0_11_0_0 broker0 := NewMockBroker(t, 0) @@ -569,6 +569,55 @@ func TestConsumeMessageWithNewerFetchAPIVersion(t *testing.T) { broker0.Close() } +func TestConsumeMessageWithSessionIDs(t *testing.T) { + // Given + fetchResponse1 := &FetchResponse{Version: 7} + fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 1) + fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 2) + + cfg := NewConfig() + cfg.Version = V1_1_0_0 + + broker0 := NewMockBroker(t, 0) + fetchResponse2 := &FetchResponse{} + fetchResponse2.Version = 7 + fetchResponse2.AddError("my_topic", 0, ErrNoError) + + broker0.SetHandlerByMap(map[string]MockResponse{ + "MetadataRequest": NewMockMetadataResponse(t). + SetBroker(broker0.Addr(), broker0.BrokerID()). + SetLeader("my_topic", 0, broker0.BrokerID()), + "OffsetRequest": NewMockOffsetResponse(t). + SetVersion(1). + SetOffset("my_topic", 0, OffsetNewest, 1234). + SetOffset("my_topic", 0, OffsetOldest, 0), + "FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2), + }) + + master, err := NewConsumer([]string{broker0.Addr()}, cfg) + if err != nil { + t.Fatal(err) + } + + // When + consumer, err := master.ConsumePartition("my_topic", 0, 1) + if err != nil { + t.Fatal(err) + } + + assertMessageOffset(t, <-consumer.Messages(), 1) + assertMessageOffset(t, <-consumer.Messages(), 2) + + safeClose(t, consumer) + safeClose(t, master) + broker0.Close() + + fetchReq := broker0.History()[3].Request.(*FetchRequest) + if fetchReq.SessionID != 0 || fetchReq.SessionEpoch != -1 { + t.Error("Expected session ID to be zero & Epoch to be -1") + } +} + // It is fine if offsets of fetched messages are not sequential (although // strictly increasing!). func TestConsumerNonSequentialOffsets(t *testing.T) {