Skip to content

Commit

Permalink
Merge pull request #503 from mailgun/maxim/moremocks
Browse files Browse the repository at this point in the history
Add mock responses for OffsetManager testing
  • Loading branch information
eapache committed Aug 11, 2015
2 parents 8f2b0fb + ea0b355 commit ba01d50
Show file tree
Hide file tree
Showing 8 changed files with 283 additions and 138 deletions.
29 changes: 22 additions & 7 deletions consumer_metadata_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,14 @@ func (r *ConsumerMetadataResponse) decode(pd packetDecoder) (err error) {
}
r.Err = KError(tmp)

r.Coordinator = new(Broker)
if err := r.Coordinator.decode(pd); err != nil {
coordinator := new(Broker)
if err := coordinator.decode(pd); err != nil {
return err
}
if coordinator.addr == ":0" {
return nil
}
r.Coordinator = coordinator

// this can all go away in 2.0, but we have to fill in deprecated fields to maintain
// backwards compatibility
Expand All @@ -43,16 +47,27 @@ func (r *ConsumerMetadataResponse) decode(pd packetDecoder) (err error) {
}

func (r *ConsumerMetadataResponse) encode(pe packetEncoder) error {

pe.putInt16(int16(r.Err))

if r.Coordinator != nil {
host, portstr, err := net.SplitHostPort(r.Coordinator.Addr())
if err != nil {
return err
}
port, err := strconv.ParseInt(portstr, 10, 32)
if err != nil {
return err
}
pe.putInt32(r.Coordinator.ID())
if err := pe.putString(host); err != nil {
return err
}
pe.putInt32(int32(port))
return nil
}
pe.putInt32(r.CoordinatorID)

if err := pe.putString(r.CoordinatorHost); err != nil {
return err
}

pe.putInt32(r.CoordinatorPort)

return nil
}
58 changes: 12 additions & 46 deletions consumer_metadata_response_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,53 +17,19 @@ var (
)

func TestConsumerMetadataResponseError(t *testing.T) {
response := ConsumerMetadataResponse{}

testDecodable(t, "error", &response, consumerMetadataResponseError)

if response.Err != ErrOffsetsLoadInProgress {
t.Error("Decoding produced incorrect error value.")
}

if response.CoordinatorID != 0 {
t.Error("Decoding produced incorrect ID.")
}

if len(response.CoordinatorHost) != 0 {
t.Error("Decoding produced incorrect host.")
}

if response.CoordinatorPort != 0 {
t.Error("Decoding produced incorrect port.")
}
response := ConsumerMetadataResponse{Err: ErrOffsetsLoadInProgress}
testResponse(t, "error", &response, consumerMetadataResponseError)
}

func TestConsumerMetadataResponseSuccess(t *testing.T) {
response := ConsumerMetadataResponse{}

testDecodable(t, "success", &response, consumerMetadataResponseSuccess)

if response.Err != ErrNoError {
t.Error("Decoding produced error value where there was none.")
}

if response.CoordinatorID != 0xAB {
t.Error("Decoding produced incorrect coordinator ID.")
}

if response.CoordinatorHost != "foo" {
t.Error("Decoding produced incorrect coordinator host.")
}

if response.CoordinatorPort != 0xCCDD {
t.Error("Decoding produced incorrect coordinator port.")
}

if response.Coordinator.ID() != 0xAB {
t.Error("Decoding produced incorrect coordinator ID.")
}

if response.Coordinator.Addr() != "foo:52445" {
t.Error("Decoding produced incorrect coordinator address.")
}
broker := NewBroker("foo:52445")
broker.id = 0xAB
response := ConsumerMetadataResponse{
Coordinator: broker,
CoordinatorID: 0xAB,
CoordinatorHost: "foo",
CoordinatorPort: 0xCCDD,
Err: ErrNoError,
}
testResponse(t, "success", &response, consumerMetadataResponseSuccess)
}
133 changes: 133 additions & 0 deletions mockresponses_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,3 +230,136 @@ func (mfr *mockFetchResponse) getHighWaterMark(topic string, partition int32) in
}
return partitions[partition]
}

// mockConsumerMetadataResponse is a `ConsumerMetadataResponse` builder.
type mockConsumerMetadataResponse struct {
coordinators map[string]interface{}
t *testing.T
}

func newMockConsumerMetadataResponse(t *testing.T) *mockConsumerMetadataResponse {
return &mockConsumerMetadataResponse{
coordinators: make(map[string]interface{}),
t: t,
}
}

func (mr *mockConsumerMetadataResponse) SetCoordinator(group string, broker *mockBroker) *mockConsumerMetadataResponse {
mr.coordinators[group] = broker
return mr
}

func (mr *mockConsumerMetadataResponse) SetError(group string, kerror KError) *mockConsumerMetadataResponse {
mr.coordinators[group] = kerror
return mr
}

func (mr *mockConsumerMetadataResponse) For(reqBody decoder) encoder {
req := reqBody.(*ConsumerMetadataRequest)
group := req.ConsumerGroup
res := &ConsumerMetadataResponse{}
v := mr.coordinators[group]
switch v := v.(type) {
case *mockBroker:
res.Coordinator = &Broker{id: v.BrokerID(), addr: v.Addr()}
case KError:
res.Err = v
}
return res
}

// mockOffsetCommitResponse is a `OffsetCommitResponse` builder.
type mockOffsetCommitResponse struct {
errors map[string]map[string]map[int32]KError
t *testing.T
}

func newMockOffsetCommitResponse(t *testing.T) *mockOffsetCommitResponse {
return &mockOffsetCommitResponse{t: t}
}

func (mr *mockOffsetCommitResponse) SetError(group, topic string, partition int32, kerror KError) *mockOffsetCommitResponse {
if mr.errors == nil {
mr.errors = make(map[string]map[string]map[int32]KError)
}
topics := mr.errors[group]
if topics == nil {
topics = make(map[string]map[int32]KError)
mr.errors[group] = topics
}
partitions := topics[topic]
if partitions == nil {
partitions = make(map[int32]KError)
topics[topic] = partitions
}
partitions[partition] = kerror
return mr
}

func (mr *mockOffsetCommitResponse) For(reqBody decoder) encoder {
req := reqBody.(*OffsetCommitRequest)
group := req.ConsumerGroup
res := &OffsetCommitResponse{}
for topic, partitions := range req.blocks {
for partition := range partitions {
res.AddError(topic, partition, mr.getError(group, topic, partition))
}
}
return res
}

func (mr *mockOffsetCommitResponse) getError(group, topic string, partition int32) KError {
topics := mr.errors[group]
if topics == nil {
return ErrNoError
}
partitions := topics[topic]
if partitions == nil {
return ErrNoError
}
kerror, ok := partitions[partition]
if !ok {
return ErrNoError
}
return kerror
}

// mockOffsetFetchResponse is a `OffsetFetchResponse` builder.
type mockOffsetFetchResponse struct {
offsets map[string]map[string]map[int32]*OffsetFetchResponseBlock
t *testing.T
}

func newMockOffsetFetchResponse(t *testing.T) *mockOffsetFetchResponse {
return &mockOffsetFetchResponse{t: t}
}

func (mr *mockOffsetFetchResponse) SetOffset(group, topic string, partition int32, offset int64, metadata string, kerror KError) *mockOffsetFetchResponse {
if mr.offsets == nil {
mr.offsets = make(map[string]map[string]map[int32]*OffsetFetchResponseBlock)
}
topics := mr.offsets[group]
if topics == nil {
topics = make(map[string]map[int32]*OffsetFetchResponseBlock)
mr.offsets[group] = topics
}
partitions := topics[topic]
if partitions == nil {
partitions = make(map[int32]*OffsetFetchResponseBlock)
topics[topic] = partitions
}
partitions[partition] = &OffsetFetchResponseBlock{offset, metadata, kerror}
return mr
}

func (mr *mockOffsetFetchResponse) For(reqBody decoder) encoder {
req := reqBody.(*OffsetFetchRequest)
group := req.ConsumerGroup
res := &OffsetFetchResponse{}
for topic, partitions := range mr.offsets[group] {
for partition, block := range partitions {
res.AddBlock(topic, partition, block)
}
}
return res
}
33 changes: 32 additions & 1 deletion offset_commit_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,40 @@ type OffsetCommitResponse struct {
Errors map[string]map[int32]KError
}

func (r *OffsetCommitResponse) AddError(topic string, partition int32, kerror KError) {
if r.Errors == nil {
r.Errors = make(map[string]map[int32]KError)
}
partitions := r.Errors[topic]
if partitions == nil {
partitions = make(map[int32]KError)
r.Errors[topic] = partitions
}
partitions[partition] = kerror
}

func (r *OffsetCommitResponse) encode(pe packetEncoder) error {
if err := pe.putArrayLength(len(r.Errors)); err != nil {
return err
}
for topic, partitions := range r.Errors {
if err := pe.putString(topic); err != nil {
return err
}
if err := pe.putArrayLength(len(partitions)); err != nil {
return err
}
for partition, kerror := range partitions {
pe.putInt32(partition)
pe.putInt16(int16(kerror))
}
}
return nil
}

func (r *OffsetCommitResponse) decode(pd packetDecoder) (err error) {
numTopics, err := pd.getArrayLength()
if err != nil {
if err != nil || numTopics == 0 {
return err
}

Expand Down
46 changes: 9 additions & 37 deletions offset_commit_response_test.go
Original file line number Diff line number Diff line change
@@ -1,52 +1,24 @@
package sarama

import "testing"
import (
"testing"
)

var (
emptyOffsetCommitResponse = []byte{
0x00, 0x00, 0x00, 0x00}

normalOffsetCommitResponse = []byte{
0x00, 0x00, 0x00, 0x02,

0x00, 0x01, 'm',
0x00, 0x00, 0x00, 0x00,

0x00, 0x01, 't',
0x00, 0x00, 0x00, 0x01,
0x00, 0x00, 0x00, 0x00,
0x00, 0x06}
)

func TestEmptyOffsetCommitResponse(t *testing.T) {
response := OffsetCommitResponse{}

testDecodable(t, "empty", &response, emptyOffsetCommitResponse)

if len(response.Errors) != 0 {
t.Error("Decoding produced errors where there were none.")
}
testResponse(t, "empty", &response, emptyOffsetCommitResponse)
}

func TestNormalOffsetCommitResponse(t *testing.T) {
response := OffsetCommitResponse{}

testDecodable(t, "normal", &response, normalOffsetCommitResponse)

if len(response.Errors) != 2 {
t.Fatal("Decoding produced wrong number of errors.")
}

if len(response.Errors["m"]) != 0 {
t.Error("Decoding produced errors for topic 'm' where there were none.")
}

if len(response.Errors["t"]) != 1 {
t.Fatal("Decoding produced wrong number of errors for topic 't'.")
}

if response.Errors["t"][0] != ErrNotLeaderForPartition {
t.Error("Decoding produced wrong error for topic 't' partition 0.")
}

response.AddError("t", 0, ErrNotLeaderForPartition)
response.Errors["m"] = make(map[int32]KError)
// The response encoded form cannot be checked for it varies due to
// unpredictable map traversal order.
testResponse(t, "normal", &response, nil)
}
Loading

0 comments on commit ba01d50

Please sign in to comment.