Skip to content

Commit

Permalink
Address code inspection comments
Browse files Browse the repository at this point in the history
  • Loading branch information
horkhe committed Aug 6, 2015
1 parent b3fa33f commit b26b611
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 46 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
default: fmt vet errcheck test

test:
go test -v -timeout 2m -race ./...
go test -v -timeout 60s -race ./...

vet:
go vet ./...
Expand Down
75 changes: 31 additions & 44 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ var testMsg = StringEncoder("Foo")
func TestConsumerOffsetManual(t *testing.T) {
// Given
broker0 := newMockBroker(t, 0)
defer broker0.Close()

mockFetchResponse := newMockFetchResponse(t, 1)
for i := 0; i < 10; i++ {
Expand Down Expand Up @@ -53,16 +52,15 @@ func TestConsumerOffsetManual(t *testing.T) {

safeClose(t, consumer)
safeClose(t, master)
broker0.Close()
}

// If `OffsetNewest` is passed as the initial offset then the first consumed
// message is indeed corresponds to the offset that broker claims to be the
// newest in his metadata response.
// newest in its metadata response.
func TestConsumerOffsetNewest(t *testing.T) {
// Given
broker0 := newMockBroker(t, 0)
defer broker0.Close()

broker0.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": newMockMetadataResponse(t).
SetBroker(broker0.Addr(), broker0.BrokerID()).
Expand Down Expand Up @@ -96,14 +94,13 @@ func TestConsumerOffsetNewest(t *testing.T) {

safeClose(t, consumer)
safeClose(t, master)
broker0.Close()
}

// It is possible to close a partition consumer and create the same anew.
func TestConsumerRecreate(t *testing.T) {
// Given
broker0 := newMockBroker(t, 0)
defer broker0.Close()

broker0.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": newMockMetadataResponse(t).
SetBroker(broker0.Addr(), broker0.BrokerID()).
Expand Down Expand Up @@ -138,14 +135,13 @@ func TestConsumerRecreate(t *testing.T) {

safeClose(t, pc)
safeClose(t, c)
broker0.Close()
}

// An attempt to consume the same partition twice should fail.
func TestConsumerDuplicate(t *testing.T) {
// Given
broker0 := newMockBroker(t, 0)
defer broker0.Close()

broker0.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": newMockMetadataResponse(t).
SetBroker(broker0.Addr(), broker0.BrokerID()).
Expand Down Expand Up @@ -178,14 +174,14 @@ func TestConsumerDuplicate(t *testing.T) {

safeClose(t, pc1)
safeClose(t, c)
broker0.Close()
}

// If consumer fails to refresh metadata it keeps retrying every with frequency
// given in `Config.Consumer.Retry.Backoff`.
// If consumer fails to refresh metadata it keeps retrying with frequency
// specified by `Config.Consumer.Retry.Backoff`.
func TestConsumerLeaderRefreshError(t *testing.T) {
// Given
broker0 := newMockBroker(t, 100)
defer broker0.Close()

// Stage 1: my_topic/0 served by broker0
Logger.Printf(" STAGE 1")
Expand All @@ -203,7 +199,8 @@ func TestConsumerLeaderRefreshError(t *testing.T) {

config := NewConfig()
config.Net.ReadTimeout = 100 * time.Millisecond
config.Consumer.Retry.Backoff = 500 * time.Millisecond
config.Consumer.Retry.Backoff = 200 * time.Millisecond
config.Consumer.Return.Errors = true
config.Metadata.Retry.Max = 0
c, err := NewConsumer([]string{broker0.Addr()}, config)
if err != nil {
Expand All @@ -212,7 +209,7 @@ func TestConsumerLeaderRefreshError(t *testing.T) {

pc, err := c.ConsumePartition("my_topic", 0, OffsetOldest)
if err != nil {
t.Errorf("Failed to create a partition consumer, err=%v", err)
t.Fatal(err)
}

assertMessageOffset(t, <-pc.Messages(), 123)
Expand All @@ -228,17 +225,16 @@ func TestConsumerLeaderRefreshError(t *testing.T) {
"FetchRequest": newMockWrapper(fetchResponse2),
})

if consErr := <-pc.Errors(); consErr.Err != ErrOutOfBrokers {
t.Errorf("Unexpected error: %v", consErr.Err)
}

// Stage 3: finally the metadata returned by broker0 tells that broker1 is
// a new leader for my_topic/0. Consumption resumes.

// Unfortunately consumer does not propagate `ErrNotLeaderForPartition`
// error to PartitionConsumer.Errors() channel. So there is no other way to
// synchronize here by sleep.
time.Sleep(300 * time.Millisecond)
Logger.Printf(" STAGE 3")

broker1 := newMockBroker(t, 101)
defer broker1.Close()

broker1.SetHandlerByMap(map[string]MockResponse{
"FetchRequest": newMockFetchResponse(t, 1).
Expand All @@ -255,13 +251,13 @@ func TestConsumerLeaderRefreshError(t *testing.T) {

safeClose(t, pc)
safeClose(t, c)
broker1.Close()
broker0.Close()
}

func TestConsumerInvalidTopic(t *testing.T) {
// Given
broker0 := newMockBroker(t, 100)
defer broker0.Close()

broker0.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": newMockMetadataResponse(t).
SetBroker(broker0.Addr(), broker0.BrokerID()),
Expand All @@ -281,15 +277,14 @@ func TestConsumerInvalidTopic(t *testing.T) {
}

safeClose(t, c)
broker0.Close()
}

// Nothing bad happens if a partition consumer that has no leader assigned at
// the moment is closed.
func TestConsumerClosePartitionWithoutLeader(t *testing.T) {
// Given
broker0 := newMockBroker(t, 100)
defer broker0.Close()

broker0.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": newMockMetadataResponse(t).
SetBroker(broker0.Addr(), broker0.BrokerID()).
Expand All @@ -304,6 +299,7 @@ func TestConsumerClosePartitionWithoutLeader(t *testing.T) {
config := NewConfig()
config.Net.ReadTimeout = 100 * time.Millisecond
config.Consumer.Retry.Backoff = 100 * time.Millisecond
config.Consumer.Return.Errors = true
config.Metadata.Retry.Max = 0
c, err := NewConsumer([]string{broker0.Addr()}, config)
if err != nil {
Expand All @@ -312,7 +308,7 @@ func TestConsumerClosePartitionWithoutLeader(t *testing.T) {

pc, err := c.ConsumePartition("my_topic", 0, OffsetOldest)
if err != nil {
t.Errorf("Failed to create a partition consumer, err=%v", err)
t.Fatal(err)
}

assertMessageOffset(t, <-pc.Messages(), 123)
Expand All @@ -323,22 +319,18 @@ func TestConsumerClosePartitionWithoutLeader(t *testing.T) {
fetchResponse2.AddError("my_topic", 0, ErrNotLeaderForPartition)

broker0.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": newMockMetadataResponse(t).
SetBroker(broker0.Addr(), broker0.BrokerID()).
SetLeader("my_topic", 0, broker0.BrokerID()),
"FetchRequest": newMockWrapper(fetchResponse2),
})

// When

// Unfortunately consumer does not propagate `ErrNotLeaderForPartition`
// error to PartitionConsumer.Errors() channel. So there is no other way to
// synchronize here by sleep.
time.Sleep(200 * time.Millisecond)
if consErr := <-pc.Errors(); consErr.Err != ErrOutOfBrokers {
t.Errorf("Unexpected error: %v", consErr.Err)
}

// Then: the partition consumer can be closed without any problem.
safeClose(t, pc)
safeClose(t, c)
broker0.Close()
}

// If the initial offset passed on partition consumer creation is out of the
Expand All @@ -347,8 +339,6 @@ func TestConsumerClosePartitionWithoutLeader(t *testing.T) {
func TestConsumerShutsDownOutOfRange(t *testing.T) {
// Given
broker0 := newMockBroker(t, 0)
defer broker0.Close()

broker0.SetHandler(func(req *request) (res encoder) {
switch reqBody := req.body.(type) {
case *MetadataRequest:
Expand Down Expand Up @@ -387,15 +377,14 @@ func TestConsumerShutsDownOutOfRange(t *testing.T) {
safeClose(t, consumer)

safeClose(t, master)
broker0.Close()
}

// If a fetch response contains messages with offsets that are smaller then
// requested, then such messages are ignored.
func TestConsumerExtraOffsets(t *testing.T) {
// Given
broker0 := newMockBroker(t, 0)
defer broker0.Close()

called := 0
broker0.SetHandler(func(req *request) (res encoder) {
switch req.body.(type) {
Expand Down Expand Up @@ -441,15 +430,14 @@ func TestConsumerExtraOffsets(t *testing.T) {

safeClose(t, consumer)
safeClose(t, master)
broker0.Close()
}

// It is fine if offsets of fetched messages are not sequential (although
// strictly increasing!).
func TestConsumerNonSequentialOffsets(t *testing.T) {
// Given
broker0 := newMockBroker(t, 0)
defer broker0.Close()

called := 0
broker0.SetHandler(func(req *request) (res encoder) {
switch req.body.(type) {
Expand Down Expand Up @@ -495,18 +483,16 @@ func TestConsumerNonSequentialOffsets(t *testing.T) {

safeClose(t, consumer)
safeClose(t, master)
broker0.Close()
}

// If leadership for a partition is changing then consumer resolves the new
// leader and switches to it.
func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
// initial setup
seedBroker := newMockBroker(t, 10)
defer seedBroker.Close()
leader0 := newMockBroker(t, 0)
defer leader0.Close()
leader1 := newMockBroker(t, 1)
defer leader1.Close()

seedBroker.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": newMockMetadataResponse(t).
Expand Down Expand Up @@ -665,6 +651,9 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) {

wg.Wait()
safeClose(t, master)
leader1.Close()
leader0.Close()
seedBroker.Close()
}

// When two partitions have the same broker as the leader, if one partition
Expand All @@ -673,8 +662,6 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
func TestConsumerInterleavedClose(t *testing.T) {
// Given
broker0 := newMockBroker(t, 0)
defer broker0.Close()

broker0.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": newMockMetadataResponse(t).
SetBroker(broker0.Addr(), broker0.BrokerID()).
Expand Down Expand Up @@ -717,6 +704,7 @@ func TestConsumerInterleavedClose(t *testing.T) {
safeClose(t, c1)
safeClose(t, c0)
safeClose(t, master)
broker0.Close()
}

func TestConsumerBounceWithReferenceOpen(t *testing.T) {
Expand Down Expand Up @@ -820,8 +808,6 @@ func TestConsumerBounceWithReferenceOpen(t *testing.T) {
func TestConsumerOffsetOutOfRange(t *testing.T) {
// Given
broker0 := newMockBroker(t, 2)
defer broker0.Close()

broker0.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": newMockMetadataResponse(t).
SetBroker(broker0.Addr(), broker0.BrokerID()).
Expand All @@ -848,6 +834,7 @@ func TestConsumerOffsetOutOfRange(t *testing.T) {
}

safeClose(t, master)
broker0.Close()
}

func assertMessageOffset(t *testing.T, msg *ConsumerMessage, expectedOffset int64) {
Expand Down
2 changes: 1 addition & 1 deletion mockbroker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
)

const (
expectationTimeout = 250 * time.Millisecond
expectationTimeout = 500 * time.Millisecond
)

type requestHandlerFunc func(req *request) (res encoder)
Expand Down

0 comments on commit b26b611

Please sign in to comment.