Skip to content

Commit

Permalink
feat(test): add a unittest for leadership tracking
Browse files Browse the repository at this point in the history
TestConsumeMessagesTrackLeader ensures that in the event that leadership
of a topicPartition changes and no preferredReadReplica is specified,
the consumer connects back to the new leader to resume consumption and
doesn't continue consuming from the follower.

See #1927
  • Loading branch information
dnwe committed May 7, 2021
1 parent 6798d09 commit 73be95a
Show file tree
Hide file tree
Showing 3 changed files with 162 additions and 2 deletions.
122 changes: 121 additions & 1 deletion consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -893,6 +893,125 @@ func TestConsumeMessagesFromReadReplicaErrorUnknown(t *testing.T) {
leader.Close()
}

// TestConsumeMessagesTrackLeader ensures that in the event that leadership of
// a topicPartition changes and no preferredReadReplica is specified, the
// consumer connects back to the new leader to resume consumption and doesn't
// continue consuming from the follower.
//
// See https://github.com/Shopify/sarama/issues/1927
func TestConsumeMessagesTrackLeader(t *testing.T) {
prevLogger := Logger
defer func() { Logger = prevLogger }()
Logger = &testLogger{t}

cfg := NewConfig()
cfg.ClientID = t.Name()
cfg.Metadata.RefreshFrequency = time.Millisecond * 50
cfg.Net.MaxOpenRequests = 1
cfg.Version = V2_1_0_0

leader1 := NewMockBroker(t, 1)
leader2 := NewMockBroker(t, 2)

mockMetadataResponse1 := NewMockMetadataResponse(t).
SetBroker(leader1.Addr(), leader1.BrokerID()).
SetBroker(leader2.Addr(), leader2.BrokerID()).
SetLeader("my_topic", 0, leader1.BrokerID())
mockMetadataResponse2 := NewMockMetadataResponse(t).
SetBroker(leader1.Addr(), leader1.BrokerID()).
SetBroker(leader2.Addr(), leader2.BrokerID()).
SetLeader("my_topic", 0, leader2.BrokerID())
mockMetadataResponse3 := NewMockMetadataResponse(t).
SetBroker(leader1.Addr(), leader1.BrokerID()).
SetBroker(leader2.Addr(), leader2.BrokerID()).
SetLeader("my_topic", 0, leader1.BrokerID())

leader1.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": mockMetadataResponse1,
"OffsetRequest": NewMockOffsetResponse(t).
SetVersion(1).
SetOffset("my_topic", 0, OffsetNewest, 1234).
SetOffset("my_topic", 0, OffsetOldest, 0),
"FetchRequest": NewMockFetchResponse(t, 1).
SetVersion(10).
SetMessage("my_topic", 0, 1, testMsg).
SetMessage("my_topic", 0, 2, testMsg),
})

client, err := NewClient([]string{leader1.Addr()}, cfg)
if err != nil {
t.Fatal(err)
}

consumer, err := NewConsumerFromClient(client)
if err != nil {
t.Fatal(err)
}

pConsumer, err := consumer.ConsumePartition("my_topic", 0, 1)
if err != nil {
t.Fatal(err)
}

assertMessageOffset(t, <-pConsumer.Messages(), 1)
assertMessageOffset(t, <-pConsumer.Messages(), 2)

fetchEmptyResponse := &FetchResponse{Version: 10}
fetchEmptyResponse.AddError("my_topic", 0, ErrNoError)
leader1.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": mockMetadataResponse2,
"FetchRequest": NewMockWrapper(fetchEmptyResponse),
})
leader2.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": mockMetadataResponse2,
"FetchRequest": NewMockFetchResponse(t, 1).
SetVersion(10).
SetMessage("my_topic", 0, 3, testMsg).
SetMessage("my_topic", 0, 4, testMsg),
})

// wait for client to be aware that leadership has changed
for {
b, _ := client.Leader("my_topic", 0)
if b.ID() == int32(2) {
break
}
time.Sleep(time.Millisecond * 50)
}

assertMessageOffset(t, <-pConsumer.Messages(), 3)
assertMessageOffset(t, <-pConsumer.Messages(), 4)

leader1.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": mockMetadataResponse3,
"FetchRequest": NewMockFetchResponse(t, 1).
SetVersion(10).
SetMessage("my_topic", 0, 5, testMsg).
SetMessage("my_topic", 0, 6, testMsg),
})
leader2.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": mockMetadataResponse3,
"FetchRequest": NewMockWrapper(fetchEmptyResponse),
})

// wait for client to be aware that leadership has changed back again
for {
b, _ := client.Leader("my_topic", 0)
if b.ID() == int32(1) {
break
}
time.Sleep(time.Millisecond * 50)
}

assertMessageOffset(t, <-pConsumer.Messages(), 5)
assertMessageOffset(t, <-pConsumer.Messages(), 6)

safeClose(t, pConsumer)
safeClose(t, consumer)
leader1.Close()
leader2.Close()
}

// It is fine if offsets of fetched messages are not sequential (although
// strictly increasing!).
func TestConsumerNonSequentialOffsets(t *testing.T) {
Expand Down Expand Up @@ -1523,8 +1642,9 @@ func TestExcludeUncommitted(t *testing.T) {
}

func assertMessageOffset(t *testing.T, msg *ConsumerMessage, expectedOffset int64) {
t.Helper()
if msg.Offset != expectedOffset {
t.Errorf("Incorrect message offset: expected=%d, actual=%d", expectedOffset, msg.Offset)
t.Fatalf("Incorrect message offset: expected=%d, actual=%d", expectedOffset, msg.Offset)
}
}

Expand Down
33 changes: 33 additions & 0 deletions logger_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package sarama

import "testing"

// testLogger implements the StdLogger interface and records the text in the
// logs of the given T passed from Test functions.
// and records the text in the error log.
//
// nolint
type testLogger struct {
t *testing.T
}

func (l *testLogger) Print(v ...interface{}) {
if l.t != nil {
l.t.Helper()
l.t.Log(v...)
}
}

func (l *testLogger) Printf(format string, v ...interface{}) {
if l.t != nil {
l.t.Helper()
l.t.Logf(format, v...)
}
}

func (l *testLogger) Println(v ...interface{}) {
if l.t != nil {
l.t.Helper()
l.t.Log(v...)
}
}
9 changes: 8 additions & 1 deletion mockbroker.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,8 @@ func (b *MockBroker) handleRequests(conn io.ReadWriteCloser, idx int, wg *sync.W
defer func() {
_ = conn.Close()
}()
s := spew.NewDefaultConfig()
s.MaxDepth = 1
Logger.Printf("*** mockbroker/%d/%d: connection opened", b.BrokerID(), idx)
var err error

Expand Down Expand Up @@ -264,7 +266,12 @@ func (b *MockBroker) handleRequests(conn io.ReadWriteCloser, idx int, wg *sync.W
Logger.Printf("*** mockbroker/%d/%d: ignored %v", b.brokerID, idx, spew.Sdump(req))
continue
}
Logger.Printf("*** mockbroker/%d/%d: served %v -> %v", b.brokerID, idx, req, res)
Logger.Printf(
"*** mockbroker/%d/%d: replied to %T with %T\n-> %s\n-> %s",
b.brokerID, idx, req.body, res,
s.Sprintf("%#v", req.body),
s.Sprintf("%#v", res),
)

encodedRes, err := encode(res, nil)
if err != nil {
Expand Down

0 comments on commit 73be95a

Please sign in to comment.